Skip to content

Commit

Permalink
make readFile async 💥
Browse files Browse the repository at this point in the history
  • Loading branch information
derhuerst committed May 22, 2021
1 parent 29d71df commit 8e0c037
Show file tree
Hide file tree
Showing 14 changed files with 27 additions and 39 deletions.
4 changes: 2 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ readCsv('path-to-file.txt')
.on('error', console.error)
.on('data', row => console.log(row))
// or
for await (const row of readCsv('path-to-file.txt')) {
for await (const row of await readCsv('path-to-file.txt')) {
console.log(row)
}
```

`readCsv(path)` returns a [readable stream](https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_readable_streams) in [`objectMode`](https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_object_mode).
`readCsv(path)` is an async function that returns a [readable stream](https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_readable_streams) in [`objectMode`](https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_object_mode).

`path` can also be a [readable stream](https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_readable_streams) like [`process.stdin`](https://nodejs.org/api/process.html#process_process_stdin).

Expand Down
16 changes: 3 additions & 13 deletions docs/zip.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,15 @@ The following code sample shows you how to benefit from `gtfs-util`'s streaming/

```js
const {async: ZipArchive} = require('node-stream-zip') // node-stream-zip@1
const {PassThrough} = require('stream')
const readCsv = require('gtfs-utils/read-csv')
const computeStopovers = require('gtfs-utils/compute-stopovers')

// Define a readFile() function that reads from the GTFS .zip
// archive on-the-fly and parses the CSV data.
const zip = new ZipArchive('path/to/gtfs.zip')
const readFile = (name) => {
const stream = new PassThrough({highWaterMark: 0})
zip.stream(name + '.txt')
.then((file) => new Promise((resolve, reject) => {
pipeline(file, stream, (err) => {
if (err) reject(err)
else resolve()
})
}))
.catch(err => stream.destroy(err))

return readCsv(stream)
const readFile = async (name) => {
const file = await zip.stream(name + '.txt')
return await readCsv(file)
}

const stopovers = computeStopovers(readFile, 'Europe/Berlin')
Expand Down
4 changes: 2 additions & 2 deletions examples/read-stops.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
const readCsv = require('../read-csv')
const readStops = require('../read-stops')

const readFile = (file) => {
return readCsv(require.resolve('sample-gtfs-feed/gtfs/' + file + '.txt'))
const readFile = async (file) => {
return await readCsv(require.resolve('sample-gtfs-feed/gtfs/' + file + '.txt'))
}

;(async () => {
Expand Down
2 changes: 1 addition & 1 deletion lib/read-stop-stations.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const readStopStations = async (readFile, filters, createStore) => {
}

const stations = createStore() // station ID by stop_id
for await (const s of readFile('stops')) {
for await (const s of await readFile('stops')) {
if (!filters.stop(s)) continue

const stationId = s.parent_station
Expand Down
6 changes: 3 additions & 3 deletions lib/read-stop-times.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ const readStopTimes = async function* (readFile, filters) {

// todo: DRY with read-trips.js

const trips = readFile('trips')
const trips = await readFile('trips')
const checkTripsSorting = expectSorting('trips', (a, b) => {
if (a.trip_id === b.trip_id) return 0
return a.trip_id < b.trip_id ? -1 : 1
})

const stopTimes = readFile('stop_times')
const stopTimes = await readFile('stop_times')
const compareStopTimes = (trip, stopTime) => {
if (trip.trip_id === stopTime.trip_id) return 0
return trip.trip_id < stopTime.trip_id ? -1 : 1
Expand All @@ -48,7 +48,7 @@ const readStopTimes = async function* (readFile, filters) {
[Symbol.asyncIterator]: async function* () {},
})
try {
const frequencies = readFile('frequencies')
const frequencies = await readFile('frequencies')
const compareFrequencies = (trip, freq) => {
if (trip.trip_id === freq.trip_id) return 0
return trip.trip_id < freq.trip_id ? -1 : 1
Expand Down
2 changes: 1 addition & 1 deletion lib/read-stop-timezones.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const readStopTimezones = async (readFile, filters, createStore) => {
}

const tzs = createStore() // station/stop timezone, by stop_id
for await (const s of readFile('stops')) {
for await (const s of await readFile('stops')) {
if (
('location_type' in s)
&& !['', STOP, STATION].includes(s.location_type)
Expand Down
2 changes: 1 addition & 1 deletion read-csv.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const {pipeline} = require('stream')
const stripBomStream = require('strip-bom-stream')
const parseCsv = require('csv-parser')

const readCsv = (path) => {
const readCsv = async (path) => {
const isPathStream = isReadable(path)
if (typeof path !== 'string' && !isPathStream) {
throw new Error('path must be a string or a Readable stream')
Expand Down
2 changes: 1 addition & 1 deletion read-pathways.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const readPathways = async function* (readFile, filters = {}, opt = {}) {
const pathways = createStore() // by node/stop ID
const pathwaysByFrom = createStore() // pathway IDs by from_stop_id

for await (let pw of readFile('pathways')) {
for await (let pw of await readFile('pathways')) {
if (!pathwayFilter(pw)) continue
if (
!pw.pathway_id
Expand Down
4 changes: 2 additions & 2 deletions read-services-and-exceptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ const readServicesAndExceptions = async function* (readFile, timezone, filters =
throw new TypeError('filters.serviceException must be a function')
}

const services = readFile('calendar')
const services = await readFile('calendar')
const checkServicesSorting = expectSorting('calendar', (a, b) => {
if (a.service_id === b.service_id) return 0
return a.service_id < b.service_id ? -1 : 1
})

const exceptions = readFile('calendar_dates')
const exceptions = await readFile('calendar_dates')
const compareException = (svc, ex) => {
if (svc.service_id === ex.service_id) return 0
return svc.service_id < ex.service_id ? -1 : 1
Expand Down
2 changes: 1 addition & 1 deletion read-shapes.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const readShapes = async function* (readFile, filters = {}) {
throw new Error('filters.shapesRow must be a function.')
}

const shapes = readFile('shapes')
const shapes = await readFile('shapes')
const checkShapesSorting = expectSorting('shapes', (a, b) => {
if (a.shape_id < b.shape_id) return -1
if (a.shape_id > b.shape_id) return 1
Expand Down
2 changes: 1 addition & 1 deletion read-stops.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const readStops = async (readFile, filters = {}, opt = {}) => {
let curLocType = NaN
let childIds = []

for await (let s of readFile('stops')) {
for await (let s of await readFile('stops')) {
if (!stopFilter(s)) continue


Expand Down
2 changes: 1 addition & 1 deletion read-trips.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const readTrips = async (readFile, filters = {}, opt = {}) => {
})

const trips = createStore() // by ID
for await (const t of readFile('trips')) {
for await (const t of await readFile('trips')) {
if (!tripFilter(t)) continue
checkSorting(t)
await trips.set(t.trip_id, formatTrip(t))
Expand Down
14 changes: 6 additions & 8 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,14 @@ const readFile = (file) => {
const utc = 'Etc/UTC'
const berlin = 'Europe/Berlin'

test('read-csv: accept a readable stream as input', (t) => {
test('read-csv: accept a readable stream as input', async (t) => {
const readable = createReadStream(require.resolve('sample-gtfs-feed/gtfs/stops.txt'))
const src = readCsv(readable)
const src = await readCsv(readable)

src.once('data', (stop) => {
t.ok(stop)
t.ok(stop.stop_id)
src.destroy()
t.end()
})
const stop = await new Promise(res => src.once('data', res))
t.ok(stop)
t.ok(stop.stop_id)
src.destroy()
})

test('format-date', (t) => {
Expand Down
4 changes: 2 additions & 2 deletions test/read-pathways.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ const test = require('tape')
const readCsv = require('../read-csv')
const readPathways = require('../read-pathways')

const readFile = (file) => {
return readCsv(require.resolve('sample-gtfs-feed/gtfs/' + file + '.txt'))
const readFile = async (file) => {
return await readCsv(require.resolve('sample-gtfs-feed/gtfs/' + file + '.txt'))
}

const pw = (id, a, b, data = {}) => ({
Expand Down

0 comments on commit 8e0c037

Please sign in to comment.