Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cnidarium: implement deferred commits via StagedWriteBatch #4122

Merged
merged 9 commits into from
Apr 3, 2024
2 changes: 2 additions & 0 deletions crates/cnidarium/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod store;
mod tests;
mod utils;
mod write;
mod write_batch;

#[cfg(feature = "metrics")]
pub use crate::metrics::register_metrics;
Expand All @@ -81,6 +82,7 @@ pub use read::StateRead;
pub use snapshot::Snapshot;
pub use storage::{Storage, TempStorage};
pub use write::StateWrite;
pub use write_batch::StagedWriteBatch;

pub mod future;

Expand Down
25 changes: 11 additions & 14 deletions crates/cnidarium/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,15 @@ impl Snapshot {
}

pub fn prefix_version(&self, prefix: &str) -> Result<Option<jmt::Version>> {
let config = self
let Some(config) = self
.0
.multistore_cache
.config
.find_substore(prefix.as_bytes());
if prefix != config.prefix {
anyhow::bail!("requested substore (prefix={prefix}) does not exist")
}
.find_substore(prefix.as_bytes())
else {
anyhow::bail!("rquested a version for a prefix that does not exist (prefix={prefix})")
};

Ok(self.substore_version(&config))
}

Expand All @@ -145,18 +146,14 @@ impl Snapshot {
let rocksdb_snapshot = self.0.snapshot.clone();
let db = self.0.db.clone();

let config = self
let Some(config) = self
.0
.multistore_cache
.config
.find_substore(prefix.as_bytes());

// If a substore is not found, `find_substore` will default to the main store.
// However, we do not want to mislead the caller by returning a root hash
// that does not correspond to the queried prefix, so we error out instead.
if prefix != config.prefix {
anyhow::bail!("requested substore (prefix={prefix}) does not exist")
}
.find_substore(prefix.as_bytes())
else {
anyhow::bail!("requested a root for a substore that does not exist (prefix={prefix})")
};

let version = self
.substore_version(&config)
Expand Down
173 changes: 133 additions & 40 deletions crates/cnidarium/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{path::PathBuf, sync::Arc};
// use tokio_stream::wrappers::WatchStream;

use anyhow::{bail, ensure, Result};
use parking_lot::RwLock;
use rocksdb::{Options, DB};
use std::collections::HashMap;
use tokio::sync::watch;
use tracing::Span;

Expand All @@ -15,7 +15,7 @@ use crate::{
substore::{SubstoreConfig, SubstoreSnapshot, SubstoreStorage},
},
};
use crate::{snapshot_cache::SnapshotCache, StateDelta};
use crate::{snapshot_cache::SnapshotCache, StagedWriteBatch, StateDelta};

mod temp;
pub use temp::TempStorage;
Expand Down Expand Up @@ -271,9 +271,9 @@ impl Storage {
self.0.snapshots.read().get(version)
}

