Skip to content

Commit

Permalink
Streamify the 2_importAndProcessOsmDataToFiles task
Browse files Browse the repository at this point in the history
Add new classes inherent from DataGeojson and DataOsmRaw.
Instead of reading the whole file at once, these classes will stream it piece by piece asynchronously, allowing for large files to be read without crashing the application.
  • Loading branch information
GabrielBruno24 committed Jan 23, 2025
1 parent e95cabc commit a53af89
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
*/
import { DataBase } from './dataBase';
import GeoJSON from 'geojson';
import fs from 'fs';
import { pipeline } from 'node:stream/promises';
import JSONStream from 'JSONStream';

export class DataGeojson extends DataBase<GeoJSON.Feature> {
private _data: GeoJSON.Feature[];
Expand Down Expand Up @@ -67,3 +70,64 @@ export class DataFileGeojson extends DataGeojson {
return this._fileData || [];
}
}

// Instead of reading the entire file at once, this class streams it asynchronously. This allows for large files to be read without crashing the application.
export class DataStreamGeojson extends DataGeojson {
private _fileData: GeoJSON.Feature[] | undefined = undefined;
private _filename: string;
private _dataInitialized: boolean;

constructor(filename: string) {
super({ type: 'FeatureCollection', features: [] });
this._filename = filename;
this._dataInitialized = false;
}

// Factory method so that we can create the class while calling an async function.
static async Create(filename: string): Promise<DataStreamGeojson> {
const instance = new DataStreamGeojson(filename);
await instance.streamDataFromFile();
return instance;
}

protected getData(): GeoJSON.Feature[] {
if (!this._dataInitialized) {
console.error('The GeoJSON data has not been properly initialized.');
}
return this._fileData || [];
}

private async streamDataFromFile(): Promise<void> {
try {
this._fileData = await this.readGeojsonData();
this._dataInitialized = true;
} catch (error) {
console.error('Error reading GeoJSON data file ' + this._filename, error);
}
}

private async readGeojsonData(): Promise<GeoJSON.Feature[]> {
console.log('Start streaming GeoJSON data.');
const readStream = fs.createReadStream(this._filename);
const jsonParser = JSONStream.parse('features.*');
const features: GeoJSON.Feature[] = [];

return new Promise((resolve, reject) => {
jsonParser.on('error', (error) => {
console.error(error);
reject(error);
});

jsonParser.on('data', (feature) => {
features.push(feature);
});

jsonParser.on('end', () => {
console.log('End of reading GeoJSON data.');
resolve(features);
});

pipeline(readStream, jsonParser);
});
}
}
65 changes: 65 additions & 0 deletions packages/chaire-lib-common/src/tasks/dataImport/data/dataOsmRaw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
* License text available at https://opensource.org/licenses/MIT
*/
import { DataBase } from './dataBase';
import fs from 'fs';
import { pipeline } from 'node:stream/promises';
import JSONStream from 'JSONStream';

export interface OsmRawDataTypeIncoming {
type: 'way' | 'relation' | 'node';
Expand Down Expand Up @@ -243,3 +246,65 @@ export class DataFileOsmRaw extends DataOsmRaw {
return this._fileData || [];
}
}

// Instead of reading the entire file at once, this class streams it asynchronously. This allows for large files to be read without crashing the application.
export class DataStreamOsmRaw extends DataOsmRaw {
private _fileData: OsmRawDataType[] | undefined = undefined;
private _filename: string;
private _dataInitialized: boolean;

private constructor(filename: string) {
super([]);
this._filename = filename;
this._dataInitialized = false;
}

// Factory method so that we can create the class while calling an async function.
static async Create(filename: string): Promise<DataStreamOsmRaw> {
const instance = new DataStreamOsmRaw(filename);
await instance.streamDataFromFile();
return instance;
}

protected getData(): OsmRawDataType[] {
if (!this._dataInitialized) {
console.error('The raw OSM data has not been properly initialized.');
}
return this._fileData || [];
}

private async streamDataFromFile(): Promise<void> {
try {
const elements = await this.readRawJsonData();
this._fileData = elements ? this.splitTags(elements) : [];
this._dataInitialized = true;
} catch (error) {
console.error('Error reading osm raw data file ' + this._filename, error);
}
}

private async readRawJsonData(): Promise<OsmRawDataTypeIncoming[]> {
console.log('Start streaming raw OSM data.');
const readStream = fs.createReadStream(this._filename);
const jsonParser = JSONStream.parse('elements.*');
const elements: OsmRawDataTypeIncoming[] = [];

return new Promise((resolve, reject) => {
jsonParser.on('error', (error) => {
console.error(error);
reject(error);
});

jsonParser.on('data', (element) => {
elements.push(element);
});

jsonParser.on('end', () => {
console.log('End of reading raw OSM data.');
resolve(elements);
});

pipeline(readStream, jsonParser);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
* License text available at https://opensource.org/licenses/MIT
*/
import GenericDataImportTask from './genericDataImportTask';
import { DataFileOsmRaw } from './data/dataOsmRaw';
import { DataFileGeojson } from './data/dataGeojson';
import { DataStreamOsmRaw } from './data/dataOsmRaw';
import { DataStreamGeojson } from './data/dataGeojson';
import OsmDataPreparationResidential from './OsmDataPreparationResidential';
import OsmDataPreparationNonResidential from './OsmDataPreparationNonResidential';

Expand Down Expand Up @@ -46,14 +46,8 @@ export default class PrepareOsmDataForImport extends GenericDataImportTask {
const absoluteDsDir = this._importDir + dataSourceDirectory + '/';
this.assertDataDownloaded(absoluteDsDir);

const osmRawData = new DataFileOsmRaw(
absoluteDsDir + GenericDataImportTask.OSM_RAW_DATA_FILE,
this.fileManager
);
const osmGeojsonData = new DataFileGeojson(
absoluteDsDir + GenericDataImportTask.OSM_GEOJSON_FILE,
this.fileManager
);
const osmRawData = await DataStreamOsmRaw.Create(absoluteDsDir + GenericDataImportTask.OSM_RAW_DATA_FILE);
const osmGeojsonData = await DataStreamGeojson.Create(absoluteDsDir + GenericDataImportTask.OSM_GEOJSON_FILE);

// Calculate residential data if required
const entrancesDataFile = absoluteDsDir + GenericDataImportTask.RESIDENTIAL_ENTRANCES_FILE;
Expand Down

0 comments on commit a53af89

Please sign in to comment.