Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: blobStore.setDownloadFilter() & createEntriesReadableStream() #940

Merged
merged 29 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
50c6a6b
WIP: blobStore.entriesStream()
gmaclennan Oct 23, 2024
30a552e
WIP: not quite working yet
gmaclennan Oct 24, 2024
04da19f
WIP: IterableWeakMap for referencing live external objects
gmaclennan Oct 25, 2024
8e4742c
WIP: cleanup, reduce, fix
gmaclennan Oct 28, 2024
9f12572
cleanup one more thing
gmaclennan Oct 28, 2024
7e6fa62
WIP more cleanup
gmaclennan Oct 28, 2024
1222673
cleanup and remove unused code
gmaclennan Oct 28, 2024
6fd542b
other small fixes
gmaclennan Oct 28, 2024
edd8d15
cleanup package-lock
gmaclennan Oct 28, 2024
cb4d1c8
cleanup error handling
gmaclennan Oct 29, 2024
35f297d
fix tests, fix bugs
gmaclennan Oct 29, 2024
81c9253
fix method name change
gmaclennan Oct 29, 2024
b847084
add blobStore.close()
gmaclennan Oct 29, 2024
b154b44
Merge branch 'main' into chore/entries-stream
gmaclennan Oct 29, 2024
d26ef47
setDownloadFilter based on archive device setting
gmaclennan Oct 29, 2024
9d936a2
don't auto-download blobs in core-manager
gmaclennan Oct 29, 2024
074710a
Merge branch 'main' into chore/entries-stream
EvanHahn Oct 30, 2024
cf1cd2b
Merge branch 'main' into chore/entries-stream
EvanHahn Nov 4, 2024
1d2489c
Remove unnecessary comment
EvanHahn Nov 4, 2024
98c9dd3
Run npm install
EvanHahn Nov 4, 2024
6496672
Use more accurate types for Hyperbee options
EvanHahn Nov 4, 2024
a79a0cc
`getBySeq` is not defined in our version of Hyperbee
EvanHahn Nov 4, 2024
f0ab997
Add a type for error
EvanHahn Nov 4, 2024
13a4314
Add another throw if aborted
EvanHahn Nov 5, 2024
dcff18b
Clean up AddDriveIds
EvanHahn Nov 5, 2024
3625631
Invert "does this entry match the filter" logic (#947)
EvanHahn Oct 30, 2024
01b045f
Clean up a comment
EvanHahn Nov 5, 2024
d6feaee
Merge branch 'main' into chore/entries-stream
EvanHahn Nov 5, 2024
d374b20
E2E tests for sparse blob downloads
EvanHahn Nov 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
"@mapeo/crypto": "1.0.0-alpha.10",
"@mapeo/sqlite-indexer": "1.0.0-alpha.9",
"@sinclair/typebox": "^0.29.6",
"@sindresorhus/merge-streams": "^4.0.0",
"b4a": "^1.6.3",
"bcp-47": "^2.1.0",
"better-sqlite3": "^8.7.0",
Expand Down Expand Up @@ -199,6 +200,7 @@
"tiny-typed-emitter": "^2.1.0",
"type-fest": "^4.5.0",
"undici": "^6.13.0",
"unix-path-resolve": "^1.0.2",
"varint": "^6.0.0",
"yauzl-promise": "^4.0.0"
}
Expand Down
131 changes: 131 additions & 0 deletions src/blob-store/downloader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { createEntriesStream } from './entries-stream.js'
import { pathPrefixesFromFilter } from './utils.js'

/** @import Hyperdrive from 'hyperdrive' */

