Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

penumbra: remove Builder factory #4169

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .cargo/config
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[build]
# Enable Tokio's `tracing` support for `tokio-console`
rustflags = ["--cfg", "tokio_unstable"]
# rustflags = ["--cfg", "tokio_unstable"]
# Note(erwan): We decided to disable it for the time being,
# I'm keeping this around to be able to reactivate it on a whim.
12 changes: 4 additions & 8 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ async fn main() -> anyhow::Result<()> {
"starting pd"
);

let abci_server = tokio::task::Builder::new()
.name("abci_server")
.spawn(penumbra_app::server::new(storage.clone()).listen_tcp(abci_bind))
.expect("failed to spawn abci server");
let abci_server = tokio::task::spawn(
penumbra_app::server::new(storage.clone()).listen_tcp(abci_bind),
);

let grpc_server =
penumbra_app::rpc::router(&storage, cometbft_addr, enable_expensive_rpc)?;
Expand All @@ -148,10 +147,7 @@ async fn main() -> anyhow::Result<()> {
// resolver if auto-https has been enabled.
macro_rules! spawn_grpc_server {
($server:expr) => {
tokio::task::Builder::new()
.name("grpc_server")
.spawn($server.serve(make_svc))
.expect("failed to spawn grpc server")
tokio::task::spawn($server.serve(make_svc))
};
}
let grpc_server = axum_server::bind(grpc_bind);
Expand Down
253 changes: 111 additions & 142 deletions crates/cnidarium/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,11 @@ impl Snapshot {
db: db.clone(),
};

let (substore_value, substore_commitment_proof) = tokio::task::Builder::new()
.name("Snapshot::get_with_proof")
.spawn_blocking({
let span = span.clone();
move || span.in_scope(|| substore.get_with_proof(substore_key_bytes))
})?
.await??;
let (substore_value, substore_commitment_proof) = tokio::task::spawn_blocking({
let span = span.clone();
move || span.in_scope(|| substore.get_with_proof(substore_key_bytes))
})
.await??;

proofs.push(substore_commitment_proof);

Expand All @@ -104,13 +102,11 @@ impl Snapshot {
db,
};

let (_, main_commitment_proof) = tokio::task::Builder::new()
.name("Snapshot::get_with_proof")
.spawn_blocking({
let span = span.clone();
move || span.in_scope(|| mainstore.get_with_proof(key_to_substore_root.into()))
})?
.await??;
let (_, main_commitment_proof) = tokio::task::spawn_blocking({
let span = span.clone();
move || span.in_scope(|| mainstore.get_with_proof(key_to_substore_root.into()))
})
.await??;

proofs.push(main_commitment_proof);
}
Expand Down Expand Up @@ -172,10 +168,7 @@ impl Snapshot {
"fetching root hash for substore"
);

tokio::task::Builder::new()
.name("Snapshot::prefix_root_hash")
.spawn_blocking(move || span.in_scope(|| substore.root_hash()))?
.await?
tokio::task::spawn_blocking(move || span.in_scope(|| substore.root_hash())).await?
}

pub async fn root_hash(&self) -> Result<crate::RootHash> {
Expand Down Expand Up @@ -221,21 +214,15 @@ impl StateRead for Snapshot {
};
let key_hash = jmt::KeyHash::with::<sha2::Sha256>(key);

crate::future::SnapshotFuture(
tokio::task::Builder::new()
.name("Snapshot::get_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let _start = std::time::Instant::now();
let rsp = substore.get_jmt(key_hash);
#[cfg(feature = "metrics")]
metrics::histogram!(metrics::STORAGE_GET_RAW_DURATION)
.record(_start.elapsed());
rsp
})
})
.expect("spawning threads is possible"),
)
crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let _start = std::time::Instant::now();
let rsp = substore.get_jmt(key_hash);
#[cfg(feature = "metrics")]
metrics::histogram!(metrics::STORAGE_GET_RAW_DURATION).record(_start.elapsed());
rsp
})
}))
}

/// Fetch a key from nonverifiable storage.
Expand All @@ -258,26 +245,21 @@ impl StateRead for Snapshot {
};
let key: Vec<u8> = key.to_vec();

crate::future::SnapshotFuture(
tokio::task::Builder::new()
.name("Snapshot::nonverifiable_get_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let _start = std::time::Instant::now();

let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let rsp = substore
.rocksdb_snapshot
.get_cf(cf_nonverifiable, key)
.map_err(Into::into);
#[cfg(feature = "metrics")]
metrics::histogram!(metrics::STORAGE_NONCONSENSUS_GET_RAW_DURATION)
.record(_start.elapsed());
rsp
})
})
.expect("spawning threads is possible"),
)
crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let _start = std::time::Instant::now();

let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let rsp = substore
.rocksdb_snapshot
.get_cf(cf_nonverifiable, key)
.map_err(Into::into);
#[cfg(feature = "metrics")]
metrics::histogram!(metrics::STORAGE_NONCONSENSUS_GET_RAW_DURATION)
.record(_start.elapsed());
rsp
})
}))
}

