Skip to content

Commit

Permalink
chore: comments, cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
shawn-zil committed Jan 10, 2025
1 parent 7b7a6e0 commit 5b525d7
Showing 1 changed file with 81 additions and 64 deletions.
145 changes: 81 additions & 64 deletions zilliqa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,29 @@ use crate::{
//
// PHASE 1: Request missing chain metadata.
// The entire chain metadata is stored in-memory, and is used to construct a chain of metadata.
// Each metadata basically contains the block_hash, block_number, parent_hash, and view_number.
// 1. We start with the latest Proposal and request the chain of metadata from a peer.
// 2. We construct the chain of metadata, based on the response received.
// 3. If the last block does not exist in our canonical history, we request for additional metadata.
// 4. If the last block exists, we have hit our canonical history.
// 5. Move to Phase 2.
// 3. If the last block does not exist in our history, we request for additional metadata.
// 4. If the last block exists, we have hit our history, we move to Phase 2.
//
// PHASE 2: Request missing blocks.
// Once the chain metadata is constructed, we request the missing blocks to replay the history.
// Once the chain metadata is constructed, we fill in the missing blocks to replay the history.
// We do not make any judgements (other than sanity) on the block and leave that up to consensus.
// 1. We construct a set of hashes, from the in-memory chain metadata.
// 2. We send these block hashes to the same Peer (that sent the metadata) for retrieval.
// 3. We inject the Proposals into the pipeline, when the response is received.
// 4. If there are still missing blocks, we ask for more, from 1.
// 5. If there are no more missing blocks, we have filled up all blocks from the chain metadata.
// 6. Ready for Phase 3.
// 2. We request these blocks from the same Peer that sent the metadata.
// 3. We inject the received Proposals into the pipeline.
// 4. If there are still missing blocks, we ask for more.
// 5. If there are no more missing blocks, we move to Phase 3.
//
// PHASE 3: Zip it up.
// Phase 1&2 may run several times that brings up 99% of the chain. This closes the final gap.
// 1. We queue all newly received Proposals, while Phase 1 & 2 were in progress.
// 2. We check the head of the queue if its parent exists in our canonical history.
// 3. If it does not, we trigger Phase 1&2.
// Phase 1&2 may run several times and bring up 99% of the chain, but it will never catch up.
// This closes the final gap.
// 1. We queue all recently received Proposals, while Phase 1 & 2 were in progress.
// 2. We check the head of the queue, if its parent exists in our history.
// 3. If it does not, our history is too far away, we run Phase 1 again.
// 4. If it does, we inject the entire queue into the pipeline.
// 5. We are caught up.

#[cfg(debug_assertions)]
const DO_SPECULATIVE: bool = false;
#[cfg(not(debug_assertions))]
const DO_SPECULATIVE: bool = true; // Speeds up syncing by speculatively fetching blocks.
// 5. We are fully synced.

#[derive(Debug)]
pub struct Sync {
Expand Down Expand Up @@ -92,6 +88,20 @@ pub struct Sync {
}

impl Sync {
// Speed up syncing by speculatively fetching blocks in Phase 1 & 2.
#[cfg(not(debug_assertions))]
const DO_SPECULATIVE: bool = true;
#[cfg(debug_assertions)]
const DO_SPECULATIVE: bool = false;

// For V1 BlockRequest, we request a little more than we need, due to drift
// Since the view number is an 'internal' clock, it is possible for the same block number
// to have different view numbers.
// 10 ~ 1-min
// 20 ~ 1-hr
// 30 ~ 2-days
const VIEW_DRIFT: u64 = 10;

pub fn new(
config: &NodeConfig,
db: Arc<Db>,
Expand All @@ -108,7 +118,9 @@ impl Sync {
})
.collect();
let peer_id = message_sender.our_peer_id;
let max_batch_size = config.block_request_batch_size.clamp(30, 180); // 30-180 sec of blocks at a time.
let max_batch_size = config
.block_request_batch_size
.clamp(Self::VIEW_DRIFT as usize * 2, 180); // up to 180 sec of blocks at a time.
let max_blocks_in_flight = config.max_blocks_in_flight.clamp(max_batch_size, 1800); // up to 30-mins worth of blocks in-pipeline.

Ok(Self {
Expand All @@ -130,12 +142,12 @@ impl Sync {
})
}

/// Sync a block proposal.
/// Phase 0: Sync a block proposal.
///
/// This is the main entry point for syncing a block proposal.
/// We start by enqueuing all proposals, and then check if the parent block exists in history.
/// If the parent block exists, we do nothing. Ttherwise, we check the oldest one in the queue.
/// If we find its parent in history, we inject the entire queue.
/// If the parent block exists, we do nothing. Otherwise, we check the least recent one.
/// If we find its parent in history, we inject the entire queue. Otherwise, we start syncing.
///
/// We do not perform checks on the Proposal here. This is done in the consensus layer.
pub fn sync_proposal(&mut self, proposal: Proposal) -> Result<()> {
Expand Down Expand Up @@ -232,10 +244,13 @@ impl Sync {
}
}

/// Retry phase 1
/// Phase 2: Retry Phase 1
///
/// If something went wrong, phase 1 may need to be retried for the most recent segment.
/// Pop the segment from the segment marker, and continue phase 1.
/// If something went wrong in Phase 2, Phase 1 may need to be retried for the recently used segment.
/// Things that could go wrong e.g. the peer went offline, the peer pruned history, etc.
///
/// Pop the most recently used segment from the segment marker, and retry phase 1.
/// This will rebuild history from the previous marker, with another peer.
fn retry_phase1(&mut self) -> Result<()> {
if self.chain_segments.is_empty() {
tracing::error!("sync::RetryPhase1 : cannot retry phase 1 without chain_segments!");
Expand All @@ -250,22 +265,22 @@ impl Sync {
key = p.parent_hash;
}

// allow retry from p1
// retry from Phase 1
tracing::info!(
"sync::RetryPhase1 : retrying block {} from {}",
meta.parent_hash,
peer_info.peer_id,
);
self.state = SyncState::Phase1(meta);
if DO_SPECULATIVE {
if Self::DO_SPECULATIVE {
self.request_missing_metadata(None)?;
}
Ok(())
}

/// Handle a multi-block response.
/// Phase 2: Handle a multi-block response.
///
/// This is phase 2 in the syncing algorithm, where we receive a set of blocks and inject them into the pipeline.
/// This is Phase 2 in the syncing algorithm, where we receive a set of blocks and inject them into the pipeline.
/// We also remove the blocks from the chain metadata, because they are now in the pipeline.
pub fn handle_multiblock_response(
&mut self,
Expand Down Expand Up @@ -354,7 +369,7 @@ impl Sync {
// Done with phase 2
if self.chain_segments.is_empty() {
self.state = SyncState::Phase3;
} else if DO_SPECULATIVE {
} else if Self::DO_SPECULATIVE {
// Speculatively request more blocks
self.request_missing_blocks()?;
}
Expand Down Expand Up @@ -393,7 +408,7 @@ impl Sync {
Ok(message)
}

/// Request missing blocks from the chain.
/// Phase 2: Request missing blocks from the chain.
///
/// It constructs a set of hashes, which constitute the series of blocks that are missing.
/// These hashes are then sent to a Peer for retrieval.
Expand Down Expand Up @@ -487,14 +502,8 @@ impl Sync {
Ok(())
}

// we request a little more than we need, due to drift
// 10 ~ 1min
// 20 ~ 1hr
const VIEW_DRIFT: u64 = 10;

/// Handle a V1 block response
/// Phase 1 / 2: Handle a V1 block response
///
/// This will be called during both Phase 1 & Phase 2 block responses.
/// If the response if from a V2 peer, it will upgrade that peer to V2.
/// In phase 1, it will extract the metadata and feed it into handle_metadata_response.
/// In phase 2, it will extract the blocks and feed it into handle_multiblock_response.
Expand All @@ -510,6 +519,7 @@ impl Sync {
return Ok(());
}

// Downgrade empty responses
if response.proposals.is_empty() {
tracing::info!("sync::HandleBlockResponse : empty V1 from {from}");
self.done_with_peer(DownGrade::Empty);
Expand All @@ -518,25 +528,28 @@ impl Sync {

// Convert the V1 response into a V2 response.
match self.state {
// Phase 1 - extract metadata from the set of proposals
// Phase 1 - construct the metadata chain from the set of received proposals
SyncState::Phase1(ChainMetaData {
block_number,
mut parent_hash,
..
}) => {
// We do not buffer the proposals, as it takes 250MB/day!
// Instead, we will re-request the proposals again, in Phase 2.
let metadata = response
.proposals
.into_iter()
.filter(|p| p.number() < block_number) // filter extras
// filter extras due to drift
.filter(|p| p.number() < block_number)
.sorted_by(|a, b| b.number().cmp(&a.number()))
// filter any forks
.filter(|p| {
if parent_hash != p.hash() {
return false;
}
parent_hash = p.header.qc.block_hash;
true
}) // filter forks
})
.map(|p| ChainMetaData {
block_hash: p.hash(),
parent_hash: p.header.qc.block_hash,
Expand All @@ -547,14 +560,17 @@ impl Sync {

self.handle_metadata_response(from, metadata)?;
}
// Phase 2 - extract the requested blocks only

// Phase 2 - extract the requested proposals only.
SyncState::Phase2(_) => {
let multi_blocks = response
.proposals
.into_iter()
.filter(|p| self.chain_metadata.contains_key(&p.hash())) // filter extras
// filter any blocks that are not needed
.filter(|p| self.chain_metadata.contains_key(&p.hash()))
.sorted_by(|a, b| b.number().cmp(&a.number()))
.collect_vec();

self.handle_multiblock_response(from, multi_blocks)?;
}
_ => {
Expand All @@ -567,30 +583,32 @@ impl Sync {
Ok(())
}

/// Handle a response to a metadata request.
/// Phase 1: Handle a response to a metadata request.
///
/// This is the first step in the syncing algorithm, where we receive a set of metadata and use it to
/// construct a chain history. We check that the metadata does indeed constitute a chain. If it does,
/// we record its segment marker and store the entire chain in-memory.
/// construct a chain history. We check that the metadata does indeed constitute a segment of a chain.
/// If it does, we record its segment marker and store the entire chain in-memory.
pub fn handle_metadata_response(
&mut self,
from: PeerId,
response: Vec<ChainMetaData>,
) -> Result<()> {
if let Some(peer) = self.in_flight.as_ref() {
// Check for expected response
let segment_peer = if let Some(peer) = self.in_flight.as_ref() {
if peer.peer_id != from {
tracing::warn!(
"sync::MetadataResponse : unexpected peer={} != {from}",
peer.peer_id
);
return Ok(());
}
peer.clone()
} else {
// We ignore any responses that arrived late, since the original request has already 'timed-out'.
tracing::warn!("sync::MetadataResponse : spurious response {from}");
return Ok(());
}
};

let segment_peer = self.in_flight.as_ref().unwrap().clone();
// Process whatever we have received.
if response.is_empty() {
// Empty response, downgrade peer and retry with a new peer.
Expand Down Expand Up @@ -624,7 +642,7 @@ impl Sync {
// TODO: possibly, discard and rebuild entire chain
// if something does not match, do nothing and retry the request with the next peer.
tracing::error!(
"sync::MetadataResponse : retry metadata expected hash={block_hash} != {} num={block_num} != {}",
"sync::MetadataResponse : unexpected metadata hash={block_hash} != {}, num={block_num} != {}",
meta.block_hash,
meta.block_number,
);
Expand Down Expand Up @@ -652,18 +670,17 @@ impl Sync {
from
);

// Record the actual chain metadata
// Record the constructed chain metadata, check for loops
for meta in segment {
if self.chain_metadata.insert(meta.block_hash, meta).is_some() {
anyhow::bail!("loop in chain!"); // there is a possible loop in the chain
anyhow::bail!("sync::MetadataResponse : loop in chain!"); // there is a possible loop in the chain
}
}

// If the segment does not link to our canonical history, fire the next request
// If the segment hits our history, start Phase 2.
if self.db.get_block_by_hash(&last_block_hash)?.is_some() {
// Hit our internal history. Next, phase 2.
self.state = SyncState::Phase2(Hash::ZERO);
} else if DO_SPECULATIVE {
} else if Self::DO_SPECULATIVE {
self.request_missing_metadata(None)?;
}

Expand Down Expand Up @@ -714,10 +731,10 @@ impl Sync {
Ok(message)
}

/// Request missing chain from a peer.
/// Phase 1: Request chain metadata from a peer.
///
/// This constructs a chain history by requesting blocks from a peer, going backwards from a given block.
/// If phase 1 is in progress, it continues requesting blocks from the last known phase 1 block.
/// If Phase 1 is in progress, it continues requesting blocks from the last known Phase 1 block.
/// Otherwise, it requests blocks from the given starting metadata.
pub fn request_missing_metadata(&mut self, meta: Option<ChainMetaData>) -> Result<()> {
if matches!(self.state, SyncState::Phase2(_)) || matches!(self.state, SyncState::Phase3) {
Expand Down Expand Up @@ -800,7 +817,7 @@ impl Sync {
Ok(())
}

/// Inject the proposals into the chain.
/// Phase 2 / 3: Inject the proposals into the chain.
///
/// It adds the list of proposals into the pipeline for execution.
/// It also outputs some syncing statistics.
Expand Down Expand Up @@ -852,7 +869,7 @@ impl Sync {

/// Mark a received proposal
///
/// Mark a proposal as received, and remove it from the cache.
/// Mark a proposal as received, and remove it from the chain.
pub fn mark_received_proposal(&mut self, prop: &InjectedProposal) -> Result<()> {
if prop.from != self.peer_id {
tracing::error!(
Expand All @@ -873,7 +890,6 @@ impl Sync {
/// Downgrade a peer based on the response received.
fn done_with_peer(&mut self, downgrade: DownGrade) {
if let Some(mut peer) = self.in_flight.take() {
// Downgrade peer, if necessary
peer.score = peer.score.saturating_add(downgrade as u32);
// Ensure that the next peer is equal or better, to avoid a single source of truth.
peer.score = peer.score.max(self.peers.peek().unwrap().score);
Expand All @@ -886,10 +902,10 @@ impl Sync {

/// Add a peer to the list of peers.
pub fn add_peer(&mut self, peer: PeerId) {
// new peers should be tried last, which gives them time to sync first.
// new peers should be tried later, which gives them time to sync first.
let new_peer = PeerInfo {
version: PeerVer::V1, // default V2
score: self.peers.iter().map(|p| p.score).max().unwrap_or_default(),
score: self.peers.iter().map(|p| p.score).min().unwrap_or_default(),
peer_id: peer,
last_used: Instant::now(),
};
Expand All @@ -901,14 +917,15 @@ impl Sync {
self.peers.retain(|p| p.peer_id != peer);
}

/// Get the next best peer to use
fn get_next_peer(&mut self) -> Option<PeerInfo> {
// Minimum of 2 peers to avoid single source of truth.
if self.peers.len() < 2 {
return None;
}

let mut peer = self.peers.pop()?;
peer.last_used = std::time::Instant::now(); // used to determine stale in-flight requests.
peer.last_used = std::time::Instant::now(); // used to determine stale requests.
Some(peer)
}

Expand Down

0 comments on commit 5b525d7

Please sign in to comment.