diff --git a/chains/ethereum/server/src/block_provider.rs b/chains/ethereum/server/src/block_provider.rs index d03a4433..182d1f0d 100644 --- a/chains/ethereum/server/src/block_provider.rs +++ b/chains/ethereum/server/src/block_provider.rs @@ -366,86 +366,3 @@ where Ok(guard.0.clone()) } } - -// /// A block stream that fetches blocks in order -// pub struct OrderedBlockStream

{ -// provider: P, -// /// Maximum number of blocks to fetch concurrently -// capacity: usize, -// /// requests which have been queued for later processing -// pending: BTreeSet, -// /// requests which are currently underway -// fut: FuturesUnordered>>>>, -// } - -// impl OrderedBlockStream

{ -// pub fn new(provider: P, capacity: usize) -> Self { -// Self { provider, capacity, pending: BTreeSet::new(), fut: FuturesUnordered::new() } -// } - -// pub fn fetch(&mut self, block_ref: BlockRef) -> Option { -// if self.pending.contains(&block_ref) { -// Some(false) -// } else if self.fut.len() < self.capacity { -// self.pending.insert(block_ref); -// self.fut -// .push(BlockFuture { block_ref, future: self.callback.get_block(block_ref) }); -// Ok(()) -// } else { -// Err(()) -// } -// } - -// pub fn len(&self) -> usize { -// self.fut.len() -// } - -// pub fn is_empty(&self) -> bool { -// self.fut.is_empty() -// } -// } - -// impl Stream for OrderedBlockStream

{ -// type Item = Result, F::Error>; - -// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// if self.fut.is_empty() { -// return Poll::Ready(None); -// } -// let future = unsafe { Pin::new_unchecked(&mut self.fut) }; -// match future.poll_next(cx) { -// Poll::Ready(Some((block_ref, result))) => { -// self.pending.remove(&block_ref); -// Poll::Ready(Some(result)) -// }, -// Poll::Ready(None) => Poll::Ready(None), -// Poll::Pending => Poll::Pending, -// } -// } - -// fn size_hint(&self) -> (usize, Option) { -// (self.fut.len(), None) -// } -// } - -// struct BlockFuture { -// block_ref: BlockRef, -// future: Fut, -// } - -// impl Unpin for BlockFuture where Fut: Unpin {} -// impl Future for BlockFuture -// where -// Fut: Future, -// { -// type Output = (BlockRef, Fut::Output); - -// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { -// let this = unsafe { self.get_unchecked_mut() }; -// let pinned = unsafe { Pin::new_unchecked(&mut this.future) }; -// match pinned.poll(cx) { -// Poll::Ready(output) => Poll::Ready((this.block_ref, output)), -// Poll::Pending => Poll::Pending, -// } -// } -// } diff --git a/chains/ethereum/server/src/block_stream.rs b/chains/ethereum/server/src/block_stream.rs index abaa7dc9..0e1eab39 100644 --- a/chains/ethereum/server/src/block_stream.rs +++ b/chains/ethereum/server/src/block_stream.rs @@ -14,11 +14,9 @@ use rosetta_ethereum_backend::{ EthereumPubSub, }; use std::{ - // collections::VecDeque, pin::Pin, task::{Context, Poll}, }; -// use tinyvec::TinyVec; pub struct BlockStream where diff --git a/chains/ethereum/server/src/chain_sync.rs b/chains/ethereum/server/src/chain_sync.rs deleted file mode 100644 index 631c544d..00000000 --- a/chains/ethereum/server/src/chain_sync.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -use futures_util::{Stream, FutureExt}; -use hashbrown::{HashMap, HashSet}; -use rosetta_config_ethereum::H256; -use tracing::Level; - -use super::{ - block_provider::BlockProvider, - multi_block::{BlockRef, MultiBlock}, -}; - -/// Maximum blocks to store in the import queue. -const MAX_IMPORTING_BLOCKS: usize = 2048; - -/// Maximum blocks to download ahead of any gap. -const MAX_DOWNLOAD_AHEAD: u32 = 2048; - -/// Status of a block. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum BlockSyncStatus { - Syncing, - Complete, - Queued, -} - -/// Block data with status. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct BlockData { - /// The Block Message from the wire - pub block: MultiBlock, - /// The peer, we received this from - pub status: BlockSyncStatus, -} - -pub struct ChainSync

{ - provider: P, - /// A collection of blocks that are being downloaded from peers - blocks: HashMap, - /// The best block in our queue of blocks to import - best_block: BlockRef, - /// A set of hashes of blocks that are being downloaded or have been - /// downloaded and are queued for import. - queue_blocks: HashSet, -} - -impl ChainSync

{ - pub fn new(provider: P) -> Self { - Self { - provider, - blocks: HashMap::new(), - best_block: BlockRef::default(), - queue_blocks: HashSet::new(), - } - } -} - -impl Stream for ChainSync

-where - P::Error: core::fmt::Debug, - P::BlockAtFut: Unpin, -{ - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.provider.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(block))) => { - let block_ref = BlockRef::from(&block); - tracing::event!( - Level::DEBUG, - event = "ImportBlock", - best_block_number = self.best_block.number, - best_block_hash = %self.best_block.hash, - block_number = block_ref.number, - block_hash = %block_ref.hash, - ); - if !self.queue_blocks.remove(&block_ref) { - tracing::warn!("block not in the"); - } - self.blocks.insert(block_ref.hash, block.into()); - }, - Poll::Ready(Some(Err(err))) => { - tracing::event!(Level::WARN, "Error fetching block: {:?}", err); - }, - Poll::Ready(None) => {}, - Poll::Pending => {}, - }; - Poll::Pending - } -} diff --git a/chains/ethereum/server/src/client.rs b/chains/ethereum/server/src/client.rs index fe3dd93a..994280fe 100644 --- a/chains/ethereum/server/src/client.rs +++ b/chains/ethereum/server/src/client.rs @@ -526,9 +526,6 @@ where impl

