Skip to content

Commit

Permalink
compute-connections as async generator 💥
Browse files Browse the repository at this point in the history
  • Loading branch information
derhuerst committed Aug 30, 2020
1 parent 574bcb5 commit 4577070
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 81 deletions.
121 changes: 43 additions & 78 deletions compute-connections.js
Original file line number Diff line number Diff line change
@@ -1,108 +1,73 @@
'use strict'

const recordSort = require('sort-array-by-another')

const inMemoryStore = require('./lib/in-memory-store')
const readStopTimes = require('./lib/read-stop-times')
const parseRelativeTime = require('./lib/parse-relative-time')
const errorsWithRow = require('./lib/errors-with-row')

const isObj = o => 'object' === typeof o && o !== null && !Array.isArray(o)

const computeConnections = async (readFile, timezone, filters = {}, opt = {}) => {
// todo: respect stopover.stop_timezone & agency.agency_timezone
const computeConnections = async function* (readFile, filters = {}, opt = {}) {
if ('function' !== typeof readFile) {
throw new Error('readFile must be a function.')
}

if (!isObj(filters)) throw new Error('filters must be an object.')
filters = {
trip: () => true,
stopover: () => true,
stopTime: () => true,
frequenciesRow: () => true,
...filters,
}
if ('function' !== typeof filters.trip) {
throw new Error('filters.trip must be a function.')
}
if ('function' !== typeof filters.stopover) {
throw new Error('filters.stopover must be a function.')
if ('function' !== typeof filters.stopTime) {
throw new Error('filters.stopTime must be a function.')
}

const {
createStore,
} = {
createStore: inMemoryStore,
...opt,
if ('function' !== typeof filters.frequenciesRow) {
throw new Error('filters.frequenciesRow must be a function.')
}

const {
stopsByTripId,
arrivalsByTripId,
departuresByTripId,
headwayBasedStarts, headwayBasedEnds, headwayBasedHeadways,
closeStores,
} = await readStopTimes(readFile, filters, {createStore})
for await (const _ of readStopTimes(readFile, filters)) {
const {
tripId,
stops, arrivals, departures,
headwayBasedStarts: hwStarts,
headwayBasedEnds: hwEnds,
headwayBasedHeadways: hwHeadways,
} = _

const generateConnectionsByTripId = async function* () {
for await (const tripId of stopsByTripId.keys()) {
const [
stops,
arrivals,
departures,
hwStarts,
hwEnds,
hwHeadways,
] = await Promise.all([
stopsByTripId.get(tripId),
arrivalsByTripId.get(tripId),
departuresByTripId.get(tripId),
headwayBasedStarts.get(tripId),
headwayBasedEnds.get(tripId),
headwayBasedHeadways.get(tripId),
])
const connections = []
const connections = []

// scheduled connections
for (let i = 1; i < stops.length; i++) {
connections.push({
tripId,
fromStop: stops[i - 1],
departure: departures[i - 1],
toStop: stops[i],
arrival: arrivals[i],
})
}
// scheduled connections
for (let i = 1; i < stops.length; i++) {
connections.push({
tripId,
fromStop: stops[i - 1],
departure: departures[i - 1],
toStop: stops[i],
arrival: arrivals[i],
headwayBased: false, // todo: pick a more helpful flag name?
})
}

// headway-based connections
// todo: DRY with compute-stopovers
if (hwStarts) {
const t0 = arrivals[0]
for (let i = 0; i < hwStarts.length; i++) {
for (let t = hwStarts[i]; t < hwEnds[i]; t += hwHeadways[i]) {
for (let j = 1; j < stops.length; j++) {
connections.push({
tripId,
fromStop: stops[j - 1],
departure: t + departures[j - 1] - t0,
toStop: stops[j],
arrival: t + arrivals[j] - t0,
headwayBased: true, // todo: pick a more helpful flag?
})
}
}
// headway-based connections
// todo: DRY with compute-stopover-times
const t0 = arrivals[0]
const hwStartsL = hwStarts ? hwStarts.length : 0
for (let h = 0; h < hwStartsL; h++) {
for (let t = hwStarts[h]; t < hwEnds[h]; t += hwHeadways[h]) {
for (let i = 1; i < stops.length; i++) {
connections.push({
tripId,
fromStop: stops[i - 1],
departure: t + departures[i - 1] - t0,
toStop: stops[i],
arrival: t + arrivals[i] - t0,
headwayBased: true, // todo: pick a more helpful flag name?
})
}
}

yield connections
}

await closeStores()
yield connections
}

const out = {}
out[Symbol.asyncIterator] = generateConnectionsByTripId
out.closeStores = closeStores
return out
}

module.exports = computeConnections
5 changes: 2 additions & 3 deletions examples/compute-connections.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

const readCsv = require('../read-csv')
const computeConnections = require('../compute-connections')
const redisStore = require('../lib/redis-store')

const readFile = (file) => {
return readCsv(require.resolve('sample-gtfs-feed/gtfs/' + file + '.txt'))
}

;(async () => {
const connectionsByTripId = await computeConnections(readFile, 'Europe/Berlin')
const connectionsByTripId = await computeConnections(readFile)
for await (const connectionsOfTrip of connectionsByTripId) {
for (const connection of connectionsOfTrip) {
console.log(connection)
Expand All @@ -18,5 +17,5 @@ const readFile = (file) => {
})()
.catch((err) => {
console.error(err)
process.exitCode = 1
process.exit(1)
})
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"read-trips.js",
"read-services-and-exceptions.js",
"compute-stopovers.js",
"compute-connections.js",
"lib",
"examples",
"test"
Expand Down
47 changes: 47 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- [`computeServiceBreaks(sortedConnections)`](#computeservicebreakssortedconnections)
- [`routeTypes`](#routetypes)
- [`readServicesAndExceptions(readFile, timezone, filters)`](#readservicesandexceptionsreadfile-timezone-filters)
- [`computeConnections(readFile, filters)`](#computeconnectionsreadfile-filters)
- [`computeStopovers(readFile, timezone, filters)`](#computestopoversreadfile-timezone-filters)


Expand Down Expand Up @@ -545,6 +546,52 @@ service-2 [
//
```

### `computeConnections(readFile, filter)`

```js
const readCsv = require('gtfs-utils/read-csv')
const computeConnections = require('gtfs-utils/compute-connections')

const readFile = name => readCsv('path/to/gtfs/' + name + '.txt')

const filters = {
stopTime: s => s.stop_id === 'some-stop-id',
}

const connectionsByTrip = computeConnections(readFile, filters)
for await (const connections of connectionsByTrip) {
for await (const connection of connections) {
console.log(connection)
}
}
```

- `readFile` must be a function that, when called with a file name, returns a [readable stream](https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_readable_streams) in [`objectMode`](https://nodejs.org/docs/latest-v10.x/api/stream.html#stream_object_mode).
- `timezone` must a timezone name from the [tz database](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones).
- `filters` must be an object; It may have the fields `trip`, `stopTime`, `frequencies`, each with a filter function.

Returns an [async iterable](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/asyncIterator) of `[{tripId, fromStop, departure, toStop, arrival}]` lists.

The code above will print the following:

```js
{
tripId: 'a-downtown-all-day',
fromStop: 'airport',
departure: 55440,
toStop: 'museum',
arrival: 55800,
}
{
tripId: 'a-downtown-all-day',
fromStop: 'museum',
departure: 55860,
toStop: 'center',
arrival: 56100,
}
//
```

### `computeStopovers(readFile, filters, timezone)`

```js
Expand Down

0 comments on commit 4577070

Please sign in to comment.