Skip to content

Commit

Permalink
Use Arc to avoid cloning the block
Browse files Browse the repository at this point in the history
  • Loading branch information
Lohann committed Sep 2, 2024
1 parent a4a0bb0 commit 2ebd422
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 25 deletions.
39 changes: 17 additions & 22 deletions chains/ethereum/server/src/block_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ pub trait BlockProvider: Unpin {
/// Error type
type Error: Unpin + Send + Sync + 'static;
/// Future type
type BlockAtFut: Future<Output = Result<Option<PartialBlock>, Self::Error>>
type BlockAtFut: Future<Output = Result<Option<Arc<PartialBlock>>, Self::Error>>
+ Unpin
+ Send
+ 'static;
/// Future type
type LatestFut: Future<Output = Result<PartialBlock, Self::Error>> + Unpin + Send + 'static;
type LatestFut: Future<Output = Result<Arc<PartialBlock>, Self::Error>> + Unpin + Send + 'static;
/// Future type
type FinalizedFut: Future<Output = Result<PartialBlock, Self::Error>> + Unpin + Send + 'static;
type FinalizedFut: Future<Output = Result<Arc<PartialBlock>, Self::Error>>
+ Unpin
+ Send
+ 'static;

/// Get block by identifier
fn block_at(&self, block_ref: BlockIdentifier) -> Self::BlockAtFut;
Expand Down Expand Up @@ -147,11 +150,11 @@ where
/// Error type
type Error = BlockProviderError<RPC::Error>;
/// Future type
type BlockAtFut = BoxFuture<'static, Result<Option<PartialBlock>, Self::Error>>;
type BlockAtFut = BoxFuture<'static, Result<Option<Arc<PartialBlock>>, Self::Error>>;
/// Future type
type LatestFut = BoxFuture<'static, Result<PartialBlock, Self::Error>>;
type LatestFut = BoxFuture<'static, Result<Arc<PartialBlock>, Self::Error>>;
/// Future type
type FinalizedFut = BoxFuture<'static, Result<PartialBlock, Self::Error>>;
type FinalizedFut = BoxFuture<'static, Result<Arc<PartialBlock>, Self::Error>>;

/// Get block by identifier
fn block_at(&self, block_ref: BlockIdentifier) -> Self::BlockAtFut {
Expand All @@ -160,7 +163,7 @@ where
let maybe_block = retrieve_sealed_block(rpc, block_ref.into())
.await
.map_err(BlockProviderError::Rpc)?;
Ok(maybe_block)
Ok(maybe_block.map(Arc::new))
}
.boxed()
}
Expand All @@ -175,7 +178,7 @@ where
else {
return Err(BlockProviderError::LatestBlockNotFound);
};
Ok(latest_block)
Ok(Arc::new(latest_block))
}
.boxed()
}
Expand All @@ -190,7 +193,7 @@ where
else {
return Err(BlockProviderError::FinalizedBlockNotFound);
};
Ok(best_block)
Ok(Arc::new(best_block))
}
.boxed()
}
Expand Down Expand Up @@ -268,11 +271,11 @@ where
/// Error type
type Error = BlockProviderError<RPC::Error>;
/// Future type
type BlockAtFut = BoxFuture<'static, Result<Option<PartialBlock>, Self::Error>>;
type BlockAtFut = BoxFuture<'static, Result<Option<Arc<PartialBlock>>, Self::Error>>;
/// Future type
type LatestFut = BoxFuture<'static, Result<PartialBlock, Self::Error>>;
type LatestFut = BoxFuture<'static, Result<Arc<PartialBlock>, Self::Error>>;
/// Future type
type FinalizedFut = BoxFuture<'static, Result<PartialBlock, Self::Error>>;
type FinalizedFut = BoxFuture<'static, Result<Arc<PartialBlock>, Self::Error>>;

/// Get block by identifier
fn block_at(&self, block_ref: BlockIdentifier) -> Self::BlockAtFut {
Expand All @@ -282,21 +285,13 @@ where
/// Retrieve the latest block
fn latest(&self) -> Self::LatestFut {
let this = self.inner.clone();
async move {
let latest_block = this.latest_block().await?.as_ref().clone();
Ok(latest_block)
}
.boxed()
async move { this.latest_block().await }.boxed()
}

/// Retrieve the latest finalized block, following the specified finality strategy
fn finalized(&self) -> Self::FinalizedFut {
let this = self.inner.clone();
async move {
let best_block = this.best_block().await?.as_ref().clone();
Ok(best_block)
}
.boxed()
async move { this.best_block().await }.boxed()
}
}

Expand Down
7 changes: 4 additions & 3 deletions chains/ethereum/server/src/finalized_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures_util::{future::BoxFuture, FutureExt, Stream};
use rosetta_ethereum_backend::ext::types::Header;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -129,10 +130,10 @@ pub struct FinalizedBlockStream<P: BlockProvider> {
statistics: Statistics,

/// Latest known finalized block and the timestamp when it was received.
best_finalized_block: Option<(PartialBlock, Instant)>,
best_finalized_block: Option<(Arc<PartialBlock>, Instant)>,

/// State machine that controls fetching the latest finalized block.
state: Option<StateMachine<'static, Result<PartialBlock, P::Error>>>,
state: Option<StateMachine<'static, Result<Arc<PartialBlock>, P::Error>>>,

/// Count of consecutive errors.
consecutive_errors: u32,
Expand Down Expand Up @@ -205,7 +206,7 @@ where
self.state = Some(StateMachine::Wait(Delay::new(
self.statistics.polling_interval,
)));
return Poll::Ready(Some(block));
return Poll::Ready(Some(block.as_ref().clone()));
}
self.consecutive_errors = 0;
Some(StateMachine::Wait(Delay::new(self.statistics.polling_interval)))
Expand Down

0 comments on commit 2ebd422

Please sign in to comment.