-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbigquery.js
115 lines (103 loc) · 3.29 KB
/
bigquery.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
const { BigQuery } = require('@google-cloud/bigquery');
const fs = require('fs');
const JSONStream = require('JSONStream');
const es = require('event-stream');
const { settlementsSchema, bigqueryConfig } = require('./config/bigquery');
const FIRST_LEVEL = 'Перший рівень';
const SECOND_LEVEL = 'Другий рівень';
const THIRD_LEVEL = 'Третій рівень';
const FOURTH_LEVEL = 'Четвертий рівень';
const CATEGORY = 'Категорія';
const NAME = "Назва об'єкта українською мовою";
const levels = {};
if (Object.values(bigqueryConfig).includes(undefined)) {
console.error('no connection settings in env')
process.exit(100);
}
const db = new BigQuery({
projectId: bigqueryConfig.projectID,
credentials: {
client_email: bigqueryConfig.client_email,
private_key: bigqueryConfig.private_key,
},
clientOptions: {
clientId: bigqueryConfig.clientId,
},
});
const createTable = () => {
const options = {
schema: settlementsSchema,
};
console.log('create table');
return db.dataset(bigqueryConfig.datasetID).createTable(bigqueryConfig.tableID, options);
};
module.exports.getTable = async () => {
const exists = await db.dataset(bigqueryConfig.datasetID).table(bigqueryConfig.tableID).exists();
if (exists[0]) {
console.log('Drop table');
await db.dataset(bigqueryConfig.datasetID).table(bigqueryConfig.tableID).delete();
}
return createTable();
};
module.exports.insertData = (file) => {
const parser = JSONStream.parse('*');
fs.createReadStream(file)
.pipe(parser)
.pipe(
es.mapSync((data) => {
if (
data[FIRST_LEVEL] &&
!data[SECOND_LEVEL] &&
!data[THIRD_LEVEL] &&
!data[FOURTH_LEVEL] &&
!levels[data[FIRST_LEVEL]]
) {
levels[data[FIRST_LEVEL]] = data[NAME];
}
if (
data[SECOND_LEVEL] &&
+data[SECOND_LEVEL].toString().slice(3, 5) &&
!data[THIRD_LEVEL] &&
!data[FOURTH_LEVEL] &&
!levels[data[SECOND_LEVEL]]
) {
levels[data[SECOND_LEVEL]] = data[NAME];
}
if (
data[THIRD_LEVEL] &&
+data[THIRD_LEVEL].toString().slice(6, 8) &&
!data[FOURTH_LEVEL] &&
!levels[data[THIRD_LEVEL]]
) {
levels[data[THIRD_LEVEL]] = data[NAME];
}
if (data[CATEGORY]) {
const code = [
data[FIRST_LEVEL],
data[SECOND_LEVEL],
data[THIRD_LEVEL],
data[FOURTH_LEVEL],
].filter((level) => level);
const item = {
Code: code[code.length - 1].toString(),
Region: levels[data[FIRST_LEVEL]],
City: levels[data[SECOND_LEVEL]],
District: levels[data[THIRD_LEVEL]] || '',
Village: data[FOURTH_LEVEL] ? data[NAME] : '',
};
return JSON.stringify(item) + '\n'; // JSON transformer must return newline-delimited JSON
}
}),
)
.pipe(
db.dataset(bigqueryConfig.datasetID).table(bigqueryConfig.tableID).createWriteStream({
sourceFormat: 'NEWLINE_DELIMITED_JSON',
}),
)
.on('error', (err) => {
console.error('Error!', err);
})
.on('complete', () => {
console.log('All done!');
});
};