Skip to content

Commit

Permalink
feat: removed sending BlockRequest from block_store.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
shawn-zil committed Jan 10, 2025
1 parent 29d5e8e commit 7b7a6e0
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 228 deletions.
229 changes: 3 additions & 226 deletions zilliqa/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
constants,
crypto::Hash,
db::Db,
message::{Block, BlockRequest, BlockStrategy, ExternalMessage, Proposal},
message::{Block, BlockStrategy, Proposal},
node::{MessageSender, OutgoingMessageFailure, RequestId},
range_map::RangeMap,
time::SystemTime,
Expand Down Expand Up @@ -416,8 +416,6 @@ pub struct BlockStore {
peers: BTreeMap<PeerId, PeerInfo>,
/// The maximum number of blocks to send requests for at a time.
max_blocks_in_flight: u64,
/// When a request to a peer fails, do not send another request to this peer for this amount of time.
failed_request_sleep_duration: Duration,
/// Our block strategies.
strategies: Vec<BlockStrategy>,
/// The block views we have available. This is read once from the DB at start-up and incrementally updated whenever
Expand All @@ -430,9 +428,6 @@ pub struct BlockStore {
unserviceable_requests: Option<RangeMap>,
message_sender: MessageSender,

/// Clock pointer - see request_blocks()
clock: usize,

/// Where we last started syncing, so we can report it in get_sync_data()
started_syncing_at: BlockNumber,
/// Previous sync flag, so we can tell when it changes.
Expand All @@ -454,8 +449,6 @@ struct PeerInfo {
availability: BlockAvailability,
/// When did we last update availability?
availability_updated_at: Option<SystemTime>,
/// Last availability query - don't send them too often.
availability_requested_at: Option<SystemTime>,
/// Requests we've sent to the peer.
pending_requests: HashMap<RequestId, (SystemTime, u64, u64)>,
/// If `Some`, the time of the most recently failed request.
Expand All @@ -467,50 +460,10 @@ impl PeerInfo {
Self {
availability: BlockAvailability::new(),
availability_updated_at: None,
availability_requested_at: None,
pending_requests: HashMap::new(),
last_request_failed_at: None,
}
}

/// Do we have availability, or should we get it again?
fn have_availability(&self) -> bool {
self.availability_updated_at.is_some()
}

/// Converts a set of block strategies into a rangemap
fn get_ranges(&self, max_view: Option<u64>) -> RangeMap {
let mut result = RangeMap::new();
if let Some(strat) = &self.availability.strategies {
let mut max_end: Option<u64> = None;
let mut last_n: Option<u64> = None;
for s in strat {
match s {
BlockStrategy::CachedViewRange(views, until_view) => {
if until_view.map_or(true, |x| self.availability.highest_known_view <= x) {
result.with_range(views);
max_end = Some(
max_end.map_or(views.end - 1, |v| std::cmp::max(v, views.end - 1)),
);
}
}
BlockStrategy::Latest(n) => {
last_n = Some(last_n.map_or(*n, |x| std::cmp::max(x, *n)));
}
}
}
if let Some(the_n) = last_n {
if let Some(max_view_nr) = max_view {
let start = max_view_nr.saturating_sub(the_n);
result.with_range(&Range {
start,
end: max_view_nr,
});
}
}
}
result
}
}

/// Data about a peer
Expand Down Expand Up @@ -596,13 +549,11 @@ impl BlockStore {
highest_confirmed_view: 0,
peers: BTreeMap::new(),
max_blocks_in_flight: config.max_blocks_in_flight as u64,
failed_request_sleep_duration: config.failed_request_sleep_duration,
strategies: vec![BlockStrategy::Latest(constants::RETAINS_LAST_N_BLOCKS)],
available_blocks,
buffered: BlockCache::new(config.max_blocks_in_flight as u64),
unserviceable_requests: None,
message_sender,
clock: 0,
started_syncing_at: 0,
last_sync_flag: false,
})
Expand Down Expand Up @@ -632,13 +583,11 @@ impl BlockStore {
highest_confirmed_view: 0,
peers: BTreeMap::new(),
max_blocks_in_flight: 0,
failed_request_sleep_duration: Duration::ZERO,
strategies: self.strategies.clone(),
available_blocks: RangeMap::new(),
buffered: BlockCache::new(0),
unserviceable_requests: None,
message_sender: self.message_sender.clone(),
clock: 0,
started_syncing_at: self.started_syncing_at,
last_sync_flag: self.last_sync_flag,
})
Expand Down Expand Up @@ -837,180 +786,8 @@ impl BlockStore {
/// Make a request for the blocks associated with a range of views. Returns `true` if a request was made and `false` if the request had to be
/// buffered because no peers were available.
/// Public so we can trigger it from the debug API
pub fn request_blocks(&mut self, req: &RangeMap) -> Result<bool> {
let mut remain = req.clone();
let to = req.max();

// Prune the pending requests
self.prune_pending_requests()?;

trace!(
"block_store::request_blocks() : entry remain {:?} clock {}",
remain,
self.clock
);

// If it's in our input queue, don't expect it again.
let expected = self.buffered.expectant_block_ranges();
trace!("block_store::request_blocks() : in our input queue {expected:?}");
(_, remain) = remain.diff_inter(&expected);

// If it's already buffered, don't request it again - wait for us to reject it and
// then we can re-request.
let extant = self.buffered.extant_block_ranges();
trace!(
"block_store::request_blocks() : cache has {0:?} with {1}/{2}",
extant,
self.buffered.cache.len(),
self.buffered.head_cache.len()
);

(_, remain) = remain.diff_inter(&extant);
(_, remain) = remain.diff_inter(&self.buffered.empty_view_ranges);

// If it's in flight, don't request it again.
let mut in_flight = RangeMap::new();
for peer in self.peers.values() {
for (_, start, end) in peer.pending_requests.values() {
in_flight.with_range(&Range {
start: *start,
end: end + 1,
});
}
}
debug!("block_store::request_blocks() : requests in flight {in_flight:?}");
(_, remain) = remain.diff_inter(&in_flight);

let now = SystemTime::now();
let failed_request_sleep_duration = self.failed_request_sleep_duration;

// If everything we have is in flight, we'll skip trying to request them (or update availability)
if remain.is_empty() {
trace!("block_store::request_blocks() : .. no non in_flight requests. Returning early");
return Ok(true);
}

for chance in 0..2 {
trace!(
"block_store::request_blocks() : chance = {chance} clock = {} peers = {}",
self.clock,
self.peers.len()
);
// There may be no peers ...
self.clock = (self.clock + 1) % std::cmp::max(1, self.peers.len());
// Slightly horrid - generate a list of peers which is the BTreeMap's list, shifted by clock.
let peers = self
.peers
.keys()
.skip(self.clock)
.chain(self.peers.keys().take(self.clock))
.cloned()
.collect::<Vec<PeerId>>();

for peer in &peers {
debug!("block_store::request_blocks() : considering peer = {peer}");
// If the last request failed < 10s or so ago, skip this peer, unless we're second-chance in
// which case, hey, why not?
let (requests, rem, query_availability) = {
let peer_info = self.peer_info(*peer);
if chance == 0
&& !peer_info
.last_request_failed_at
.and_then(|at| at.elapsed().ok())
.map(|time_since| time_since > failed_request_sleep_duration)
.unwrap_or(true)
{
trace!("block_store::request_blocks() : .. Last request failed; skipping this peer");
continue;
}

if peer_info.pending_requests.len()
>= constants::MAX_PENDING_BLOCK_REQUESTS_PER_PEER
{
trace!(
"block_store::request_blocks() : .. Skipping peer {peer} - too many pending requests {0}",
peer_info.pending_requests.len()
);
continue;
}
// Split ..
let left = constants::MAX_PENDING_BLOCK_REQUESTS_PER_PEER
- peer_info.pending_requests.len();
let ranges = peer_info.get_ranges(to);
let (req, rem) = remain.diff_inter_limited(&ranges, Some(left));
// If we are not about to make a request, and we do not have recent availability then
// make a synthetic request to get that availability.
let query_availability = req.is_empty()
&& peer_info.pending_requests.is_empty()
&& (!peer_info.have_availability()
|| peer_info.availability_requested_at.map_or(true, |x| {
x.elapsed()
.map(|v| {
v > constants::REQUEST_PEER_VIEW_AVAILABILITY_NOT_BEFORE
})
.unwrap_or(true)
}));
(req, rem, query_availability)
};

let mut request_sent = false;
debug!(
" block_store::request_blocks() :.. Requests to send: {:?}",
requests
);
// Send all requests now ..
for request in requests.ranges.iter() {
if !request.is_empty() {
trace!(
"block_store::request_blocks() : peer = {:?} request = {:?}: sending block request",
peer,
request,
);
// Yay!
let message = ExternalMessage::BlockRequest(BlockRequest {
from_view: request.start,
to_view: request.end,
});
let request_id =
self.message_sender.send_external_message(*peer, message)?;
self.peer_info(*peer)
.pending_requests
.insert(request_id, (now, request.start, request.end));
request_sent = true;
}
}
// If we haven't got recent availability, and we haven't already asked for it, ask ..
if !request_sent && chance == 0 && query_availability {
trace!("block_store::request_blocks() : Querying availability");
// Executive decision: Don't ask for any blocks here, because we are about to do so in duplicate
// later and we don't want to duplicate work - you could viably go for a slightly faster
// sync by just asking for all the blocks and letting the peer send what it has.
let message = ExternalMessage::BlockRequest(BlockRequest {
from_view: 0,
to_view: 0,
});
let peer_info = self.peer_info(*peer);
peer_info.availability_requested_at = Some(now);
let _ = self.message_sender.send_external_message(*peer, message);
}

// We only need to request stuff from peers if we haven't already done so.
remain = rem;
}
}
trace!("block_store::request_blocks() : all done");
if !remain.is_empty() {
warn!(
"block_store::request_blocks() : Could not find peers for views {:?}",
remain
);
if let Some(us) = &mut self.unserviceable_requests {
us.with_range_map(&remain);
} else {
self.unserviceable_requests = Some(remain);
}
}
Ok(true)
pub fn request_blocks(&mut self, _req: &RangeMap) -> Result<bool> {
Ok(false) // FIXME: Stub
}

pub fn get_block(&self, hash: Hash) -> Result<Option<Block>> {
Expand Down
5 changes: 3 additions & 2 deletions zilliqa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
collections::{BTreeMap, BinaryHeap, VecDeque},
sync::Arc,
time::{Duration, Instant},
u64,
};

use alloy::primitives::BlockNumber;
Expand Down Expand Up @@ -924,7 +923,9 @@ impl Sync {
)?
.expect("missing highest block");
Ok((
self.in_pipeline > 0 || !matches!(self.state, SyncState::Phase0),
!self.chain_metadata.is_empty()
|| !self.chain_segments.is_empty()
|| !self.recent_proposals.is_empty(),
highest_block,
))
}
Expand Down

0 comments on commit 7b7a6e0

Please sign in to comment.