EthereumClient

where - // P: BlockProvider + Unpin + Send + Sync + 'static, - // P::FinalizedFut: Unpin + Send + Sync + 'static, - // P::Error: std::error::Error, P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, { #[allow(clippy::missing_errors_doc)] diff --git a/chains/ethereum/server/src/lib.rs b/chains/ethereum/server/src/lib.rs index 177b09d3..863b182f 100644 --- a/chains/ethereum/server/src/lib.rs +++ b/chains/ethereum/server/src/lib.rs @@ -14,7 +14,6 @@ use url::Url; mod block_provider; mod block_stream; -// mod chain_sync; mod client; mod event_stream; mod finalized_block_stream; diff --git a/chains/ethereum/server/src/log_filter.rs b/chains/ethereum/server/src/log_filter.rs index 54c6e7bd..1cc79241 100644 --- a/chains/ethereum/server/src/log_filter.rs +++ b/chains/ethereum/server/src/log_filter.rs @@ -54,8 +54,6 @@ impl LogFilter { #[cfg(test)] mod tests { use super::*; - // use hex_literal::hex; - // use rosetta_config_ethereum::ext::types::Bloom; #[test] fn add_remove_works() { @@ -71,33 +69,4 @@ mod tests { assert!(filter.remove(&address).is_none()); assert!(filter.is_empty()); } - - // #[test] - // fn filter_topics_works() { - // let mut filter = LogFilter::new(); - // let logs_bloom = - // Bloom::from(hex!(" - // 00000000000000000000000000000000000000000000020200040000000000000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000002000000000000004000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000200000000000000000" - // )); - - // // Empty filter - // let mut logs = filter.topics_from_bloom(logs_bloom); - // assert!(logs.next().is_none()); - // drop(logs); - - // let expect_address = Address::from(hex!("97be939b2eb5a462c634414c8134b09ebad04d83")); - // let expect_topics = [ - // H256(hex!("b7dbf4f78c37528484cb9761beaca968c613f3c6c534b25b1988b912413c68bc")), - // H256(hex!("fca76ae197bb7f913a92bd1f31cb362d0fdbf27b2cc56d8b9bc22d0d76c58dc8")), - // ]; - // filter.add(expect_address, expect_topics.into_iter()); - - // let mut logs = filter.topics_from_bloom(logs_bloom); - // let (address, mut topics) = logs.next().unwrap(); - // assert_eq!(address, expect_address); - // assert_eq!(topics.next().unwrap(), expect_topics[0]); - // assert_eq!(topics.next().unwrap(), expect_topics[1]); - // assert!(logs.next().is_none()); - // assert!(topics.next().is_none()); - // } } diff --git a/chains/ethereum/server/src/state.rs b/chains/ethereum/server/src/state.rs index 5e6238e0..99bf6269 100644 --- a/chains/ethereum/server/src/state.rs +++ b/chains/ethereum/server/src/state.rs @@ -267,7 +267,6 @@ impl StateInner { self.finalized_blocks.push_back(finalized_block_ref); // Remove retracted blocks - let head_block_number = finalized_block_ref.number + 1; let finalized_blocks = &self.finalized_blocks; let mut removed = self .blocks @@ -277,13 +276,7 @@ impl StateInner { return false; } // Check if the block exists in the fork tree - let exists = - self.fork_tree.iter().any(|(block_ref, _, _)| block_ref.hash == block.hash()); - if exists { - return false; - } - // Check if the block is descendent of the finalized block - block.number() < head_block_number + !self.fork_tree.iter().any(|(block_ref, _, _)| block_ref.hash == block.hash()) }) .map(|(_, block)| block) .collect::>();