Skip to content

Commit

Permalink
chore: allow crater runs (ZcashFoundation#8171)
Browse files Browse the repository at this point in the history
* chore(dev-deps): always specify the version for dev-dependencies

* test(tower-batch-control): remove zebra-consensus dev dependency

Copies the Ed25519Verifier code directly to tower-batch-control tests

* test(zebra-consensus): update ed25519 verifier tests with ones from batch-control

* test(zebra-consensus): restore previous timeout values

The timeouts copied from tower-batch-control seems too low
and there must have been a reasoning why were they introduced

* chore: update dev-deps versions to beta.33

* chore(tower-batch-control): remove dev-dependency on metrics

* chore(tower-batch-control): remove zebra-chain dev-dependency

* Update zebra-scan/Cargo.toml

* bump all versions to match current release

* fix missed commas in version bumps

---------

Co-authored-by: Arya <[email protected]>
Co-authored-by: Alfredo Garcia <[email protected]>
Co-authored-by: Pili Guerra <[email protected]>
  • Loading branch information
4 people authored and idky137 committed Feb 28, 2024
1 parent 59aecf6 commit 66d9558
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 85 deletions.
1 change: 0 additions & 1 deletion Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4702,7 +4702,6 @@ dependencies = [
"tower-test",
"tracing",
"tracing-futures",
"zebra-consensus",
"zebra-test",
]

Expand Down
10 changes: 6 additions & 4 deletions tower-batch-control/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
[package]
name = "tower-batch-control"
version = "0.2.41-beta.10"
authors = ["Zcash Foundation <[email protected]>", "Tower Maintainers <[email protected]>"]
authors = [
"Zcash Foundation <[email protected]>",
"Tower Maintainers <[email protected]>",
]
description = "Tower middleware for batch request processing"
# # Legal
#
Expand Down Expand Up @@ -43,8 +46,7 @@ rand = "0.8.5"

tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] }
tokio-test = "0.4.3"
tower-fallback = { path = "../tower-fallback/" }
tower-fallback = { path = "../tower-fallback/", version = "0.2.41-beta.10" }
tower-test = "0.4.0"

zebra-consensus = { path = "../zebra-consensus/" }
zebra-test = { path = "../zebra-test/" }
zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" }
182 changes: 163 additions & 19 deletions tower-batch-control/tests/ed25519.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,172 @@
//! Test batching using ed25519 verification.
use std::time::Duration;
use std::{
mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use color_eyre::{eyre::eyre, Report};
use ed25519_zebra::*;
use ed25519_zebra::{batch, Error, SigningKey, VerificationKeyBytes};
use futures::stream::{FuturesOrdered, StreamExt};
use futures::FutureExt;
use futures_core::Future;
use rand::thread_rng;
use tokio::sync::{oneshot::error::RecvError, watch};
use tower::{Service, ServiceExt};
use tower_batch_control::Batch;
use tower_batch_control::{Batch, BatchControl};
use tower_fallback::Fallback;

// ============ service impl ============

use zebra_consensus::ed25519::{Item as Ed25519Item, Verifier as Ed25519Verifier};
/// A boxed [`std::error::Error`].
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// The type of the batch verifier.
type BatchVerifier = batch::Verifier;

/// The type of verification results.
type VerifyResult = Result<(), Error>;

/// The type of the batch sender channel.
type Sender = watch::Sender<Option<VerifyResult>>;

/// The type of the batch item.
/// This is an `Ed25519Item`.
type Item = batch::Item;

/// Ed25519 signature verifier service
struct Verifier {
/// A batch verifier for ed25519 signatures.
batch: BatchVerifier,

/// A channel for broadcasting the result of a batch to the futures for each batch item.
///
/// Each batch gets a newly created channel, so there is only ever one result sent per channel.
/// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
tx: Sender,
}

impl Default for Verifier {
fn default() -> Self {
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, tx }
}
}

impl Verifier {
/// Returns the batch verifier and channel sender from `self`,
/// replacing them with a new empty batch.
fn take(&mut self) -> (BatchVerifier, Sender) {
// Use a new verifier and channel for each batch.
let batch = mem::take(&mut self.batch);

let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);

(batch, tx)
}

