-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathBioAnnotate_BioSample.2.sra.metadata.Amazon.Location.js
77 lines (56 loc) · 2.9 KB
/
BioAnnotate_BioSample.2.sra.metadata.Amazon.Location.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// # of records processed: [34,922,262, 146,493] 96m55s
import { LocationClient, SearchPlaceIndexForTextCommand } from '@aws-sdk/client-location';
import { parse } from 'csv-parse';
import { createReadStream, createWriteStream } from 'node:fs';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
import { Queue, arrayChunk, escapeCSVValue, msToMS, stringNormalize, warnInline } from '../common/common.js';
import { access, writeFile } from 'node:fs/promises';
const __dirname = dirname(fileURLToPath(import.meta.url));
const INPUT_READ_STREAM = createReadStream(join(__dirname, '../tmp/BioAnnotate_BioSample.1.sra.metadata'));
try {
await access(join(__dirname, '../tmp/BioAnnotate_BioSample.2.sra.metadata.Amazon.Location'));
} catch {
await writeFile(join(__dirname, '../tmp/BioAnnotate_BioSample.2.sra.metadata.Amazon.Location'), ['attributeValueNormalized', 'latlon'].map(escapeCSVValue).join(',') + '\n');
}
const OUTPUT_READ_STREAM = createReadStream(join(__dirname, '../tmp/BioAnnotate_BioSample.2.sra.metadata.Amazon.Location'));
const OUTPUT_WRITE_STREAM = createWriteStream(join(__dirname, '../tmp/BioAnnotate_BioSample.2.sra.metadata.Amazon.Location'), { flags:'a' });
const CACHE = {};
for await (const record of OUTPUT_READ_STREAM.pipe(parse({ columns:true })))
CACHE[record.attributeValueNormalized] = record.latlon;
const client = new LocationClient();
const queue = Queue({ n:1024*32, onFlush:async q => {
for(const chunk of arrayChunk(Array.from(new Set(q.map(v => v[0]))), 8)) {
for(const [k, v] of await Promise.all(chunk.map(async v => [
v,
(await client.send(new SearchPlaceIndexForTextCommand({ IndexName:'AmazonLocationPlaceIndexHERE', Text:v.substr(0, 200) })))
.Results?.[0]?.Place?.Geometry?.Point?.reverse().join(',')
]))) {
++n[1];
CACHE[k] = v;
OUTPUT_WRITE_STREAM.write([k, CACHE[k]].map(escapeCSVValue).join(',') + '\n');
}
warnInline('# of records processed: [' + n[0].toLocaleString() + ', ' + Object.keys(CACHE).length.toLocaleString() + '] ' + msToMS(Date.now()-ms));
}
} });
const ms = Date.now();
const n = [0, 0, 0];
try {
for await (const record of INPUT_READ_STREAM.pipe(parse({ columns:true }))) {
++n[0];
const attributeValueNormalized = stringNormalize(record['attribute_value']);
if(attributeValueNormalized.includes('missing'))
continue;
if(CACHE[attributeValueNormalized] !== undefined)
++n[1];
else
await queue.push(attributeValueNormalized);
if(!(n[0]%1024))
warnInline('# of records processed: [' + n[0].toLocaleString() + ', ' + Object.keys(CACHE).length.toLocaleString() + '] ' + msToMS(Date.now()-ms));
}
} catch(e) {
console.error(e);
process.exit(1);
}
await queue.flush();
warnInline('# of records processed: [' + n[0].toLocaleString() + ', ' + Object.keys(CACHE).length.toLocaleString() + '] ' + msToMS(Date.now()-ms) + '\n');