Skip to content

Commit

Permalink
compute-sorted-connections: use for-await loops & read-stop-times hel…
Browse files Browse the repository at this point in the history
…per 💥
  • Loading branch information
derhuerst committed Aug 30, 2020
1 parent 1d351ad commit a880ccb
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 143 deletions.
188 changes: 50 additions & 138 deletions compute-sorted-connections.js
Original file line number Diff line number Diff line change
@@ -1,162 +1,74 @@
'use strict'

const debug = require('debug')('gtfs-utils:compute-sorted-connections')
const pump = require('pump')
const {Writable} = require('stream')
const {DateTime} = require('luxon')
const {gte} = require('sorted-array-functions')

const readServicesAndExceptions = require('./read-services-and-exceptions')
const inMemoryStore = require('./lib/in-memory-store')
const readTrips = require('./read-trips')
const parseTime = require('./parse-time')
const readServicesAndExceptions = require('./read-services-and-exceptions')
const computeConnections = require('./compute-connections')
const resolveTime = require('./lib/resolve-time')

const isObj = o => 'object' === typeof o && o !== null && !Array.isArray(o)
// todo: respect stopover.stop_timezone & agency.agency_timezone
const computeSortedConnections = async (readFile, timezone, filters = {}, opt = {}) => {
if ('string' !== typeof timezone || !timezone) {
throw new Error('timezone must be a non-empty string.')
}

const computeStopoversByTrip = (readFile, filters, timezone) => {
const {
stopover: stopoverFilter,
} = filters

return Promise.all([
readServicesAndExceptions(readFile, timezone, filters),
readTrips(readFile, filters.trip)
])
.then(([services, trips]) => {
for (let tripId in trips) {
trips[tripId] = {
serviceId: trips[tripId].service_id,
routeId: trips[tripId].route_id
}
}

const stopovers = Object.create(null) // by trip ID
const onStopover = (s) => {
if (!stopoverFilter(s)) return;

const trip = trips[s.trip_id]
if (!trip) {
debug('unknown trip', s.trip_id)
return;
}
const days = services[trip.serviceId]
if (!days) {
debug('unknown service', trip.serviceId)
return;
}

s.service_id = trip.serviceId
s.route_id = trip.routeId
s.stop_sequence = parseInt(s.stop_sequence)

if (!(s.trip_id in stopovers)) stopovers[s.trip_id] = [s]
else stopovers[s.trip_id].push(s)
}

let row = 0
const parser = new Writable({
objectMode: true,
write: function parseStopover (s, _, cb) {
row++
try {
onStopover(s)
} catch (err) {
err.row = row
err.message += ' – row ' + row
return cb(err)
}
cb()
},
writev: function parseStopovers (chunks, _, cb) {
for (let i = 0; i < chunks.length; i++) {
const s = chunks[i].chunk
row++
try {
onStopover(s)
} catch (err) {
err.row = row
err.message += ' – row ' + row
return cb(err)
}
}
cb()
}
})
createStore,
} = {
createStore: inMemoryStore,
...opt,
}

return new Promise((resolve, reject) => {
pump(
readFile('stop_times'),
parser,
(err) => {
if (err) reject(err)
else resolve({stopovers, trips, services})
}
)
})
debug('reading trips')
const svcIdsByTrip = await readTrips(readFile, filters, {
...opt,
formatTrip: t => t.service_id,
})
}

const sortStopovers = (s1, s2) => s1.stop_sequence - s2.stop_sequence

const computeSortedConnections = (readFile, filters, timezone) => {
if ('function' !== typeof readFile) {
throw new Error('readFile must be a function.')
}

if (!isObj(filters)) throw new Error('filters must be an object.')
filters = {
service: () => true,
trip: () => true,
stopover: () => true,
...filters,
}
if ('function' !== typeof filters.service) {
throw new Error('filters.service must be a function.')
}
if ('function' !== typeof filters.trip) {
throw new Error('filters.trip must be a function.')
}
if ('function' !== typeof filters.stopover) {
throw new Error('filters.stopover must be a function.')
debug('reading services & exceptions')
const _services = readServicesAndExceptions(readFile, timezone, filters)
const services = createStore() // by service ID
for await (const [id, days] of _services) {
await services.set(id, days)
}

return computeStopoversByTrip(readFile, filters, timezone)
.then(({stopovers: allStopovers, trips, services}) => {
const byDeparture = []
// todo: use store API to support memory-constrained environments
const sortedConnections = []
const compareConnections = (a, b) => a.departure - b.departure

for (const tripId in allStopovers) {
const stopovers = allStopovers[tripId].sort(sortStopovers)
allStopovers[tripId] = null // allow GC
debug('reading connections')
const connectionsByTrip = computeConnections(readFile, filters, opt)
for await (const connections of connectionsByTrip) {
if (connections.length === 0) continue

const {serviceId, routeId} = trips[tripId]
const days = services[serviceId]
const serviceId = await svcIdsByTrip.get(connections[0].tripId)
if (!serviceId) continue
const days = await services.get(serviceId)
if (!days) continue // todo: log error?

for (const c of connections) {
for (let i = 0; i < days.length; i++) {
const day = DateTime.fromMillis(days[i] * 1000, {zone: timezone})

const maxJ = stopovers.length - 1
for (let j = 0; j < maxJ; j++) {
const s1 = stopovers[j]
const s2 = stopovers[j + 1]

const dep = day.plus(parseTime(s1.departure_time)) / 1000 | 0
byDeparture.push([dep, {
tripId,
fromStop: s1.stop_id,
departure: dep,
toStop: s2.stop_id,
arrival: day.plus(parseTime(s2.arrival_time)) / 1000 | 0,
routeId,
serviceId
}])
const dep = resolveTime(timezone, days[i], c.departure)
const newCon = {
tripId: c.tripId,
fromStop: c.fromStop,
departure: dep,
toStop: c.toStop,
arrival: resolveTime(timezone, days[i], c.arrival),
headwayBased: !!c.headwayBased,
}

const idx = gte(sortedConnections, newCon, compareConnections)
if (idx === -1) sortedConnections.push(newCon)
else sortedConnections.splice(idx, 0, newCon)
}
}
}

byDeparture.sort((a, b) => a[0] - b[0])
for (let i = 0; i < byDeparture.length; i++) {
byDeparture[i] = byDeparture[i][1]
}
return byDeparture
})
return sortedConnections
}

module.exports = computeSortedConnections
2 changes: 1 addition & 1 deletion examples/compute-service-breaks.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const readFile = (file) => {
return readCsv(require.resolve('sample-gtfs-feed/gtfs/' + file + '.txt'))
}

computeSortedConnections(readFile, {}, 'Europe/Berlin')
computeSortedConnections(readFile, 'Europe/Berlin')
.then((connections) => {
const {findBetween, data} = computeServiceBreaks(connections)

Expand Down
13 changes: 9 additions & 4 deletions examples/compute-sorted-connections.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ const readFile = (file) => {
return readCsv(require.resolve('sample-gtfs-feed/gtfs/' + file + '.txt'))
}

computeSortedConnections(readFile, {}, 'Europe/Berlin')
.then((sortedConnections) => {
console.log(sortedConnections)
;(async () => {
const sortedCons = await computeSortedConnections(readFile, 'Europe/Berlin')
for (const connection of sortedCons) {
console.log(connection)
}
})()
.catch((err) => {
console.error(err)
process.exit(1)
})
.catch(console.error)

0 comments on commit a880ccb

Please sign in to comment.