/// Commits the provided [`StateDelta`] to persistent storage as the latest
/// version of the chain state.
pub async fn commit(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
/// Prepares a commit for the provided [`StateDelta`], returning a [`StagedWriteBatch`].
/// The batch can be committed to the database using the [`Storage::commit_batch`] method.
pub async fn prepare_commit(&self, delta: StateDelta<Snapshot>) -> Result<StagedWriteBatch> {
// Extract the snapshot and the changes from the state delta
let (snapshot, changes) = delta.flatten();
let prev_snapshot_version = snapshot.version();
Expand All @@ -286,35 +286,28 @@ impl Storage {

ensure!(
prev_storage_version == prev_snapshot_version,
"trying to commit a delta forked from version {}, but the latest version is {}",
"trying to prepare a commit for a delta forked from version {}, but the latest version is {}",
prev_snapshot_version,
prev_storage_version
);

self.commit_inner(snapshot, changes, next_storage_version, false)
self.prepare_commit_inner(snapshot, changes, next_storage_version, false)
.await
}

/// Commits the supplied [`Cache`] to persistent storage.
///
/// # Migrations
/// In the case of chain state migrations we need to commit the new state
/// without incrementing the version. If `perform_migration` is `true` the
/// snapshot will _not_ be written to the snapshot cache, and no subscribers
/// will be notified. Substore versions will not be updated.
async fn commit_inner(
async fn prepare_commit_inner(
&self,
snapshot: Snapshot,
cache: Cache,
version: jmt::Version,
perform_migration: bool,
) -> Result<crate::RootHash> {
tracing::debug!(new_jmt_version = ?version, "committing state delta");
) -> Result<StagedWriteBatch> {
tracing::debug!(new_jmt_version = ?version, "preparing to commit state delta");
// Save a copy of the changes to send to subscribers later.
let changes = Arc::new(cache.clone_changes());

let mut changes_by_substore = cache.shard_by_prefix(&self.0.multistore_config);
let mut substore_roots = Vec::new();
let mut substore_roots = HashMap::new();
let mut multistore_versions =
multistore::MultistoreCache::from_config(self.0.multistore_config.clone());

Expand Down Expand Up @@ -366,24 +359,24 @@ impl Storage {
continue;
};

let version = if perform_migration {
let new_version = if perform_migration {
old_substore_version
} else {
old_substore_version.wrapping_add(1)
};
new_versions.push(version);
new_versions.push(new_version);
let substore_snapshot = SubstoreSnapshot {
config: config.clone(),
rocksdb_snapshot: rocksdb_snapshot.clone(),
version,
version: new_version,
db: db.clone(),
};

let substore_storage = SubstoreStorage { substore_snapshot };

// Commit the substore and collect its root hash
let (root_hash, substore_batch) = substore_storage
.commit(changeset, write_batch, version, perform_migration)
.commit(changeset, write_batch, new_version, perform_migration)
.await?;
write_batch = substore_batch;

Expand All @@ -393,7 +386,15 @@ impl Storage {
?version,
"added substore to write batch"
);
substore_roots.push((config.clone(), root_hash, version));
substore_roots.insert(config.clone(), (root_hash, new_version));

tracing::debug!(
?root_hash,
prefix = ?config.prefix,
?new_version,
"updating substore version"
);
multistore_versions.set_version(config.clone(), new_version);
}

// Add substore roots to the main store changeset
Expand All @@ -405,7 +406,7 @@ impl Storage {
Cache::default()
});

for (config, root_hash, _) in substore_roots.iter() {
for (config, (root_hash, _)) in substore_roots.iter() {
main_store_changes
.unwritten_changes
.insert(config.prefix.to_string(), Some(root_hash.0.to_vec()));
Expand All @@ -432,27 +433,117 @@ impl Storage {
"added main store to write batch"
);

tracing::debug!(?global_root_hash, version = ?version, "updating main store version");
let main_store_config = self.0.multistore_config.main_store.clone();
multistore_versions.set_version(main_store_config, version);

Ok(StagedWriteBatch {
write_batch,
version,
multistore_versions,
root_hash: global_root_hash,
substore_roots,
perform_migration,
changes,
})
}

/// Commits the provided [`StateDelta`] to persistent storage as the latest
/// version of the chain state.
pub async fn commit(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
let batch = self.prepare_commit(delta).await?;
self.commit_batch(batch)
}

/// Commits the supplied [`StagedWriteBatch`] to persistent storage.
///
/// # Migrations
/// In the case of chain state migrations we need to commit the new state
/// without incrementing the version. If `perform_migration` is `true` the
/// snapshot will _not_ be written to the snapshot cache, and no subscribers
/// will be notified. Substore versions will not be updated.
pub fn commit_batch(&self, batch: StagedWriteBatch) -> Result<crate::RootHash> {
let StagedWriteBatch {
write_batch,
version,
multistore_versions,
root_hash: global_root_hash,
substore_roots,
perform_migration,
changes,
} = batch;

let db = self.0.db.clone();

// check that the version of the batch being committed is the correct next version
let old_version = self.latest_version();
let expected_new_version = if perform_migration {
old_version
} else {
old_version.wrapping_add(1)
};

ensure!(
expected_new_version == version,
"new version mismatch: expected {} but got {}",
expected_new_version,
version
);

// also check that each of the substore versions are the correct next version
let snapshot = self.latest_snapshot();

// Warning: we MUST check version coherence for **every** substore.
// These checks are a second line of defense. They must consider
// the case when two deltas effect distinct substores.
//
// version: (m, ss_1, ss_2)
// D_0: (_, 1, 0) <- initial state
// D_1: (A, 1, 1) <- multiwrite to ss_1 AND ss_2
// D_1*: (A, 1, 0) <- isolate write to ss_1
//
// A comprehensive check lets us catch the stale write D_1* even if
// locally it does not directly effect the second substore at all.
// And even if the main version check passes (spuriously, or because of
// a migration).
for (substore_config, new_version) in &multistore_versions.substores {
if substore_config.prefix.is_empty() {
// this is the main store, ignore
continue;
}

let old_substore_version = snapshot
.substore_version(&substore_config)
.expect("substores must be initialized at startup");

// if the substore exists in `substore_roots`, there have been updates to the substore.
// if `perform_migration` is false and there are updates, the next version should be previous + 1.
// otherwise, the version should remain the same.
let expected_substore_version =
if substore_roots.get(substore_config).is_some() && !perform_migration {
old_substore_version.wrapping_add(1)
} else {
old_substore_version
};

ensure!(
expected_substore_version == *new_version,
"substore new version mismatch for substore with prefix {}: expected {} but got {}",
substore_config.prefix,
expected_substore_version,
new_version
);
}

tracing::debug!(new_jmt_version = ?batch.version, "committing batch to db");

db.write(write_batch).expect("can write to db");
tracing::debug!(
?global_root_hash,
?version,
"committed main store and substores to db"
);

// Update the tracked versions for each substore.
for (config, root_hash, new_version) in substore_roots {
tracing::debug!(
?root_hash,
prefix = ?config.prefix,
?new_version,
"updating substore version"
);
multistore_versions.set_version(config, new_version);
}

tracing::debug!(?global_root_hash, ?version, "updating main store version");
multistore_versions.set_version(main_store_config, version);

// If we're not performing a migration, we should update the snapshot cache
if !perform_migration {
tracing::debug!("updating snapshot cache");
Expand Down Expand Up @@ -487,8 +578,10 @@ impl Storage {
pub async fn commit_in_place(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
let (snapshot, changes) = delta.flatten();
let old_version = self.latest_version();
self.commit_inner(snapshot, changes, old_version, true)
.await
let batch = self
.prepare_commit_inner(snapshot, changes, old_version, true)
.await?;
self.commit_batch(batch)
}

/// Returns the internal handle to RocksDB, this is useful to test adjacent storage crates.
Expand Down
Loading
Loading