-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.js
205 lines (171 loc) · 5.48 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// const p = require('path')
const pump = require('pump')
const querystring = require('query-string')
const u = require('url')
// const through = require('through2')
// const crypto = require('crypto')
const hyperdriveHttp = require('hyperdrive-http')
const ndjson = require('./util/ndjson-duplex-stream')
const { collectStream } = require('./util/stream')
const { makeStore } = require('./store')
const mappings = require('./lib/mappings')
const log = require('./lib/log')
const { clock } = require('./util/debug')
if (require.main === module) {
const opts = extractOptions()
const server = makeServer(opts)
console.log(opts)
server.start()
} else {
module.exports = makeServer
}
function makeServer (opts) {
const fastify = require('fastify')({
logger: log.child({ name: 'fastify' })
})
const store = makeStore({
storage: opts.storage,
key: opts.key,
mappings
})
store.ready(() => {
// log.info('Store ready: %s', store.key.toString('hex'))
})
fastify.register(require('./frontend/fastify'), {
ssr: false
// prefix: '/app'
})
fastify.register(require('fastify-websocket'))
let hyperdriveRequestHandler = new Promise((resolve, reject) => {
store.writer((err, drive) => {
if (err) return reject(err)
const handler = hyperdriveHttp(drive)
resolve(handler)
})
})
fastify.get('/fs/*', (req, res) => {
// const path = req.params['*']
hyperdriveRequestHandler.then(handler => {
const rawReq = req.req
rawReq.url = rawReq.url.substring(3) // remove /fs prefix
handler(rawReq, res.res)
})
})
fastify.get('/batch', { websocket: true }, (rawStream, req, params) => {
// TODO: Add auth.
const stream = ndjson(rawStream)
const batchStream = store.createBatchStream()
pump(stream, batchStream, stream)
})
fastify.get('/get', { websocket: true }, (rawStream, req) => {
const stream = ndjson(rawStream)
const getStream = store.createGetStream()
pump(stream, getStream, stream)
})
fastify.get('/hyperdrive/writeFile/*', { websocket: true }, (rawStream, req, params) => {
const path = params['*']
// TODO: Check if path is valid.
// rawStream.on('data', d => console.log('data', d.toString()))
store.writer((err, drive) => {
if (err) return rawStream.destroy(err)
const ws = drive.createWriteStream(path)
ws.on('close', () => log.debug('Written file: ' + path))
rawStream.pipe(ws)
})
})
fastify.get('/hyperdrive/readFile/*', { websocket: true }, (rawStream, req, params) => {
const path = params['*']
// TODO: Check if path is valid.
// rawStream.on('data', d => console.log('data', d.toString()))
store.writer((err, drive) => {
if (err) return rawStream.destroy(err)
const rs = drive.createReadStream(path)
rs.pipe(rawStream)
})
})
fastify.get('/query/:name', { websocket: true }, (rawStream, req, params) => {
console.log('QUERY RECV')
const stream = ndjson(rawStream)
const { name } = params
const args = queryArgs(req.url)
const time = clock()
log.info('[query] %s START %o', name, args)
const [view, method] = name.split('.')
if (!(typeof store.api[view] === 'object' &&
typeof store.api[view][method] === 'function')) {
return stream.destroy(new Error('Invalid query name'))
}
// TODO: Support higher-level key than api.manifest
let manifest
if (store.api[view].manifest) {
manifest = store.api[view].manifest[method] || store.api[view].manifest.default
}
if (!manifest) manifest = 'streaming'
if (typeof manifest !== 'object') manifest = { type: manifest }
// TODO: Add safeguards / sanitze user input
// or formalize query args in other ways.
const result = store.api[view][method](args)
if (manifest.type === 'streaming') {
const transforms = []
// TODO: Formalize this without special casing.
if (view === 'entities') {
transforms.push(store.createGetStream())
}
log.info('[query] %s STREAM %s', name, time())
stream.on('end', () => log.info('[query] %s END'))
transforms.push(collectStream(100))
pump(result, ...transforms, stream)
} else if (manifest.type === 'promise') {
result
.then(data => {
log.info('[query] %s DONE %s', name, time())
stream.write(data)
stream.end()
})
.catch(err => stream.destroy(err))
} else {
log.info('[query] %s ERROR')
stream.destroy(new Error('Unsupported method: ' + name))
}
// result.on('end', () => console.log('result end: ', time()))
// stream.on('end', () => console.log('stream end: ', time()))
// TODO: Move to websocket middleware.
// TODO: Propagate errors somewhere else?
stream.on('error', e => {
if (e.code && e.code === 'ECONNRESET') return
log.error(e)
})
})
function start () {
fastify.listen(opts.port || 9191, 'localhost', (err) => {
if (err) return console.error(err)
})
}
return {
fastify, store, start
}
}
function queryArgs (url) {
url = u.parse(url)
let query = {}
if (url.search) {
query = { ...querystring.parse(url.search) }
}
return query
}
function extractOptions () {
const argv = require('minimist')(process.argv.slice(2), {
alias: {
p: 'port',
s: 'storage',
k: 'key'
}
})
console.log(argv)
const opts = {
port: argv.port || 9191,
storage: argv.storage || './data',
key: argv.key || null
}
return opts
}