diff --git a/Cargo.toml b/Cargo.toml index d0a21a9..2fe9687 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,9 @@ vec_map = "0.8.2" smallvec = "1.8.0" arbitrary = { version = "1.2.3", features = ["derive"] } ethereum-types = { version = "0.14.1", features = ["arbitrary"] } +tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread"] } +futures = "0.3.29" +async-recursion = "1.0.0" [dev-dependencies] ssz_types = "0.5.0" diff --git a/benches/tree_hash_root.rs b/benches/tree_hash_root.rs index 940f249..b06a0a8 100644 --- a/benches/tree_hash_root.rs +++ b/benches/tree_hash_root.rs @@ -1,6 +1,7 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use milhouse::{List, Vector}; use ssz_types::VariableList; +use tokio::runtime::Builder; use tree_hash::TreeHash; type C = typenum::U1099511627776; @@ -21,6 +22,23 @@ pub fn tree_hash_root(c: &mut Criterion) { }, ); + // Tweaking the intervals doesn't really make much difference (a few percent). + let rt = Builder::new_multi_thread() + .global_queue_interval(1_000_000) + .event_interval(1_000_000) + .build() + .unwrap(); + c.bench_with_input( + BenchmarkId::new("async_tree_hash_root_list", size), + &(&rt, size), + |b, &(rt, size)| { + b.iter(|| { + let l1 = List::::try_from_iter(0..size).unwrap(); + rt.block_on(l1.async_tree_hash_root()) + }); + }, + ); + c.bench_with_input( BenchmarkId::new("tree_hash_root_vector", size), &size, diff --git a/src/leaf.rs b/src/leaf.rs index bd64bfe..88cb784 100644 --- a/src/leaf.rs +++ b/src/leaf.rs @@ -4,7 +4,7 @@ use crate::{ }; use arbitrary::Arbitrary; use derivative::Derivative; -use parking_lot::RwLock; +use tokio::sync::RwLock; use tree_hash::Hash256; #[derive(Debug, Derivative, Arbitrary)] @@ -17,18 +17,6 @@ pub struct Leaf { pub value: Arc, } -impl Clone for Leaf -where - T: Clone, -{ - fn clone(&self) -> Self { - Self { - hash: RwLock::new(*self.hash.read()), - value: self.value.clone(), - } - } -} - impl Leaf { pub fn new(value: T) -> Self { Self::with_hash(value, Hash256::zero()) diff --git a/src/list.rs b/src/list.rs index 1d449a0..747903c 100644 --- a/src/list.rs +++ b/src/list.rs @@ -271,7 +271,7 @@ impl Default for List { } } -impl TreeHash for List { +impl TreeHash for List { fn tree_hash_type() -> tree_hash::TreeHashType { tree_hash::TreeHashType::List } @@ -293,6 +293,16 @@ impl TreeHash for List { } } +impl List { + pub async fn async_tree_hash_root(&self) -> Hash256 { + // FIXME(sproul): remove assert + assert!(!self.interface.has_pending_updates()); + + let root = self.interface.backing.tree.async_tree_hash().await; + tree_hash::mix_in_length(&root, self.len()) + } +} + impl<'a, T: Value, N: Unsigned, U: UpdateMap> IntoIterator for &'a List { type Item = &'a T; type IntoIter = InterfaceIter<'a, T, U>; diff --git a/src/packed_leaf.rs b/src/packed_leaf.rs index b0f3770..43a0cb9 100644 --- a/src/packed_leaf.rs +++ b/src/packed_leaf.rs @@ -1,8 +1,8 @@ use crate::{utils::arb_rwlock, Error, UpdateMap}; use arbitrary::Arbitrary; use derivative::Derivative; -use parking_lot::RwLock; use std::ops::ControlFlow; +use tokio::sync::RwLock; use tree_hash::{Hash256, TreeHash, BYTES_PER_CHUNK}; #[derive(Debug, Derivative, Arbitrary)] @@ -14,21 +14,30 @@ pub struct PackedLeaf { pub(crate) values: Vec, } -impl Clone for PackedLeaf -where - T: TreeHash + Clone, -{ - fn clone(&self) -> Self { - Self { - hash: RwLock::new(*self.hash.read()), - values: self.values.clone(), +impl PackedLeaf { + pub fn tree_hash(&self) -> Hash256 { + let read_lock = self.hash.blocking_read(); + let mut hash = *read_lock; + drop(read_lock); + + if !hash.is_zero() { + return hash; } + + let hash_bytes = hash.as_bytes_mut(); + + let value_len = BYTES_PER_CHUNK / T::tree_hash_packing_factor(); + for (i, value) in self.values.iter().enumerate() { + hash_bytes[i * value_len..(i + 1) * value_len] + .copy_from_slice(&value.tree_hash_packed_encoding()); + } + + *self.hash.blocking_write() = hash; + hash } -} -impl PackedLeaf { - pub fn tree_hash(&self) -> Hash256 { - let read_lock = self.hash.read(); + pub async fn async_tree_hash(&self) -> Hash256 { + let read_lock = self.hash.read().await; let mut hash = *read_lock; drop(read_lock); @@ -44,7 +53,7 @@ impl PackedLeaf { .copy_from_slice(&value.tree_hash_packed_encoding()); } - *self.hash.write() = hash; + *self.hash.write().await = hash; hash } diff --git a/src/tests/proptest/operations.rs b/src/tests/proptest/operations.rs index f882ddd..1779030 100644 --- a/src/tests/proptest/operations.rs +++ b/src/tests/proptest/operations.rs @@ -157,7 +157,7 @@ where fn apply_ops_list(list: &mut List, spec: &mut Spec, ops: Vec>) where - T: Value + Debug + Send + Sync, + T: Value + Debug + Send + Sync + 'static, N: Unsigned + Debug, { let mut checkpoint = list.clone(); @@ -235,7 +235,7 @@ where fn apply_ops_vect(vect: &mut Vector, spec: &mut Spec, ops: Vec>) where - T: Value + Debug + Send + Sync, + T: Value + Debug + Send + Sync + 'static, N: Unsigned + Debug, { let mut checkpoint = vect.clone(); diff --git a/src/tests/repeat.rs b/src/tests/repeat.rs index 552041f..07cbc06 100644 --- a/src/tests/repeat.rs +++ b/src/tests/repeat.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use tree_hash::TreeHash; use typenum::{Unsigned, U1024, U64, U8}; -fn list_test(val: T) { +fn list_test(val: T) { for n in 96..=N::to_usize() { let fast = List::::repeat(val.clone(), n).unwrap(); let slow = List::::repeat_slow(val.clone(), n).unwrap(); diff --git a/src/tests/size_of.rs b/src/tests/size_of.rs index 00e0b9d..9c0f60f 100644 --- a/src/tests/size_of.rs +++ b/src/tests/size_of.rs @@ -1,6 +1,6 @@ use crate::{Arc, Leaf, PackedLeaf, Tree}; -use parking_lot::RwLock; use std::mem::size_of; +use tokio::sync::RwLock; use tree_hash::Hash256; /// It's important that the Tree nodes have a predictable size. diff --git a/src/tree.rs b/src/tree.rs index 77bb09e..27fa04c 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -1,11 +1,12 @@ use crate::utils::{arb_arc, arb_rwlock, opt_hash, opt_packing_depth, opt_packing_factor, Length}; use crate::{Arc, Error, Leaf, PackedLeaf, UpdateMap, Value}; use arbitrary::Arbitrary; +use async_recursion::async_recursion; use derivative::Derivative; use ethereum_hashing::{hash32_concat, ZERO_HASHES}; -use parking_lot::RwLock; use std::collections::BTreeMap; use std::ops::ControlFlow; +use tokio::{runtime::Handle, sync::RwLock}; use tree_hash::Hash256; #[derive(Debug, Derivative, Arbitrary)] @@ -25,21 +26,6 @@ pub enum Tree { Zero(usize), } -impl Clone for Tree { - fn clone(&self) -> Self { - match self { - Self::Node { hash, left, right } => Self::Node { - hash: RwLock::new(*hash.read()), - left: left.clone(), - right: right.clone(), - }, - Self::Leaf(leaf) => Self::Leaf(leaf.clone()), - Self::PackedLeaf(leaf) => Self::PackedLeaf(leaf.clone()), - Self::Zero(depth) => Self::Zero(*depth), - } - } -} - impl Tree { pub fn empty(depth: usize) -> Arc { Self::zero(depth) @@ -287,8 +273,8 @@ impl Tree { ) if full_depth > 0 => { use RebaseAction::*; - let orig_hash = *orig_hash_lock.read(); - let base_hash = *base_hash_lock.read(); + let orig_hash = *orig_hash_lock.blocking_read(); + let base_hash = *base_hash_lock.blocking_read(); // If hashes *and* lengths are equal then we can short-cut the recursion // and immediately replace `orig` by the `base` node. If `lengths` are `None` @@ -391,12 +377,12 @@ impl Tree { } } -impl Tree { +impl Tree { pub fn tree_hash(&self) -> Hash256 { match self { Self::Leaf(Leaf { hash, value }) => { // FIXME(sproul): upgradeable RwLock? - let read_lock = hash.read(); + let read_lock = hash.blocking_read(); let existing_hash = *read_lock; drop(read_lock); @@ -411,14 +397,14 @@ impl Tree { existing_hash } else { let tree_hash = value.tree_hash_root(); - *hash.write() = tree_hash; + *hash.blocking_write() = tree_hash; tree_hash } } Self::PackedLeaf(leaf) => leaf.tree_hash(), Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]), Self::Node { hash, left, right } => { - let read_lock = hash.read(); + let read_lock = hash.blocking_read(); let existing_hash = *read_lock; drop(read_lock); @@ -430,7 +416,91 @@ impl Tree { rayon::join(|| left.tree_hash(), || right.tree_hash()); let tree_hash = Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes())); - *hash.write() = tree_hash; + *hash.blocking_write() = tree_hash; + tree_hash + } + } + } + } + + #[async_recursion] + pub async fn async_tree_hash(&self) -> Hash256 { + match self { + Self::Leaf(Leaf { hash, value }) => { + // FIXME(sproul): upgradeable RwLock? + let read_lock = hash.read().await; + let existing_hash = *read_lock; + drop(read_lock); + + // NOTE: We re-compute the hash whenever it is non-zero. Computed hashes may + // legitimately be zero, but this only occurs at the leaf level when the value is + // entirely zeroes (e.g. [0u64, 0, 0, 0]). In order to avoid storing an + // `Option` we choose to re-compute the hash in this case. In practice + // this is unlikely to provide any performance penalty except at very small list + // lengths (<= 32), because a node higher in the tree will cache a non-zero hash + // preventing its children from being visited more than once. + if !existing_hash.is_zero() { + existing_hash + } else { + let tree_hash = value.tree_hash_root(); + *hash.write().await = tree_hash; + tree_hash + } + } + Self::PackedLeaf(leaf) => leaf.async_tree_hash().await, + Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]), + Self::Node { hash, left, right } => { + const MAX_QUEUE_DEPTH: usize = 4; + + let read_lock = hash.read().await; + let existing_hash = *read_lock; + drop(read_lock); + + if !existing_hash.is_zero() { + existing_hash + } else { + // Parallelism goes brrrr. + let rt_metrics = Handle::current().metrics(); + let num_workers = rt_metrics.num_workers(); + let max_queue_depth = (0..num_workers) + .map(|i| rt_metrics.worker_local_queue_depth(i)) + .max() + .unwrap(); + + let (left_res, right_res) = futures::future::join( + async { + if max_queue_depth >= MAX_QUEUE_DEPTH { + // Runtime is busy, use the current thread. + Ok(left.async_tree_hash().await) + } else { + // Runtime has some spare capacity, use new task. + let left_clone = left.clone(); + tokio::task::spawn( + async move { left_clone.async_tree_hash().await }, + ) + .await + } + }, + async { + if max_queue_depth >= MAX_QUEUE_DEPTH { + // Runtime is busy, use the current thread. + Ok(right.async_tree_hash().await) + } else { + // Runtime has some spare capacity, use new task. + let right_clone = right.clone(); + tokio::task::spawn( + async move { right_clone.async_tree_hash().await }, + ) + .await + } + }, + ) + .await; + let left_hash = left_res.unwrap(); + let right_hash = right_res.unwrap(); + let tree_hash = + Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes())); + *hash.write().await = tree_hash; tree_hash } } diff --git a/src/utils.rs b/src/utils.rs index e0aa78a..2b34041 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,7 @@ use crate::{Arc, UpdateMap}; use arbitrary::Arbitrary; -use parking_lot::RwLock; use std::collections::BTreeMap; +use tokio::sync::RwLock; use tree_hash::{Hash256, TreeHash, TreeHashType}; /// Length type, to avoid confusion with depth and other `usize` parameters. diff --git a/src/vector.rs b/src/vector.rs index 7316bff..5263d06 100644 --- a/src/vector.rs +++ b/src/vector.rs @@ -245,7 +245,7 @@ impl Default for Vector { } } -impl tree_hash::TreeHash for Vector { +impl tree_hash::TreeHash for Vector { fn tree_hash_type() -> tree_hash::TreeHashType { tree_hash::TreeHashType::Vector }