/// Returns a stream of all key-value pairs with the given prefix.
Expand Down Expand Up @@ -309,36 +291,33 @@ impl StateRead for Snapshot {
// Since the JMT keys are hashed, we can't use a prefix iterator directly.
// We need to first prefix range the key preimages column family, then use the hashed matches to fetch the values
// from the JMT column family.
tokio::task::Builder::new()
.name("Snapshot::prefix_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
let jmt_keys_iterator =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

for tuple in jmt_keys_iterator {
// For each key that matches the prefix, fetch the value from the JMT column family.
let (key_preimage, _) = tuple?;

let k = std::str::from_utf8(key_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();

let key_hash = jmt::KeyHash::with::<sha2::Sha256>(k.as_bytes());

let v = substore
.get_jmt(key_hash)?
.expect("keys in jmt_keys should have a corresponding value in jmt");

tx_prefix_item.blocking_send(Ok((k, v)))?;
}
anyhow::Ok(())
})
tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
let jmt_keys_iterator =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

for tuple in jmt_keys_iterator {
// For each key that matches the prefix, fetch the value from the JMT column family.
let (key_preimage, _) = tuple?;

let k = std::str::from_utf8(key_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();

let key_hash = jmt::KeyHash::with::<sha2::Sha256>(k.as_bytes());

let v = substore
.get_jmt(key_hash)?
.expect("keys in jmt_keys should have a corresponding value in jmt");

tx_prefix_item.blocking_send(Ok((k, v)))?;
}
anyhow::Ok(())
})
.expect("should be able to spawn_blocking");
});

tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query)
}
Expand Down Expand Up @@ -371,27 +350,23 @@ impl StateRead for Snapshot {
let mode = rocksdb::IteratorMode::Start;
let (tx_prefix_keys, rx_prefix_keys) = mpsc::channel(10);

tokio::task::Builder::new()
.name("Snapshot::prefix_keys")
.spawn_blocking(move || {
span.in_scope(|| {
let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

for key_and_keyhash in iter {
let (raw_preimage, _) = key_and_keyhash?;
let preimage = std::str::from_utf8(raw_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();
tx_prefix_keys.blocking_send(Ok(preimage))?;
}
anyhow::Ok(())
})
tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
let iter = substore
.rocksdb_snapshot
.iterator_cf_opt(cf_jmt_keys, options, mode);

for key_and_keyhash in iter {
let (raw_preimage, _) = key_and_keyhash?;
let preimage = std::str::from_utf8(raw_preimage.as_ref())
.expect("saved jmt keys are utf-8 strings")
.to_string();
tx_prefix_keys.blocking_send(Ok(preimage))?;
}
anyhow::Ok(())
})
.expect("should be able to spawn_blocking");
});

tokio_stream::wrappers::ReceiverStream::new(rx_prefix_keys)
}
Expand Down Expand Up @@ -421,23 +396,20 @@ impl StateRead for Snapshot {

let (tx_prefix_query, rx_prefix_query) = mpsc::channel(10);

tokio::task::Builder::new()
.name("Snapshot::nonverifiable_prefix_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);
for i in iter {
let (key, value) = i?;
tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?;
}
anyhow::Ok(())
})
tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);
for i in iter {
let (key, value) = i?;
tx_prefix_query.blocking_send(Ok((key.into(), value.into())))?;
}
anyhow::Ok(())
})
.expect("should be able to spawn_blocking");
});

tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query)
}
Expand Down Expand Up @@ -512,32 +484,29 @@ impl StateRead for Snapshot {
let prefix = prefix.to_vec();

let (tx, rx) = mpsc::channel::<Result<(Vec<u8>, Vec<u8>)>>(10);
tokio::task::Builder::new()
.name("Snapshot::nonverifiable_range_raw")
.spawn_blocking(move || {
span.in_scope(|| {
let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);

for i in iter {
let (key, value) = i?;

// This is a bit of a hack, but RocksDB doesn't let us express the "prefixed range-queries",
// that we want to support. In particular, we want to be able to do a prefix query that starts
// at a particular key, and does not have an upper bound. Since we can't create an iterator that
// cover this range, we have to filter out the keys that don't match the prefix.
if !prefix.is_empty() && !key.starts_with(&prefix) {
break;
}
tx.blocking_send(Ok((key.into(), value.into())))?;
tokio::task::spawn_blocking(move || {
span.in_scope(|| {
let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
let iter =
substore
.rocksdb_snapshot
.iterator_cf_opt(cf_nonverifiable, options, mode);

for i in iter {
let (key, value) = i?;

// This is a bit of a hack, but RocksDB doesn't let us express the "prefixed range-queries",
// that we want to support. In particular, we want to be able to do a prefix query that starts
// at a particular key, and does not have an upper bound. Since we can't create an iterator that
// cover this range, we have to filter out the keys that don't match the prefix.
if !prefix.is_empty() && !key.starts_with(&prefix) {
break;
}
Ok::<(), anyhow::Error>(())
})
tx.blocking_send(Ok((key.into(), value.into())))?;
}
Ok::<(), anyhow::Error>(())
})
.expect("should be able to spawn_blocking");
});

Ok(tokio_stream::wrappers::ReceiverStream::new(rx))
}
Expand Down
Loading
Loading