/**
* Like hyperdrive.download() but 'live', and for multiple drives.
*
* Will emit an 'error' event for any unexpected errors. A consumer must attach
* an error listener to avoid uncaught errors. Sources of errors include:
*
* - If the entries stream emits an error
* - If a drive referenced in an entry is not found
* - If core.has() throws (e.g. if hypercore is closed)
* - If core.download().done() throws, which should not happen according to
* current hypercore code.
* - If the entries stream ends unexpectedly (it should be live and not end)
*
* NB: unlike hyperdrive.download(), this will also download deleted and
* previous versions of blobs - we don't currently support editing or deleting
* of blobs, so this should not be an issue, and if we do in the future,
* downloading deleted and previous versions may be desirable behavior anyway
*
* @extends {TypedEmitter<{ error: (error: Error) => void }>}
*/
export class Downloader extends TypedEmitter {
/** @type {import('./index.js').THyperdriveIndex} */
#driveIndex
/** @type {Set<{ done(): Promise<void>, destroy(): void }>} */
#queuedDownloads = new Set()
#entriesStream
#processEntriesPromise
#ac = new AbortController()
#pathPrefixes

/**
* @param {import('./index.js').THyperdriveIndex} driveIndex
* @param {object} [options]
* @param {import('../types.js').BlobFilter | null} [options.filter] Filter blobs of specific types and/or sizes to download
*/
constructor(driveIndex, { filter } = {}) {
super()
this.#pathPrefixes = filter ? pathPrefixesFromFilter(filter) : []
this.#driveIndex = driveIndex

this.#entriesStream = createEntriesStream(driveIndex, { live: true })
this.#entriesStream.once('error', this.#handleError)

this.#ac.signal.addEventListener('abort', this.#handleAbort, { once: true })

this.#processEntriesPromise = this.#processEntries()
this.#processEntriesPromise.catch(this.#handleError)
}

/**
* Start processing entries from the entries stream - if an entry matches the
* filter, and we don't already have it, queue it for download. If the
* Downloader is live, this method will never resolve, otherwise it will
* resolve when all the entries have been processed and downloaded.
*/
async #processEntries() {
for await (const entry of this.#entriesStream) {
this.#ac.signal.throwIfAborted()
const {
driveId,
key: filePath,
value: { blob },
} = entry
if (!this.#shouldDownloadFile(filePath)) continue
const drive = this.#driveIndex.get(driveId)
// ERROR HANDLING: this is unexpected and should not happen
if (!drive) throw new Error('Drive not found: ' + driveId)
const blobs = await drive.getBlobs()
this.#ac.signal.throwIfAborted()
await this.#processEntry(blobs.core, blob)
}
EvanHahn marked this conversation as resolved.
Show resolved Hide resolved
throw new Error('Entries stream ended unexpectedly')
}

/** @param {string} filePath */
#shouldDownloadFile(filePath) {
if (!this.#pathPrefixes.length) return true
return this.#pathPrefixes.some((prefix) => filePath.startsWith(prefix))
}

/**
* Update state and queue missing entries for download
*
* @param {import('hypercore')} blobsCore
* @param {{ blockOffset: number, blockLength: number, byteLength: number }} blob
*/
async #processEntry(blobsCore, { blockOffset: start, blockLength: length }) {
const end = start + length
const have = await blobsCore.has(start, end)
this.#ac.signal.throwIfAborted()
if (have) return
const download = blobsCore.download({ start, end })
this.#queuedDownloads.add(download)
download
.done()
// According to the code, this should never throw.
.catch(this.#handleError)
.finally(() => {
this.#queuedDownloads.delete(download)
})
}

/**
* Cancel the downloads and clean up resources.
*/
destroy() {
this.#ac.abort()
}

/** @param {any} error */
EvanHahn marked this conversation as resolved.
Show resolved Hide resolved
#handleError = (error) => {
if (this.#ac.signal.aborted) return
this.emit('error', error)
this.#ac.abort(error)
}

#handleAbort = () => {
for (const download of this.#queuedDownloads) download.destroy()
this.#ac.signal.removeEventListener('abort', this.#handleAbort)
this.#entriesStream.removeListener('error', this.#ac.abort)
// queuedDownloads should always be empty by here anyway, but just in case.
EvanHahn marked this conversation as resolved.
Show resolved Hide resolved
this.#queuedDownloads.clear()
this.#entriesStream.destroy()
}
}
75 changes: 75 additions & 0 deletions src/blob-store/entries-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import SubEncoder from 'sub-encoder'
import mergeStreams from '@sindresorhus/merge-streams'
import { Transform, pipeline } from 'node:stream'
import { noop } from '../utils.js'

/** @import Hyperdrive from 'hyperdrive' */
/** @import { BlobStoreEntriesStream } from '../types.js' */

const keyEncoding = new SubEncoder('files', 'utf-8')

/**
*
* @param {import('./index.js').THyperdriveIndex} driveIndex
* @param {object} opts
* @param {boolean} [opts.live=false]
* @returns {BlobStoreEntriesStream}
*/
export function createEntriesStream(driveIndex, { live = false } = {}) {
const mergedEntriesStreams = mergeStreams(
[...driveIndex].map((drive) => getHistoryStream(drive.db, { live }))
)
driveIndex.on('add-drive', addDrive)
// Close is always emitted, so we can use it to remove the listener
mergedEntriesStreams.once('close', () =>
driveIndex.off('add-drive', addDrive)
)
return mergedEntriesStreams

/** @param {Hyperdrive} drive */
function addDrive(drive) {
mergedEntriesStreams.add(getHistoryStream(drive.db, { live }))
}
}

/**
*
* @param {import('hyperbee')} bee
* @param {object} opts
* @param {boolean} opts.live
*/
function getHistoryStream(bee, { live }) {
// This will also include old versions of files, but it is the only way to
// get a live stream from a Hyperbee, however we currently do not support
// edits of blobs, so this should not be an issue, and the consequence is
// that old versions are downloaded too, which is acceptable.
const historyStream = bee.createHistoryStream({
live,
// `keyEncoding` is necessary because hyperdrive stores file index data
// under the `files` sub-encoding key
keyEncoding,
})
return pipeline(historyStream, new AddDriveIds(bee.core), noop)
}

class AddDriveIds extends Transform {
#core
#discoveryKey

/** @param {import('hypercore')} core */
constructor(core) {
super({ objectMode: true })
this.#core = core
this.#discoveryKey = core.discoveryKey?.toString('hex')
}

/** @type {Transform['_transform']} */
_transform(entry, _, callback) {
// Minimal performance optimization to only call toString() once.
// core.discoveryKey will always be defined by the time it starts
// streaming, but could be null when the instance is first created.
const driveId =
this.#discoveryKey || this.#core.discoveryKey?.toString('hex')
callback(null, { ...entry, driveId })
}
}
EvanHahn marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading