From 5b525d763fef3cd08a9a51ba5aa1a5673392e3ea Mon Sep 17 00:00:00 2001 From: Shawn Date: Fri, 10 Jan 2025 16:32:15 +0800 Subject: [PATCH] chore: comments, cleanup. --- zilliqa/src/sync.rs | 145 +++++++++++++++++++++++++------------------- 1 file changed, 81 insertions(+), 64 deletions(-) diff --git a/zilliqa/src/sync.rs b/zilliqa/src/sync.rs index cfc6140bc..c300780ef 100644 --- a/zilliqa/src/sync.rs +++ b/zilliqa/src/sync.rs @@ -29,33 +29,29 @@ use crate::{ // // PHASE 1: Request missing chain metadata. // The entire chain metadata is stored in-memory, and is used to construct a chain of metadata. +// Each metadata basically contains the block_hash, block_number, parent_hash, and view_number. // 1. We start with the latest Proposal and request the chain of metadata from a peer. // 2. We construct the chain of metadata, based on the response received. -// 3. If the last block does not exist in our canonical history, we request for additional metadata. -// 4. If the last block exists, we have hit our canonical history. -// 5. Move to Phase 2. +// 3. If the last block does not exist in our history, we request for additional metadata. +// 4. If the last block exists, we have hit our history, we move to Phase 2. // // PHASE 2: Request missing blocks. -// Once the chain metadata is constructed, we request the missing blocks to replay the history. +// Once the chain metadata is constructed, we fill in the missing blocks to replay the history. +// We do not make any judgements (other than sanity) on the block and leave that up to consensus. // 1. We construct a set of hashes, from the in-memory chain metadata. -// 2. We send these block hashes to the same Peer (that sent the metadata) for retrieval. -// 3. We inject the Proposals into the pipeline, when the response is received. -// 4. If there are still missing blocks, we ask for more, from 1. -// 5. If there are no more missing blocks, we have filled up all blocks from the chain metadata. -// 6. Ready for Phase 3. +// 2. We request these blocks from the same Peer that sent the metadata. +// 3. We inject the received Proposals into the pipeline. +// 4. If there are still missing blocks, we ask for more. +// 5. If there are no more missing blocks, we move to Phase 3. // // PHASE 3: Zip it up. -// Phase 1&2 may run several times that brings up 99% of the chain. This closes the final gap. -// 1. We queue all newly received Proposals, while Phase 1 & 2 were in progress. -// 2. We check the head of the queue if its parent exists in our canonical history. -// 3. If it does not, we trigger Phase 1&2. +// Phase 1&2 may run several times and bring up 99% of the chain, but it will never catch up. +// This closes the final gap. +// 1. We queue all recently received Proposals, while Phase 1 & 2 were in progress. +// 2. We check the head of the queue, if its parent exists in our history. +// 3. If it does not, our history is too far away, we run Phase 1 again. // 4. If it does, we inject the entire queue into the pipeline. -// 5. We are caught up. - -#[cfg(debug_assertions)] -const DO_SPECULATIVE: bool = false; -#[cfg(not(debug_assertions))] -const DO_SPECULATIVE: bool = true; // Speeds up syncing by speculatively fetching blocks. +// 5. We are fully synced. #[derive(Debug)] pub struct Sync { @@ -92,6 +88,20 @@ pub struct Sync { } impl Sync { + // Speed up syncing by speculatively fetching blocks in Phase 1 & 2. + #[cfg(not(debug_assertions))] + const DO_SPECULATIVE: bool = true; + #[cfg(debug_assertions)] + const DO_SPECULATIVE: bool = false; + + // For V1 BlockRequest, we request a little more than we need, due to drift + // Since the view number is an 'internal' clock, it is possible for the same block number + // to have different view numbers. + // 10 ~ 1-min + // 20 ~ 1-hr + // 30 ~ 2-days + const VIEW_DRIFT: u64 = 10; + pub fn new( config: &NodeConfig, db: Arc, @@ -108,7 +118,9 @@ impl Sync { }) .collect(); let peer_id = message_sender.our_peer_id; - let max_batch_size = config.block_request_batch_size.clamp(30, 180); // 30-180 sec of blocks at a time. + let max_batch_size = config + .block_request_batch_size + .clamp(Self::VIEW_DRIFT as usize * 2, 180); // up to 180 sec of blocks at a time. let max_blocks_in_flight = config.max_blocks_in_flight.clamp(max_batch_size, 1800); // up to 30-mins worth of blocks in-pipeline. Ok(Self { @@ -130,12 +142,12 @@ impl Sync { }) } - /// Sync a block proposal. + /// Phase 0: Sync a block proposal. /// /// This is the main entry point for syncing a block proposal. /// We start by enqueuing all proposals, and then check if the parent block exists in history. - /// If the parent block exists, we do nothing. Ttherwise, we check the oldest one in the queue. - /// If we find its parent in history, we inject the entire queue. + /// If the parent block exists, we do nothing. Otherwise, we check the least recent one. + /// If we find its parent in history, we inject the entire queue. Otherwise, we start syncing. /// /// We do not perform checks on the Proposal here. This is done in the consensus layer. pub fn sync_proposal(&mut self, proposal: Proposal) -> Result<()> { @@ -232,10 +244,13 @@ impl Sync { } } - /// Retry phase 1 + /// Phase 2: Retry Phase 1 /// - /// If something went wrong, phase 1 may need to be retried for the most recent segment. - /// Pop the segment from the segment marker, and continue phase 1. + /// If something went wrong in Phase 2, Phase 1 may need to be retried for the recently used segment. + /// Things that could go wrong e.g. the peer went offline, the peer pruned history, etc. + /// + /// Pop the most recently used segment from the segment marker, and retry phase 1. + /// This will rebuild history from the previous marker, with another peer. fn retry_phase1(&mut self) -> Result<()> { if self.chain_segments.is_empty() { tracing::error!("sync::RetryPhase1 : cannot retry phase 1 without chain_segments!"); @@ -250,22 +265,22 @@ impl Sync { key = p.parent_hash; } - // allow retry from p1 + // retry from Phase 1 tracing::info!( "sync::RetryPhase1 : retrying block {} from {}", meta.parent_hash, peer_info.peer_id, ); self.state = SyncState::Phase1(meta); - if DO_SPECULATIVE { + if Self::DO_SPECULATIVE { self.request_missing_metadata(None)?; } Ok(()) } - /// Handle a multi-block response. + /// Phase 2: Handle a multi-block response. /// - /// This is phase 2 in the syncing algorithm, where we receive a set of blocks and inject them into the pipeline. + /// This is Phase 2 in the syncing algorithm, where we receive a set of blocks and inject them into the pipeline. /// We also remove the blocks from the chain metadata, because they are now in the pipeline. pub fn handle_multiblock_response( &mut self, @@ -354,7 +369,7 @@ impl Sync { // Done with phase 2 if self.chain_segments.is_empty() { self.state = SyncState::Phase3; - } else if DO_SPECULATIVE { + } else if Self::DO_SPECULATIVE { // Speculatively request more blocks self.request_missing_blocks()?; } @@ -393,7 +408,7 @@ impl Sync { Ok(message) } - /// Request missing blocks from the chain. + /// Phase 2: Request missing blocks from the chain. /// /// It constructs a set of hashes, which constitute the series of blocks that are missing. /// These hashes are then sent to a Peer for retrieval. @@ -487,14 +502,8 @@ impl Sync { Ok(()) } - // we request a little more than we need, due to drift - // 10 ~ 1min - // 20 ~ 1hr - const VIEW_DRIFT: u64 = 10; - - /// Handle a V1 block response + /// Phase 1 / 2: Handle a V1 block response /// - /// This will be called during both Phase 1 & Phase 2 block responses. /// If the response if from a V2 peer, it will upgrade that peer to V2. /// In phase 1, it will extract the metadata and feed it into handle_metadata_response. /// In phase 2, it will extract the blocks and feed it into handle_multiblock_response. @@ -510,6 +519,7 @@ impl Sync { return Ok(()); } + // Downgrade empty responses if response.proposals.is_empty() { tracing::info!("sync::HandleBlockResponse : empty V1 from {from}"); self.done_with_peer(DownGrade::Empty); @@ -518,25 +528,28 @@ impl Sync { // Convert the V1 response into a V2 response. match self.state { - // Phase 1 - extract metadata from the set of proposals + // Phase 1 - construct the metadata chain from the set of received proposals SyncState::Phase1(ChainMetaData { block_number, mut parent_hash, .. }) => { // We do not buffer the proposals, as it takes 250MB/day! + // Instead, we will re-request the proposals again, in Phase 2. let metadata = response .proposals .into_iter() - .filter(|p| p.number() < block_number) // filter extras + // filter extras due to drift + .filter(|p| p.number() < block_number) .sorted_by(|a, b| b.number().cmp(&a.number())) + // filter any forks .filter(|p| { if parent_hash != p.hash() { return false; } parent_hash = p.header.qc.block_hash; true - }) // filter forks + }) .map(|p| ChainMetaData { block_hash: p.hash(), parent_hash: p.header.qc.block_hash, @@ -547,14 +560,17 @@ impl Sync { self.handle_metadata_response(from, metadata)?; } - // Phase 2 - extract the requested blocks only + + // Phase 2 - extract the requested proposals only. SyncState::Phase2(_) => { let multi_blocks = response .proposals .into_iter() - .filter(|p| self.chain_metadata.contains_key(&p.hash())) // filter extras + // filter any blocks that are not needed + .filter(|p| self.chain_metadata.contains_key(&p.hash())) .sorted_by(|a, b| b.number().cmp(&a.number())) .collect_vec(); + self.handle_multiblock_response(from, multi_blocks)?; } _ => { @@ -567,17 +583,18 @@ impl Sync { Ok(()) } - /// Handle a response to a metadata request. + /// Phase 1: Handle a response to a metadata request. /// /// This is the first step in the syncing algorithm, where we receive a set of metadata and use it to - /// construct a chain history. We check that the metadata does indeed constitute a chain. If it does, - /// we record its segment marker and store the entire chain in-memory. + /// construct a chain history. We check that the metadata does indeed constitute a segment of a chain. + /// If it does, we record its segment marker and store the entire chain in-memory. pub fn handle_metadata_response( &mut self, from: PeerId, response: Vec, ) -> Result<()> { - if let Some(peer) = self.in_flight.as_ref() { + // Check for expected response + let segment_peer = if let Some(peer) = self.in_flight.as_ref() { if peer.peer_id != from { tracing::warn!( "sync::MetadataResponse : unexpected peer={} != {from}", @@ -585,12 +602,13 @@ impl Sync { ); return Ok(()); } + peer.clone() } else { + // We ignore any responses that arrived late, since the original request has already 'timed-out'. tracing::warn!("sync::MetadataResponse : spurious response {from}"); return Ok(()); - } + }; - let segment_peer = self.in_flight.as_ref().unwrap().clone(); // Process whatever we have received. if response.is_empty() { // Empty response, downgrade peer and retry with a new peer. @@ -624,7 +642,7 @@ impl Sync { // TODO: possibly, discard and rebuild entire chain // if something does not match, do nothing and retry the request with the next peer. tracing::error!( - "sync::MetadataResponse : retry metadata expected hash={block_hash} != {} num={block_num} != {}", + "sync::MetadataResponse : unexpected metadata hash={block_hash} != {}, num={block_num} != {}", meta.block_hash, meta.block_number, ); @@ -652,18 +670,17 @@ impl Sync { from ); - // Record the actual chain metadata + // Record the constructed chain metadata, check for loops for meta in segment { if self.chain_metadata.insert(meta.block_hash, meta).is_some() { - anyhow::bail!("loop in chain!"); // there is a possible loop in the chain + anyhow::bail!("sync::MetadataResponse : loop in chain!"); // there is a possible loop in the chain } } - // If the segment does not link to our canonical history, fire the next request + // If the segment hits our history, start Phase 2. if self.db.get_block_by_hash(&last_block_hash)?.is_some() { - // Hit our internal history. Next, phase 2. self.state = SyncState::Phase2(Hash::ZERO); - } else if DO_SPECULATIVE { + } else if Self::DO_SPECULATIVE { self.request_missing_metadata(None)?; } @@ -714,10 +731,10 @@ impl Sync { Ok(message) } - /// Request missing chain from a peer. + /// Phase 1: Request chain metadata from a peer. /// /// This constructs a chain history by requesting blocks from a peer, going backwards from a given block. - /// If phase 1 is in progress, it continues requesting blocks from the last known phase 1 block. + /// If Phase 1 is in progress, it continues requesting blocks from the last known Phase 1 block. /// Otherwise, it requests blocks from the given starting metadata. pub fn request_missing_metadata(&mut self, meta: Option) -> Result<()> { if matches!(self.state, SyncState::Phase2(_)) || matches!(self.state, SyncState::Phase3) { @@ -800,7 +817,7 @@ impl Sync { Ok(()) } - /// Inject the proposals into the chain. + /// Phase 2 / 3: Inject the proposals into the chain. /// /// It adds the list of proposals into the pipeline for execution. /// It also outputs some syncing statistics. @@ -852,7 +869,7 @@ impl Sync { /// Mark a received proposal /// - /// Mark a proposal as received, and remove it from the cache. + /// Mark a proposal as received, and remove it from the chain. pub fn mark_received_proposal(&mut self, prop: &InjectedProposal) -> Result<()> { if prop.from != self.peer_id { tracing::error!( @@ -873,7 +890,6 @@ impl Sync { /// Downgrade a peer based on the response received. fn done_with_peer(&mut self, downgrade: DownGrade) { if let Some(mut peer) = self.in_flight.take() { - // Downgrade peer, if necessary peer.score = peer.score.saturating_add(downgrade as u32); // Ensure that the next peer is equal or better, to avoid a single source of truth. peer.score = peer.score.max(self.peers.peek().unwrap().score); @@ -886,10 +902,10 @@ impl Sync { /// Add a peer to the list of peers. pub fn add_peer(&mut self, peer: PeerId) { - // new peers should be tried last, which gives them time to sync first. + // new peers should be tried later, which gives them time to sync first. let new_peer = PeerInfo { version: PeerVer::V1, // default V2 - score: self.peers.iter().map(|p| p.score).max().unwrap_or_default(), + score: self.peers.iter().map(|p| p.score).min().unwrap_or_default(), peer_id: peer, last_used: Instant::now(), }; @@ -901,6 +917,7 @@ impl Sync { self.peers.retain(|p| p.peer_id != peer); } + /// Get the next best peer to use fn get_next_peer(&mut self) -> Option { // Minimum of 2 peers to avoid single source of truth. if self.peers.len() < 2 { @@ -908,7 +925,7 @@ impl Sync { } let mut peer = self.peers.pop()?; - peer.last_used = std::time::Instant::now(); // used to determine stale in-flight requests. + peer.last_used = std::time::Instant::now(); // used to determine stale requests. Some(peer) }