Skip to content

Commit

Permalink
feat: working phase 1 with protomainnet.
Browse files Browse the repository at this point in the history
  • Loading branch information
shawn-zil committed Jan 10, 2025
1 parent 5b738e8 commit 29d5e8e
Showing 1 changed file with 82 additions and 9 deletions.
91 changes: 82 additions & 9 deletions zilliqa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use std::{
collections::{BTreeMap, BinaryHeap, VecDeque},
sync::Arc,
time::{Duration, Instant},
u64,
};

use alloy::primitives::BlockNumber;
use anyhow::Result;
use itertools::Itertools;
use libp2p::PeerId;
Expand Down Expand Up @@ -86,6 +88,8 @@ pub struct Sync {
recent_proposals: VecDeque<Proposal>,
// for statistics only
inject_at: Option<(std::time::Instant, usize)>,
// record starting number, for eth_syncing() RPC call.
started_at_block_number: u64,
}

impl Sync {
Expand Down Expand Up @@ -123,6 +127,7 @@ impl Sync {
state: SyncState::Phase0,
recent_proposals: VecDeque::with_capacity(max_batch_size),
inject_at: None,
started_at_block_number: u64::MIN,
})
}

Expand Down Expand Up @@ -158,6 +163,16 @@ impl Sync {
view_number,
};
self.request_missing_metadata(Some(meta))?;

let highest_block = self
.db
.get_canonical_block_by_number(
self.db
.get_highest_canonical_block_number()?
.expect("no highest block"),
)?
.expect("missing highest block");
self.started_at_block_number = highest_block.number();
}
}
// Continue phase 1, until we hit history/genesis.
Expand Down Expand Up @@ -258,16 +273,29 @@ impl Sync {
from: PeerId,
response: Vec<Proposal>,
) -> Result<()> {
if let Some(peer) = self.in_flight.as_ref() {
if peer.peer_id != from {
tracing::warn!(
"sync::MultiBlockResponse : unexpected peer={} != {from}",
peer.peer_id
);
return Ok(());
}
} else {
tracing::warn!("sync::MultiBlockResponse : spurious response {from}");
return Ok(());
}

// Process only a full response
if response.is_empty() {
// Empty response, downgrade peer and retry phase 1.
tracing::warn!("sync::MultiBlockResponse : empty blocks {from}",);
self.done_with_peer(DownGrade::Empty);
return self.retry_phase1();
} else if response.len() < self.max_batch_size {
// Partial response, downgrade peer but process the block.
// Partial response, process blocks.
tracing::warn!("sync::MultiBlockResponse : partial blocks {from}",);
self.done_with_peer(DownGrade::Partial);
self.done_with_peer(DownGrade::None);
} else {
self.done_with_peer(DownGrade::None);
}
Expand Down Expand Up @@ -550,6 +578,19 @@ impl Sync {
from: PeerId,
response: Vec<ChainMetaData>,
) -> Result<()> {
if let Some(peer) = self.in_flight.as_ref() {
if peer.peer_id != from {
tracing::warn!(
"sync::MetadataResponse : unexpected peer={} != {from}",
peer.peer_id
);
return Ok(());
}
} else {
tracing::warn!("sync::MetadataResponse : spurious response {from}");
return Ok(());
}

let segment_peer = self.in_flight.as_ref().unwrap().clone();
// Process whatever we have received.
if response.is_empty() {
Expand All @@ -558,9 +599,9 @@ impl Sync {
self.done_with_peer(DownGrade::Empty);
return Ok(());
} else if response.len() < self.max_batch_size {
// Partial response, downgrade peer but process the response.
// Partial response, process the response.
tracing::warn!("sync::MetadataResponse : partial blocks {from}",);
self.done_with_peer(DownGrade::Partial);
self.done_with_peer(DownGrade::None);
} else {
self.done_with_peer(DownGrade::None);
}
Expand Down Expand Up @@ -704,6 +745,7 @@ impl Sync {
}

if let Some(peer) = self.get_next_peer() {
let peer_id = peer.peer_id;
let message = match self.state {
SyncState::Phase1(ChainMetaData { parent_hash, .. })
if matches!(peer.version, PeerVer::V2) =>
Expand Down Expand Up @@ -746,12 +788,11 @@ impl Sync {
tracing::info!(
?message,
"sync::RequestMissingMetadata : requesting missing chain from {}",
peer.peer_id
peer_id
);
self.message_sender
.send_external_message(peer.peer_id, message)?;

self.in_flight = Some(peer);
self.message_sender
.send_external_message(peer_id, message)?;
} else {
tracing::warn!(
"sync::RequestMissingMetadata : insufficient peers to request missing blocks"
Expand Down Expand Up @@ -871,6 +912,39 @@ impl Sync {
peer.last_used = std::time::Instant::now(); // used to determine stale in-flight requests.
Some(peer)
}

/// Returns (am_syncing, current_highest_block)
pub fn am_syncing(&self) -> Result<(bool, Block)> {
let highest_block = self
.db
.get_canonical_block_by_number(
self.db
.get_highest_canonical_block_number()?
.expect("no highest block"),
)?
.expect("missing highest block");
Ok((
self.in_pipeline > 0 || !matches!(self.state, SyncState::Phase0),
highest_block,
))
}

// Returns (starting_block, current_block, highest_block) if we're syncing,
// None if we're not.
pub fn get_sync_data(&self) -> Result<Option<(BlockNumber, BlockNumber, BlockNumber)>> {
let (flag, highest_block) = self.am_syncing()?;
if !flag {
Ok(None)
} else {
let highest_saved_block_number = highest_block.number();
let highest_block_number_seen = self.recent_proposals.back().unwrap().number();
Ok(Some((
self.started_at_block_number,
highest_saved_block_number,
highest_block_number_seen,
)))
}
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -901,7 +975,6 @@ impl PartialOrd for PeerInfo {
#[derive(Debug)]
enum DownGrade {
None,
Partial,
Empty,
Timeout,
}
Expand Down

0 comments on commit 29d5e8e

Please sign in to comment.