diff --git a/crates/cnidarium/src/lib.rs b/crates/cnidarium/src/lib.rs index bb9d415cf8..a967a60b10 100644 --- a/crates/cnidarium/src/lib.rs +++ b/crates/cnidarium/src/lib.rs @@ -70,6 +70,7 @@ mod store; mod tests; mod utils; mod write; +mod write_batch; #[cfg(feature = "metrics")] pub use crate::metrics::register_metrics; @@ -81,6 +82,7 @@ pub use read::StateRead; pub use snapshot::Snapshot; pub use storage::{Storage, TempStorage}; pub use write::StateWrite; +pub use write_batch::StagedWriteBatch; pub mod future; diff --git a/crates/cnidarium/src/snapshot.rs b/crates/cnidarium/src/snapshot.rs index 706215e41c..c59a652a12 100644 --- a/crates/cnidarium/src/snapshot.rs +++ b/crates/cnidarium/src/snapshot.rs @@ -124,14 +124,15 @@ impl Snapshot { } pub fn prefix_version(&self, prefix: &str) -> Result> { - let config = self + let Some(config) = self .0 .multistore_cache .config - .find_substore(prefix.as_bytes()); - if prefix != config.prefix { - anyhow::bail!("requested substore (prefix={prefix}) does not exist") - } + .find_substore(prefix.as_bytes()) + else { + anyhow::bail!("rquested a version for a prefix that does not exist (prefix={prefix})") + }; + Ok(self.substore_version(&config)) } @@ -145,18 +146,14 @@ impl Snapshot { let rocksdb_snapshot = self.0.snapshot.clone(); let db = self.0.db.clone(); - let config = self + let Some(config) = self .0 .multistore_cache .config - .find_substore(prefix.as_bytes()); - - // If a substore is not found, `find_substore` will default to the main store. - // However, we do not want to mislead the caller by returning a root hash - // that does not correspond to the queried prefix, so we error out instead. - if prefix != config.prefix { - anyhow::bail!("requested substore (prefix={prefix}) does not exist") - } + .find_substore(prefix.as_bytes()) + else { + anyhow::bail!("requested a root for a substore that does not exist (prefix={prefix})") + }; let version = self .substore_version(&config) diff --git a/crates/cnidarium/src/storage.rs b/crates/cnidarium/src/storage.rs index adf8708bb1..350ae8f86d 100644 --- a/crates/cnidarium/src/storage.rs +++ b/crates/cnidarium/src/storage.rs @@ -1,9 +1,9 @@ use std::{path::PathBuf, sync::Arc}; -// use tokio_stream::wrappers::WatchStream; use anyhow::{bail, ensure, Result}; use parking_lot::RwLock; use rocksdb::{Options, DB}; +use std::collections::HashMap; use tokio::sync::watch; use tracing::Span; @@ -15,7 +15,7 @@ use crate::{ substore::{SubstoreConfig, SubstoreSnapshot, SubstoreStorage}, }, }; -use crate::{snapshot_cache::SnapshotCache, StateDelta}; +use crate::{snapshot_cache::SnapshotCache, StagedWriteBatch, StateDelta}; mod temp; pub use temp::TempStorage; @@ -271,9 +271,9 @@ impl Storage { self.0.snapshots.read().get(version) } - /// Commits the provided [`StateDelta`] to persistent storage as the latest - /// version of the chain state. - pub async fn commit(&self, delta: StateDelta) -> Result { + /// Prepares a commit for the provided [`StateDelta`], returning a [`StagedWriteBatch`]. + /// The batch can be committed to the database using the [`Storage::commit_batch`] method. + pub async fn prepare_commit(&self, delta: StateDelta) -> Result { // Extract the snapshot and the changes from the state delta let (snapshot, changes) = delta.flatten(); let prev_snapshot_version = snapshot.version(); @@ -286,35 +286,28 @@ impl Storage { ensure!( prev_storage_version == prev_snapshot_version, - "trying to commit a delta forked from version {}, but the latest version is {}", + "trying to prepare a commit for a delta forked from version {}, but the latest version is {}", prev_snapshot_version, prev_storage_version ); - self.commit_inner(snapshot, changes, next_storage_version, false) + self.prepare_commit_inner(snapshot, changes, next_storage_version, false) .await } - /// Commits the supplied [`Cache`] to persistent storage. - /// - /// # Migrations - /// In the case of chain state migrations we need to commit the new state - /// without incrementing the version. If `perform_migration` is `true` the - /// snapshot will _not_ be written to the snapshot cache, and no subscribers - /// will be notified. Substore versions will not be updated. - async fn commit_inner( + async fn prepare_commit_inner( &self, snapshot: Snapshot, cache: Cache, version: jmt::Version, perform_migration: bool, - ) -> Result { - tracing::debug!(new_jmt_version = ?version, "committing state delta"); + ) -> Result { + tracing::debug!(new_jmt_version = ?version, "preparing to commit state delta"); // Save a copy of the changes to send to subscribers later. let changes = Arc::new(cache.clone_changes()); let mut changes_by_substore = cache.shard_by_prefix(&self.0.multistore_config); - let mut substore_roots = Vec::new(); + let mut substore_roots = HashMap::new(); let mut multistore_versions = multistore::MultistoreCache::from_config(self.0.multistore_config.clone()); @@ -366,16 +359,16 @@ impl Storage { continue; }; - let version = if perform_migration { + let new_version = if perform_migration { old_substore_version } else { old_substore_version.wrapping_add(1) }; - new_versions.push(version); + new_versions.push(new_version); let substore_snapshot = SubstoreSnapshot { config: config.clone(), rocksdb_snapshot: rocksdb_snapshot.clone(), - version, + version: new_version, db: db.clone(), }; @@ -383,7 +376,7 @@ impl Storage { // Commit the substore and collect its root hash let (root_hash, substore_batch) = substore_storage - .commit(changeset, write_batch, version, perform_migration) + .commit(changeset, write_batch, new_version, perform_migration) .await?; write_batch = substore_batch; @@ -393,7 +386,15 @@ impl Storage { ?version, "added substore to write batch" ); - substore_roots.push((config.clone(), root_hash, version)); + substore_roots.insert(config.clone(), (root_hash, new_version)); + + tracing::debug!( + ?root_hash, + prefix = ?config.prefix, + ?new_version, + "updating substore version" + ); + multistore_versions.set_version(config.clone(), new_version); } // Add substore roots to the main store changeset @@ -405,7 +406,7 @@ impl Storage { Cache::default() }); - for (config, root_hash, _) in substore_roots.iter() { + for (config, (root_hash, _)) in substore_roots.iter() { main_store_changes .unwritten_changes .insert(config.prefix.to_string(), Some(root_hash.0.to_vec())); @@ -432,6 +433,110 @@ impl Storage { "added main store to write batch" ); + tracing::debug!(?global_root_hash, version = ?version, "updating main store version"); + let main_store_config = self.0.multistore_config.main_store.clone(); + multistore_versions.set_version(main_store_config, version); + + Ok(StagedWriteBatch { + write_batch, + version, + multistore_versions, + root_hash: global_root_hash, + substore_roots, + perform_migration, + changes, + }) + } + + /// Commits the provided [`StateDelta`] to persistent storage as the latest + /// version of the chain state. + pub async fn commit(&self, delta: StateDelta) -> Result { + let batch = self.prepare_commit(delta).await?; + self.commit_batch(batch) + } + + /// Commits the supplied [`StagedWriteBatch`] to persistent storage. + /// + /// # Migrations + /// In the case of chain state migrations we need to commit the new state + /// without incrementing the version. If `perform_migration` is `true` the + /// snapshot will _not_ be written to the snapshot cache, and no subscribers + /// will be notified. Substore versions will not be updated. + pub fn commit_batch(&self, batch: StagedWriteBatch) -> Result { + let StagedWriteBatch { + write_batch, + version, + multistore_versions, + root_hash: global_root_hash, + substore_roots, + perform_migration, + changes, + } = batch; + + let db = self.0.db.clone(); + + // check that the version of the batch being committed is the correct next version + let old_version = self.latest_version(); + let expected_new_version = if perform_migration { + old_version + } else { + old_version.wrapping_add(1) + }; + + ensure!( + expected_new_version == version, + "new version mismatch: expected {} but got {}", + expected_new_version, + version + ); + + // also check that each of the substore versions are the correct next version + let snapshot = self.latest_snapshot(); + + // Warning: we MUST check version coherence for **every** substore. + // These checks are a second line of defense. They must consider + // the case when two deltas effect distinct substores. + // + // version: (m, ss_1, ss_2) + // D_0: (_, 1, 0) <- initial state + // D_1: (A, 1, 1) <- multiwrite to ss_1 AND ss_2 + // D_1*: (A, 1, 0) <- isolate write to ss_1 + // + // A comprehensive check lets us catch the stale write D_1* even if + // locally it does not directly effect the second substore at all. + // And even if the main version check passes (spuriously, or because of + // a migration). + for (substore_config, new_version) in &multistore_versions.substores { + if substore_config.prefix.is_empty() { + // this is the main store, ignore + continue; + } + + let old_substore_version = snapshot + .substore_version(&substore_config) + .expect("substores must be initialized at startup"); + + // if the substore exists in `substore_roots`, there have been updates to the substore. + // if `perform_migration` is false and there are updates, the next version should be previous + 1. + // otherwise, the version should remain the same. + let expected_substore_version = + if substore_roots.get(substore_config).is_some() && !perform_migration { + old_substore_version.wrapping_add(1) + } else { + old_substore_version + }; + + ensure!( + expected_substore_version == *new_version, + "substore new version mismatch for substore with prefix {}: expected {} but got {}", + substore_config.prefix, + expected_substore_version, + new_version + ); + } + + tracing::debug!(new_jmt_version = ?batch.version, "committing batch to db"); + db.write(write_batch).expect("can write to db"); tracing::debug!( ?global_root_hash, @@ -439,20 +544,6 @@ impl Storage { "committed main store and substores to db" ); - // Update the tracked versions for each substore. - for (config, root_hash, new_version) in substore_roots { - tracing::debug!( - ?root_hash, - prefix = ?config.prefix, - ?new_version, - "updating substore version" - ); - multistore_versions.set_version(config, new_version); - } - - tracing::debug!(?global_root_hash, ?version, "updating main store version"); - multistore_versions.set_version(main_store_config, version); - // If we're not performing a migration, we should update the snapshot cache if !perform_migration { tracing::debug!("updating snapshot cache"); @@ -487,8 +578,10 @@ impl Storage { pub async fn commit_in_place(&self, delta: StateDelta) -> Result { let (snapshot, changes) = delta.flatten(); let old_version = self.latest_version(); - self.commit_inner(snapshot, changes, old_version, true) - .await + let batch = self + .prepare_commit_inner(snapshot, changes, old_version, true) + .await?; + self.commit_batch(batch) } /// Returns the internal handle to RocksDB, this is useful to test adjacent storage crates. diff --git a/crates/cnidarium/src/store/multistore.rs b/crates/cnidarium/src/store/multistore.rs index 3fcb7d8604..c08f6a59e7 100644 --- a/crates/cnidarium/src/store/multistore.rs +++ b/crates/cnidarium/src/store/multistore.rs @@ -15,13 +15,16 @@ impl MultistoreConfig { } /// Returns the substore matching the key's prefix, return `None` otherwise. - pub fn find_substore(&self, key: &[u8]) -> Arc { + pub fn find_substore(&self, key: &[u8]) -> Option> { + if key.is_empty() { + return Some(self.main_store.clone()); + } + // Note: This is a linear search, but the number of substores is small. self.substores .iter() .find(|s| key.starts_with(s.prefix.as_bytes())) .cloned() - .unwrap_or(self.main_store.clone()) } /// Route a key to a substore, and return the truncated key and the corresponding `SubstoreConfig`. @@ -46,7 +49,12 @@ impl MultistoreConfig { /// `prefix_a/` -> `prefix_a/` in `main_store /// `nonexistent_prefix` -> `nonexistent_prefix` in `main_store` pub fn route_key_str<'a>(&self, key: &'a str) -> (&'a str, Arc) { - let config = self.find_substore(key.as_bytes()); + let config = self + .find_substore(key.as_bytes()) + .unwrap_or_else(|| self.main_store.clone()); + + // If the key is a total match, we want to return the key bound to the + // main store. This is where the root hash of the prefix tree is located. if key == config.prefix { return (key, self.main_store.clone()); } @@ -93,7 +101,9 @@ impl MultistoreConfig { /// `prefix_a/` -> `prefix_a/` in `main_store` /// `nonexistent_prefix` -> `nonexistent_prefix` in `main_store` pub fn route_key_bytes<'a>(&self, key: &'a [u8]) -> (&'a [u8], Arc) { - let config = self.find_substore(key); + let config = self + .find_substore(key) + .unwrap_or_else(|| self.main_store.clone()); // If the key is a total match for the prefix, we return the original key // routed to the main store. This is where subtree root hashes are stored. @@ -136,7 +146,9 @@ impl MultistoreConfig { /// `prefix_a/` -> "" in `substore_a` /// `nonexistent_prefix` -> "" in `main_store` pub fn match_prefix_str<'a>(&self, prefix: &'a str) -> (&'a str, Arc) { - let config = self.find_substore(prefix.as_bytes()); + let config = self + .find_substore(prefix.as_bytes()) + .unwrap_or_else(|| self.main_store.clone()); let truncated_prefix = prefix .strip_prefix(&config.prefix) @@ -162,7 +174,9 @@ impl MultistoreConfig { /// `prefix_a/` -> "" in `substore_a` /// `nonexistent_prefix` -> "" in `main_store` pub fn match_prefix_bytes<'a>(&self, prefix: &'a [u8]) -> (&'a [u8], Arc) { - let config = self.find_substore(prefix); + let config = self + .find_substore(prefix) + .unwrap_or_else(|| self.main_store.clone()); let truncated_prefix = prefix .strip_prefix(config.prefix.as_bytes()) diff --git a/crates/cnidarium/src/store/substore.rs b/crates/cnidarium/src/store/substore.rs index ad9eadfdf9..9fc6b68521 100644 --- a/crates/cnidarium/src/store/substore.rs +++ b/crates/cnidarium/src/store/substore.rs @@ -18,7 +18,7 @@ use jmt::storage::TreeWriter; /// Specifies the configuration of a substore, which is a prefixed subset of /// the main store with its own merkle tree, nonverifiable data, preimage index, etc. -#[derive(Debug, Eq, PartialEq, PartialOrd, Ord)] +#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)] pub struct SubstoreConfig { /// The prefix of the substore. If empty, it is the root-level store config. pub prefix: String, diff --git a/crates/cnidarium/src/write_batch.rs b/crates/cnidarium/src/write_batch.rs new file mode 100644 index 0000000000..a70f63cb24 --- /dev/null +++ b/crates/cnidarium/src/write_batch.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use std::collections::HashMap; + +use crate::{ + cache::Cache, + store::{multistore, substore::SubstoreConfig}, + RootHash, +}; + +/// A staged write batch that can be committed to RocksDB. +/// +/// This allows for write batches to be prepared and committed at a later time. +pub struct StagedWriteBatch { + /// The write batch to commit to RocksDB. + pub(crate) write_batch: rocksdb::WriteBatch, + /// The new version of the chain state. + pub(crate) version: jmt::Version, + /// The new versions of each substore. + pub(crate) multistore_versions: multistore::MultistoreCache, + /// The root hash of the chain state corresponding to this set of changes. + pub(crate) root_hash: RootHash, + /// The configs, root hashes, and new versions of each substore + /// that was updated in this batch. + pub(crate) substore_roots: HashMap, (RootHash, u64)>, + /// Whether or not to perform a migration. + pub(crate) perform_migration: bool, + /// A lightweight copy of the changeset, this is useful to provide + /// a stream of changes to subscribers. + pub(crate) changes: Arc, +} + +impl StagedWriteBatch { + /// Returns the new version of the chain state corresponding to this set of changes. + pub fn version(&self) -> jmt::Version { + self.version + } + + /// Returns the root hash of the jmt corresponding to this set of changes. + pub fn root_hash(&self) -> &RootHash { + &self.root_hash + } + + /// Returns the version of a substore in this batch, if it exists + /// and `None` otherwise. + pub fn substore_version(&self, prefix: &str) -> Option { + let Some(substore_config) = self + .multistore_versions + .config + .find_substore(prefix.as_bytes()) + else { + return None; + }; + + self.multistore_versions.get_version(&substore_config) + } +} diff --git a/crates/cnidarium/tests/write_batch.rs b/crates/cnidarium/tests/write_batch.rs new file mode 100644 index 0000000000..0717400fd0 --- /dev/null +++ b/crates/cnidarium/tests/write_batch.rs @@ -0,0 +1,302 @@ +use anyhow::Result; +use cnidarium::{StateDelta, StateWrite, Storage}; +use tempfile; +use tokio; + +#[tokio::test] +/// A simple test that checks that we cannot commit a stale batch to storage. +/// Strategy: +/// Create three state deltas, one that writes to every substore, and two others +/// that target specific substores or none at all. +pub async fn test_write_batch_stale_version_substores() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let tmpdir = tempfile::tempdir()?; + let db_path = tmpdir.into_path(); + let substore_prefixes = vec![ + "ibc".to_string(), + "dex".to_string(), + "misc".to_string(), + "cometbft-data".to_string(), + ]; + let storage = Storage::load(db_path.clone(), substore_prefixes.clone()).await?; + let initial_snapshot = storage.latest_snapshot(); + let initial_version = initial_snapshot.version(); + let initial_root_hash = initial_snapshot.root_hash().await?; + assert_eq!( + initial_version, + u64::MAX, + "initial version should be u64::MAX" + ); + assert_eq!(initial_root_hash.0, [0u8; 32]); + + /* ************************ Prepare three deltas ************************** */ + // Our goal is to check that we can't commit a batch with a stale version. + // We create three deltas: + // 1. Empty delta + // 2. Delta that writes to one substore + // 3. Delta that writes to each substore and also writes to the main store + + /* We create an empty delta that writes no keys. */ + let delta_1 = StateDelta::new(initial_snapshot); + let write_batch_1 = storage.prepare_commit(delta_1).await?; + let version_1 = write_batch_1.version(); + let root_hash_1 = write_batch_1.root_hash().clone(); + assert_eq!(version_1, initial_version.wrapping_add(1)); + assert_ne!(root_hash_1.0, initial_root_hash.0); + + // We check that merely preparing a batch does not write anything. + let state_snapshot = storage.latest_snapshot(); + assert_eq!(state_snapshot.version(), initial_version); + assert_eq!(state_snapshot.root_hash().await?.0, initial_root_hash.0); + for prefix in substore_prefixes.iter() { + // We didn't write to any substores, so their version should be unchanged. + assert_eq!( + write_batch_1 + .substore_version(prefix) + .expect("substore exists"), + u64::MAX + ) + } + + /* We create a new delta that writes to a single substore. */ + let mut delta_2 = StateDelta::new(state_snapshot.clone()); + delta_2.put_raw("ibc/key".to_string(), [1u8; 32].to_vec()); + let write_batch_2 = storage.prepare_commit(delta_2).await?; + let version_2 = write_batch_2.version(); + let root_hash_2 = write_batch_2.root_hash(); + assert_eq!(version_2, initial_version.wrapping_add(1)); + assert_ne!(root_hash_2.0, initial_root_hash.0); + + // Now, we check that the version for the main store is incremented, and + // only the version for the ibc substore is incremented. + assert_eq!(write_batch_2.version(), initial_version.wrapping_add(1)); + assert_eq!( + write_batch_2 + .substore_version("ibc") + .expect("substore_exists"), + initial_version.wrapping_add(1) + ); + for prefix in substore_prefixes.iter().filter(|p| *p != "ibc") { + assert_eq!( + write_batch_2 + .substore_version(prefix) + .expect("substore exists"), + u64::MAX + ) + } + + /* We create a new delta that writes to each substore. */ + let mut delta_3 = StateDelta::new(state_snapshot); + for substore_prefix in substore_prefixes.iter() { + let key = format!("{}/key", substore_prefix); + tracing::debug!(?key, "adding to delta_1"); + delta_3.put_raw(key, [1u8; 32].to_vec()); + } + let write_batch_3 = storage.prepare_commit(delta_3).await?; + let version_3 = write_batch_3.version(); + let root_hash_3 = write_batch_3.root_hash().clone(); + + // Once again, we check that we incremented the main store version. + assert_eq!(version_3, initial_version.wrapping_add(1)); + assert_ne!(root_hash_3.0, initial_root_hash.0); + // In addition to that, we check that we incremented the version of each substore. + for prefix in substore_prefixes.iter() { + assert_eq!( + write_batch_3 + .substore_version(prefix) + .expect("substore exists"), + initial_version.wrapping_add(1) + ) + } + + /* Persist `write_batch_1` and check that the two other (stale) deltas cannot be applied. */ + let final_root = storage + .commit_batch(write_batch_1) + .expect("committing batch 3 should work"); + let final_snapshot = storage.latest_snapshot(); + assert_eq!(root_hash_1.0, final_root.0); + assert_eq!(root_hash_1.0, final_snapshot.root_hash().await?.0); + assert_eq!(version_1, final_snapshot.version()); + assert!( + storage.commit_batch(write_batch_2).is_err(), + "committing batch 2 should fail" + ); + assert!( + storage.commit_batch(write_batch_3).is_err(), + "committing batch 3 should fail" + ); + + Ok(()) +} + +#[tokio::test] +/// Test that we can commit a batch without incrementing the substore versions if there are no +/// keys to write. +pub async fn test_two_empty_writes() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let tmpdir = tempfile::tempdir()?; + let db_path = tmpdir.into_path(); + let substore_prefixes = vec![ + "ibc".to_string(), + "dex".to_string(), + "misc".to_string(), + "cometbft-data".to_string(), + ]; + let storage = Storage::load(db_path.clone(), substore_prefixes.clone()).await?; + let initial_snapshot = storage.latest_snapshot(); + let initial_version = initial_snapshot.version(); + let initial_root_hash = initial_snapshot.root_hash().await?; + assert_eq!( + initial_version, + u64::MAX, + "initial version should be u64::MAX" + ); + assert_eq!(initial_root_hash.0, [0u8; 32]); + + let mut delta_1 = StateDelta::new(initial_snapshot); + for substore_prefix in substore_prefixes.iter() { + let key = format!("{}/key", substore_prefix); + tracing::debug!(?key, "adding to delta_1"); + delta_1.put_raw(key, [1u8; 12].to_vec()); + } + let write_batch_1 = storage.prepare_commit(delta_1).await?; + let version_1 = write_batch_1.version(); + let root_hash_1 = write_batch_1.root_hash().clone(); + + assert_eq!(version_1, initial_version.wrapping_add(1)); + assert_ne!(root_hash_1.0, initial_root_hash.0); + for prefix in substore_prefixes.iter() { + assert_eq!( + write_batch_1 + .substore_version(prefix) + .expect("substore exists"), + initial_version.wrapping_add(1) + ) + } + + // We check that merely preparing a batch does not write anything. + let state_snapshot = storage.latest_snapshot(); + assert_eq!(state_snapshot.version(), initial_version); + assert_eq!(state_snapshot.root_hash().await?.0, initial_root_hash.0); + + /* We create a new delta that writes no keys */ + let delta_2 = StateDelta::new(state_snapshot.clone()); + let write_batch_2 = storage.prepare_commit(delta_2).await?; + let version_2 = write_batch_2.version(); + let root_hash_2 = write_batch_2.root_hash(); + assert_eq!(version_2, initial_version.wrapping_add(1)); + assert_ne!(root_hash_2.0, initial_root_hash.0); + assert_eq!(write_batch_2.version(), initial_version.wrapping_add(1)); + for prefix in substore_prefixes.iter() { + assert_eq!( + write_batch_2 + .substore_version(prefix) + .expect("substore exists"), + initial_version + ) + } + + let block_1_root = storage + .commit_batch(write_batch_1) + .expect("committing batch 3 should work"); + let block_1_snapshot = storage.latest_snapshot(); + let block_1_version = block_1_snapshot.version(); + assert_eq!(root_hash_1.0, block_1_root.0); + assert_eq!(root_hash_1.0, block_1_snapshot.root_hash().await?.0); + assert_eq!(version_1, block_1_version); + assert!( + storage.commit_batch(write_batch_2).is_err(), + "committing batch 2 should fail" + ); + + /* We create an empty delta that writes no keys. */ + let delta_3 = StateDelta::new(block_1_snapshot); + let write_batch_3 = storage.prepare_commit(delta_3).await?; + let version_3 = write_batch_3.version(); + let root_hash_3 = write_batch_3.root_hash().clone(); + assert_eq!(version_3, block_1_version.wrapping_add(1)); + + /* Check that we can apply `write_batch_3` */ + let block_2_root = storage + .commit_batch(write_batch_3) + .expect("committing batch 3 should work"); + let block_2_snapshot = storage.latest_snapshot(); + let block_2_version = block_2_snapshot.version(); + assert_eq!(root_hash_3.0, block_2_root.0); + assert_eq!(root_hash_3.0, block_2_snapshot.root_hash().await?.0); + assert_eq!(version_3, block_2_version); + Ok(()) +} + +#[tokio::test] +/// Test that we can write prepare-commit batches that write to every +/// substore. +/// Intuition: we want to make sure that the version check that guards us from +/// writing stale batches, is working as expected. +pub async fn test_batch_substore() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let tmpdir = tempfile::tempdir()?; + let db_path = tmpdir.into_path(); + let substore_prefixes = vec![ + "ibc".to_string(), + "dex".to_string(), + "misc".to_string(), + "cometbft-data".to_string(), + ]; + let storage = Storage::load(db_path.clone(), substore_prefixes.clone()).await?; + let initial_snapshot = storage.latest_snapshot(); + let initial_version = initial_snapshot.version(); + let initial_root_hash = initial_snapshot.root_hash().await?; + assert_eq!( + initial_version, + u64::MAX, + "initial version should be u64::MAX" + ); + assert_eq!(initial_root_hash.0, [0u8; 32]); + + for i in 0..100 { + let snapshot = storage.latest_snapshot(); + let prev_version = snapshot.version(); + let prev_root = snapshot + .root_hash() + .await + .expect("a root hash is available"); + + let mut delta = StateDelta::new(snapshot); + for substore_prefix in substore_prefixes.iter() { + let key = format!("{}/key_{i}", substore_prefix); + tracing::debug!(?key, index = i, "adding to delta"); + delta.put_raw(key, [1u8; 12].to_vec()); + } + let write_batch = storage.prepare_commit(delta).await?; + let next_version = write_batch.version(); + let next_root = write_batch.root_hash().clone(); + + assert_eq!(next_version, prev_version.wrapping_add(1)); + assert_ne!(next_root.0, prev_root.0); + for prefix in substore_prefixes.iter() { + assert_eq!( + write_batch + .substore_version(prefix) + .expect("substore exists"), + prev_version.wrapping_add(1) + ) + } + + // We check that merely preparing a batch does not write anything. + let state_snapshot = storage.latest_snapshot(); + assert_eq!(state_snapshot.version(), prev_version); + assert_eq!(state_snapshot.root_hash().await?.0, prev_root.0); + + let block_root = storage + .commit_batch(write_batch) + .expect("committing batch 3 should work"); + let block_snapshot = storage.latest_snapshot(); + let block_version = block_snapshot.version(); + assert_eq!(next_root.0, block_root.0); + assert_eq!(next_root.0, block_snapshot.root_hash().await?.0); + assert_eq!(next_version, block_version); + } + + Ok(()) +}