Skip to content

Commit

Permalink
Merge pull request #23 from cowlicks/on-disk-comp
Browse files Browse the repository at this point in the history
Make on-disk storage identical to JavaScript Hyperbee
  • Loading branch information
cowlicks authored Mar 30, 2024
2 parents 4fcce30 + ba1f77f commit e91369f
Show file tree
Hide file tree
Showing 19 changed files with 908 additions and 510 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- `Debug` implementation for `Children`, `Child`, `Node` and `BlockEntry` changed to be more readable.
- On-disk binary output is now identical to JavaScript Hyperbee.
- `Hyperbee` no longer takes a generic parameter with `CoreMem` bound.

### Removed
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ name = "uniffi-bindgen"
path = "uniffi-bindgen.rs"
required-features = ["ffi"]

[features]
# TODO rm "ffi" from default when we split project into workspaces. So we can avoid using uniffi unless it is needed
[features]
default = ["ffi"]
clib = ["tokio/rt-multi-thread", "dep:libc"]
ffi = ["clib", "dep:uniffi"]
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ Distributable python packages are still a work-in-progress. Currently only Pytho
- [x] support `gt`, `lt`, etc bounds for key streaming
- [x] accept compare-and-swap for `put` and `del`.
- [x] support prefixed key operations like JS's [`sub`](https://docs.pears.com/building-blocks/hyperbee#db.sub)
- [ ] one-to-one binary output
- [x] support prefixed key operations like JS's [`sub`](https://docs.holepunch.to/building-blocks/hyperbee#const-sub-db.sub-sub-prefix-options)
- [x] one-to-one binary output [#23](https://github.com/cowlicks/hyperbee/pull/23/)

## Future work

Expand Down
133 changes: 116 additions & 17 deletions src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use derive_builder::Builder;
use hypercore::{AppendOutcome, Hypercore, Info};
use prost::Message;
use prost::{bytes::Buf, DecodeError, Message};
use random_access_storage::RandomAccess;
use tokio::sync::{Mutex, RwLock};
use tracing::trace;

use crate::{
changes::Changes,
messages::{Node as NodeSchema, YoloIndex},
BlockEntry, HyperbeeError, Shared,
wchildren, Child, HyperbeeError, KeyValue, Node, Shared, SharedNode,
};

#[derive(Builder, Debug)]
Expand Down Expand Up @@ -99,31 +99,23 @@ impl Blocks {

#[tracing::instrument(skip(self, changes))]
/// Commit [`Changes`](crate::changes::Changes) to the Hypercore
// TODO create a BlockEntry from changes and add it to self.cache
pub async fn add_changes(&self, changes: Changes) -> Result<AppendOutcome, HyperbeeError> {
let Changes {
key,
value,
nodes,
root,
seq,
..
} = changes;
trace!("Adding changes with # nodes [{}]", nodes.len());
let reordered_nodes = reorder_nodes(seq, &nodes).await;

trace!("adding changes with n_nodes = {}", nodes.len());
let mut new_nodes = vec![];
// encode nodes
new_nodes.push(
root.expect("Root *should* always be added in the put/del logic")
.read()
.await
.to_level()
.await,
);
for node in nodes.into_iter() {
new_nodes.push(node.read().await.to_level().await);
let mut levels = vec![];
for node in reordered_nodes {
levels.push(node.read().await.to_level().await)
}

let index = YoloIndex { levels: new_nodes };
let index = YoloIndex { levels };

let mut index_buf = Vec::with_capacity(index.encoded_len());
YoloIndex::encode(&index, &mut index_buf).map_err(HyperbeeError::YoloIndexEncodingError)?;
Expand All @@ -140,3 +132,110 @@ impl Blocks {
self.append(&node_schema_buf).await
}
}

/// Gets the references to the children for the provided `seq` from the given `node`.
/// NB: we `.rev()` this because we're inserting these into a stack, and want children poped from
/// the stack to be in order.
async fn take_children_with_seq(node: &SharedNode, seq: u64) -> Vec<(SharedNode, usize)> {
node.read()
.await
.children
.children
.read()
.await
.iter()
.enumerate()
.filter(|(_i, c)| c.seq == seq)
.rev()
.map(|(i, _)| (node.clone(), i))
.collect()
}

/// To get the same on-disk binary data as JavaScript Hyperbee, we reorder the nodes.
/// via a depth-first search.
/// https://github.com/holepunchto/hyperbee/blob/e1b398f5afef707b73e62f575f2b166bcef1fa34/index.js#L237-L249
async fn reorder_nodes(seq: u64, nodes: &[SharedNode]) -> Vec<SharedNode> {
let root = &nodes[nodes.len() - 1];
let mut child_stack = vec![];
let mut out = vec![root.clone()];

child_stack.append(&mut take_children_with_seq(root, seq).await);

while let Some((node, child_index)) = child_stack.pop() {
// Get the next child, update it's offset

// The get the childs old offset, so we can get the node it points to
let old_offset = node.read().await.children.children.read().await[child_index].offset;
// The child's node is pushed into `out` so it's offset will be `out.len()`
wchildren!(node)[child_index].offset = out.len() as u64;

let childs_node = nodes[old_offset as usize].clone();
// Push the child's node into the output
out.push(childs_node.clone());
// stage the child's nodes children to be reordered
child_stack.append(&mut take_children_with_seq(&childs_node, seq).await);
}
out
}

/// Deserialize bytes from a Hypercore block into [`Node`]s.
fn make_node_vec<B: Buf>(buf: B, blocks: Shared<Blocks>) -> Result<Vec<SharedNode>, DecodeError> {
Ok(YoloIndex::decode(buf)?
.levels
.iter()
.map(|level| {
let keys = level.keys.iter().map(|k| KeyValue::new(*k)).collect();
let mut children = vec![];
for i in (0..(level.children.len())).step_by(2) {
children.push(Child::new(level.children[i], level.children[i + 1]));
}
Arc::new(RwLock::new(Node::new(keys, children, blocks.clone())))
})
.collect())
}

/// A "block" from a [`Hypercore`](hypercore::Hypercore) deserialized into the form used in
/// Hyperbee
pub(crate) struct BlockEntry {
/// Pointers::new(NodeSchema::new(hypercore.get(seq)).index))
nodes: Vec<SharedNode>,
/// NodeSchema::new(hypercore.get(seq)).key
pub key: Vec<u8>,
/// NodeSchema::new(hypercore.get(seq)).value
pub value: Option<Vec<u8>>,
}

impl BlockEntry {
fn new(entry: NodeSchema, blocks: Shared<Blocks>) -> Result<Self, HyperbeeError> {
Ok(BlockEntry {
nodes: make_node_vec(&entry.index[..], blocks)?,
key: entry.key,
value: entry.value,
})
}

/// Get a [`Node`] from this [`BlockEntry`] at the provided `offset`.
/// offset is the offset of the node within the hypercore block
pub fn get_tree_node(&self, offset: u64) -> Result<SharedNode, HyperbeeError> {
Ok(self
.nodes
.get(
usize::try_from(offset)
.map_err(|e| HyperbeeError::U64ToUsizeConversionError(offset, e))?,
)
.expect("offset *should* always point to a real node")
.clone())
}
}

impl Debug for BlockEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BlockEntry {{ ")?;
let mut nodes = vec![];
for node in self.nodes.iter() {
nodes.push(node.try_read().unwrap());
}
f.debug_list().entries(nodes).finish()?;
write!(f, "}}")
}
}
33 changes: 4 additions & 29 deletions src/changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ use crate::{Child, SharedNode};

#[derive(Debug, Default)]
/// Structure to store in-progress changes to the [`Hyperbee`]
/// NB: because of how hyperbee-js works, we need to distinguish between root/non-root nodes.
pub(crate) struct Changes {
seq: u64,
pub seq: u64,
pub key: Vec<u8>,
pub value: Option<Vec<u8>>,
pub nodes: Vec<SharedNode>,
pub root: Option<SharedNode>,
}

impl Changes {
Expand All @@ -18,41 +16,18 @@ impl Changes {
key: key.to_vec(),
value: value.map(<[u8]>::to_vec),
nodes: vec![],
root: None,
}
}

/// Add a node that's changed. Returns the's stored node's reference
#[tracing::instrument(skip(self, node))]
pub fn add_node(&mut self, node: SharedNode) -> Child {
self.nodes.push(node.clone());
let offset: u64 = self
.nodes
.len()
.try_into()
.expect("this would happen when sizeof(usize) < sizeof(u64), lkey on 32bit. And when the offset (which is on the order of the height of the tree) is greater than usize::MAX. Well that would be crazy. We should Probably have a check for usize >= u64 on startup... or something... TODO");
Child::new(self.seq, offset, Some(node))
}

/// Should only be used when [`Hyperbee::del`] causes a dangling root
pub fn overwrite_root(&mut self, root: SharedNode) -> Child {
self.root = Some(root.clone());
Child::new(self.seq, 0, Some(root))
}

/// Add changed root
pub fn add_root(&mut self, root: SharedNode) -> Child {
if self.root.is_some() {
panic!("We should never be replacing a root on a changes");
}
self.overwrite_root(root)
}

/// adds a changed node and handles when the node should be used as the root
pub fn add_changed_node(&mut self, path_len: usize, node: SharedNode) -> Child {
if path_len == 0 {
self.add_root(node)
} else {
self.add_node(node)
}
self.nodes.push(node.clone());
Child::new(self.seq, offset)
}
}
Loading

0 comments on commit e91369f

Please sign in to comment.