diff --git a/Cargo.lock b/Cargo.lock index ea7fc40b..f569f9a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,7 +193,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -209,7 +209,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "syn-solidity", "tiny-keccak", ] @@ -225,7 +225,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "syn-solidity", ] @@ -739,7 +739,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -786,7 +786,7 @@ checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1049,9 +1049,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" dependencies = [ "serde", ] @@ -1111,13 +1111,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8" +checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" dependencies = [ "jobserver", "libc", - "once_cell", ] [[package]] @@ -1557,7 +1556,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1605,7 +1604,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1627,7 +1626,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core 0.20.10", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1703,7 +1702,7 @@ checksum = "d65d7ce8132b7c0e54497a4d9a55a1c2a0912a0d786cf894472ba818fba45762" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1714,7 +1713,7 @@ checksum = "62d671cc41a825ebabc75757b62d3d168c577f9149b2d49ece1dad1f72119d25" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1727,7 +1726,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version 0.4.0", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1820,7 +1819,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.70", + "syn 2.0.71", "termcolor", "toml", "walkdir", @@ -2208,7 +2207,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "syn 2.0.70", + "syn 2.0.71", "toml", "walkdir", ] @@ -2226,7 +2225,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2252,7 +2251,7 @@ dependencies = [ "serde", "serde_json", "strum 0.26.3", - "syn 2.0.70", + "syn 2.0.71", "tempfile", "thiserror", "tiny-keccak", @@ -2440,7 +2439,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2762,7 +2761,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3154,9 +3153,9 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", "http 1.1.0", @@ -3171,7 +3170,7 @@ dependencies = [ "bytes", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -3261,7 +3260,7 @@ dependencies = [ "futures-util", "h2 0.4.5", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "httparse", "itoa", "pin-project-lite", @@ -3314,7 +3313,7 @@ dependencies = [ "futures-channel", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", "socket2 0.5.7", @@ -3738,7 +3737,7 @@ dependencies = [ "futures-timer", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "http-body-util", "jsonrpsee-types 0.24.0", "pin-project", @@ -3779,7 +3778,7 @@ checksum = "52dc99c70619e252e6adc5e95144323505a69a1742771de5b3f2071e1595b363" dependencies = [ "async-trait", "base64 0.22.1", - "http-body 1.0.0", + "http-body 1.0.1", "hyper 1.4.1", "hyper-rustls 0.27.2", "hyper-util", @@ -4396,7 +4395,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4480,7 +4479,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4736,7 +4735,7 @@ dependencies = [ "phf_shared 0.11.2", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4774,7 +4773,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4855,7 +4854,7 @@ dependencies = [ "polkavm-common 0.8.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4867,7 +4866,7 @@ dependencies = [ "polkavm-common 0.9.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4877,7 +4876,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15e85319a0d5129dc9f021c62607e0804f5fb777a05cdda44d750ac0732def66" dependencies = [ "polkavm-derive-impl 0.8.0", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4887,7 +4886,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ba81f7b5faac81e528eb6158a6f3c9e0bb1008e0ffa19653bc8dea925ecb429" dependencies = [ "polkavm-derive-impl 0.9.0", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4968,7 +4967,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5265,7 +5264,7 @@ checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5497,6 +5496,7 @@ version = "0.6.0" dependencies = [ "anyhow", "async-trait", + "const-hex", "fluent-uri", "futures-util", "rosetta-crypto", @@ -5682,6 +5682,7 @@ dependencies = [ "sha3", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-subscriber 0.3.18", "url", @@ -6233,7 +6234,7 @@ dependencies = [ "proc-macro2", "quote", "scale-info", - "syn 2.0.70", + "syn 2.0.71", "thiserror", ] @@ -6369,9 +6370,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", "core-foundation", @@ -6383,9 +6384,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" dependencies = [ "core-foundation-sys", "libc", @@ -6471,7 +6472,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6542,7 +6543,7 @@ dependencies = [ "darling 0.20.10", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6981,7 +6982,7 @@ checksum = "48d09fa0a5f7299fb81ee25ae3853d26200f7a348148aed6de76be905c007dbe" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7113,7 +7114,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7399,7 +7400,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7486,7 +7487,7 @@ dependencies = [ "scale-info", "scale-typegen", "subxt-metadata", - "syn 2.0.70", + "syn 2.0.71", "thiserror", "tokio", ] @@ -7549,7 +7550,7 @@ dependencies = [ "quote", "scale-typegen", "subxt-codegen", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7619,9 +7620,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.70" +version = "2.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0209b68b3613b093e0ec905354eccaedcfe83b8cb37cbdeae64026c3064c16" +checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" dependencies = [ "proc-macro2", "quote", @@ -7637,7 +7638,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7724,22 +7725,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7872,7 +7873,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7937,6 +7938,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -8090,7 +8092,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -8509,7 +8511,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-shared", ] @@ -8543,7 +8545,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -9114,7 +9116,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -9134,7 +9136,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] diff --git a/chains/ethereum/server/Cargo.toml b/chains/ethereum/server/Cargo.toml index e240d804..a00f670a 100644 --- a/chains/ethereum/server/Cargo.toml +++ b/chains/ethereum/server/Cargo.toml @@ -25,6 +25,7 @@ serde.workspace = true serde_json.workspace = true thiserror = "1.0" tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +tokio-stream = { version = "0.1", features = ["sync"] } tracing = "0.1" url = "2.4" diff --git a/chains/ethereum/server/src/block_stream.rs b/chains/ethereum/server/src/block_stream.rs index 1cb2628f..00920549 100644 --- a/chains/ethereum/server/src/block_stream.rs +++ b/chains/ethereum/server/src/block_stream.rs @@ -1,22 +1,106 @@ -#![allow(dead_code)] -use crate::{ - finalized_block_stream::FinalizedBlockStream, new_heads::NewHeadsStream, state::State, +use super::{ + event_stream::{EthereumEventStream, NewBlock}, + state::State, }; +use futures_util::StreamExt; +use rosetta_config_ethereum::Event as EthEvent; +use rosetta_core::{stream::Stream, types::BlockIdentifier, BlockOrIdentifier, ClientEvent}; use rosetta_ethereum_backend::{ ext::types::{rpc::RpcBlock, H256}, jsonrpsee::core::{client::Subscription, ClientError as RpcError}, EthereumPubSub, }; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; pub struct BlockStream where RPC: for<'s> EthereumPubSub = Subscription>> + + Clone + Unpin + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { - finalized: FinalizedBlockStream, - new_heads: NewHeadsStream, + block_stream: Option>, state: State, } + +impl BlockStream +where + RPC: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + RPC::SubscriptionError: Send + Sync, +{ + #[must_use] + pub fn new(client: RPC, state: State) -> Self { + Self { block_stream: Some(EthereumEventStream::new(client)), state } + } +} + +impl Stream for BlockStream +where + RPC: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + RPC::SubscriptionError: Send + Sync, +{ + type Item = ClientEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(mut block_stream) = self.block_stream.take() else { + return Poll::Ready(None); + }; + + let mut failures = 0; + loop { + match block_stream.poll_next_unpin(cx) { + Poll::Ready(Some(new_block)) => { + let block_id = { + let header = new_block.sealed_block().header(); + BlockOrIdentifier::Identifier(BlockIdentifier { + index: header.number(), + hash: header.hash().0, + }) + }; + let is_finalized = matches!(new_block, NewBlock::Finalized(_)); + if let Err(err) = self.state.import(new_block.into_sealed_block()) { + failures += 1; + tracing::warn!("failed to import block {block_id} ({failures}): {:?}", err); + if failures >= 5 { + return Poll::Ready(None); + } + continue; + } + + let event = if is_finalized { + ClientEvent::NewFinalized(block_id) + } else { + ClientEvent::NewHead(block_id) + }; + self.block_stream = Some(block_stream); + break Poll::Ready(Some(event)); + }, + Poll::Ready(None) => break Poll::Ready(None), + Poll::Pending => { + self.block_stream = Some(block_stream); + break Poll::Pending; + }, + } + } + } + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} diff --git a/chains/ethereum/server/src/client.rs b/chains/ethereum/server/src/client.rs index e5b00bc4..79844a2d 100644 --- a/chains/ethereum/server/src/client.rs +++ b/chains/ethereum/server/src/client.rs @@ -1,14 +1,17 @@ #![allow(clippy::option_if_let_else)] use crate::{ - event_stream::EthereumEventStream, + block_stream::BlockStream, log_filter::LogFilter, proof::verify_proof, + shared_stream::SharedStream, + state::State, utils::{ AtBlockExt, DefaultFeeEstimatorConfig, EthereumRpcExt, PartialBlock, PolygonFeeEstimatorConfig, }, }; use anyhow::{Context, Result}; +use futures_util::StreamExt; use rosetta_config_ethereum::{ ext::types::{ crypto::{Crypto, DefaultCrypto, Keypair, Signer}, @@ -27,14 +30,14 @@ use rosetta_config_ethereum::{ use rosetta_core::{ crypto::{address::Address, PublicKey}, types::{BlockIdentifier, PartialBlockIdentifier}, - BlockchainConfig, + BlockchainConfig, ClientEvent, }; use rosetta_ethereum_backend::{ jsonrpsee::{ core::client::{ClientT, SubscriptionClientT}, Adapter, }, - BlockRange, EthereumPubSub, EthereumRpc, ExitReason, + BlockRange, EthereumRpc, ExitReason, }; use std::sync::{ atomic::{self, Ordering}, @@ -73,6 +76,7 @@ pub struct EthereumClient

{ nonce: Arc, private_key: Option<[u8; 32]>, log_filter: Arc>, + // event_stream: SharedStream>> } impl

Clone for EthereumClient

@@ -516,11 +520,17 @@ where impl

EthereumClient

where - P: SubscriptionClientT + Send + Sync + 'static, + P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, { #[allow(clippy::missing_errors_doc)] - pub async fn listen(&self) -> Result> { - let new_heads = EthereumPubSub::new_heads(&self.backend).await?; - Ok(EthereumEventStream::new(self, new_heads)) + pub async fn listen(&self) -> Result>>> { + let best_finalized_block = self.finalized_block(None).await?; + let mut stream = BlockStream::new(self.backend.clone(), State::new(best_finalized_block)); + match stream.next().await { + Some(ClientEvent::Close(msg)) => anyhow::bail!(msg), + None => anyhow::bail!("Failed to open the event stream"), + Some(_) => {}, + } + Ok(SharedStream::new(stream, 100)) } } diff --git a/chains/ethereum/server/src/event_stream.rs b/chains/ethereum/server/src/event_stream.rs index 3c411a2d..7ddfe042 100644 --- a/chains/ethereum/server/src/event_stream.rs +++ b/chains/ethereum/server/src/event_stream.rs @@ -1,46 +1,99 @@ -use crate::{client::EthereumClient, utils::PartialBlock}; -use futures_util::{future::BoxFuture, FutureExt, StreamExt}; -use rosetta_config_ethereum::Event; -use rosetta_core::{stream::Stream, types::BlockIdentifier, BlockOrIdentifier, ClientEvent}; +use super::{finalized_block_stream::FinalizedBlockStream, new_heads::NewHeadsStream}; +use futures_util::StreamExt; +use rosetta_config_ethereum::ext::types::SealedBlock; +use rosetta_core::stream::Stream; use rosetta_ethereum_backend::{ - ext::types::{crypto::DefaultCrypto, rpc::RpcBlock, H256}, - jsonrpsee::core::client::{Subscription, SubscriptionClientT}, + ext::types::{rpc::RpcBlock, H256}, + jsonrpsee::core::client::{error::Error as RpcError, Subscription}, + EthereumPubSub, }; -use std::{cmp::Ordering, pin::Pin, task::Poll}; +use std::{pin::Pin, task::Poll}; -// Maximum number of failures in sequence before closing the stream -const FAILURE_THRESHOLD: u32 = 10; +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NewBlock { + NewHead(SealedBlock), + Finalized(SealedBlock), +} + +impl NewBlock { + #[must_use] + pub const fn new_head(block: SealedBlock) -> Self { + Self::NewHead(block) + } + + #[must_use] + pub const fn new_finalized(block: SealedBlock) -> Self { + Self::Finalized(block) + } -pub struct EthereumEventStream<'a, P: SubscriptionClientT + Send + Sync + 'static> { - /// Ethereum subscription for new heads - new_head_stream: Option>>, + #[must_use] + pub fn into_sealed_block(self) -> SealedBlock { + match self { + Self::Finalized(block) | Self::NewHead(block) => block, + } + } + + #[must_use] + pub const fn sealed_block(&self) -> &SealedBlock { + match self { + Self::Finalized(block) | Self::NewHead(block) => block, + } + } +} + +impl From for SealedBlock { + fn from(new_block: NewBlock) -> Self { + match new_block { + NewBlock::Finalized(block) | NewBlock::NewHead(block) => block, + } + } +} + +pub struct EthereumEventStream +where + C: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + C::SubscriptionError: Send + Sync, +{ + /// Latest block stream + new_head_stream: Option>, /// Finalized blocks stream - finalized_stream: Option>, - /// Count the number of failed attempts to retrieve the latest block - failures: u32, + finalized_stream: Option>, } -impl

EthereumEventStream<'_, P> +impl EthereumEventStream where - P: SubscriptionClientT + Send + Sync + 'static, + C: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + C::SubscriptionError: Send + Sync, { - pub fn new( - client: &EthereumClient

, - subscription: Subscription>, - ) -> EthereumEventStream<'_, P> { - EthereumEventStream { - new_head_stream: Some(subscription), + pub fn new(client: C) -> Self { + Self { + new_head_stream: Some(NewHeadsStream::new(client.clone())), finalized_stream: Some(FinalizedBlockStream::new(client)), - failures: 0, } } } -impl

Stream for EthereumEventStream<'_, P> +impl Stream for EthereumEventStream where - P: SubscriptionClientT + Send + Sync + 'static, + C: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + C::SubscriptionError: Send + Sync, { - type Item = ClientEvent; + type Item = NewBlock; fn poll_next( mut self: Pin<&mut Self>, @@ -53,19 +106,9 @@ where // Poll the finalized block stream match finalized_stream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(block))) => { + Poll::Ready(Some(block)) => { self.finalized_stream = Some(finalized_stream); - - return Poll::Ready(Some(ClientEvent::NewFinalized( - BlockOrIdentifier::Identifier(BlockIdentifier::new( - block.header().header().number, - block.header().hash().0, - )), - ))); - }, - Poll::Ready(Some(Err(error))) => { - self.new_head_stream = None; - return Poll::Ready(Some(ClientEvent::Close(error))); + return Poll::Ready(Some(NewBlock::new_finalized(block))); }, Poll::Ready(None) => { self.new_head_stream = None; @@ -82,183 +125,19 @@ where return Poll::Ready(None); }; - loop { - if self.failures >= FAILURE_THRESHOLD { - self.new_head_stream = None; + match new_head_stream.poll_next_unpin(cx) { + Poll::Ready(Some(block)) => { + self.new_head_stream = Some(new_head_stream); + Poll::Ready(Some(NewBlock::new_head(block))) + }, + Poll::Ready(None) => { self.finalized_stream = None; - return Poll::Ready(Some(ClientEvent::Close( - "More than 10 failures in sequence".into(), - ))); - } - - match new_head_stream.poll_next_unpin(cx) { - Poll::Ready(Some(block)) => { - // Convert raw block to block identifier - let block = match block { - Ok(block) => { - let header = if let Some(hash) = block.hash { - block.header.seal(hash) - } else { - block.header.seal_slow::() - }; - BlockIdentifier::new(header.number(), header.hash().0) - }, - Err(error) => { - self.failures += 1; - println!("[RPC BUG] invalid latest block: {error}"); - tracing::error!("[RPC BUG] invalid latest block: {error}"); - continue; - }, - }; - - // Reset failure counter - self.failures = 0; - - // Store the new latest block - if let Some(finalized_stream) = self.finalized_stream.as_mut() { - finalized_stream.update_latest_block(block.index); - } - - self.new_head_stream = Some(new_head_stream); - return Poll::Ready(Some(ClientEvent::NewHead(BlockOrIdentifier::Identifier( - block, - )))); - }, - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => { - self.new_head_stream = Some(new_head_stream); - break Poll::Pending; - }, - }; - } - } -} - -struct FinalizedBlockStream<'a, P> -where - P: SubscriptionClientT + Send + Sync + 'static, -{ - /// Ethereum client used to retrieve the finalized block - client: &'a EthereumClient

, - /// Cache the latest block, used for retrieve the latest finalized block - /// see [`BlockFinalityStrategy`] - latest_block: Option, - /// Ethereum client doesn't support subscribing for finalized blocks, as workaround - /// everytime we receive a new head, we query the latest finalized block - future: Option>>, - /// Cache the best finalized block, we use this to avoid emitting two - /// [`ClientEvent::NewFinalized`] for the same block - best_finalized_block: Option, - /// Count the number of failed attempts to retrieve the finalized block - failures: u32, - /// Waker used to wake up the stream when a new block is available - waker: Option, -} - -impl<'a, P> FinalizedBlockStream<'a, P> -where - P: SubscriptionClientT + Send + Sync + 'static, -{ - pub fn new(client: &EthereumClient

) -> FinalizedBlockStream<'_, P> { - FinalizedBlockStream { - client, - latest_block: None, - future: None, - best_finalized_block: None, - failures: 0, - waker: None, - } - } - - pub fn update_latest_block(&mut self, number: u64) { - if Some(number) == self.latest_block { - return; - } - self.latest_block = Some(number); - if self.future.is_none() { - self.future = Some(self.finalized_block()); - } - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } - - fn finalized_block<'c>(&'c self) -> BoxFuture<'a, anyhow::Result> { - self.client.finalized_block(self.latest_block).boxed() - } -} - -impl

Stream for FinalizedBlockStream<'_, P> -where - P: SubscriptionClientT + Send + Sync + 'static, -{ - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - loop { - // Check the failure count - match self.failures.cmp(&FAILURE_THRESHOLD) { - Ordering::Greater => return Poll::Ready(None), - Ordering::Equal => { - self.failures += 1; - self.future = None; - return Poll::Ready(Some(Err(format!( - "More than {FAILURE_THRESHOLD} failures in sequence", - )))); - }, - Ordering::Less => {}, - } - - // If the future is not ready, store the waker and return pending - let Some(mut future) = self.future.take() else { - self.waker = Some(cx.waker().clone()); - return Poll::Pending; - }; - - match future.poll_unpin(cx) { - Poll::Ready(Ok(block)) => { - // Store the waker - self.waker = Some(cx.waker().clone()); - - // Skip if the finalized block is equal to the best finalized block - if let Some(best_finalized_block) = self.best_finalized_block.take() { - if block.header().hash() == best_finalized_block.header().hash() { - self.best_finalized_block = Some(best_finalized_block); - break Poll::Pending; - } - tracing::debug!( - "new finalized block {} {:?}", - block.header().number(), - block.header().hash() - ); - } - - // Cache the new best finalized block - self.best_finalized_block = Some(block.clone()); - - // Return the best finalized block - break Poll::Ready(Some(Ok(block))); - }, - Poll::Ready(Err(error)) => { - // Increment failure count - self.failures += 1; - tracing::error!( - "failed to retrieve finalized block: {error:?} {}", - self.failures - ); - - // Retry to retrieve the latest finalized block. - self.future = Some(self.finalized_block()); - continue; - }, - Poll::Pending => { - self.future = Some(future); - break Poll::Pending; - }, - } + Poll::Ready(None) + }, + Poll::Pending => { + self.new_head_stream = Some(new_head_stream); + Poll::Pending + }, } } } diff --git a/chains/ethereum/server/src/finalized_block_stream.rs b/chains/ethereum/server/src/finalized_block_stream.rs index 466b0998..ef3e7d08 100644 --- a/chains/ethereum/server/src/finalized_block_stream.rs +++ b/chains/ethereum/server/src/finalized_block_stream.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use crate::utils::{EthereumRpcExt, PartialBlock}; use futures_timer::Delay; use futures_util::{future::BoxFuture, FutureExt, Stream}; @@ -41,9 +40,6 @@ struct Statistics { /// Latest known finalized block. best_finalized_block: Option

, - /// required number of successful polls before starting to adjust the polling interval. - probation_period: u32, - /// Incremented the best finalized block is parent of the new block. /// Ex: if the best known finalized block is 100, and the new block is 101. new: u32, @@ -91,7 +87,13 @@ impl Statistics { self.new += 1; true } else { - let gap_size = i32::try_from(new_block.number - expected).unwrap_or(1); + debug_assert!( + new_block.number > expected, + "Non monotonically increasing finalized block number" + ); + // Cap the gap_size to `ADJUST_THRESHOLD`. + let gap_size = + i32::try_from(new_block.number - expected).unwrap_or(1).min(ADJUST_THRESHOLD); self.gaps += 1; self.adjust_threshold -= gap_size; true @@ -149,7 +151,6 @@ where backend, statistics: Statistics { best_finalized_block: None, - probation_period: 0, new: 0, duplicated: 0, gaps: 0, diff --git a/chains/ethereum/server/src/lib.rs b/chains/ethereum/server/src/lib.rs index 82ff9e18..650cf95e 100644 --- a/chains/ethereum/server/src/lib.rs +++ b/chains/ethereum/server/src/lib.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use block_stream::BlockStream; pub use client::EthereumClient; pub use rosetta_config_ethereum::{ EthereumMetadata, EthereumMetadataParams, Event, Query as EthQuery, QueryItem, @@ -9,6 +10,7 @@ use rosetta_core::{ types::{BlockIdentifier, PartialBlockIdentifier}, BlockchainClient, BlockchainConfig, }; +use rosetta_ethereum_backend::jsonrpsee::Adapter; use rosetta_server::ws::{default_client, default_http_client, DefaultClient, HttpClient}; use url::Url; @@ -20,6 +22,7 @@ mod log_filter; mod multi_block; mod new_heads; mod proof; +mod shared_stream; mod state; mod utils; @@ -34,6 +37,7 @@ pub mod config { #[doc(hidden)] pub mod ext { pub use anyhow; + pub use futures_util; pub use rosetta_config_ethereum as config; pub use rosetta_core as core; pub use rosetta_ethereum_backend as backend; @@ -77,9 +81,11 @@ impl MaybeWsEthereumClient { ) -> Result { let uri = Url::parse(addr.as_ref())?; if uri.scheme() == "ws" || uri.scheme() == "wss" { + tracing::trace!("Initializing Ethereum client with Websocket at {uri}"); let client = default_client(uri.as_str(), None).await?; Self::from_jsonrpsee(config, client, private_key).await } else { + tracing::trace!("Initializing Ethereum client with Http at {uri}"); let http_connection = default_http_client(uri.as_str())?; // let http_connection = Http::new(uri); let client = EthereumClient::new(config, http_connection, private_key).await?; @@ -106,7 +112,7 @@ impl MaybeWsEthereumClient { impl BlockchainClient for MaybeWsEthereumClient { type MetadataParams = EthereumMetadataParams; type Metadata = EthereumMetadata; - type EventStream<'a> = EthereumEventStream<'a, DefaultClient>; + type EventStream<'a> = shared_stream::SharedStream>> where Self: 'a; type Call = EthQuery; type CallResult = EthQueryResult; diff --git a/chains/ethereum/server/src/logs_stream.rs b/chains/ethereum/server/src/logs_stream.rs index c05a8d7f..6533cb79 100644 --- a/chains/ethereum/server/src/logs_stream.rs +++ b/chains/ethereum/server/src/logs_stream.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use futures_util::{future::BoxFuture, Stream, StreamExt}; use rosetta_ethereum_backend::{ ext::types::{crypto::DefaultCrypto, rpc::RpcBlock, AtBlock, SealedBlock, H256}, diff --git a/chains/ethereum/server/src/multi_block.rs b/chains/ethereum/server/src/multi_block.rs index 0c0d08f0..ef48e3a4 100644 --- a/chains/ethereum/server/src/multi_block.rs +++ b/chains/ethereum/server/src/multi_block.rs @@ -1,10 +1,10 @@ -#![allow(dead_code)] use std::{ cmp::Ordering, hash::{Hash, Hasher}, }; use crate::utils::{FullBlock, PartialBlock}; +use rosetta_core::{types::BlockIdentifier, BlockOrIdentifier}; use rosetta_ethereum_backend::ext::types::{ crypto::DefaultCrypto, Header, SealedHeader, H256, U256, }; @@ -183,3 +183,42 @@ impl Ord for BlockRef { } } } + +impl From<&'_ MultiBlock> for BlockRef { + fn from(block: &'_ MultiBlock) -> Self { + block.as_block_ref() + } +} + +impl From<&'_ SealedHeader> for BlockRef { + fn from(block: &'_ SealedHeader) -> Self { + Self { number: block.number(), hash: block.hash() } + } +} + +impl From<&'_ PartialBlock> for BlockRef { + fn from(block: &'_ PartialBlock) -> Self { + Self::from(block.header()) + } +} + +impl From<&'_ FullBlock> for BlockRef { + fn from(block: &'_ FullBlock) -> Self { + Self::from(block.header()) + } +} + +impl From<&'_ BlockIdentifier> for BlockRef { + fn from(identifier: &'_ BlockIdentifier) -> Self { + Self { number: identifier.index, hash: H256(identifier.hash) } + } +} + +impl From<&'_ BlockOrIdentifier> for BlockRef { + fn from(identifier: &'_ BlockOrIdentifier) -> Self { + match identifier { + BlockOrIdentifier::Identifier(id) => Self::from(id), + BlockOrIdentifier::Block(block) => Self::from(&block.block_identifier), + } + } +} diff --git a/chains/ethereum/server/src/new_heads.rs b/chains/ethereum/server/src/new_heads.rs index 8b42fc73..080cbc42 100644 --- a/chains/ethereum/server/src/new_heads.rs +++ b/chains/ethereum/server/src/new_heads.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; use rosetta_ethereum_backend::{ ext::types::{crypto::DefaultCrypto, rpc::RpcBlock, AtBlock, SealedBlock, H256}, @@ -26,20 +25,22 @@ struct PollLatestBlock(RPC); impl FutureFactory for PollLatestBlock where RPC: EthereumRpc + Send + Sync + 'static, + RPC::Error: Send + Sync, { - type Output = Result, RpcError>; + type Output = Result, ::Error>; type Future<'a> = BoxFuture<'a, Self::Output>; fn new_future(&mut self) -> Self::Future<'_> { async move { let Some(block) = self.0.block(AtBlock::Latest).await? else { return Ok(None); }; - let Some(hash) = block.hash else { - return Err(RpcError::Custom( - "[report this bug] the api returned the latest block without hash".to_string(), - )); + let block = if let Some(hash) = block.hash { + block.seal(hash) + } else { + tracing::warn!("[report this bug] the api returned the latest block without hash, computing block hash manually"); + block.seal_slow::() }; - Ok(Some(block.seal(hash))) + Ok(Some(block)) } .boxed() } @@ -88,6 +89,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { Subscription(AutoSubscribe, NewHeadsSubscriber>), Polling(CircuitBreaker>, ()>), @@ -101,6 +103,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { #[must_use] pub const fn new(backend: RPC) -> Self { @@ -117,6 +120,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { /// Subscription or Polling to new block headers. state: State, @@ -135,6 +139,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { #[must_use] pub const fn new(backend: RPC) -> Self { @@ -149,6 +154,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { type Item = PartialBlock; diff --git a/chains/ethereum/server/src/shared_stream.rs b/chains/ethereum/server/src/shared_stream.rs new file mode 100644 index 00000000..ce68385d --- /dev/null +++ b/chains/ethereum/server/src/shared_stream.rs @@ -0,0 +1,180 @@ +use futures_util::{future::Shared, Future, FutureExt, Stream, StreamExt}; +use std::{ + pin::Pin, + sync::{Arc, Weak}, + task::{Context, Poll}, +}; +use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; + +pub struct SharedStream +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + inner: Inner, + stream: Option::Item>>, +} + +impl SharedStream +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + #[must_use] + pub fn new(stream: T, capacity: usize) -> Self { + let (tx, rx) = tokio::sync::broadcast::channel::<::Item>(capacity); + let inner = Inner::new(stream, tx); + Self { inner, stream: Some(BroadcastStream::new(rx)) } + } +} + +impl Stream for SharedStream +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(mut stream) = self.stream.take() else { + panic!("stream polled after completion"); + }; + + // Poll the transmitter + match self.inner.future.poll_unpin(cx) { + Poll::Ready(()) => return Poll::Ready(None), + Poll::Pending => {}, + } + + // Poll the receiver + loop { + match stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(value))) => { + self.stream = Some(stream); + break Poll::Ready(Some(value)); + }, + Poll::Ready(Some(Err(value))) => match value { + BroadcastStreamRecvError::Lagged(gap) => { + tracing::warn!("broadcast stream lagged by {gap} messages"); + continue; + }, + }, + Poll::Ready(None) => { + // Stream has ended + break Poll::Ready(None); + }, + Poll::Pending => { + self.stream = Some(stream); + break Poll::Pending; + }, + } + } + } +} + +impl Clone for SharedStream +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + stream: self.inner.outbound_channel.upgrade().map(|channel| { + let receiver = channel.subscribe(); + BroadcastStream::new(receiver) + }), + } + } +} + +struct Inner +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + future: Shared>, + /// Map of listener IDs to their respective channels + outbound_channel: Weak::Item>>, +} + +impl Inner +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + #[must_use] + pub fn new( + stream: T, + outbound_channel: tokio::sync::broadcast::Sender<::Item>, + ) -> Self { + let outbound_channel = Arc::new(outbound_channel); + let outbound_channel_ref = Arc::downgrade(&outbound_channel); + let future = BroadcastFuture { stream, outbound_channel: Some(outbound_channel) }; + Self { future: future.shared(), outbound_channel: outbound_channel_ref } + } +} + +impl Clone for Inner +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + future: Shared::clone(&self.future), + outbound_channel: self.outbound_channel.clone(), + } + } +} + +#[pin_project::pin_project] +struct BroadcastFuture +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + #[pin] + stream: T, + /// Map of listener IDs to their respective channels + outbound_channel: Option::Item>>>, +} + +impl Future for BroadcastFuture +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + // Check if the stream has ended + let Some(outbound_channel) = this.outbound_channel.take() else { + panic!("future polled after completion"); + }; + + // Poll the stream + loop { + match this.stream.poll_next_unpin(cx) { + Poll::Ready(Some(value)) => { + // Broadcast the message to all listeners + // SAFETY: this should never happen, there must be always at least one listener + assert!( + outbound_channel.send(value).is_ok(), + "[report this bug] failed to broadcast message, no one is listening." + ); + }, + Poll::Ready(None) => { + // Stream has ended + break Poll::Ready(()); + }, + Poll::Pending => { + *this.outbound_channel = Some(outbound_channel); + break Poll::Pending; + }, + } + } + } +} diff --git a/chains/ethereum/server/src/state.rs b/chains/ethereum/server/src/state.rs index c7d9bbb8..99bf6269 100644 --- a/chains/ethereum/server/src/state.rs +++ b/chains/ethereum/server/src/state.rs @@ -1,5 +1,7 @@ -#![allow(dead_code)] -use std::collections::{BTreeMap, VecDeque}; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::{Arc, RwLock}, +}; use crate::multi_block::{BlockRef, MultiBlock}; use fork_tree::FinalizationResult; @@ -8,17 +10,40 @@ use rosetta_config_ethereum::ext::types::H256; type ForkTree = fork_tree::ForkTree; -/// Maximum number of blocks that can be skipped when importing a block -const MAX_BLOCK_GAP: u64 = 1000; - #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum Error { #[error("block not found: {0}")] BlockNotFound(H256), } -/// Manages the client state +#[derive(Debug, Clone)] pub struct State { + inner: Arc>, +} + +impl State { + pub fn new>(best_finalized_block: B) -> Self { + Self { inner: Arc::new(RwLock::new(StateInner::new(best_finalized_block))) } + } + + pub fn import>(&self, block: B) -> Result<(), fork_tree::Error> { + #[allow(clippy::unwrap_used)] + self.inner.write().unwrap().import(block) + } + + pub fn finalize>( + &self, + finalized_block_ref: B, + ) -> Result, fork_tree::Error> { + let finalized_block_ref = finalized_block_ref.into(); + #[allow(clippy::unwrap_used)] + self.inner.write().unwrap().finalize(finalized_block_ref) + } +} + +/// Manages the client state +#[derive(Debug, PartialEq)] +struct StateInner { /// Map of block hashes to their full block data blocks: HashMap, /// Maps an orphan block to missing block @@ -29,10 +54,12 @@ pub struct State { fork_tree: ForkTree, /// List of finalized finalized blocks finalized_blocks: VecDeque, + /// latest known block + latest_block: BlockRef, } -impl State { - pub fn new>(best_finalized_block: B) -> Self { +impl StateInner { + fn new>(best_finalized_block: B) -> Self { let best_finalized_block = best_finalized_block.into(); let best_finalized_block_ref = best_finalized_block.as_block_ref(); let best_finalized_block_parent = best_finalized_block.parent_hash(); @@ -69,6 +96,7 @@ impl State { missing: HashMap::new(), fork_tree, finalized_blocks, + latest_block: best_finalized_block_ref, } } @@ -85,46 +113,8 @@ impl State { Ok(()) } - fn insert_orphan_block( - &mut self, - block: MultiBlock, - mut children: BTreeMap, - ) { - // Add block to the orphan list - let missing_ref = if let Some(parent_ref) = self.orphans.get(&block.parent_ref()).copied() { - self.orphans.insert(block.as_block_ref(), parent_ref); - parent_ref - } else { - let parent_ref = block.parent_ref(); - self.orphans.insert(block.as_block_ref(), parent_ref); - parent_ref - }; - - // Update children missing references - for child_ref in children.keys().copied() { - self.orphans.insert(child_ref, missing_ref); - } - - // Add block to the orphan list - match self.missing.entry(missing_ref) { - Entry::Occupied(mut entry) => { - let orphans = entry.get_mut(); - if let Some(cached) = orphans.get_mut(&block.as_block_ref()) { - cached.upgrade(block); - } else { - orphans.insert(block.as_block_ref(), block); - } - orphans.extend(children); - }, - Entry::Vacant(entry) => { - children.insert(block.as_block_ref(), block); - entry.insert(children); - }, - } - } - #[allow(clippy::too_many_lines)] - pub fn import>(&mut self, block: B) -> Result<(), fork_tree::Error> { + fn import>(&mut self, block: B) -> Result<(), fork_tree::Error> { let block = block.into(); // Check if the block is already in the cache, if so, update it @@ -234,7 +224,7 @@ impl State { Ok(()) } - pub fn finalize( + fn finalize( &mut self, finalized_block_ref: BlockRef, ) -> Result, fork_tree::Error> { @@ -373,7 +363,7 @@ mod tests { // // (where N is not a part of fork tree) let block_a = create_block(H256::zero(), 1, 1); - let mut state = State::new(block_a.clone()); + let state = State::new(block_a.clone()); let block_b = create_block(block_a.hash(), 2, 2); let block_c = create_block(block_b.hash(), 3, 3); let block_d = create_block(block_c.hash(), 4, 4); @@ -436,7 +426,7 @@ mod tests { // // (where N is not a part of fork tree) let block_a = create_block(H256::zero(), 1, 1); - let mut state = State::new(block_a.clone()); + let state = State::new(block_a.clone()); let block_b = create_block(block_a.hash(), 2, 2); let block_c = create_block(block_b.hash(), 3, 3); let block_d = create_block(block_c.hash(), 4, 4); @@ -476,8 +466,13 @@ mod tests { state.import(block).unwrap(); } - assert_eq!(state.orphans.len(), 4); - assert_eq!(state.missing.len(), 1); + #[allow(clippy::significant_drop_tightening)] + { + let inner = state.inner.read().unwrap(); + assert_eq!(inner.orphans.len(), 4); + assert_eq!(inner.missing.len(), 1); + drop(inner); + } // Finalize block A let retracted = state.finalize(block_a.as_block_ref()).unwrap(); diff --git a/deny.toml b/deny.toml index 0cce99c9..6bcefd73 100644 --- a/deny.toml +++ b/deny.toml @@ -1,3 +1,4 @@ +[graph] # cargo-deny is really only ever intended to run on the "normal" tier-1 targets targets = [ { triple = "x86_64-unknown-linux-gnu" }, @@ -6,8 +7,7 @@ targets = [ ] [licenses] # ----------------------------------------------------------------- # -# The lint level for crates which do not have a detectable license -unlicensed = "deny" +version = 2 # List of explicitly allowed licenses # See https://spdx.org/licenses/ for list of possible licenses @@ -25,27 +25,10 @@ allow = [ "Unicode-DFS-2016", ] -# Lint level for licenses considered copyleft -copyleft = "deny" -# Blanket approval or denial for OSI-approved or FSF Free/Libre licenses -# * both - The license will be approved if it is both OSI-approved *AND* FSF -# * either - The license will be approved if it is either OSI-approved *OR* FSF -# * osi-only - The license will be approved if is OSI-approved *AND NOT* FSF -# * fsf-only - The license will be approved if is FSF *AND NOT* OSI-approved -# * neither - This predicate is ignored and the default lint level is used - -allow-osi-fsf-free = "neither" -# Lint level used when no other predicates are matched -# 1. License isn't in the allow or deny lists -# 2. License isn't copyleft -# 3. License isn't OSI/FSF, or allow-osi-fsf-free = "neither" - -default = "deny" # The confidence threshold for detecting a license from license text. # The higher the value, the more closely the license text must be to the # canonical license text of a valid SPDX license file. # [possible values: any between 0.0 and 1.0]. - confidence-threshold = 0.9 # Allow 1 or more licenses on a per-crate basis, so that particular licenses # aren't accepted for every possible crate as with the normal allow list @@ -90,34 +73,31 @@ wildcards = "allow" highlight = "all" [advisories] # --------------------------------------------------------------- # +version = 2 + # The path where the advisory database is cloned/fetched into db-path = "~/.cargo/advisory-db" # The url(s) of the advisory databases to use db-urls = ["https://github.com/rustsec/advisory-db"] -# The lint level for security vulnerabilities -vulnerability = "deny" - -# The lint level for unmaintained crates -unmaintained = "deny" - # The lint level for crates that have been yanked from their source registry yanked = "deny" -# The lint level for crates with security notices. Note that as of -# 2019-12-17 there are no security notice advisories in -# https://github.com/rustsec/advisory-db -notice = "deny" - # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ - 'RUSTSEC-2021-0060', # Create `aes-soft` has been merged into the `aes` crate - 'RUSTSEC-2021-0064', # Crate `cpuid-bool` has been renamed to `cpufeatures` - 'RUSTSEC-2021-0139', # ansi_term is Unmaintained - 'RUSTSEC-2022-0093', # related issue: https://github.com/Analog-Labs/chain-connectors/issues/162 - 'RUSTSEC-2024-0344', # Timing variabilit on curve25519-dalek, which can potentially leak private keys + # Create `aes-soft` has been merged into the `aes` crate + { id = 'RUSTSEC-2021-0060', reason = "Will be fixed in a future PR" }, + + # Crate `cpuid-bool` has been renamed to `cpufeatures` + { id = 'RUSTSEC-2021-0064', reason = "Will be fixed in a future PR" }, + + # ansi_term is Unmaintained + { id = 'RUSTSEC-2021-0139', reason = "Will be fixed in a future PR" }, + + # Timing variabilit on curve25519-dalek, which can potentially leak private keys + { id = 'RUSTSEC-2024-0344', reason = "Waiting for third-part libraries to update to the fixed version" }, ] # This section is considered when running `cargo deny check sources`. diff --git a/rosetta-core/Cargo.toml b/rosetta-core/Cargo.toml index 8a79d25e..6b39d82b 100644 --- a/rosetta-core/Cargo.toml +++ b/rosetta-core/Cargo.toml @@ -9,6 +9,7 @@ description = "Provides traits and definitions shared by the server and client c [dependencies] anyhow = "1.0" async-trait = "0.1" +const-hex = { version = "1.9", default-features = false, features = ["alloc"] } fluent-uri = "0.1" futures-util = "0.3" rosetta-crypto.workspace = true diff --git a/rosetta-core/src/lib.rs b/rosetta-core/src/lib.rs index a393bfb0..55528713 100644 --- a/rosetta-core/src/lib.rs +++ b/rosetta-core/src/lib.rs @@ -13,7 +13,10 @@ use anyhow::Result; use async_trait::async_trait; pub use futures_util::{future, stream}; use serde::{de::DeserializeOwned, Serialize}; -use std::sync::Arc; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use futures_util::stream::Empty; pub use node_uri::{NodeUri, NodeUriError}; @@ -41,7 +44,7 @@ pub struct BlockchainConfig { pub testnet: bool, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq)] pub enum BlockOrIdentifier { Identifier(ID), Block(Block), @@ -73,6 +76,30 @@ impl From for BlockOrIdentifier { } } +impl Debug for BlockOrIdentifier +where + ID: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Identifier(id) => Debug::fmt(id, f), + Self::Block(block) => Debug::fmt(&block.block_identifier, f), + } + } +} + +impl Display for BlockOrIdentifier +where + ID: Display, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Identifier(id) => Display::fmt(id, f), + Self::Block(block) => Display::fmt(&block.block_identifier, f), + } + } +} + /// Event produced by a handler. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ClientEvent { diff --git a/rosetta-core/src/types.rs b/rosetta-core/src/types.rs index fbe645ae..008d7c8e 100644 --- a/rosetta-core/src/types.rs +++ b/rosetta-core/src/types.rs @@ -11,7 +11,7 @@ pub use rosetta_types::{ SignatureType, TransactionIdentifier, }; -use std::vec::Vec; +use std::{fmt::Display, vec::Vec}; /// Block : Blocks contain an array of Transactions that occurred at a particular `BlockIdentifier`. /// A hard requirement for blocks returned by Rosetta implementations is that they MUST be @@ -35,7 +35,7 @@ pub struct Block { } /// `BlockIdentifier` : The `block_identifier` uniquely identifies a block in a particular network. -#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, Default, Serialize, Deserialize)] pub struct BlockIdentifier { /// This is also known as the block height. #[serde(rename = "index")] @@ -54,6 +54,23 @@ impl BlockIdentifier { } } +impl Display for BlockIdentifier { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let hash_hex = const_hex::encode_prefixed(self.hash); + write!(f, "{}: {}", self.index, hash_hex) + } +} + +impl Debug for BlockIdentifier { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let hash_hex = const_hex::encode_prefixed(self.hash); + f.debug_struct("BlockIdentifier") + .field("index", &self.index) + .field("hash", &hash_hex) + .finish() + } +} + /// `PartialBlockIdentifier` : When fetching data by `BlockIdentifier`, it may be possible to only /// specify the index or hash. If neither property is specified, it is assumed that the client is /// making a request at the current block.