Skip to content

Commit

Permalink
Add a history stream for apply ticks / views updates
Browse files Browse the repository at this point in the history
`base.system.createApplyTickHistoryStream()` emits once per `apply` tick
with an object of the form:

```
{
  index: Number, // the index of the 'info' block in the `system.db`
  views: [{ key, appends }], // A list of changed views with the number of blocks appended
  writer: { key, length } // Which writer appended the blocks and what its new length is
}
```

This can be used for debugging how the apply function changed views over
the history of the autobase system.
  • Loading branch information
lejeunerenard committed Sep 16, 2024
1 parent e34377a commit 76e7541
Show file tree
Hide file tree
Showing 3 changed files with 424 additions and 0 deletions.
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,39 @@ Generate a local core to be used for an Autobase.

Get user data associated with the autobase core.

### `SystemView`

The view of the autobase system.

#### `system.createApplyTickHistoryStream(opts)`

Returns a stream of `apply` 'ticks' history for debugging how apply changed
the view(s) and what writer wrote the batch of blocks processed. Each update has
the following structure:

```js
{
index: Number, // the index of the 'info' block in the `system.db`
views: [{ key, appends }], // A list of changed views with the number of blocks appended
writer: { key, length } // Which writer appended the blocks and what its new length is
}
```

Supported options are the same as `hyperbee`'s
[`.createHistoryStream()`](https://github.com/holepunchto/hyperbee?tab=readme-ov-file#const-stream--dbcreatehistorystreamoptions)
with two exceptions being `reverse` & `encoding`, so:

```js
{
live: false, // If true the stream will wait for new data and never end
gte: seq, // Start with this seq (inclusive)
gt: seq, // Start after this index
lte: seq, // Stop after this index
lt: seq, // Stop before this index
limit: -1 // Set to the max number of entries you want
}
```

### `AutoStore`

Each autobase creates a `AutoStore` which is used to create views. The store is passed to the `open` function.
Expand Down
50 changes: 50 additions & 0 deletions lib/system.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const SubEncoder = require('sub-encoder')
const ReadyResource = require('ready-resource')
const b4a = require('b4a')
const c = require('compact-encoding')
const { pipeline, Transform } = require('streamx')

const { Info, Member } = require('./messages')

Expand Down Expand Up @@ -402,6 +403,55 @@ module.exports = class SystemView extends ReadyResource {
await co.close()
}
}

createApplyTickHistoryStream (opts = {}) {
const lastSeenViews = []
let writerSinceInfoBlock = null

const infoKey = b4a.from('info')

return pipeline(
this.db.createHistoryStream({
...opts,
reverse: false, // force to run forward
encoding: { key: DIGEST }
}),
new Transform({
transform (node, cb) {
if (b4a.equals(node.key, infoKey)) {
const value = c.decode(Info, node.value)

// Get view changes
const views = []
for (let i = 0; i < value.views.length; i++) {
const view = value.views[i]
const lastLength = lastSeenViews[i] || 0
if (view.length !== lastLength) {
views.push({
key: view.key,
appends: view.length - lastLength
})
}

lastSeenViews[i] = view.length
}

const event = { index: node.seq, views, writer: writerSinceInfoBlock }
writerSinceInfoBlock = null

return cb(null, event)
}

const value = c.decode(Member, node.value)
const writerLength = value.length
if (writerLength <= 0) return cb(null)

writerSinceInfoBlock = { key: node.key, length: value.length }
return cb(null)
}
})
)
}
}

function hasDependency (node, dep) {
Expand Down
Loading

0 comments on commit 76e7541

Please sign in to comment.