/// Synchronously process the batch, and send the result using the channel sender.
/// This function blocks until the batch is completed.
fn verify(batch: BatchVerifier, tx: Sender) {
let result = batch.verify(thread_rng());
let _ = tx.send(Some(result));
}

/// Flush the batch using a thread pool, and return the result via the channel.
/// This returns immediately, usually before the batch is completed.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();

// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// We don't care about execution order here, because this method is only called on drop.
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
}

/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
}
}

impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got ed25519 item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();

Box::pin(async move {
match rx.changed().await {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;

if result.is_ok() {
tracing::trace!(?result, "validated ed25519 signature");
} else {
tracing::trace!(?result, "invalid ed25519 signature");
}
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
}
})
}

BatchControl::Flush => {
tracing::trace!("got ed25519 flush command");

let (batch, tx) = self.take();

Box::pin(Self::flush_spawning(batch, tx).map(Ok))
}
}
}
}

impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
// This returns immediately, usually before the batch is completed.
self.flush_blocking();
}
}

/// Fires off a task into the Rayon threadpool and awaits the result through a oneshot channel.
async fn spawn_fifo<
E: 'static + std::error::Error + Sync + Send,
F: 'static + FnOnce() -> Result<(), E> + Send,
>(
f: F,
) -> Result<Result<(), E>, RecvError> {
// Rayon doesn't have a spawn function that returns a value,
// so we use a oneshot channel instead.
let (rsp_tx, rsp_rx) = tokio::sync::oneshot::channel();

rayon::spawn_fifo(move || {
let _ = rsp_tx.send(f());
});

rsp_rx.await
}

// =============== testing code ========

Expand All @@ -22,7 +176,7 @@ async fn sign_and_verify<V>(
bad_index: Option<usize>,
) -> Result<(), V::Error>
where
V: Service<Ed25519Item, Response = ()>,
V: Service<Item, Response = ()>,
{
let mut results = FuturesOrdered::new();
for i in 0..n {
Expand Down Expand Up @@ -61,7 +215,7 @@ async fn batch_flushes_on_max_items() -> Result<(), Report> {
// flushing is happening based on hitting max_items.
//
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Batch::new(Ed25519Verifier::default(), 10, 5, Duration::from_secs(1000));
let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None))
.await
.map_err(|e| eyre!(e))?
Expand All @@ -79,12 +233,7 @@ async fn batch_flushes_on_max_latency() -> Result<(), Report> {
// flushing is happening based on hitting max_latency.
//
// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Batch::new(
Ed25519Verifier::default(),
100,
10,
Duration::from_millis(500),
);
let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500));
timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None))
.await
.map_err(|e| eyre!(e))?
Expand All @@ -99,13 +248,8 @@ async fn fallback_verification() -> Result<(), Report> {

// Create our own verifier, so we don't shut down a shared verifier used by other tests.
let verifier = Fallback::new(
Batch::new(
Ed25519Verifier::default(),
10,
1,
Duration::from_millis(100),
),
tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }),
Batch::new(Verifier::default(), 10, 1, Duration::from_millis(100)),
tower::service_fn(|item: Item| async move { item.verify_single() }),
);

sign_and_verify(verifier, 100, Some(39))
Expand Down
2 changes: 1 addition & 1 deletion tower-fallback/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ tracing = "0.1.39"
[dev-dependencies]
tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] }

zebra-test = { path = "../zebra-test/" }
zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" }
2 changes: 1 addition & 1 deletion zebra-chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ rand_chacha = "0.3.1"

tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] }

zebra-test = { path = "../zebra-test/" }
zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" }

[[bench]]
name = "block"
Expand Down
6 changes: 3 additions & 3 deletions zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] }
tracing-error = "0.2.0"
tracing-subscriber = "0.3.18"

zebra-state = { path = "../zebra-state", features = ["proptest-impl"] }
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test/" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["proptest-impl"] }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" }
Loading

0 comments on commit 66d9558

Please sign in to comment.