Skip to content

Commit

Permalink
fix: do not stream output (#7)
Browse files Browse the repository at this point in the history
This PR sets the `stream-channels` option to `false` which disables streaming of output.

We are buffering the output so there is no need to stream or have the complexity of a newline delimited parser to deal with it.

This also fixes a problem with IPFS Clusters hosted behind an nginx reverse proxy. Currently if any response is received before the request has finished uploading then nginx doesn't know how to deal with that and terminates the connection. What this means is that if you upload a small file then a big file the request will fail whereas a big file followed by a small file _might_ succeed.
  • Loading branch information
Alan Shaw authored May 26, 2021
1 parent a375aa1 commit 815c6d1
Showing 1 changed file with 5 additions and 45 deletions.
50 changes: 5 additions & 45 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export class Cluster {
url.searchParams.set('cid-version', 1)
url.searchParams.set('raw-leaves', true)
url.searchParams.set('wrap-with-directory', true)
url.searchParams.set('stream-channels', false)
setPinOptions(options, url.searchParams)

const headers = this.options.headers
Expand All @@ -79,12 +80,11 @@ export class Cluster {
throw Object.assign(new Error(`${response.status}: ${response.statusText}`), { response })
}

const result = []
for await (const item of parseLines(response.body, JSON.parse)) {
result.push({ ...item, cid: item.cid['/'] })
const results = await response.json()
for (const f of results) {
f.cid = f.cid['/']
}

return result
return results
}

/**
Expand Down Expand Up @@ -283,43 +283,3 @@ function toPinResponse (data) {
reference: data.reference
}
}

const BR = /\r?\n/

/**
* Takes a stream and returns async iterable of non whitespace lines parsed via
* provided parser.
*
* @template T
* @param {ReadableStream<Uint8Array>} source
* @param {(line:string) => T}
* @returns {AsyncIterable<T>}
*/
const parseLines = async function * (source, parseLine) {
const decoder = new TextDecoder()
const reader = source.getReader()
let buffer = ''

try {
while (true) {
const chunk = await reader.read()
if (chunk.done) {
if (buffer.trim().length > 0) {
yield parseLine(buffer)
}
break
} else {
buffer += decoder.decode(chunk.value, { stream: true })
const lines = buffer.split(BR)
buffer = lines.pop()
for (const line of lines) {
if (line.trim().length > 0) {
yield parseLine(line)
}
}
}
}
} finally {
reader.releaseLock()
}
}

0 comments on commit 815c6d1

Please sign in to comment.