diff --git a/chains/ethereum/server/src/block_provider.rs b/chains/ethereum/server/src/block_provider.rs index 079a772c..9e5fea59 100644 --- a/chains/ethereum/server/src/block_provider.rs +++ b/chains/ethereum/server/src/block_provider.rs @@ -21,14 +21,17 @@ pub trait BlockProvider: Unpin { /// Error type type Error: Unpin + Send + Sync + 'static; /// Future type - type BlockAtFut: Future, Self::Error>> + type BlockAtFut: Future>, Self::Error>> + Unpin + Send + 'static; /// Future type - type LatestFut: Future> + Unpin + Send + 'static; + type LatestFut: Future, Self::Error>> + Unpin + Send + 'static; /// Future type - type FinalizedFut: Future> + Unpin + Send + 'static; + type FinalizedFut: Future, Self::Error>> + + Unpin + + Send + + 'static; /// Get block by identifier fn block_at(&self, block_ref: BlockIdentifier) -> Self::BlockAtFut; @@ -147,11 +150,11 @@ where /// Error type type Error = BlockProviderError; /// Future type - type BlockAtFut = BoxFuture<'static, Result, Self::Error>>; + type BlockAtFut = BoxFuture<'static, Result>, Self::Error>>; /// Future type - type LatestFut = BoxFuture<'static, Result>; + type LatestFut = BoxFuture<'static, Result, Self::Error>>; /// Future type - type FinalizedFut = BoxFuture<'static, Result>; + type FinalizedFut = BoxFuture<'static, Result, Self::Error>>; /// Get block by identifier fn block_at(&self, block_ref: BlockIdentifier) -> Self::BlockAtFut { @@ -160,7 +163,7 @@ where let maybe_block = retrieve_sealed_block(rpc, block_ref.into()) .await .map_err(BlockProviderError::Rpc)?; - Ok(maybe_block) + Ok(maybe_block.map(Arc::new)) } .boxed() } @@ -175,7 +178,7 @@ where else { return Err(BlockProviderError::LatestBlockNotFound); }; - Ok(latest_block) + Ok(Arc::new(latest_block)) } .boxed() } @@ -190,7 +193,7 @@ where else { return Err(BlockProviderError::FinalizedBlockNotFound); }; - Ok(best_block) + Ok(Arc::new(best_block)) } .boxed() } @@ -268,11 +271,11 @@ where /// Error type type Error = BlockProviderError; /// Future type - type BlockAtFut = BoxFuture<'static, Result, Self::Error>>; + type BlockAtFut = BoxFuture<'static, Result>, Self::Error>>; /// Future type - type LatestFut = BoxFuture<'static, Result>; + type LatestFut = BoxFuture<'static, Result, Self::Error>>; /// Future type - type FinalizedFut = BoxFuture<'static, Result>; + type FinalizedFut = BoxFuture<'static, Result, Self::Error>>; /// Get block by identifier fn block_at(&self, block_ref: BlockIdentifier) -> Self::BlockAtFut { @@ -282,21 +285,13 @@ where /// Retrieve the latest block fn latest(&self) -> Self::LatestFut { let this = self.inner.clone(); - async move { - let latest_block = this.latest_block().await?.as_ref().clone(); - Ok(latest_block) - } - .boxed() + async move { this.latest_block().await }.boxed() } /// Retrieve the latest finalized block, following the specified finality strategy fn finalized(&self) -> Self::FinalizedFut { let this = self.inner.clone(); - async move { - let best_block = this.best_block().await?.as_ref().clone(); - Ok(best_block) - } - .boxed() + async move { this.best_block().await }.boxed() } } diff --git a/chains/ethereum/server/src/finalized_block_stream.rs b/chains/ethereum/server/src/finalized_block_stream.rs index 9308af4b..3f50b13b 100644 --- a/chains/ethereum/server/src/finalized_block_stream.rs +++ b/chains/ethereum/server/src/finalized_block_stream.rs @@ -4,6 +4,7 @@ use futures_util::{future::BoxFuture, FutureExt, Stream}; use rosetta_ethereum_backend::ext::types::Header; use std::{ pin::Pin, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -129,10 +130,10 @@ pub struct FinalizedBlockStream { statistics: Statistics, /// Latest known finalized block and the timestamp when it was received. - best_finalized_block: Option<(PartialBlock, Instant)>, + best_finalized_block: Option<(Arc, Instant)>, /// State machine that controls fetching the latest finalized block. - state: Option>>, + state: Option, P::Error>>>, /// Count of consecutive errors. consecutive_errors: u32, @@ -205,7 +206,7 @@ where self.state = Some(StateMachine::Wait(Delay::new( self.statistics.polling_interval, ))); - return Poll::Ready(Some(block)); + return Poll::Ready(Some(block.as_ref().clone())); } self.consecutive_errors = 0; Some(StateMachine::Wait(Delay::new(self.statistics.polling_interval)))