Skip to content

Commit

Permalink
Feature/improve event stream (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lohann authored Jul 17, 2024
1 parent bc495ac commit dcab7f8
Show file tree
Hide file tree
Showing 16 changed files with 626 additions and 399 deletions.
132 changes: 67 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions chains/ethereum/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde.workspace = true
serde_json.workspace = true
thiserror = "1.0"
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tracing = "0.1"
url = "2.4"

Expand Down
94 changes: 89 additions & 5 deletions chains/ethereum/server/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,106 @@
#![allow(dead_code)]
use crate::{
finalized_block_stream::FinalizedBlockStream, new_heads::NewHeadsStream, state::State,
use super::{
event_stream::{EthereumEventStream, NewBlock},
state::State,
};
use futures_util::StreamExt;
use rosetta_config_ethereum::Event as EthEvent;
use rosetta_core::{stream::Stream, types::BlockIdentifier, BlockOrIdentifier, ClientEvent};
use rosetta_ethereum_backend::{
ext::types::{rpc::RpcBlock, H256},
jsonrpsee::core::{client::Subscription, ClientError as RpcError},
EthereumPubSub,
};
use std::{
pin::Pin,
task::{Context, Poll},
};

pub struct BlockStream<RPC>
where
RPC: for<'s> EthereumPubSub<Error = RpcError, NewHeadsStream<'s> = Subscription<RpcBlock<H256>>>
+ Clone
+ Unpin
+ Send
+ Sync
+ 'static,
RPC::SubscriptionError: Send + Sync,
{
finalized: FinalizedBlockStream<RPC>,
new_heads: NewHeadsStream<RPC>,
block_stream: Option<EthereumEventStream<RPC>>,
state: State,
}

impl<RPC> BlockStream<RPC>
where
RPC: for<'s> EthereumPubSub<Error = RpcError, NewHeadsStream<'s> = Subscription<RpcBlock<H256>>>
+ Clone
+ Unpin
+ Send
+ Sync
+ 'static,
RPC::SubscriptionError: Send + Sync,
{
#[must_use]
pub fn new(client: RPC, state: State) -> Self {
Self { block_stream: Some(EthereumEventStream::new(client)), state }
}
}

impl<RPC> Stream for BlockStream<RPC>
where
RPC: for<'s> EthereumPubSub<Error = RpcError, NewHeadsStream<'s> = Subscription<RpcBlock<H256>>>
+ Clone
+ Unpin
+ Send
+ Sync
+ 'static,
RPC::SubscriptionError: Send + Sync,
{
type Item = ClientEvent<BlockIdentifier, EthEvent>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Some(mut block_stream) = self.block_stream.take() else {
return Poll::Ready(None);
};

let mut failures = 0;
loop {
match block_stream.poll_next_unpin(cx) {
Poll::Ready(Some(new_block)) => {
let block_id = {
let header = new_block.sealed_block().header();
BlockOrIdentifier::Identifier(BlockIdentifier {
index: header.number(),
hash: header.hash().0,
})
};
let is_finalized = matches!(new_block, NewBlock::Finalized(_));
if let Err(err) = self.state.import(new_block.into_sealed_block()) {
failures += 1;
tracing::warn!("failed to import block {block_id} ({failures}): {:?}", err);
if failures >= 5 {
return Poll::Ready(None);
}
continue;
}

let event = if is_finalized {
ClientEvent::NewFinalized(block_id)
} else {
ClientEvent::NewHead(block_id)
};
self.block_stream = Some(block_stream);
break Poll::Ready(Some(event));
},
Poll::Ready(None) => break Poll::Ready(None),
Poll::Pending => {
self.block_stream = Some(block_stream);
break Poll::Pending;
},
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
24 changes: 17 additions & 7 deletions chains/ethereum/server/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#![allow(clippy::option_if_let_else)]
use crate::{
event_stream::EthereumEventStream,
block_stream::BlockStream,
log_filter::LogFilter,
proof::verify_proof,
shared_stream::SharedStream,
state::State,
utils::{
AtBlockExt, DefaultFeeEstimatorConfig, EthereumRpcExt, PartialBlock,
PolygonFeeEstimatorConfig,
},
};
use anyhow::{Context, Result};
use futures_util::StreamExt;
use rosetta_config_ethereum::{
ext::types::{
crypto::{Crypto, DefaultCrypto, Keypair, Signer},
Expand All @@ -27,14 +30,14 @@ use rosetta_config_ethereum::{
use rosetta_core::{
crypto::{address::Address, PublicKey},
types::{BlockIdentifier, PartialBlockIdentifier},
BlockchainConfig,
BlockchainConfig, ClientEvent,
};
use rosetta_ethereum_backend::{
jsonrpsee::{
core::client::{ClientT, SubscriptionClientT},
Adapter,
},
BlockRange, EthereumPubSub, EthereumRpc, ExitReason,
BlockRange, EthereumRpc, ExitReason,
};
use std::sync::{
atomic::{self, Ordering},
Expand Down Expand Up @@ -73,6 +76,7 @@ pub struct EthereumClient<P> {
nonce: Arc<std::sync::atomic::AtomicU64>,
private_key: Option<[u8; 32]>,
log_filter: Arc<std::sync::Mutex<LogFilter>>,
// event_stream: SharedStream<BlockStream<Adapter<P>>>
}

impl<P> Clone for EthereumClient<P>
Expand Down Expand Up @@ -516,11 +520,17 @@ where

impl<P> EthereumClient<P>
where
P: SubscriptionClientT + Send + Sync + 'static,
P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static,
{
#[allow(clippy::missing_errors_doc)]
pub async fn listen(&self) -> Result<EthereumEventStream<'_, P>> {
let new_heads = EthereumPubSub::new_heads(&self.backend).await?;
Ok(EthereumEventStream::new(self, new_heads))
pub async fn listen(&self) -> Result<SharedStream<BlockStream<Adapter<P>>>> {
let best_finalized_block = self.finalized_block(None).await?;
let mut stream = BlockStream::new(self.backend.clone(), State::new(best_finalized_block));
match stream.next().await {
Some(ClientEvent::Close(msg)) => anyhow::bail!(msg),
None => anyhow::bail!("Failed to open the event stream"),
Some(_) => {},
}
Ok(SharedStream::new(stream, 100))
}
}
Loading

0 comments on commit dcab7f8

Please sign in to comment.