diff --git a/.github/workflows/base_benchmarks.yaml b/.github/workflows/base_benchmarks.yaml index 69c20fff5..5d0912ae1 100644 --- a/.github/workflows/base_benchmarks.yaml +++ b/.github/workflows/base_benchmarks.yaml @@ -33,7 +33,6 @@ jobs: --threshold-max-sample-size 64 \ --threshold-upper-boundary 0.99 \ --thresholds-reset \ - --err \ --adapter rust_criterion \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ cargo bench diff --git a/.github/workflows/pr_benchmarks.yaml b/.github/workflows/pr_benchmarks.yaml index 305e99d16..de51b72e5 100644 --- a/.github/workflows/pr_benchmarks.yaml +++ b/.github/workflows/pr_benchmarks.yaml @@ -34,7 +34,6 @@ jobs: --start-point-clone-thresholds \ --start-point-reset \ --testbed self-hosted \ - --err \ --adapter rust_criterion \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ cargo bench diff --git a/z2/src/converter.rs b/z2/src/converter.rs index 251a0a324..dbb1445d1 100644 --- a/z2/src/converter.rs +++ b/z2/src/converter.rs @@ -14,18 +14,14 @@ use bitvec::{bitarr, order::Msb0}; use eth_trie::{EthTrie, MemoryDB, Trie}; use indicatif::{ProgressBar, ProgressFinish, ProgressIterator, ProgressStyle}; use itertools::Itertools; -use libp2p::PeerId; use sha2::{Digest, Sha256}; -use tokio::sync::mpsc; use tracing::{debug, trace, warn}; use zilliqa::{ - block_store::BlockStore, cfg::{scilla_ext_libs_path_default, Amount, Config, NodeConfig}, crypto::{Hash, SecretKey}, db::Db, exec::store_external_libraries, message::{Block, QuorumCertificate, Vote, MAX_COMMITTEE_SIZE}, - node::{MessageSender, RequestId}, schnorr, scilla::{storage_key, CheckOutput, ParamValue, Transition}, state::{Account, Code, ContractInit, State}, @@ -346,27 +342,15 @@ pub async fn convert_persistence( "{msg} {wide_bar} [{per_sec}] {human_pos}/~{human_len} ({elapsed}/~{duration})", )?; - let (outbound_message_sender, _a) = mpsc::unbounded_channel(); - let (local_message_sender, _b) = mpsc::unbounded_channel(); - let message_sender = MessageSender { - our_shard: 0, - our_peer_id: PeerId::random(), - outbound_channel: outbound_message_sender, - local_channel: local_message_sender, - request_id: RequestId::default(), - }; + // let (outbound_message_sender, _a) = mpsc::unbounded_channel(); + // let (local_message_sender, _b) = mpsc::unbounded_channel(); let zq2_db = Arc::new(zq2_db); let node_config = &zq2_config.nodes[0]; - let block_store = Arc::new(BlockStore::new( - node_config, - zq2_db.clone(), - message_sender.clone(), - )?); let mut state = State::new_with_genesis( zq2_db.clone().state_trie()?, node_config.clone(), - block_store, + zq2_db.clone(), )?; let mut scilla_docker = run_scilla_docker()?; diff --git a/z2/src/docgen.rs b/z2/src/docgen.rs index 1f9b25a52..76d71a30c 100644 --- a/z2/src/docgen.rs +++ b/z2/src/docgen.rs @@ -14,7 +14,7 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use tera::Tera; use tokio::fs; -use zilliqa::{cfg::NodeConfig, crypto::SecretKey}; +use zilliqa::{cfg::NodeConfig, crypto::SecretKey, sync::SyncPeers}; const SUPPORTED_APIS_PATH_NAME: &str = "index"; @@ -352,10 +352,20 @@ pub fn get_implemented_jsonrpc_methods() -> Result Consensus { let secret_key = genesis_deposits[index].0; + let peer_id = secret_key.to_libp2p_keypair().public().to_peer_id(); let (outbound_message_sender, a) = mpsc::unbounded_channel(); let (local_message_sender, b) = mpsc::unbounded_channel(); let (reset_timeout_sender, c) = mpsc::unbounded_channel(); @@ -208,6 +212,7 @@ fn consensus( message_sender, reset_timeout_sender, Arc::new(db), + Arc::new(SyncPeers::new(peer_id)), ) .unwrap() } diff --git a/zilliqa/src/api/zilliqa.rs b/zilliqa/src/api/zilliqa.rs index e587597cf..5d0cc44b4 100644 --- a/zilliqa/src/api/zilliqa.rs +++ b/zilliqa/src/api/zilliqa.rs @@ -508,7 +508,7 @@ fn get_blockchain_info(_: Params, node: &Arc>) -> Result>) -> Result>) -> Result block.transactions.len(), @@ -1247,11 +1245,7 @@ fn get_recent_transactions( let mut txns = Vec::new(); let mut blocks_searched = 0; while block_number > 0 && txns.len() < 100 && blocks_searched < 100 { - let block = match node - .consensus - .block_store - .get_canonical_block_by_number(block_number)? - { + let block = match node.consensus.get_canonical_block_by_number(block_number)? { Some(block) => block, None => continue, }; @@ -1274,7 +1268,7 @@ fn get_recent_transactions( // GetNumTransactions fn get_num_transactions(_params: Params, node: &Arc>) -> Result { let node = node.lock().unwrap(); - let num_transactions = node.consensus.block_store.get_num_transactions()?; + let num_transactions = node.consensus.get_num_transactions()?; Ok(num_transactions.to_string()) } @@ -1283,7 +1277,6 @@ fn get_num_txns_tx_epoch(_params: Params, node: &Arc>) -> Result block.transactions.len(), @@ -1302,7 +1295,6 @@ fn get_num_txns_ds_epoch(_params: Params, node: &Arc>) -> Result Self { - Self { - parent_hash, - from, - proposal, - } - } -} - -/// A block cache. -/// We need to be careful to conserve block space in the presence of block flooding attacks, and we need to -/// make sure we don't lose blocks that form part of the main chain repeatedly, else we will never be able -/// to construct it. -/// -/// Similarly, we should ensure that we always buffer proposals close to the head of the tree, else we will -/// lose sync frequently and have to request, which will slow down block production. -/// -/// An easy way to do this is to put a hash of the node address (actually, we just use the low bits) in the -/// bottom (log2(N_WAYS)) bits of the view number. We then evict the largest tag le (max_view - buffer). -/// -/// I don't think it actually matters whether we use the view or the block number here, since we're not using -/// fixed-size arrays. -/// -#[derive(Debug, Serialize, Deserialize)] -pub struct BlockCache { - /// Caches proposals that are not yet blocks, and are before the head_cache. - pub cache: BTreeMap, - /// Caches proposals close to the head. - /// This buys us out of the situation where we are, say, 2 blocks behind the head. - /// We request those blocks, but by the time we get them, a new block is proposed. - /// So we're now a block behind. We request it, and then, by the time we get it ... - /// and so on. The head_cache caches broadcast proposals at the head of the chain - /// so we only need to get to (head_of_chain - head_cache_entries) and we can - /// then catch up using the head cache. - pub head_cache: BTreeMap, - /// Caches ranges where we think there is no block at all (just an empty view) - pub empty_view_ranges: RangeMap, - /// The head cache - this caches - /// An index into the cache by parent hash - pub by_parent_hash: HashMap>, - /// Set associative shift - pub shift: usize, - /// This is used to count the number of times we've looked for a fork. - /// The counter is zeroed when we receive (or pop) a new block, and counts 1 every - /// time we looked. - pub fork_counter: usize, - /// Copied from the parent to minimise the number of additional parameters we need. - pub max_blocks_in_flight: u64, - /// These are views which we have removed from the cache to process later. Remember not to re-request them, or - /// we will end up asking peers for views which we are about to process. - /// We need to remember to clear these flags once we have the proposal, because it might be a lie and we may need - /// to rerequest in order to get the right view (there will only ever be one legitimate view with a given number, - /// but peers may lie to us about what it is) - pub views_expecting_proposals: BTreeSet, -} - -impl BlockCache { - pub fn new(max_blocks_in_flight: u64) -> Self { - Self { - cache: BTreeMap::new(), - head_cache: BTreeMap::new(), - empty_view_ranges: RangeMap::new(), - by_parent_hash: HashMap::new(), - shift: 8 - constants::BLOCK_CACHE_LOG2_WAYS, - fork_counter: 0, - max_blocks_in_flight, - views_expecting_proposals: BTreeSet::new(), - } - } - - pub fn key_from_view(&self, peer: &PeerId, view_num: u64) -> u128 { - let ways = peer.to_bytes().pop().unwrap_or(0x00); - u128::from(ways >> self.shift) | (u128::from(view_num) << self.shift) - } - - pub fn view_from_key(&self, key: u128) -> u64 { - u64::try_from(key >> self.shift).unwrap() - } - - pub fn min_key_for_view(&self, view: u64) -> u128 { - u128::from(view) << self.shift - } - - pub fn expect_process_proposal(&mut self, view: u64) { - self.views_expecting_proposals.insert(view); - } - - pub fn received_process_proposal(&mut self, view: u64) { - self.views_expecting_proposals.remove(&view); - } - - /// returns the minimum key (view << shift) that we are prepared to store in the head cache. - /// keys smaller than this are stored in the main cache. - /// We compute this by subtracting a constant from (highest_known_view +1)<< shift - which is - /// the highest key we think could currently exist (highest view we've ever seen +1 shifted up). - /// (the constant is preshifted for efficiency) - /// This aims to keep the head cache at roughly BLOCK_CACHE_HEAD_BUFFER_ENTRIES entries - /// (note that this will be BLOCK_CACHE_HEAD_BUFFER_ENTRIES >> shift cached views, since the - /// head cache is set associative) - pub fn min_head_cache_key(&self, highest_known_view: u64) -> u128 { - let delta = u128::try_from(constants::BLOCK_CACHE_HEAD_BUFFER_ENTRIES).unwrap(); - let highest_key = u128::from(highest_known_view + 1) << self.shift; - highest_key // prevent underflowing for low views - .saturating_sub(delta) - } - - pub fn destructive_proposals_from_parent_hashes( - &mut self, - hashes: &[Hash], - ) -> Vec<(PeerId, Proposal)> { - // For each hash, find the list of blocks that have it as the parent. - let cache_keys = hashes - .iter() - .filter_map(|x| self.by_parent_hash.remove(x)) - .flatten() - .collect::>(); - let maybe = cache_keys - .iter() - .filter_map(|key| { - self.cache - .remove(key) - .or_else(|| self.head_cache.remove(key)) - .map(|entry| (entry.from, entry.proposal)) - }) - .collect::>(); - if !cache_keys.is_empty() { - let max_view = - self.view_from_key(cache_keys.iter().fold(0, |v1, v2| std::cmp::max(v1, *v2))); - // Ignore any gaps up to this point, because they may be lies. - (_, self.empty_view_ranges) = - self.empty_view_ranges - .diff_inter(&RangeMap::from_range(&Range { - start: 0, - end: max_view + 1, - })); - // We got a real block! Reset the fork counter. - self.fork_counter = 0; - } - maybe - } - - /// Delete all blocks in the cache up to and including block_number - pub fn delete_blocks_up_to(&mut self, block_number: u64) { - // note that this code embodies the assumption that increasing block number implies - // increasing view number. - self.trim_with_fn(|_, v| -> bool { v.proposal.number() <= block_number }); - } - - pub fn trim(&mut self, highest_confirmed_view: u64) { - let lowest_ignored_key = self.min_key_for_view(highest_confirmed_view); - debug!("trim: lowest_ignored_key = {0}", lowest_ignored_key); - self.trim_with_fn(|k, _| -> bool { *k < lowest_ignored_key }); - // We don't care about anything lower than what we're about to flush - self.views_expecting_proposals = self - .views_expecting_proposals - .split_off(&highest_confirmed_view); - } - - /// DANGER WILL ROBINSON! This function only searches from the minimum key to the maximum, so - /// any selector function which is not monotonic in key will not work properly. - fn trim_with_fn bool>(&mut self, selector: F) { - // We've deleted or replaced this key with this parent hash; remove it from the index. - fn unlink_parent_hash(cache: &mut HashMap>, key: &u128, hash: &Hash) { - let mut do_remove = false; - if let Some(val) = cache.get_mut(hash) { - val.remove(key); - if val.is_empty() { - do_remove = true - } - } - if do_remove { - cache.remove(hash); - } - } - - let cache_entries = self.max_blocks_in_flight << constants::BLOCK_CACHE_LOG2_WAYS; - // debug!("trim: cache had: {0}", self.extant_block_ranges()?); - // Should really be an option, but given that there is a convenient sentinel.. - let mut lowest_view_in_cache: Option = None; - let shift = self.shift; - - for cache_ptr in [&mut self.cache, &mut self.head_cache] { - while let Some((k, v)) = cache_ptr.first_key_value() { - if selector(k, v) { - // Kill it! - if let Some((k, v)) = cache_ptr.pop_first() { - unlink_parent_hash(&mut self.by_parent_hash, &k, &v.parent_hash); - }; - } else { - let view_number = u64::try_from(*k >> shift).unwrap(); - lowest_view_in_cache = Some( - lowest_view_in_cache.map_or(view_number, |x| std::cmp::min(x, view_number)), - ); - break; - } - } - } - - // Empty view ranges below the thing we last trimmed might not exist - zap them. - if let Some(v) = lowest_view_in_cache { - (_, self.empty_view_ranges) = - self.empty_view_ranges - .diff_inter(&RangeMap::from_range(&Range { - start: 0, - end: v + 1, - })); - } - // And trim. - let cache_size = usize::try_from(cache_entries).unwrap(); - self.empty_view_ranges.truncate(cache_size); - - while self.head_cache.len() > constants::BLOCK_CACHE_HEAD_BUFFER_ENTRIES { - if let Some((k, v)) = self.head_cache.pop_first() { - // Push this into the main cache, otherwise we will get into the state where - // blocks are removed from the head cache and lost and we are constantly - // requesting blocks to replace them. - self.cache.insert(k, v); - } - } - while self.cache.len() > cache_size { - if let Some((k, v)) = self.cache.pop_last() { - unlink_parent_hash(&mut self.by_parent_hash, &k, &v.parent_hash); - } - } - // Both caches are now at most the "right" number of entries long. - } - - pub fn no_blocks_at(&mut self, no_blocks_in: &Range) { - self.empty_view_ranges.with_range(no_blocks_in); - } - - pub fn delete_empty_view_range_cache(&mut self) { - self.empty_view_ranges = RangeMap::new(); - } - - /// Insert this proposal into the cache. - pub fn insert( - &mut self, - from: &PeerId, - parent_hash: &Hash, - proposal: Proposal, - highest_confirmed_view: u64, - highest_known_view: u64, - ) -> Result<()> { - fn insert_with_replacement( - into: &mut BTreeMap, - by_parent_hash: &mut HashMap>, - from: &PeerId, - parent_hash: &Hash, - key: u128, - value: Proposal, - ) { - into.insert(key, BlockCacheEntry::new(*parent_hash, *from, value)) - .map(|entry| { - by_parent_hash - .get_mut(&entry.parent_hash) - .map(|x| x.remove(&key)) - }); - if let Some(v) = by_parent_hash.get_mut(parent_hash) { - v.insert(key); - } else { - let mut new_set = HashSet::new(); - new_set.insert(key); - by_parent_hash.insert(*parent_hash, new_set); - } - } - - if proposal.header.view <= highest_confirmed_view { - // nothing to do. - return Ok(()); - } - // First, insert us. - let key = self.key_from_view(from, proposal.header.view); - if key > self.min_head_cache_key(highest_known_view) { - insert_with_replacement( - &mut self.head_cache, - &mut self.by_parent_hash, - from, - parent_hash, - key, - proposal, - ); - } else { - insert_with_replacement( - &mut self.cache, - &mut self.by_parent_hash, - from, - parent_hash, - key, - proposal, - ); - } - // Zero the fork counter. - self.fork_counter = 0; - // Now evict the worst entry - self.trim(highest_confirmed_view); - Ok(()) - } - - pub fn inc_fork_counter(&mut self) -> usize { - self.fork_counter += 1; - self.fork_counter - } - - pub fn reset_fork_counter(&mut self) { - self.fork_counter = 0; - } - - // For debugging - what view number ranges are in the cache? - pub fn extant_block_ranges(&self) -> RangeMap { - let mut result = RangeMap::new(); - let shift = 8 - constants::BLOCK_CACHE_LOG2_WAYS; - for key in self.cache.keys() { - let _ = u128::try_into(key >> shift).map(|x| result.with_elem(x)); - } - for key in self.head_cache.keys() { - let _ = u128::try_into(key >> shift).map(|x| result.with_elem(x)); - } - result - } - - pub fn expectant_block_ranges(&self) -> RangeMap { - let mut result = RangeMap::new(); - self.views_expecting_proposals.iter().for_each(|v| { - result.with_elem(*v); - }); - result - } -} - -/// Stores and manages the node's list of blocks. Also responsible for making requests for new blocks. -/// -/// # Syncing Algorithm -/// -/// We rely on [crate::consensus::Consensus] informing us of newly received block proposals via: -/// * [BlockStore::process_block] for blocks that can be part of our chain, because we already have their parent. -/// * [BlockStore::buffer_proposal] for blocks that can't (yet) be part of our chain. -/// -/// Both these code paths also call [BlockStore::request_missing_blocks]. This finds the greatest view of any proposal -/// we've seen (whether its part of our chain or not). -/// -/// -/// TODO(#1096): Retries for blocks we request but never receive. -#[derive(Debug)] -pub struct BlockStore { - db: Arc, - block_cache: Arc>>, - /// The maximum view of any proposal we have received, even if it is not part of our chain yet. - highest_known_view: u64, - /// Highest confirmed view - blocks we know to be correct. - highest_confirmed_view: u64, - /// Information we keep about our peers' state. - peers: BTreeMap, - /// The maximum number of blocks to send requests for at a time. - max_blocks_in_flight: u64, - /// When a request to a peer fails, do not send another request to this peer for this amount of time. - failed_request_sleep_duration: Duration, - /// Our block strategies. - strategies: Vec, - /// The block views we have available. This is read once from the DB at start-up and incrementally updated whenever - /// we receive a new block. We do this because obtaining the data from the DB is expensive. - available_blocks: RangeMap, - - /// Buffered block proposals. - buffered: BlockCache, - /// Requests we would like to send, but haven't been able to (e.g. because we have no peers). - unserviceable_requests: Option, - message_sender: MessageSender, - - /// Clock pointer - see request_blocks() - clock: usize, - - /// Where we last started syncing, so we can report it in get_sync_data() - started_syncing_at: BlockNumber, - /// Previous sync flag, so we can tell when it changes. - last_sync_flag: bool, -} - -/// Data about block availability sent between peers -#[derive(Clone, Debug, Serialize, Deserialize)] -struct BlockAvailability { - /// None means no information, Some([]) means the other node shouldn't be relied upon for any blocks at all. - strategies: Option>, - /// The largest view we've seen from a block that this peer sent us. - highest_known_view: u64, -} - -#[derive(Clone, Debug)] -struct PeerInfo { - /// Availability from this peer - availability: BlockAvailability, - /// When did we last update availability? - availability_updated_at: Option, - /// Last availability query - don't send them too often. - availability_requested_at: Option, - /// Requests we've sent to the peer. - pending_requests: HashMap, - /// If `Some`, the time of the most recently failed request. - last_request_failed_at: Option, -} - -impl PeerInfo { - fn new() -> Self { - Self { - availability: BlockAvailability::new(), - availability_updated_at: None, - availability_requested_at: None, - pending_requests: HashMap::new(), - last_request_failed_at: None, - } - } - - /// Do we have availability, or should we get it again? - fn have_availability(&self) -> bool { - self.availability_updated_at.is_some() - } - - /// Converts a set of block strategies into a rangemap - fn get_ranges(&self, max_view: Option) -> RangeMap { - let mut result = RangeMap::new(); - if let Some(strat) = &self.availability.strategies { - let mut max_end: Option = None; - let mut last_n: Option = None; - for s in strat { - match s { - BlockStrategy::CachedViewRange(views, until_view) => { - if until_view.map_or(true, |x| self.availability.highest_known_view <= x) { - result.with_range(views); - max_end = Some( - max_end.map_or(views.end - 1, |v| std::cmp::max(v, views.end - 1)), - ); - } - } - BlockStrategy::Latest(n) => { - last_n = Some(last_n.map_or(*n, |x| std::cmp::max(x, *n))); - } - } - } - if let Some(the_n) = last_n { - if let Some(max_view_nr) = max_view { - let start = max_view_nr.saturating_sub(the_n); - result.with_range(&Range { - start, - end: max_view_nr, - }); - } - } - } - result - } -} - -/// Data about a peer -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct PeerInfoStatus { - availability: BlockAvailability, - availability_updated_at: Option, - pending_requests: Vec<(String, SystemTime, u64, u64)>, - last_request_failed_at: Option, -} - -/// Data about the block store, used for debugging. -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct BlockStoreStatus { - highest_known_view: u64, - views_held: Vec>, - peers: Vec<(String, PeerInfoStatus)>, - availability: Option>, -} - -impl BlockStoreStatus { - pub fn new(block_store: &mut BlockStore) -> Result { - let peers = block_store - .peers - .iter() - .map(|(k, v)| (format!("{:?}", k), PeerInfoStatus::new(v))) - .collect::>(); - Ok(Self { - highest_known_view: block_store.highest_known_view, - views_held: block_store.db.get_view_ranges()?, - peers, - availability: block_store.availability()?, - }) - } -} - -impl PeerInfoStatus { - // Annoyingly, this can't (easily) be allowed to fail without making generating debug info hard. - fn new(info: &PeerInfo) -> Self { - fn s_from_time(q: Option) -> Option { - q.map(|z| { - z.duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or(Duration::ZERO) - .as_secs() - }) - } - let pending_requests = info - .pending_requests - .iter() - .map(|(k, v)| (format!("{:?}", k), v.0, v.1, v.2)) - .collect::>(); - Self { - availability: info.availability.clone(), - availability_updated_at: s_from_time(info.availability_updated_at), - pending_requests, - last_request_failed_at: s_from_time(info.last_request_failed_at), - } - } -} - -impl BlockAvailability { - pub fn new() -> Self { - Self { - strategies: None, - highest_known_view: 0, - } - } -} - -impl BlockStore { - pub fn new(config: &NodeConfig, db: Arc, message_sender: MessageSender) -> Result { - let available_blocks = - db.get_view_ranges()? - .iter() - .fold(RangeMap::new(), |mut range_map, range| { - range_map.with_range(range); - range_map - }); - Ok(BlockStore { - db, - block_cache: Arc::new(RwLock::new(LruCache::new(NonZeroUsize::new(5).unwrap()))), - highest_known_view: 0, - highest_confirmed_view: 0, - peers: BTreeMap::new(), - max_blocks_in_flight: config.max_blocks_in_flight, - failed_request_sleep_duration: config.failed_request_sleep_duration, - strategies: vec![BlockStrategy::Latest(constants::RETAINS_LAST_N_BLOCKS)], - available_blocks, - buffered: BlockCache::new(config.max_blocks_in_flight), - unserviceable_requests: None, - message_sender, - clock: 0, - started_syncing_at: 0, - last_sync_flag: false, - }) - } - - /// The data set here is held in memory. It can be useful to update manually - /// For example after a restart to remind block_store of its peers and height - pub fn set_peers_and_view( - &mut self, - highest_known_view: u64, - peer_ids: &Vec, - ) -> Result<()> { - for peer_id in peer_ids { - self.peer_info(*peer_id); - } - self.highest_known_view = highest_known_view; - Ok(()) - } - - /// Create a read-only clone of this [BlockStore]. The read-only property must be upheld by the caller - Calling - /// any `&mut self` methods on the returned [BlockStore] will lead to problems. This clone is cheap. - pub fn clone_read_only(&self) -> Arc { - Arc::new(BlockStore { - db: self.db.clone(), - block_cache: self.block_cache.clone(), - highest_known_view: 0, - highest_confirmed_view: 0, - peers: BTreeMap::new(), - max_blocks_in_flight: 0, - failed_request_sleep_duration: Duration::ZERO, - strategies: self.strategies.clone(), - available_blocks: RangeMap::new(), - buffered: BlockCache::new(0), - unserviceable_requests: None, - message_sender: self.message_sender.clone(), - clock: 0, - started_syncing_at: self.started_syncing_at, - last_sync_flag: self.last_sync_flag, - }) - } - - /// Update someone else's availability - pub fn update_availability( - &mut self, - from: PeerId, - avail: &Option>, - ) -> Result<()> { - let the_peer = self.peer_info(from); - the_peer.availability.strategies.clone_from(avail); - the_peer.availability_updated_at = Some(SystemTime::now()); - Ok(()) - } - - /// Retrieve our availability. - /// We need to do this by view range, which means that we need to account for views where there was no block. - /// So, the underlying db function finds the view lower and upper bounds of our contiguous block ranges and we - /// advertise those. - pub fn availability(&self) -> Result>> { - let mut to_return = self.strategies.clone(); - to_return.extend( - self.available_blocks - .ranges - .iter() - .map(|range| BlockStrategy::CachedViewRange(range.clone(), None)), - ); - Ok(Some(to_return)) - } - - /// Buffer a block proposal whose parent we don't yet know about. - pub fn buffer_proposal(&mut self, from: PeerId, proposal: Proposal) -> Result<()> { - let view = proposal.view(); - - // If this is the highest block we've seen, remember its view. - if view > self.highest_known_view { - trace!(view, "block_store:: new highest known view"); - self.highest_known_view = view; - } - - trace!( - "block_store:: buffer_proposal {view}, hc {0}", - self.highest_confirmed_view - ); - self.buffered.insert( - &from, - &proposal.header.qc.block_hash.clone(), - proposal, - self.highest_confirmed_view, - self.highest_known_view, - )?; - - let peer = self.peer_info(from); - if view > peer.availability.highest_known_view { - trace!(%from, view, "block_store:: new highest known view for peer"); - peer.availability.highest_known_view = view; - } - - Ok(()) - } - - /// This function: - /// - /// * Looks through the blocks we have - /// * Finds the next blocks it thinks we need - /// * Iterates through our known peers. - /// - /// If we don't have availability for a peer, we will request it by - /// sending an empty block request. - /// - /// If we do, we will try to request whatever blocks it has that we want. - /// - /// We limit the number of outstanding requests per peer, in order to - /// avoid bufferbloat at the peer's input message queue. - /// - /// We don't ask for blocks that we think are in flight (ie. we've - /// requested them but they have not yet arrived), those we don't think a - /// peer has, or those we think are gaps (remember that requests are made - /// by view, so you can't guarantee that every view has a block). - /// - /// We time out outstanding requests on a flat-timeout basis (our model - /// being that if you haven't replied by now, the whole message has - /// probably been lost). - /// Returns whether this function thinks we are syncing or not. - pub fn request_missing_blocks(&mut self) -> Result { - // Get the highest view we currently have committed to our chain. - // This is a bit horrid - it can go down as well as up, because we can roll back blocks - // when we discover that they are ahead of what we think the rest of the chain - // has committed to - if we don't roll back here, we won't then fetch the canonical - // versions of those blocks (thinking we already have them). - let (syncing, current_block) = self.am_syncing()?; - self.highest_confirmed_view = current_block.view(); - let current_view = current_block.view(); - trace!( - "block_store::request_missing_blocks() : set highest_confirmed_view {0} (current = {1})", - self.highest_confirmed_view, - current_view, - ); - - // First off, let's load up the unserviceable requests. - let mut to_request = if let Some(us_requests) = self.unserviceable_requests.take() { - us_requests - } else { - RangeMap::new() - }; - - // If we think the network might be ahead of where we currently are, attempt to download the missing blocks. - // This is complicated, because we mustn't request more blocks than will fit in our cache, or we might - // end up evicting the critical part of the chain.. - // @todo I can't think of a more elegant way than this, but it's horrid - we want to exclude views which - // we might still be voting on. - if syncing { - trace!( - current_view, - self.highest_known_view, - self.max_blocks_in_flight, - "block_store::request_missing_blocks() : missing some blocks" - ); - { - // We need to request from current_view, because these blocks might never be returned by our peers - // deduplication of requests is done one level lower - in request_blocks(). - let from = current_view + 1; - // Never request more than current_view + max_blocks_in_flight, or the cache won't be able to hold - // the responses and we'll end up being unable to reconstruct the chain. Not strictly true, because - // the network will hold some blocks for us, but true enough that I think we ought to treat it as - // such. - let to = cmp::min( - current_view + self.max_blocks_in_flight, - self.highest_known_view, - ); - trace!("block_store::request_missing_blocks() : requesting blocks {from} to {to}"); - to_request.with_range(&Range { - start: from, - end: to + 1, - }); - } - if !to_request.is_empty() { - self.request_blocks(&to_request)?; - } - } else { - // We're synced - no need to try and guess forks. - trace!( - "block_store::request_missing_blocks() : synced with highest_known_view {0}, current_view {1}", - self.highest_known_view, - current_view - ); - self.buffered.reset_fork_counter(); - } - - if syncing && !self.last_sync_flag { - // We didn't used to be syncing; remember when we started. - self.started_syncing_at = current_block.number(); - } - self.last_sync_flag = syncing; - - Ok(syncing) - } - - pub fn prune_pending_requests(&mut self) -> Result<()> { - // In the good old days, we could've done this by linear interpolation on the timestamp. - let current_time = SystemTime::now(); - for peer in self.peers.keys().cloned().collect::>() { - let the_peer = self.peer_info(peer); - the_peer.pending_requests = the_peer - .pending_requests - .iter() - .filter_map(|(k, (v1, v2, v3))| { - // How long since this request was sent? - match current_time.duration_since(*v1) { - Ok(since) => { - if since > constants::BLOCK_REQUEST_RESPONSE_TIMEOUT { - // Time out everything. - trace!("block_store::prune_pending_requests: timing out pending request {k:?} {v1:?} {v2} {v3}"); - None - } else { - Some((*k, (*v1, *v2, *v3))) - } - } - _ => None, - } - }) - .collect(); - } - Ok(()) - } - - pub fn retry_us_requests(&mut self) -> Result<()> { - if let Some(us_requests) = self.unserviceable_requests.take() { - self.request_blocks(&us_requests)?; - } - Ok(()) - } - - /// Make a request for the blocks associated with a range of views. Returns `true` if a request was made and `false` if the request had to be - /// buffered because no peers were available. - /// Public so we can trigger it from the debug API - pub fn request_blocks(&mut self, req: &RangeMap) -> Result { - let mut remain = req.clone(); - let to = req.max(); - - // Prune the pending requests - self.prune_pending_requests()?; - - // If it's in our input queue, don't expect it again. - let expected = self.buffered.expectant_block_ranges(); - (_, remain) = remain.diff_inter(&expected); - - // If it's already buffered, don't request it again - wait for us to reject it and - // then we can re-request. - let extant = self.buffered.extant_block_ranges(); - - (_, remain) = remain.diff_inter(&extant); - (_, remain) = remain.diff_inter(&self.buffered.empty_view_ranges); - - // If it's in flight, don't request it again. - let mut in_flight = RangeMap::new(); - for peer in self.peers.values() { - for (_, start, end) in peer.pending_requests.values() { - in_flight.with_range(&Range { - start: *start, - end: end + 1, - }); - } - } - (_, remain) = remain.diff_inter(&in_flight); - - let now = SystemTime::now(); - let failed_request_sleep_duration = self.failed_request_sleep_duration; - - // If everything we have is in flight, we'll skip trying to request them (or update availability) - if remain.is_empty() { - trace!("block_store::request_blocks() : .. no non in_flight requests. Returning early"); - return Ok(true); - } - - for chance in 0..2 { - trace!( - "block_store::request_blocks() : chance = {chance} clock = {} peers = {}", - self.clock, - self.peers.len() - ); - // There may be no peers ... - self.clock = (self.clock + 1) % std::cmp::max(1, self.peers.len()); - // Slightly horrid - generate a list of peers which is the BTreeMap's list, shifted by clock. - let peers = self - .peers - .keys() - .skip(self.clock) - .chain(self.peers.keys().take(self.clock)) - .cloned() - .collect::>(); - - for peer in &peers { - debug!("block_store::request_blocks() : considering peer = {peer}"); - // If the last request failed < 10s or so ago, skip this peer, unless we're second-chance in - // which case, hey, why not? - let (requests, rem, query_availability) = { - let peer_info = self.peer_info(*peer); - if chance == 0 - && !peer_info - .last_request_failed_at - .and_then(|at| at.elapsed().ok()) - .map(|time_since| time_since > failed_request_sleep_duration) - .unwrap_or(true) - { - trace!("block_store::request_blocks() : .. Last request failed; skipping this peer"); - continue; - } - - if peer_info.pending_requests.len() - >= constants::MAX_PENDING_BLOCK_REQUESTS_PER_PEER - { - trace!( - "block_store::request_blocks() : .. Skipping peer {peer} - too many pending requests {0}", - peer_info.pending_requests.len() - ); - continue; - } - // Split .. - let left = constants::MAX_PENDING_BLOCK_REQUESTS_PER_PEER - - peer_info.pending_requests.len(); - let ranges = peer_info.get_ranges(to); - let (req, rem) = remain.diff_inter_limited(&ranges, Some(left)); - // If we are not about to make a request, and we do not have recent availability then - // make a synthetic request to get that availability. - let query_availability = req.is_empty() - && peer_info.pending_requests.is_empty() - && (!peer_info.have_availability() - || peer_info.availability_requested_at.map_or(true, |x| { - x.elapsed() - .map(|v| { - v > constants::REQUEST_PEER_VIEW_AVAILABILITY_NOT_BEFORE - }) - .unwrap_or(true) - })); - (req, rem, query_availability) - }; - - let mut request_sent = false; - // Send all requests now .. - for request in requests.ranges.iter() { - if !request.is_empty() { - trace!( - "block_store::request_blocks() : peer = {:?} request = {:?}: sending block request", - peer, - request, - ); - // Yay! - let message = ExternalMessage::BlockRequest(BlockRequest { - from_view: request.start, - to_view: request.end, - }); - let request_id = - self.message_sender.send_external_message(*peer, message)?; - self.peer_info(*peer) - .pending_requests - .insert(request_id, (now, request.start, request.end)); - request_sent = true; - } - } - // If we haven't got recent availability, and we haven't already asked for it, ask .. - if !request_sent && chance == 0 && query_availability { - trace!("block_store::request_blocks() : Querying availability"); - // Executive decision: Don't ask for any blocks here, because we are about to do so in duplicate - // later and we don't want to duplicate work - you could viably go for a slightly faster - // sync by just asking for all the blocks and letting the peer send what it has. - let message = ExternalMessage::BlockRequest(BlockRequest { - from_view: 0, - to_view: 0, - }); - let peer_info = self.peer_info(*peer); - peer_info.availability_requested_at = Some(now); - let _ = self.message_sender.send_external_message(*peer, message); - } - - // We only need to request stuff from peers if we haven't already done so. - remain = rem; - } - } - trace!("block_store::request_blocks() : all done"); - if !remain.is_empty() { - warn!( - "block_store::request_blocks() : Could not find peers for views {:?}", - remain - ); - if let Some(us) = &mut self.unserviceable_requests { - us.with_range_map(&remain); - } else { - self.unserviceable_requests = Some(remain); - } - } - Ok(true) - } - - pub fn get_block(&self, hash: Hash) -> Result> { - let mut block_cache = self - .block_cache - .write() - .map_err(|e| anyhow!("Failed to get write access to block cache: {e}"))?; - if let Some(block) = block_cache.get(&hash) { - return Ok(Some(block.clone())); - } - let Some(block) = self.db.get_block_by_hash(&hash)? else { - return Ok(None); - }; - block_cache.put(hash, block.clone()); - Ok(Some(block)) - } - - pub fn get_block_by_view(&self, view: u64) -> Result> { - let Some(hash) = self.db.get_block_hash_by_view(view)? else { - return Ok(None); - }; - self.get_block(hash) - } - - pub fn get_highest_canonical_block_number(&self) -> Result> { - self.db.get_highest_canonical_block_number() - } - - pub fn get_canonical_block_by_number(&self, number: u64) -> Result> { - self.db.get_canonical_block_by_number(number) - } - - /// Called to process a block which can be added to the chain. - /// - insert the block into any necessary databases - /// - update the highest known and confirmed views, if necessary, - /// - Return a list of proposals that can now be made part of the chain, removing - /// them from the cache to free up space as we do so. - /// - /// The caller should arrange to process the returned list asynchronously to avoid - /// blocking message processing for too long. - pub fn process_block( - &mut self, - from: Option, - block: Block, - ) -> Result> { - trace!(?from, number = block.number(), hash = ?block.hash(), "block_store::process_block() : starting"); - self.db.insert_block(&block)?; - self.available_blocks.with_elem(block.view()); - - if let Some(from) = from { - let peer = self.peer_info(from); - if block.view() > peer.availability.highest_known_view { - trace!(%from, view = block.view(), "new highest known view for peer"); - peer.availability.highest_known_view = block.view(); - } - } - - // There are two sets - let result = self - .buffered - .destructive_proposals_from_parent_hashes(&[block.hash()]); - - // Update highest_confirmed_view, but don't trim the cache if - // we're not changing anything. - if block.header.view > self.highest_confirmed_view { - self.highest_confirmed_view = block.header.view; - self.buffered.trim(self.highest_confirmed_view); - } - - Ok(result) - } - - pub fn report_outgoing_message_failure( - &mut self, - failure: OutgoingMessageFailure, - ) -> Result<()> { - let peer_info = self.peer_info(failure.peer); - let Some((_, from, to)) = peer_info.pending_requests.remove(&failure.request_id) else { - // A request we didn't know about failed. It must have been sent by someone else. - return Ok(()); - }; - peer_info.last_request_failed_at = Some(SystemTime::now()); - - trace!("block_store : outgoing_message_failure: re-requesting {from} - {to}"); - self.request_blocks(&RangeMap::from_closed_interval(from, to))?; - - Ok(()) - } - - fn peer_info(&mut self, peer: PeerId) -> &mut PeerInfo { - self.peers.entry(peer).or_insert_with(PeerInfo::new) - } - - pub fn forget_block_range(&mut self, blocks: Range) -> Result<()> { - self.db.forget_block_range(blocks) - } - - pub fn contains_block(&mut self, block_hash: &Hash) -> Result { - self.db.contains_block(block_hash) - } - - // Retrieve the plausible next blocks for the block with this hash - // Because of forks there might be many of these. - pub fn obtain_child_block_candidates_for( - &mut self, - hashes: &[Hash], - ) -> Result> { - // The easy case is that there's something in the buffer with us as its parent hash. - let with_parent_hashes = self - .buffered - .destructive_proposals_from_parent_hashes(hashes); - if with_parent_hashes.is_empty() { - // There isn't. There are three cases: - // - // 1. We simply haven't received the next block yet. Give up and wait for it. - // 2. We have received a lie for the next block. Delete it and try again. - // 3. There was a fork and so the true next block is a bit further on in the - // chain than we've looked so far. - // - // There would be a few easy optimisations if we could eg. assume that forks were max length - // 1. As it is, I can't think of a clever way to do this, so... - - // In any case, deleting any cached block that calls itself the next block is - // the right thing to do - if it really was the next block, we would not be - // executing this branch. - if let Some(highest_block_number) = self.db.get_highest_canonical_block_number()? { - self.buffered.delete_blocks_up_to(highest_block_number + 1); - trace!( - "block_store::obtain_child_block_candidates : deleted cached blocks up to and including {0}", - highest_block_number + 1 - ); - } - - let fork_elems = - self.buffered.inc_fork_counter() * (1 + constants::EXAMINE_BLOCKS_PER_FORK_COUNT); - - // Limit the number of forks to process otherwise the db query can take too long - const MAX_FORK_BLOCKS_TO_QUERY: usize = 512; - let fork_elems = cmp::min(fork_elems, MAX_FORK_BLOCKS_TO_QUERY); - - let parent_hashes = self.db.get_highest_block_hashes(fork_elems)?; - let revised = self - .buffered - .destructive_proposals_from_parent_hashes(&parent_hashes); - if !revised.is_empty() { - // Found some! - self.buffered.reset_fork_counter(); - } - Ok(revised) - } else { - Ok(with_parent_hashes) - } - } - - pub fn next_proposals_if_likely(&mut self) -> Result> { - // This is a bit sneaky, but the db overhead is just stepping through its B-Tree and this - // lets us cut out a lot of forks with 0 retries. - self.obtain_child_block_candidates_for( - &self - .db - .get_highest_block_hashes(constants::EXAMINE_BLOCKS_PER_FORK_COUNT)?, - ) - } - - pub fn delete_empty_view_range_cache(&mut self) { - self.buffered.delete_empty_view_range_cache(); - } - - /// Suppose that there is a view with no associated block. - /// Because we request views, not blocks, we will ask for blocks for those views. - /// Because there are no valid blocks in those views, we won't get them. - /// We will therefore ask again, and continue doing so forever, potentially exhausting our capacity for outstanding - /// view requests and blocking us from requesting blocks from views in which they might be extant. - /// We avoid this by finding the gaps between the view numbers of proposals we receive and caching - /// this list in the block_cache. We then arrange not to rerequest blocks in views for which we know there are no - /// valid blocks - remembering to clear this periodically in case a malicious node has lied to us about it. - /// - /// this function takes a list of proposals in a block response, works out the gaps between them and caches - /// the result. Gaps at the beginning of the sequence are recorded in the space between from_view and the view of the - /// first proposal; gaps at the end are ignored (and will be returned when we ask for the next view up from where - /// this block proposal left off). - pub fn buffer_lack_of_proposals( - &mut self, - from_view: u64, - proposals: &Vec, - ) -> Result<()> { - // OK. Find the gaps and register them as areas not to ask about again, because - // we now "know" that there is no block in this range. - // If this turns out to be a lie, we will pop the first block in the gap and check to see - // if it our next block. This will have the side-effect of forgetting about any gaps before - // that point, which we will then re-query, realise our mistake and carry on. - // @todo this is horribly slow - speed it up! - let mut gap_start = from_view; - let mut gap_end; - for p in proposals { - gap_end = p.header.view; - if gap_end > gap_start { - self.buffered.no_blocks_at(&Range { - start: gap_start, - end: gap_end, - }); - } - gap_start = gap_end + 1; - } - // There's never a gap at the end, because we don't know at which view we stopped. - Ok(()) - } - - pub fn get_num_transactions(&self) -> Result { - let count = self.db.get_total_transaction_count()?; - Ok(count) - } - - pub fn summarise_buffered(&self) -> RangeMap { - self.buffered.extant_block_ranges() - } - - pub fn expect_process_proposal(&mut self, view: u64) { - self.buffered.expect_process_proposal(view); - } - - pub fn received_process_proposal(&mut self, view: u64) { - self.buffered.received_process_proposal(view); - } - - /// Returns (am_syncing, current_highest_block) - pub fn am_syncing(&self) -> Result<(bool, Block)> { - let current_block = self - .db - .get_canonical_block_by_number( - self.db - .get_highest_canonical_block_number()? - .ok_or_else(|| anyhow!("no highest block"))?, - )? - .ok_or_else(|| anyhow!("missing highest block"))?; - Ok(( - (self.highest_known_view + 2) > current_block.view(), - current_block, - )) - } - - // Returns (starting_block, current_block, highest_block) if we're syncing, - // None if we're not. - pub fn get_sync_data(&self) -> Result> { - let (flag, highest_block) = self.am_syncing()?; - if !flag { - Ok(None) - } else { - // Compute the highest block. We're going to do this by taking the difference between - - // get an estimated block number if no more views were skipped. - let skipped_views = highest_block.view() - highest_block.number(); - let expected_highest_block_number = self.highest_known_view - skipped_views; - Ok(Some(( - self.started_syncing_at, - highest_block.number(), - expected_highest_block_number, - ))) - } - } -} diff --git a/zilliqa/src/cfg.rs b/zilliqa/src/cfg.rs index 21f46d86b..6fdeef28d 100644 --- a/zilliqa/src/cfg.rs +++ b/zilliqa/src/cfg.rs @@ -101,10 +101,10 @@ pub struct NodeConfig { pub block_request_limit: usize, /// The maximum number of blocks to have outstanding requests for at a time when syncing. #[serde(default = "max_blocks_in_flight_default")] - pub max_blocks_in_flight: u64, + pub max_blocks_in_flight: usize, /// The maximum number of blocks to request in a single message when syncing. #[serde(default = "block_request_batch_size_default")] - pub block_request_batch_size: u64, + pub block_request_batch_size: usize, /// The maximum number of key value pairs allowed to be returned withing the response of the `GetSmartContractState` RPC. Defaults to no limit. #[serde(default = "state_rpc_limit_default")] pub state_rpc_limit: usize, @@ -204,11 +204,11 @@ pub fn block_request_limit_default() -> usize { 100 } -pub fn max_blocks_in_flight_default() -> u64 { +pub fn max_blocks_in_flight_default() -> usize { 1000 } -pub fn block_request_batch_size_default() -> u64 { +pub fn block_request_batch_size_default() -> usize { 100 } diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index 9a75179ec..9a49c03b9 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -19,7 +19,6 @@ use tokio::sync::{broadcast, mpsc::UnboundedSender}; use tracing::*; use crate::{ - block_store::BlockStore, blockhooks, cfg::{ConsensusConfig, NodeConfig}, constants::TIME_TO_ALLOW_PROPOSAL_BROADCAST, @@ -29,13 +28,13 @@ use crate::{ inspector::{self, ScillaInspector, TouchedAddressInspector}, message::{ AggregateQc, BitArray, BitSlice, Block, BlockHeader, BlockRef, BlockStrategy, - ExternalMessage, InternalMessage, NewView, ProcessProposal, Proposal, QuorumCertificate, - Vote, MAX_COMMITTEE_SIZE, + ExternalMessage, InternalMessage, NewView, Proposal, QuorumCertificate, Vote, + MAX_COMMITTEE_SIZE, }, - node::{MessageSender, NetworkMessage, OutgoingMessageFailure}, + node::{MessageSender, NetworkMessage}, pool::{TransactionPool, TxAddResult, TxPoolContent}, - range_map::RangeMap, state::State, + sync::{Sync, SyncPeers}, time::SystemTime, transaction::{EvmGas, SignedTransaction, TransactionReceipt, VerifiedTransaction}, }; @@ -149,7 +148,7 @@ pub struct Consensus { config: NodeConfig, message_sender: MessageSender, reset_timeout: UnboundedSender, - pub block_store: BlockStore, + pub sync: Sync, latest_leader_cache: RefCell>, votes: BTreeMap, /// Votes for a block we don't have stored. They are retained in case we receive the block later. @@ -186,6 +185,7 @@ impl Consensus { message_sender: MessageSender, reset_timeout: UnboundedSender, db: Arc, + peers: Arc, ) -> Result { trace!( "Opening database in {:?} for shard {}", @@ -204,18 +204,16 @@ impl Consensus { )?; } - // It is important to create the `BlockStore` after the checkpoint has been loaded into the DB. The - // `BlockStore` pre-loads and caches information about the currently stored blocks. - let block_store = BlockStore::new(&config, db.clone(), message_sender.clone())?; - let latest_block = db .get_finalized_view()? - .map(|view| { - block_store - .get_block_by_view(view)? - .ok_or_else(|| anyhow!("no header found at view {view}")) + .and_then(|view| { + db.get_block_hash_by_view(view) + .expect("no header found at view {view}") }) - .transpose()?; + .and_then(|hash| { + db.get_block_by_hash(&hash) + .expect("no block found for hash {hash}") + }); let mut state = if let Some(latest_block) = &latest_block { trace!("Loading state from latest block"); @@ -223,15 +221,11 @@ impl Consensus { db.state_trie()?, latest_block.state_root_hash().into(), config.clone(), - block_store.clone_read_only(), + db.clone(), ) } else { trace!("Constructing new state from genesis"); - State::new_with_genesis( - db.state_trie()?, - config.clone(), - block_store.clone_read_only(), - ) + State::new_with_genesis(db.state_trie()?, config.clone(), db.clone()) }?; let (latest_block, latest_block_view) = match latest_block { @@ -245,10 +239,9 @@ impl Consensus { let (start_view, finalized_view, high_qc) = { match db.get_high_qc()? { Some(qc) => { - let high_block = block_store - .get_block(qc.block_hash)? + let high_block = db + .get_block_by_hash(&qc.block_hash)? .ok_or_else(|| anyhow!("missing block that high QC points to!"))?; - let finalized_number = db .get_finalized_view()? .ok_or_else(|| anyhow!("missing latest finalized view!"))?; @@ -282,22 +275,19 @@ impl Consensus { // If we have newer blocks, erase them // @todo .. more elegantly :-) loop { - let highest_block_number = db - .get_highest_canonical_block_number()? - .ok_or_else(|| anyhow!("can't find highest block num in database!"))?; - - let head_block = block_store - .get_canonical_block_by_number(highest_block_number)? - .ok_or_else(|| anyhow!("missing head block!"))?; + let head_block = db + .get_highest_recorded_block()? + .ok_or_else(|| anyhow!("can't find highest block in database!"))?; trace!( - "recovery: highest_block_number {highest_block_number} view {0}", + "recovery: highest_block_number {} view {}", + head_block.number(), head_block.view() ); if head_block.view() > high_block.view() && head_block.view() > finalized_number { - trace!("recovery: stored block {0} reverted", highest_block_number); + trace!("recovery: stored block {0} reverted", head_block.number()); db.remove_transactions_executed_in_block(&head_block.hash())?; db.remove_block(&head_block)?; } else { @@ -319,10 +309,18 @@ impl Consensus { } }; + let sync = Sync::new( + &config, + db.clone(), + &latest_block, + message_sender.clone(), + peers.clone(), + )?; + let mut consensus = Consensus { secret_key, config, - block_store, + sync, latest_leader_cache: RefCell::new(None), message_sender, reset_timeout, @@ -372,6 +370,8 @@ impl Consensus { .at_root(parent.state_root_hash().into()) .get_stakers(block.header)?, )?; + + consensus.sync.set_checkpoint(&block); } // If timestamp of when current high_qc was written exists then use it to estimate the minimum number of blocks the network has moved on since shut down @@ -394,8 +394,8 @@ impl Consensus { // Remind block_store of our peers and request any potentially missing blocks let high_block = consensus - .block_store - .get_block(high_qc.block_hash)? + .db + .get_block_by_hash(&high_qc.block_hash)? .ok_or_else(|| anyhow!("missing block that high QC points to!"))?; let executed_block = BlockHeader { @@ -406,24 +406,15 @@ impl Consensus { // Grab last seen committee's peerIds in case others also went offline let committee = state_at.get_stakers(executed_block)?; - let recent_peer_ids: Vec<_> = committee + let recent_peer_ids = committee .iter() .filter(|&&peer_public_key| peer_public_key != consensus.public_key()) .filter_map(|&peer_public_key| { state_at.get_peer_id(peer_public_key).unwrap_or(None) }) - .collect(); + .collect_vec(); - consensus - .block_store - .set_peers_and_view(high_block.view(), &recent_peer_ids)?; - // It is likley that we missed the most recent proposal. Request it now - consensus - .block_store - .request_blocks(&RangeMap::from_closed_interval( - high_block.view(), - high_block.view() + 1, - ))?; + peers.add_peers(recent_peer_ids); } Ok(consensus) @@ -456,11 +447,11 @@ impl Consensus { pub fn head_block(&self) -> Block { let highest_block_number = self - .block_store + .db .get_highest_canonical_block_number() .unwrap() .unwrap(); - self.block_store + self.db .get_canonical_block_by_number(highest_block_number) .unwrap() .unwrap() @@ -643,7 +634,9 @@ impl Consensus { block.hash() ); - if self.block_store.contains_block(&block.hash())? { + // FIXME: Cleanup + + if self.db.contains_block(&block.hash())? { trace!("ignoring block proposal, block store contains this block already"); return Ok(None); } @@ -669,29 +662,9 @@ impl Consensus { return Ok(None); } - match self.check_block(&block, during_sync) { - Ok(()) => {} - Err((e, temporary)) => { - // If this block could become valid in the future, buffer it. - if temporary { - self.block_store.buffer_proposal( - from, - Proposal::from_parts_with_hashes( - block, - transactions - .into_iter() - .map(|tx| { - let hash = tx.calculate_hash(); - (tx, hash) - }) - .collect(), - ), - )?; - } else { - warn!(?e, "invalid block proposal received!"); - } - return Ok(None); - } + if let Err(e) = self.check_block(&block, during_sync) { + warn!(?e, "invalid block proposal received!"); + return Ok(None); } self.update_high_qc_and_view(block.agg.is_some(), block.header.qc)?; @@ -714,19 +687,6 @@ impl Consensus { block.view(), view ); - self.block_store.buffer_proposal( - from, - Proposal::from_parts_with_hashes( - block, - transactions - .into_iter() - .map(|tx| { - let hash = tx.calculate_hash(); - (tx, hash) - }) - .collect(), - ), - )?; return Ok(None); } @@ -870,7 +830,6 @@ impl Consensus { let proposer_address = parent_state.get_reward_address(proposer)?; - let mut total_cosigner_stake = 0; let cosigner_stake: Vec<_> = committee .iter() .enumerate() @@ -882,11 +841,15 @@ impl Consensus { .unwrap() .unwrap() .get(); - total_cosigner_stake += stake; (reward_address, stake) }) .collect(); + let total_cosigner_stake = cosigner_stake.iter().fold(0, |sum, c| sum + c.1); + if total_cosigner_stake == 0 { + return Err(anyhow!("total stake is 0")); + } + // Track total awards given out. This may be different to rewards_per_block because we round down on division when we split the rewards let mut total_rewards_issued = 0; @@ -1721,7 +1684,8 @@ impl Consensus { fn committee_for_hash(&self, parent_hash: Hash) -> Result> { let Ok(Some(parent)) = self.get_block(&parent_hash) else { - return Err(anyhow!("parent block not found: {:?}", parent_hash)); + // tracing::error!("parent block not found: {:?}", parent_hash); + return Ok(Vec::new()); // return an empty vector instead of Err for graceful app-level error-handling }; let parent_root_hash = parent.state_root_hash(); @@ -1963,7 +1927,7 @@ impl Consensus { new_high_qc: QuorumCertificate, ) -> Result<()> { let view = self.get_view()?; - let Some(new_high_qc_block) = self.block_store.get_block(new_high_qc.block_hash)? else { + let Some(new_high_qc_block) = self.db.get_block_by_hash(&new_high_qc.block_hash)? else { // We don't set high_qc to a qc if we don't have its block. warn!("Recieved potential high QC but didn't have the corresponding block"); return Ok(()); @@ -2244,41 +2208,32 @@ impl Consensus { /// Check the validity of a block. Returns `Err(_, true)` if this block could become valid in the future and /// `Err(_, false)` if this block could never be valid. - fn check_block(&self, block: &Block, during_sync: bool) -> Result<(), (anyhow::Error, bool)> { - block.verify_hash().map_err(|e| (e, false))?; + fn check_block(&self, block: &Block, during_sync: bool) -> Result<()> { + block.verify_hash()?; if block.view() == 0 { // We only check a block if we receive it from an external source. We obviously already have the genesis // block, so we aren't ever expecting to receive it. - return Err((anyhow!("tried to check genesis block"), false)); + return Err(anyhow!("tried to check genesis block")); } - let Some(parent) = self - .get_block(&block.parent_hash()) - .map_err(|e| (e, false))? - else { + let Some(parent) = self.get_block(&block.parent_hash())? else { warn!( "Missing parent block while trying to check validity of block number {}", block.number() ); - return Err((MissingBlockError::from(block.parent_hash()).into(), true)); + return Err(MissingBlockError::from(block.parent_hash()).into()); }; - let finalized_view = self.get_finalized_view().map_err(|e| (e, false))?; - let Some(finalized_block) = self - .get_block_by_view(finalized_view) - .map_err(|e| (e, false))? - else { - return Err((MissingBlockError::from(finalized_view).into(), false)); + let finalized_view = self.get_finalized_view()?; + let Some(finalized_block) = self.get_block_by_view(finalized_view)? else { + return Err(MissingBlockError::from(finalized_view).into()); }; if block.view() < finalized_block.view() { - return Err(( - anyhow!( - "block is too old: view is {} but we have finalized {}", - block.view(), - finalized_block.view() - ), - false, + return Err(anyhow!( + "block is too old: view is {} but we have finalized {}", + block.view(), + finalized_block.view() )); } @@ -2293,12 +2248,11 @@ impl Consensus { let committee = self .state .at_root(parent.state_root_hash().into()) - .get_stakers(block.header) - .map_err(|e| (e, false))?; + .get_stakers(block.header)?; if verified.is_err() { info!(?block, "Unable to verify block = "); - return Err((anyhow!("invalid block signature found! block hash: {:?} block view: {:?} committee len {:?}", block.hash(), block.view(), committee.len()), false)); + return Err(anyhow!("invalid block signature found! block hash: {:?} block view: {:?} committee len {:?}", block.hash(), block.view(), committee.len())); } // Check if the co-signers of the block's QC represent the supermajority. @@ -2307,13 +2261,11 @@ impl Consensus { &committee, parent.state_root_hash(), block, - ) - .map_err(|e| (e, false))?; + )?; // Verify the block's QC signature - note the parent should be the committee the QC // was signed over. - self.verify_qc_signature(&block.header.qc, committee.clone()) - .map_err(|e| (e, false))?; + self.verify_qc_signature(&block.header.qc, committee.clone())?; if let Some(agg) = &block.agg { // Check if the signers of the block's aggregate QC represent the supermajority self.check_quorum_in_indices( @@ -2321,24 +2273,16 @@ impl Consensus { &committee, parent.state_root_hash(), block, - ) - .map_err(|e| (e, false))?; + )?; // Verify the aggregate QC's signature - self.batch_verify_agg_signature(agg, &committee) - .map_err(|e| (e, false))?; + self.batch_verify_agg_signature(agg, &committee)?; } // Retrieve the highest among the aggregated QCs and check if it equals the block's QC. - let block_high_qc = self.get_high_qc_from_block(block).map_err(|e| (e, false))?; - let Some(block_high_qc_block) = self - .get_block(&block_high_qc.block_hash) - .map_err(|e| (e, false))? - else { + let block_high_qc = self.get_high_qc_from_block(block)?; + let Some(block_high_qc_block) = self.get_block(&block_high_qc.block_hash)? else { warn!("missing finalized block4"); - return Err(( - MissingBlockError::from(block_high_qc.block_hash).into(), - false, - )); + return Err(MissingBlockError::from(block_high_qc.block_hash).into()); }; // Prevent the creation of forks from the already committed chain if block_high_qc_block.view() < finalized_block.view() { @@ -2348,19 +2292,16 @@ impl Consensus { finalized_block.view(), self.high_qc, block); - return Err(( - anyhow!( - "invalid block - high QC view is {} while finalized is {}", - block_high_qc_block.view(), - finalized_block.view() - ), - false, + return Err(anyhow!( + "invalid block - high QC view is {} while finalized is {}", + block_high_qc_block.view(), + finalized_block.view() )); } // This block's timestamp must be greater than or equal to the parent block's timestamp. if block.timestamp() < parent.timestamp() { - return Err((anyhow!("timestamp decreased from parent"), false)); + return Err(anyhow!("timestamp decreased from parent")); } // This block's timestamp should be at most `self.allowed_timestamp_skew` away from the current time. Note this @@ -2370,31 +2311,22 @@ impl Consensus { .elapsed() .unwrap_or_else(|err| err.duration()); if !during_sync && difference > self.config.allowed_timestamp_skew { - return Err(( - anyhow!( - "timestamp difference for block {} greater than allowed skew: {difference:?}", - block.view() - ), - false, + return Err(anyhow!( + "timestamp difference for block {} greater than allowed skew: {difference:?}", + block.view() )); } // Blocks must be in sequential order if block.header.number != parent.header.number + 1 { - return Err(( - anyhow!( - "block number is not sequential: {} != {} + 1", - block.header.number, - parent.header.number - ), - false, + return Err(anyhow!( + "block number is not sequential: {} != {} + 1", + block.header.number, + parent.header.number )); } - if !self - .block_extends_from(block, &finalized_block) - .map_err(|e| (e, false))? - { + if !self.block_extends_from(block, &finalized_block)? { warn!( "invalid block {:?}, does not extend finalized block {:?} our head is {:?}", block, @@ -2402,9 +2334,8 @@ impl Consensus { self.head_block() ); - return Err(( - anyhow!("invalid block, does not extend from finalized block"), - false, + return Err(anyhow!( + "invalid block, does not extend from finalized block" )); } Ok(()) @@ -2414,11 +2345,10 @@ impl Consensus { pub fn receive_block_availability( &mut self, from: PeerId, - availability: &Option>, + _availability: &Option>, ) -> Result<()> { trace!("Received block availability from {:?}", from); - self.block_store.update_availability(from, availability)?; - Ok(()) + Ok(()) // FIXME: Stub } // Checks for the validity of a block and adds it to our block store if valid. @@ -2433,8 +2363,6 @@ impl Consensus { proposal.number(), proposal.view() ); - self.block_store - .received_process_proposal(proposal.header.view); let result = self.proposal(from, proposal, true)?; // Processing the received block can either result in: // * A `Proposal`, if we have buffered votes for this block which form a supermajority, meaning we can @@ -2450,25 +2378,7 @@ impl Consensus { let hash = block.hash(); debug!(?from, ?hash, ?block.header.view, ?block.header.number, "added block"); let _ = self.new_blocks.send(block.header); - // We may have child blocks; process them too. - self.block_store - .process_block(from, block)? - .into_iter() - .try_for_each(|(from_id, child_proposal)| -> Result<()> { - // The only reason this can fail is permanent failure of the messaging mechanism, so - // propagate it back here. - // Mark this block in the cache as "we're about to process this one" - let view = child_proposal.header.view; - self.message_sender.send_external_message( - self.peer_id(), - ExternalMessage::ProcessProposal(ProcessProposal { - from: from_id.to_bytes(), - block: child_proposal, - }), - )?; - self.block_store.expect_process_proposal(view); - Ok(()) - })?; + self.db.insert_block(&block)?; Ok(()) } @@ -2509,15 +2419,15 @@ impl Consensus { } pub fn get_block(&self, key: &Hash) -> Result> { - self.block_store.get_block(*key) + self.db.get_block_by_hash(key) } pub fn get_block_by_view(&self, view: u64) -> Result> { - self.block_store.get_block_by_view(view) + self.db.get_block_by_view(view) } pub fn get_canonical_block_by_number(&self, number: u64) -> Result> { - self.block_store.get_canonical_block_by_number(number) + self.db.get_canonical_block_by_number(number) } fn set_finalized_view(&mut self, view: u64) -> Result<()> { @@ -2590,7 +2500,7 @@ impl Consensus { pub fn state_at(&self, number: u64) -> Result> { Ok(self - .block_store + .db .get_canonical_block_by_number(number)? .map(|block| self.state.at_root(block.state_root_hash().into()))) } @@ -3191,71 +3101,26 @@ impl Consensus { } } - pub fn report_outgoing_message_failure( - &mut self, - failure: OutgoingMessageFailure, - ) -> Result<()> { - self.block_store.report_outgoing_message_failure(failure) + pub fn get_num_transactions(&self) -> Result { + let count = self.db.get_total_transaction_count()?; + Ok(count) } pub fn tick(&mut self) -> Result<()> { trace!("consensus::tick()"); trace!("request_missing_blocks from timer"); - // Drives the block fetching state machine - see docs/fetching_blocks.md - if self.block_store.request_missing_blocks()? { - // We're syncing.. - // Is it likely that the next thing in the buffer could be the next block? - let likely_blocks = self.block_store.next_proposals_if_likely()?; - if likely_blocks.is_empty() { - trace!("no blocks buffered"); - // If there are no next blocks buffered, someone may well have lied to us about - // where the gaps in the view range are. This should be a rare occurrence, so in - // lieu of timing it out, just zap the view range gap and we'll take the hit on - // any rerequests. - self.block_store.delete_empty_view_range_cache(); - } else { - likely_blocks.into_iter().for_each(|(from, block)| { - trace!( - "buffer may contain the next block - {0:?} v={1} n={2}", - block.hash(), - block.view(), - block.number() - ); - // Ignore errors here - just carry on and wait for re-request to clean up. - let view = block.view(); - let _ = self.message_sender.send_external_message( - self.peer_id(), - ExternalMessage::ProcessProposal(ProcessProposal { - from: from.to_bytes(), - block, - }), - ); - self.block_store.expect_process_proposal(view); - }); - } + // TODO: Drive passive-sync from Timeouts + if !self.sync.am_syncing()? { + self.sync.sync_to_genesis()?; } else { trace!("not syncing ..."); } Ok(()) } - pub fn buffer_proposal(&mut self, from: PeerId, proposal: Proposal) -> Result<()> { - self.block_store.buffer_proposal(from, proposal)?; - Ok(()) - } - - pub fn buffer_lack_of_proposals( - &mut self, - from_view: u64, - proposals: &Vec, - ) -> Result<()> { - self.block_store - .buffer_lack_of_proposals(from_view, proposals) - } - pub fn get_sync_data(&self) -> Result> { - self.block_store.get_sync_data() + self.sync.get_sync_data() } } diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index f32bf52a5..f8fa197da 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -6,13 +6,14 @@ use std::{ ops::Range, path::{Path, PathBuf}, sync::{Arc, Mutex}, - time::Duration, + time::{Duration, Instant}, }; use alloy::primitives::Address; use anyhow::{anyhow, Context, Result}; use eth_trie::{EthTrie, MemoryDB, Trie, DB}; use itertools::Itertools; +use libp2p::PeerId; use lru_mem::LruCache; use lz4::{Decoder, EncoderBuilder}; use rusqlite::{ @@ -28,6 +29,7 @@ use crate::{ exec::{ScillaError, ScillaException, ScillaTransition}, message::{AggregateQc, Block, BlockHeader, QuorumCertificate}, state::Account, + sync::PeerInfo, time::SystemTime, transaction::{EvmGas, Log, SignedTransaction, TransactionReceipt}, }; @@ -175,6 +177,7 @@ enum BlockFilter { Hash(Hash), View(u64), Height(u64), + MaxHeight, } const CHECKPOINT_HEADER_BYTES: [u8; 8] = *b"ZILCHKPT"; @@ -333,6 +336,18 @@ impl Db { CREATE TABLE IF NOT EXISTS state_trie (key BLOB NOT NULL PRIMARY KEY, value BLOB NOT NULL) WITHOUT ROWID; ", )?; + connection.execute_batch( + "CREATE TEMP TABLE IF NOT EXISTS sync_metadata ( + block_hash BLOB NOT NULL UNIQUE, + parent_hash BLOB NOT NULL, + block_number INTEGER NOT NULL PRIMARY KEY, + version INTEGER DEFAULT 0, + peer BLOB DEFAULT NULL, + rawdata BLOB NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_sync_metadata ON sync_metadata(block_number) WHERE peer IS NOT NULL;", + )?; + Ok(()) } @@ -347,6 +362,150 @@ impl Db { Ok(Some(base_path.join("checkpoints").into_boxed_path())) } + /// Returns the number of stored sync segments + pub fn count_sync_segments(&self) -> Result { + Ok(self + .db + .lock() + .unwrap() + .prepare_cached("SELECT COUNT(block_number) FROM sync_metadata WHERE peer IS NOT NULL")? + .query_row([], |row| row.get(0)) + .optional()? + .unwrap_or_default()) + } + + /// Checks if the stored metadata exists + pub fn contains_sync_metadata(&self, block_hash: &Hash) -> Result { + Ok(self + .db + .lock() + .unwrap() + .prepare_cached("SELECT parent_hash FROM sync_metadata WHERE block_hash = ?1")? + .query_row([block_hash], |row| row.get::<_, Hash>(0)) + .optional()? + .is_some()) + } + + /// Retrieves bulk metadata information from the given block_hash (inclusive) + pub fn get_sync_segment(&self, hash: Hash) -> Result> { + let db = self.db.lock().unwrap(); + + let mut hashes = Vec::new(); + let mut block_hash = hash; + + while let Some(parent_hash) = db + .prepare_cached("SELECT parent_hash FROM sync_metadata WHERE block_hash = ?1")? + .query_row([block_hash], |row| row.get::<_, Hash>(0)) + .optional()? + { + hashes.push(block_hash); + block_hash = parent_hash; + } + Ok(hashes) + } + + /// Peeks into the top of the segment stack. + pub fn last_sync_segment(&self) -> Result> { + let db = self.db.lock().unwrap(); + let r = db.prepare_cached("SELECT rawdata, version, peer FROM sync_metadata WHERE peer IS NOT NULL ORDER BY block_number ASC LIMIT 1")? + .query_row([], |row| Ok(( + serde_json::from_slice(row.get::<_,Vec>(0)?.as_slice()).unwrap(), + PeerInfo { + last_used: Instant::now(), + score: u32::MAX, + version: row.get(1)?, + peer_id: PeerId::from_bytes(row.get::<_,Vec>(2)?.as_slice()).unwrap(), + }))).optional()?; + Ok(r) + } + + /// Pushes a particular segment into the stack. + pub fn push_sync_segment(&self, peer: PeerInfo, meta: BlockHeader) -> Result<()> { + let db = self.db.lock().unwrap(); + db.prepare_cached( + "INSERT OR REPLACE INTO sync_metadata (parent_hash, block_hash, block_number, version, peer, rawdata) VALUES (:parent_hash, :block_hash, :block_number, :version, :peer, :rawdata)")? + .execute( + named_params! { + ":parent_hash": meta.qc.block_hash, + ":block_hash": meta.hash, + ":block_number": meta.number, + ":peer": peer.peer_id.to_bytes(), + ":version": peer.version, + ":rawdata": serde_json::to_vec(&meta).unwrap(), + }, + )?; + Ok(()) + } + + /// Bulk inserts a bunch of metadata. + pub fn insert_sync_metadata(&self, metas: &Vec) -> Result<()> { + let mut db = self.db.lock().unwrap(); + let tx = db.transaction()?; + + for meta in metas { + tx.prepare_cached( + "INSERT OR REPLACE INTO sync_metadata (parent_hash, block_hash, block_number, rawdata) VALUES (:parent_hash, :block_hash, :block_number, :rawdata)")? + .execute( + named_params! { + ":parent_hash": meta.qc.block_hash, + ":block_hash": meta.hash, + ":block_number": meta.number, + ":rawdata": serde_json::to_vec(meta).unwrap(), + })?; + } + tx.commit()?; + Ok(()) + } + + /// Empty the metadata table. + pub fn empty_sync_metadata(&self) -> Result<()> { + self.db + .lock() + .unwrap() + .execute("DELETE FROM sync_metadata", [])?; + Ok(()) + } + + /// Pops a segment from the stack; and bulk removes all metadata associated with it. + pub fn pop_sync_segment(&self) -> Result<()> { + let mut db = self.db.lock().unwrap(); + let c = db.transaction()?; + + if let Some(block_hash) = c.prepare_cached("SELECT block_hash FROM sync_metadata WHERE peer IS NOT NULL ORDER BY block_number ASC LIMIT 1")? + .query_row([], |row| row.get::<_,Hash>(0)).optional()? { + if let Some(parent_hash) = c.prepare_cached("SELECT parent_hash FROM sync_metadata WHERE block_hash = ?1")? + .query_row([block_hash], |row| row.get(0)).optional()? { + + // update marker + c.prepare_cached( + "UPDATE sync_metadata SET peer = NULL WHERE block_hash = ?1")? + .execute( + [block_hash] + )?; + + // remove segment + let mut hashes = Vec::new(); + let mut block_hash = parent_hash; + while let Some(parent_hash) = c + .prepare_cached("SELECT parent_hash FROM sync_metadata WHERE block_hash = ?1")? + .query_row([block_hash], |row| row.get::<_, Hash>(0)) + .optional()? + { + hashes.push(block_hash); + block_hash = parent_hash; + } + + for hash in hashes { + c.prepare_cached("DELETE FROM sync_metadata WHERE block_hash = ?1")? + .execute([hash])?; + } + } + } + + c.commit()?; + Ok(()) + } + /// Fetch checkpoint data from file and initialise db state /// Return checkpointed block and transactions which must be executed after this function /// Return None if checkpoint already loaded @@ -627,19 +786,6 @@ impl Db { .unwrap_or(None)) } - // Deliberately not named get_highest_block_number() because there used to be one - // of those with unclear semantics, so changing name to force the compiler to error - // if it was used. - pub fn get_highest_recorded_block_number(&self) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .prepare_cached("SELECT height FROM blocks ORDER BY height DESC LIMIT 1")? - .query_row((), |row| row.get(0)) - .optional()?) - } - pub fn get_highest_canonical_block_number(&self) -> Result> { Ok(self .db @@ -874,8 +1020,8 @@ impl Db { }) } macro_rules! query_block { - ($cond: tt, $key: tt) => { - self.db.lock().unwrap().prepare_cached(concat!("SELECT block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg FROM blocks WHERE ", $cond),)?.query_row([$key], make_block).optional()? + ($cond: tt $(, $key:tt)*) => { + self.db.lock().unwrap().prepare_cached(concat!("SELECT block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg FROM blocks WHERE ", $cond),)?.query_row([$($key),*], make_block).optional()? }; } Ok(match filter { @@ -888,6 +1034,9 @@ impl Db { BlockFilter::Height(height) => { query_block!("height = ?1 AND is_canonical = TRUE", height) } + BlockFilter::MaxHeight => { + query_block!("TRUE ORDER BY height DESC LIMIT 1") + } }) } @@ -918,6 +1067,10 @@ impl Db { self.get_block(BlockFilter::Height(number)) } + pub fn get_highest_recorded_block(&self) -> Result> { + self.get_block(BlockFilter::MaxHeight) + } + pub fn contains_block(&self, block_hash: &Hash) -> Result { Ok(self .db diff --git a/zilliqa/src/exec.rs b/zilliqa/src/exec.rs index 49faece60..78eaafad5 100644 --- a/zilliqa/src/exec.rs +++ b/zilliqa/src/exec.rs @@ -415,7 +415,6 @@ impl DatabaseRef for &State { fn block_hash_ref(&self, number: u64) -> Result { Ok(self - .block_store .get_canonical_block_by_number(number)? .map(|block| B256::new(block.hash().0)) .unwrap_or_default()) @@ -1203,15 +1202,11 @@ impl PendingState { } pub fn get_canonical_block_by_number(&self, block_number: u64) -> Result> { - self.pre_state - .block_store - .get_canonical_block_by_number(block_number) + self.pre_state.get_canonical_block_by_number(block_number) } pub fn get_highest_canonical_block_number(&self) -> Result> { - self.pre_state - .block_store - .get_highest_canonical_block_number() + self.pre_state.get_highest_canonical_block_number() } pub fn load_account(&mut self, address: Address) -> Result<&mut PendingAccount> { diff --git a/zilliqa/src/lib.rs b/zilliqa/src/lib.rs index b949e6493..8da13f370 100644 --- a/zilliqa/src/lib.rs +++ b/zilliqa/src/lib.rs @@ -1,5 +1,4 @@ pub mod api; -pub mod block_store; mod blockhooks; pub mod cfg; pub mod consensus; @@ -23,6 +22,7 @@ pub mod scilla; mod scilla_proto; pub mod serde_util; pub mod state; +pub mod sync; pub mod test_util; pub mod time; pub mod transaction; diff --git a/zilliqa/src/message.rs b/zilliqa/src/message.rs index 25d33d897..9e1088d57 100644 --- a/zilliqa/src/message.rs +++ b/zilliqa/src/message.rs @@ -227,8 +227,23 @@ impl fmt::Debug for BlockResponse { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestBlocksByHeight { + pub request_at: SystemTime, + pub from_height: u64, + pub to_height: u64, +} + /// Used to convey proposal processing internally, to avoid blocking threads for too long. #[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InjectedProposal { + // An encoded PeerId + pub from: PeerId, + pub block: Proposal, +} + +/// TODO: Remove. Unused in RFC161 algorithm +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProcessProposal { // An encoded PeerId pub from: Vec, @@ -259,6 +274,12 @@ pub enum ExternalMessage { /// An acknowledgement of the receipt of a message. Note this is only used as a response when the caller doesn't /// require any data in the response. Acknowledgement, + /// The following are used for the new sync protocol + InjectedProposal(InjectedProposal), + MetaDataRequest(RequestBlocksByHeight), + MetaDataResponse(Vec), + MultiBlockRequest(Vec), + MultiBlockResponse(Vec), } impl ExternalMessage { @@ -274,6 +295,25 @@ impl ExternalMessage { impl Display for ExternalMessage { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { + ExternalMessage::MultiBlockRequest(r) => { + write!(f, "MultiBlockRequest({})", r.len()) + } + ExternalMessage::MultiBlockResponse(r) => { + write!(f, "MultiBlockResponse({})", r.len()) + } + ExternalMessage::MetaDataResponse(r) => { + write!(f, "MetaDataResponse({})", r.len()) + } + ExternalMessage::MetaDataRequest(r) => { + write!( + f, + "MetaDataRequest(from={}, to={})", + r.from_height, r.to_height + ) + } + ExternalMessage::InjectedProposal(p) => { + write!(f, "InjectedProposal {}", p.block.number()) + } ExternalMessage::Proposal(p) => write!(f, "Proposal({})", p.view()), ExternalMessage::Vote(v) => write!(f, "Vote({})", v.view), ExternalMessage::NewView(n) => write!(f, "NewView({})", n.view), diff --git a/zilliqa/src/node.rs b/zilliqa/src/node.rs index 5eb0db947..f02d7fa46 100644 --- a/zilliqa/src/node.rs +++ b/zilliqa/src/node.rs @@ -33,13 +33,14 @@ use crate::{ exec::{PendingState, TransactionApplyResult}, inspector::{self, ScillaInspector}, message::{ - Block, BlockHeader, BlockResponse, ExternalMessage, InternalMessage, IntershardCall, - ProcessProposal, Proposal, + Block, BlockHeader, BlockResponse, ExternalMessage, InjectedProposal, InternalMessage, + IntershardCall, Proposal, }, node_launcher::ResponseChannel, p2p_node::{LocalMessageTuple, OutboundMessageTuple}, pool::{TxAddResult, TxPoolContent}, state::State, + sync::SyncPeers, transaction::{ EvmGas, SignedTransaction, TransactionReceipt, TxIntershard, VerifiedTransaction, }, @@ -170,6 +171,7 @@ impl ChainId { } impl Node { + #[allow(clippy::too_many_arguments)] pub fn new( config: NodeConfig, secret_key: SecretKey, @@ -178,6 +180,7 @@ impl Node { request_responses: UnboundedSender<(ResponseChannel, ExternalMessage)>, reset_timeout: UnboundedSender, peer_num: Arc, + peers: Arc, ) -> Result { config.validate()?; let peer_id = secret_key.to_libp2p_keypair().public().to_peer_id(); @@ -201,7 +204,14 @@ impl Node { reset_timeout: reset_timeout.clone(), db: db.clone(), chain_id: ChainId::new(config.eth_chain_id), - consensus: Consensus::new(secret_key, config, message_sender, reset_timeout, db)?, + consensus: Consensus::new( + secret_key, + config, + message_sender, + reset_timeout, + db, + peers, + )?, peer_num, }; Ok(node) @@ -269,59 +279,33 @@ impl Node { self.request_responses .send((response_channel, ExternalMessage::Acknowledgement))?; } - ExternalMessage::BlockRequest(request) => { - if from == self.peer_id { - debug!("block_store::BlockRequest : ignoring blocks request to self"); - return Ok(()); - } - - trace!( - "block_store::BlockRequest : received a block request - {}", - self.peer_id - ); - // Note that it is very important that we limit this by number of blocks - // returned, _not_ by max view range returned. If we don't, then any - // view gap larger than block_request_limit will never be filliable - // because no node will ever be prepared to return the block after it. - let proposals: Vec = (request.from_view..=request.to_view) - .take(self.config.block_request_limit) - .filter_map(|view| { - self.consensus - .get_block_by_view(view) - .transpose() - .map(|block| Ok(self.block_to_proposal(block?))) - }) - .collect::>()?; - - let availability = self.consensus.block_store.availability()?; - trace!("block_store::BlockRequest - responding to new blocks request {id:?} from {from:?} of {request:?} with props {0:?} availability {availability:?}", - proposals.iter().fold("".to_string(), |state, x| format!("{},{}", state, x.header.view))); - - // Send the response to this block request. - self.request_responses.send(( - response_channel, - ExternalMessage::BlockResponse(BlockResponse { - proposals, - from_view: request.from_view, - availability, - }), - ))?; + // RFC-161 sync algorithm, phase 2. + ExternalMessage::MultiBlockRequest(request) => { + let message = self + .consensus + .sync + .handle_multiblock_request(from, request)?; + self.request_responses.send((response_channel, message))?; } - // We don't usually expect a [BlockResponse] to be received as a request, however this can occur when our - // [BlockStore] has re-sent a previously unusable block because we didn't (yet) have the block's parent. - // Having knowledge of this here breaks our abstraction boundaries slightly, but it also keeps things - // simple. - ExternalMessage::BlockResponse(m) => { - self.handle_block_response(from, m)?; - // Acknowledge this block response. This does nothing because the `BlockResponse` request was sent by - // us, but we keep it here for symmetry with the other handlers. - self.request_responses - .send((response_channel, ExternalMessage::Acknowledgement))?; + // RFC-161 sync algorithm, phase 1. + ExternalMessage::MetaDataRequest(request) => { + let message = self.consensus.sync.handle_metadata_request(from, request)?; + self.request_responses.send((response_channel, message))?; } // This just breaks down group block messages into individual messages to stop them blocking threads // for long periods. - ExternalMessage::ProcessProposal(m) => { - self.handle_process_proposal(from, m)?; + ExternalMessage::InjectedProposal(p) => { + self.handle_injected_proposal(from, p)?; + } + // Respond negatively to block request from old nodes + ExternalMessage::BlockRequest(_) => { + // respond with an invalid response + let message = ExternalMessage::BlockResponse(BlockResponse { + availability: None, + proposals: vec![], + from_view: u64::MAX, + }); + self.request_responses.send((response_channel, message))?; } // Handle requests which contain a block proposal. Initially sent as a broadcast, it is re-routed into // a Request by the underlying layer, with a faux request-id. This is to mitigate issues when there are @@ -333,8 +317,8 @@ impl Node { self.request_responses .send((response_channel, ExternalMessage::Acknowledgement))?; } - _ => { - warn!("unexpected message type"); + msg => { + warn!(%msg, "unexpected message type"); } } @@ -347,17 +331,30 @@ impl Node { failure: OutgoingMessageFailure, ) -> Result<()> { debug!(from = %self.peer_id, %to, ?failure, "handling message failure"); - self.consensus.report_outgoing_message_failure(failure)?; + self.consensus.sync.handle_request_failure(failure)?; Ok(()) } pub fn handle_response(&mut self, from: PeerId, message: ExternalMessage) -> Result<()> { debug!(%from, to = %self.peer_id, %message, "handling response"); match message { - ExternalMessage::BlockResponse(m) => self.handle_block_response(from, m)?, - ExternalMessage::Acknowledgement => {} - _ => { - warn!("unexpected message type"); + ExternalMessage::MultiBlockResponse(response) => self + .consensus + .sync + .handle_multiblock_response(from, response)?, + + ExternalMessage::MetaDataResponse(response) => self + .consensus + .sync + .handle_metadata_response(from, response)?, + ExternalMessage::BlockResponse(response) => { + self.consensus.sync.handle_block_response(from, response)? + } + ExternalMessage::Acknowledgement => { + self.consensus.sync.handle_acknowledgement(from)?; + } + msg => { + warn!(%msg, "unexpected message type"); } } @@ -904,26 +901,8 @@ impl Node { self.peer_num.load(std::sync::atomic::Ordering::Relaxed) } - /// Convenience function to convert a block to a proposal (add full txs) - /// NOTE: Includes intershard transactions. Should only be used for syncing history, - /// not for consensus messages regarding new blocks. - fn block_to_proposal(&self, block: Block) -> Proposal { - let txs: Vec<_> = block - .transactions - .iter() - .map(|tx_hash| { - self.consensus - .get_transaction_by_hash(*tx_hash) - .unwrap() - .unwrap() - }) - .collect(); - - Proposal::from_parts(block, txs) - } - fn handle_proposal(&mut self, from: PeerId, proposal: Proposal) -> Result<()> { - if let Some((to, message)) = self.consensus.proposal(from, proposal, false)? { + if let Some((to, message)) = self.consensus.proposal(from, proposal.clone(), false)? { self.reset_timeout .send(self.config.consensus.consensus_timeout)?; if let Some(to) = to { @@ -931,38 +910,21 @@ impl Node { } else { self.message_sender.broadcast_proposal(message)?; } + } else { + self.consensus.sync.sync_from_proposal(proposal)?; // proposal is already verified } Ok(()) } - fn handle_block_response(&mut self, from: PeerId, response: BlockResponse) -> Result<()> { - trace!( - "block_store::handle_block_response - received blocks response of length {}", - response.proposals.len() - ); - self.consensus - .receive_block_availability(from, &response.availability)?; - - self.consensus - .buffer_lack_of_proposals(response.from_view, &response.proposals)?; - - for block in response.proposals { - // Buffer the block so that we know we have it - in fact, add it to the cache so - // that we can include it in the chain if necessary. - self.consensus.buffer_proposal(from, block)?; - } - trace!("block_store::handle_block_response: finished handling response"); - Ok(()) - } - - fn handle_process_proposal(&mut self, from: PeerId, req: ProcessProposal) -> Result<()> { + fn handle_injected_proposal(&mut self, from: PeerId, req: InjectedProposal) -> Result<()> { if from != self.consensus.peer_id() { - warn!("Someone ({from}) sent me a ProcessProposal; illegal- ignoring"); + warn!("Someone ({from}) sent me a InjectedProposal; illegal- ignoring"); return Ok(()); } trace!("Handling proposal for view {0}", req.block.header.view); let proposal = self.consensus.receive_block(from, req.block)?; + self.consensus.sync.mark_received_proposal(req.from)?; if let Some(proposal) = proposal { trace!( " ... broadcasting proposal for view {0}", diff --git a/zilliqa/src/node_launcher.rs b/zilliqa/src/node_launcher.rs index bdc9d8581..2c80c231c 100644 --- a/zilliqa/src/node_launcher.rs +++ b/zilliqa/src/node_launcher.rs @@ -32,6 +32,7 @@ use crate::{ message::{ExternalMessage, InternalMessage}, node::{self, OutgoingMessageFailure}, p2p_node::{LocalMessageTuple, OutboundMessageTuple}, + sync::SyncPeers, }; pub struct NodeLauncher { @@ -96,7 +97,7 @@ impl NodeLauncher { local_outbound_message_sender: UnboundedSender, request_responses_sender: UnboundedSender<(ResponseChannel, ExternalMessage)>, peer_num: Arc, - ) -> Result<(Self, NodeInputChannels)> { + ) -> Result<(Self, NodeInputChannels, Arc)> { /// Helper to create a (sender, receiver) pair for a channel. fn sender_receiver() -> (UnboundedSender, UnboundedReceiverStream) { let (sender, receiver) = mpsc::unbounded_channel(); @@ -110,6 +111,9 @@ impl NodeLauncher { let (local_messages_sender, local_messages_receiver) = sender_receiver(); let (reset_timeout_sender, reset_timeout_receiver) = sender_receiver(); + let peer_id = secret_key.to_libp2p_keypair().public().to_peer_id(); + let peers: Arc = Arc::new(SyncPeers::new(peer_id)); + let node = Node::new( config.clone(), secret_key, @@ -118,7 +122,9 @@ impl NodeLauncher { request_responses_sender, reset_timeout_sender.clone(), peer_num, + peers.clone(), )?; + let node = Arc::new(Mutex::new(node)); for api_server in &config.api_servers { @@ -168,7 +174,7 @@ impl NodeLauncher { local_messages: local_messages_sender, }; - Ok((launcher, input_channels)) + Ok((launcher, input_channels, peers)) } pub async fn start_shard_node(&mut self) -> Result<()> { diff --git a/zilliqa/src/p2p_node.rs b/zilliqa/src/p2p_node.rs index 3e3d5e127..05a130181 100644 --- a/zilliqa/src/p2p_node.rs +++ b/zilliqa/src/p2p_node.rs @@ -37,6 +37,7 @@ use crate::{ message::{ExternalMessage, InternalMessage}, node::{OutgoingMessageFailure, RequestId}, node_launcher::{NodeInputChannels, NodeLauncher, ResponseChannel}, + sync::SyncPeers, }; /// Messages are a tuple of the destination shard ID and the actual message. @@ -61,6 +62,7 @@ pub type OutboundMessageTuple = (Option<(PeerId, RequestId)>, u64, ExternalMessa pub type LocalMessageTuple = (u64, u64, InternalMessage); pub struct P2pNode { + shard_peers: HashMap>, shard_nodes: HashMap, shard_threads: JoinSet>, task_threads: JoinSet>, @@ -133,7 +135,7 @@ impl P2pNode { // So, the nodes are unable to see each other directly and remain isolated, defeating kademlia and autonat. identify: identify::Behaviour::new( identify::Config::new("zilliqa/1.0.0".into(), key_pair.public()) - .with_hide_listen_addrs(!cfg!(debug_assertions)), + .with_hide_listen_addrs(true), ), }) })? @@ -147,6 +149,7 @@ impl P2pNode { .build(); Ok(Self { + shard_peers: HashMap::new(), shard_nodes: HashMap::new(), peer_id, secret_key, @@ -193,7 +196,7 @@ impl P2pNode { info!("LaunchShard message received for a shard we're already running. Ignoring..."); return Ok(()); } - let (mut node, input_channels) = NodeLauncher::new( + let (mut node, input_channels, peers) = NodeLauncher::new( self.secret_key, config, self.outbound_message_sender.clone(), @@ -202,6 +205,7 @@ impl P2pNode { self.peer_num.clone(), ) .await?; + self.shard_peers.insert(topic.hash(), peers); self.shard_nodes.insert(topic.hash(), input_channels); self.shard_threads .spawn(async move { node.start_shard_node().await }); @@ -264,6 +268,16 @@ impl P2pNode { .kademlia .add_address(&peer_id, address.clone()); } + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Subscribed { peer_id, topic })) => { + if let Some(peers) = self.shard_peers.get(&topic) { + peers.add_peer(peer_id); + } + } + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Unsubscribed { peer_id, topic })) => { + if let Some(peers) = self.shard_peers.get(&topic) { + peers.remove_peer(peer_id); + } + } SwarmEvent::Behaviour(BehaviourEvent::Gossipsub(gossipsub::Event::Message{ message_id: msg_id, message: gossipsub::Message { diff --git a/zilliqa/src/pool.rs b/zilliqa/src/pool.rs index 32b71d908..9239e4fc7 100644 --- a/zilliqa/src/pool.rs +++ b/zilliqa/src/pool.rs @@ -403,16 +403,13 @@ mod tests { primitives::{Address, Bytes, PrimitiveSignature, TxKind, U256}, }; use anyhow::Result; - use libp2p::PeerId; use rand::{seq::SliceRandom, thread_rng}; use super::TransactionPool; use crate::{ - block_store::BlockStore, cfg::NodeConfig, crypto::Hash, db::Db, - node::{MessageSender, RequestId}, state::State, transaction::{EvmGas, SignedTransaction, TxIntershard, VerifiedTransaction}, }; @@ -468,23 +465,10 @@ mod tests { fn get_in_memory_state() -> Result { let node_config = NodeConfig::default(); - let (s1, _) = tokio::sync::mpsc::unbounded_channel(); - let (s2, _) = tokio::sync::mpsc::unbounded_channel(); - - let message_sender = MessageSender { - our_shard: 0, - our_peer_id: PeerId::random(), - outbound_channel: s1, - local_channel: s2, - request_id: RequestId::default(), - }; - let db = Db::new::(None, 0, 0)?; let db = Arc::new(db); - let block_store = BlockStore::new(&node_config, db.clone(), message_sender.clone())?; - - State::new_with_genesis(db.state_trie()?, node_config, Arc::new(block_store)) + State::new_with_genesis(db.state_trie()?, node_config, db.clone()) } fn create_acc(state: &mut State, address: Address, balance: u128, nonce: u64) -> Result<()> { diff --git a/zilliqa/src/state.rs b/zilliqa/src/state.rs index c3b595238..ffdaace9c 100644 --- a/zilliqa/src/state.rs +++ b/zilliqa/src/state.rs @@ -17,13 +17,12 @@ use sha3::{Digest, Keccak256}; use tracing::{debug, info}; use crate::{ - block_store::BlockStore, cfg::{Amount, ContractUpgradesBlockHeights, Forks, NodeConfig, ScillaExtLibsPath}, contracts::{self, Contract}, crypto::{self, Hash}, - db::TrieStorage, + db::{Db, TrieStorage}, error::ensure_success, - message::{BlockHeader, MAX_COMMITTEE_SIZE}, + message::{Block, BlockHeader, MAX_COMMITTEE_SIZE}, node::ChainId, scilla::{ParamValue, Scilla, Transition}, serde_util::vec_param_value, @@ -40,6 +39,7 @@ use crate::{ /// the storage root is used to index into the state /// all the keys are hashed and stored in the same sled tree pub struct State { + sql: Arc, db: Arc, accounts: PatriciaTrie, /// The Scilla interpreter interface. Note that it is lazily initialized - This is a bit of a hack to ensure that @@ -54,15 +54,10 @@ pub struct State { pub scilla_call_gas_exempt_addrs: Vec
, pub chain_id: ChainId, pub forks: Forks, - pub block_store: Arc, } impl State { - pub fn new( - trie: TrieStorage, - config: &NodeConfig, - block_store: Arc, - ) -> Result { + pub fn new(trie: TrieStorage, config: &NodeConfig, sql: Arc) -> Result { let db = Arc::new(trie); let consensus_config = &config.consensus; Ok(Self { @@ -78,7 +73,7 @@ impl State { scilla_call_gas_exempt_addrs: consensus_config.scilla_call_gas_exempt_addrs.clone(), chain_id: ChainId::new(config.eth_chain_id), forks: consensus_config.get_forks()?, - block_store, + sql, }) } @@ -99,17 +94,13 @@ impl State { trie: TrieStorage, root_hash: B256, config: NodeConfig, - block_store: Arc, + sql: Arc, ) -> Result { - Ok(Self::new(trie, &config, block_store)?.at_root(root_hash)) + Ok(Self::new(trie, &config, sql)?.at_root(root_hash)) } - pub fn new_with_genesis( - trie: TrieStorage, - config: NodeConfig, - block_store: Arc, - ) -> Result { - let mut state = State::new(trie, &config, block_store)?; + pub fn new_with_genesis(trie: TrieStorage, config: NodeConfig, sql: Arc) -> Result { + let mut state = State::new(trie, &config, sql)?; if config.consensus.is_main { let shard_data = contracts::shard_registry::CONSTRUCTOR.encode_input( @@ -308,8 +299,8 @@ impl State { gas_price: self.gas_price, scilla_call_gas_exempt_addrs: self.scilla_call_gas_exempt_addrs.clone(), chain_id: self.chain_id, - block_store: self.block_store.clone(), forks: self.forks.clone(), + sql: self.sql.clone(), } } @@ -405,6 +396,14 @@ impl State { &bincode::serialize(&account)?, )?) } + + pub fn get_canonical_block_by_number(&self, number: u64) -> Result> { + self.sql.get_canonical_block_by_number(number) + } + + pub fn get_highest_canonical_block_number(&self) -> Result> { + self.sql.get_highest_canonical_block_number() + } } pub mod contract_addr { @@ -602,37 +601,18 @@ mod tests { use std::{path::PathBuf, sync::Arc}; use crypto::Hash; - use libp2p::PeerId; use revm::primitives::FixedBytes; use super::*; - use crate::{ - api::to_hex::ToHex, - block_store::BlockStore, - cfg::NodeConfig, - db::Db, - message::BlockHeader, - node::{MessageSender, RequestId}, - }; + use crate::{api::to_hex::ToHex, cfg::NodeConfig, db::Db, message::BlockHeader}; #[test] fn deposit_contract_updateability() { - let (s1, _) = tokio::sync::mpsc::unbounded_channel(); - let (s2, _) = tokio::sync::mpsc::unbounded_channel(); - let message_sender = MessageSender { - our_shard: 0, - our_peer_id: PeerId::random(), - outbound_channel: s1, - local_channel: s2, - request_id: RequestId::default(), - }; let db = Db::new::(None, 0, 0).unwrap(); let db = Arc::new(db); let config = NodeConfig::default(); - let block_store = - Arc::new(BlockStore::new(&config, db.clone(), message_sender.clone()).unwrap()); - let mut state = State::new(db.state_trie().unwrap(), &config, block_store).unwrap(); + let mut state = State::new(db.state_trie().unwrap(), &config, db).unwrap(); let deposit_init_addr = state.deploy_initial_deposit_contract(&config).unwrap(); diff --git a/zilliqa/src/sync.rs b/zilliqa/src/sync.rs new file mode 100644 index 000000000..0d761a883 --- /dev/null +++ b/zilliqa/src/sync.rs @@ -0,0 +1,1150 @@ +use std::{ + cmp::Ordering, + collections::{BinaryHeap, VecDeque}, + ops::Sub, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + +use alloy::primitives::BlockNumber; +use anyhow::Result; +use itertools::Itertools; +use libp2p::PeerId; +use rusqlite::types::{FromSql, FromSqlResult, ToSql, ToSqlOutput, ValueRef}; + +use crate::{ + cfg::NodeConfig, + crypto::Hash, + db::Db, + message::{ + Block, BlockHeader, BlockRequest, BlockResponse, ExternalMessage, InjectedProposal, + Proposal, QuorumCertificate, RequestBlocksByHeight, + }, + node::{MessageSender, OutgoingMessageFailure, RequestId}, + time::SystemTime, + transaction::SignedTransaction, +}; + +// Syncing Algorithm +// +// When a Proposal is received by Consensus, we check if the parent exists in our DB. +// If not, then it triggers a syncing algorithm. +// +// PHASE 1: Request missing chain metadata. +// The entire chain metadata is stored in-memory, and is used to construct a chain of metadata. +// Each metadata basically contains the block_hash, block_number, parent_hash, and view_number. +// 1. We start with the latest Proposal and request the chain of metadata from a peer. +// 2. We construct the chain of metadata, based on the response received. +// 3. If the last block does not exist in our history, we request for additional metadata. +// 4. If the last block exists, we have hit our history, we move to Phase 2. +// +// PHASE 2: Request missing blocks. +// Once the chain metadata is constructed, we fill in the missing blocks to replay the history. +// We do not make any judgements (other than sanity) on the block and leave that up to consensus. +// 1. We construct a set of hashes, from the in-memory chain metadata. +// 2. We request these blocks from the same Peer that sent the metadata. +// 3. We inject the received Proposals into the pipeline. +// 4. If there are still missing blocks, we ask for more. +// 5. If there are no more missing blocks, we move to Phase 3. +// +// PHASE 3: Zip it up. +// Phase 1&2 may run several times and bring up 99% of the chain, but it will never catch up. +// This closes the final gap. +// 1. We queue all recently received Proposals, while Phase 1 & 2 were in progress. +// 2. We check the head of the queue, if its parent exists in our history. +// 3. If it does not, our history is too far away, we run Phase 1 again. +// 4. If it does, we inject the entire queue into the pipeline. +// 5. We are fully synced. + +#[derive(Debug)] +pub struct Sync { + // database + db: Arc, + // message bus + message_sender: MessageSender, + // internal peers + peers: Arc, + // peer handling an in-flight request + in_flight: Option<(PeerInfo, RequestId)>, + // how many blocks to request at once + max_batch_size: usize, + // how many blocks to inject into the queue + max_blocks_in_flight: usize, + // count of proposals pending in the pipeline + in_pipeline: usize, + // our peer id + peer_id: PeerId, + // internal sync state + state: SyncState, + // fixed-size queue of the most recent proposals + recent_proposals: VecDeque, + // for statistics only + inject_at: Option<(std::time::Instant, usize)>, + // record starting number, for eth_syncing() RPC call. + started_at_block_number: u64, + // checkpoint, if set + checkpoint_hash: Hash, +} + +impl Sync { + // Speed up syncing by speculatively fetching blocks in Phase 1 & 2. + #[cfg(not(debug_assertions))] + const DO_SPECULATIVE: bool = true; + #[cfg(debug_assertions)] + const DO_SPECULATIVE: bool = false; + + pub fn new( + config: &NodeConfig, + db: Arc, + latest_block: &Option, + message_sender: MessageSender, + peers: Arc, + ) -> Result { + let peer_id = message_sender.our_peer_id; + let max_batch_size = config.block_request_batch_size.clamp(30, 180); // up to 180 sec of blocks at a time. + let max_blocks_in_flight = config.max_blocks_in_flight.clamp(max_batch_size, 1800); // up to 30-mins worth of blocks in-pipeline. + + // Start from reset, or continue sync + let state = if db.count_sync_segments()? == 0 { + SyncState::Phase0 + } else { + SyncState::Retry1 // continue sync + }; + + let (latest_block_number, latest_block_hash) = latest_block + .as_ref() + .map_or_else(|| (u64::MIN, Hash::ZERO), |b| (b.number(), b.hash())); + + Ok(Self { + db, + message_sender, + peer_id, + peers, + max_batch_size, + max_blocks_in_flight, + in_flight: None, + in_pipeline: usize::MIN, + state, + recent_proposals: VecDeque::with_capacity(max_batch_size), + inject_at: None, + started_at_block_number: latest_block_number, + checkpoint_hash: latest_block_hash, + }) + } + + /// Skip Failure + /// + /// We get a plain ACK in certain cases - treated as an empty response. + pub fn handle_acknowledgement(&mut self, from: PeerId) -> Result<()> { + if let Some((peer, _)) = self.in_flight.as_ref() { + // downgrade peer due to empty response + if peer.peer_id == from { + tracing::warn!(to = %peer.peer_id, + "sync::Acknowledgement : empty response" + ); + self.peers + .done_with_peer(self.in_flight.take(), DownGrade::Empty); + // Retry if failed in Phase 2 for whatever reason + match self.state { + SyncState::Phase1(_) if Self::DO_SPECULATIVE => { + self.request_missing_metadata(None)? + } + SyncState::Phase2(_) => self.state = SyncState::Retry1, + _ => {} + } + } else { + tracing::warn!(to = %peer.peer_id, + "sync::Acknowledgement : spurious" + ); + } + } + Ok(()) + } + + /// P2P Failure + /// + /// This gets called for any libp2p request failure - treated as a network failure + pub fn handle_request_failure(&mut self, failure: OutgoingMessageFailure) -> Result<()> { + // check if the request is a sync messages + if let Some((peer, req_id)) = self.in_flight.as_ref() { + // downgrade peer due to network failure + if peer.peer_id == failure.peer && *req_id == failure.request_id { + tracing::warn!(to = %peer.peer_id, err = %failure.error, + "sync::RequestFailure : network error" + ); + self.peers + .done_with_peer(self.in_flight.take(), DownGrade::Timeout); + // Retry if failed in Phase 2 for whatever reason + match self.state { + SyncState::Phase1(_) if Self::DO_SPECULATIVE => { + self.request_missing_metadata(None)? + } + SyncState::Phase2(_) => self.state = SyncState::Retry1, + _ => {} + } + } else { + tracing::warn!(to = %peer.peer_id, + "sync::RequestFailure : spurious" + ); + } + } + Ok(()) + } + + /// Phase 0: Sync a block proposal. + /// + /// This is the main entry point for active-syncing a block proposal. + /// We start by enqueuing all proposals, and then check if the parent block exists in history. + /// If the parent block exists, we do nothing. Otherwise, we check the least recent one. + /// If we find its parent in history, we inject the entire queue. Otherwise, we start syncing. + /// + /// We do not perform checks on the Proposal here. This is done in the consensus layer. + pub fn sync_from_proposal(&mut self, proposal: Proposal) -> Result<()> { + // just stuff the latest proposal into the fixed-size queue. + while self.recent_proposals.len() >= self.max_batch_size { + self.recent_proposals.pop_front(); + } + self.recent_proposals.push_back(proposal); + + self.internal_sync() + } + + // TODO: Passive-sync place-holder - https://github.com/Zilliqa/zq2/issues/2232 + pub fn sync_to_genesis(&mut self) -> Result<()> { + Ok(()) + } + + fn internal_sync(&mut self) -> Result<()> { + if self.recent_proposals.is_empty() { + // Do nothing if there's no recent proposals. + tracing::debug!("sync::Internal : missing recent proposals"); + return Ok(()); + } + + match self.state { + // Check if we are out of sync + SyncState::Phase0 if self.in_pipeline == 0 => { + let parent_hash = self.recent_proposals.back().unwrap().header.qc.block_hash; + if !self.db.contains_block(&parent_hash)? { + // No parent block, trigger sync + tracing::info!("sync::SyncProposal : syncing from {parent_hash}",); + let meta = self.recent_proposals.back().unwrap().header; + self.request_missing_metadata(Some(meta))?; + + let highest_block = self + .db + .get_canonical_block_by_number( + self.db + .get_highest_canonical_block_number()? + .expect("no highest block"), + )? + .expect("missing highest block"); + self.started_at_block_number = highest_block.number(); + } + } + // Continue phase 1, until we hit history/genesis. + SyncState::Phase1(_) if self.in_pipeline < self.max_batch_size => { + self.request_missing_metadata(None)?; + } + // Continue phase 2, until we have all segments. + SyncState::Phase2(_) if self.in_pipeline < self.max_blocks_in_flight => { + self.request_missing_blocks()?; + } + // Wait till 99% synced, zip it up! + SyncState::Phase3 if self.in_pipeline == 0 => { + let ancestor_hash = self.recent_proposals.front().unwrap().header.qc.block_hash; + if self.db.contains_block(&ancestor_hash)? { + tracing::info!( + "sync::SyncProposal : finishing {} blocks for segment #{} from {}", + self.recent_proposals.len(), + self.db.count_sync_segments()?, + self.peer_id, + ); + // inject the proposals + let proposals = self.recent_proposals.drain(..).collect_vec(); + self.inject_proposals(proposals)?; + } + self.db.empty_sync_metadata()?; + self.state = SyncState::Phase0; + } + // Retry to fix sync issues e.g. peers that are now offline + SyncState::Retry1 if self.in_pipeline == 0 => { + self.retry_phase1()?; + if self.started_at_block_number == 0 { + let highest_block = self + .db + .get_canonical_block_by_number( + self.db + .get_highest_canonical_block_number()? + .expect("no highest block"), + )? + .expect("missing highest block"); + self.started_at_block_number = highest_block.number(); + } + } + _ => { + tracing::debug!("sync::SyncProposal : syncing {} blocks", self.in_pipeline); + } + } + + Ok(()) + } + + /// Convenience function to convert a block to a proposal (add full txs) + /// Should only be used for syncing history, not for consensus messages regarding new blocks. + fn block_to_proposal(&self, block: Block) -> Proposal { + // since block must be valid, unwrap(s) are safe + let txs = block + .transactions + .iter() + .map(|hash| self.db.get_transaction(hash).unwrap().unwrap()) + // handle verification on the client-side + .map(|tx| { + let hash = tx.calculate_hash(); + (tx, hash) + }) + .collect_vec(); + Proposal::from_parts_with_hashes(block, txs) + } + + /// Phase 2: Retry Phase 1 + /// + /// If something went wrong in Phase 2, Phase 1 may need to be retried for the recently used segment. + /// Things that could go wrong e.g. the peer went offline, the peer pruned history, etc. + /// + /// Pop the most recently used segment from the segment marker, and retry phase 1. + /// This will rebuild history from the previous marker, with another peer. + /// If this function is called many times, it will eventually restart from Phase 0. + fn retry_phase1(&mut self) -> Result<()> { + if self.db.count_sync_segments()? == 0 { + tracing::error!("sync::RetryPhase1 : cannot retry phase 1 without chain segments!"); + self.state = SyncState::Phase0; + return Ok(()); + } + + tracing::debug!( + "sync::RetryPhase1 : retrying segment #{}", + self.db.count_sync_segments()?, + ); + + // remove the last segment from the chain metadata + let (meta, _) = self.db.last_sync_segment()?.unwrap(); + self.db.pop_sync_segment()?; + self.state = SyncState::Phase1(meta); + + Ok(()) + } + + /// Phase 2: Handle a multi-block response. + /// + /// This is Phase 2 in the syncing algorithm, where we receive a set of blocks and inject them into the pipeline. + /// We also remove the blocks from the chain metadata, because they are now in the pipeline. + pub fn handle_multiblock_response( + &mut self, + from: PeerId, + response: Vec, + ) -> Result<()> { + if let Some((peer, _)) = self.in_flight.as_ref() { + if peer.peer_id != from { + tracing::warn!( + "sync::MultiBlockResponse : unexpected peer={} != {from}", + peer.peer_id + ); + return Ok(()); + } + } else { + tracing::warn!("sync::MultiBlockResponse : spurious response {from}"); + return Ok(()); + } + + // Process only a full response + if response.is_empty() { + // Empty response, downgrade peer and retry phase 1. + tracing::warn!("sync::MultiBlockResponse : empty blocks {from}",); + self.peers + .done_with_peer(self.in_flight.take(), DownGrade::Empty); + self.state = SyncState::Retry1; + return Ok(()); + } else { + self.peers + .done_with_peer(self.in_flight.take(), DownGrade::None); + } + + let SyncState::Phase2(check_sum) = self.state else { + anyhow::bail!("sync::MultiBlockResponse : invalid state"); + }; + + tracing::info!( + "sync::MultiBlockResponse : received {} blocks for segment #{} from {}", + response.len(), + self.db.count_sync_segments()?, + from + ); + + // If the checksum does not match, retry phase 1. Maybe the node has pruned the segment. + let checksum = response + .iter() + .fold(Hash::builder().with(Hash::ZERO.as_bytes()), |sum, p| { + sum.with(p.hash().as_bytes()) + }) + .finalize(); + + if check_sum != checksum { + tracing::error!( + "sync::MultiBlockResponse : unexpected checksum={check_sum} != {checksum}" + ); + self.state = SyncState::Retry1; + return Ok(()); + } + + // Response seems sane. + let proposals = response + .into_iter() + .sorted_by_key(|p| p.number()) + .collect_vec(); + + self.db.pop_sync_segment()?; + self.inject_proposals(proposals)?; // txns are verified when processing InjectedProposal. + + // Done with phase 2 + if self.db.count_sync_segments()? == 0 { + self.state = SyncState::Phase3; + } else if Self::DO_SPECULATIVE { + // Speculatively request more blocks + self.request_missing_blocks()?; + } + + Ok(()) + } + + /// Returns a list of Proposals + /// + /// Given a set of block hashes, retrieve the list of proposals from its history. + /// Returns this list of proposals to the requestor. + pub fn handle_multiblock_request( + &mut self, + from: PeerId, + request: Vec, + ) -> Result { + tracing::debug!( + "sync::MultiBlockRequest : received a {} multiblock request from {}", + request.len(), + from + ); + + // TODO: Any additional checks + // Validators should not respond to this, unless they are free e.g. stuck in an exponential backoff. + + let batch_size: usize = self.max_batch_size.min(request.len()); // mitigate DOS by limiting the number of blocks we return + let mut proposals = Vec::with_capacity(batch_size); + for hash in request { + let Some(block) = self.db.get_block_by_hash(&hash)? else { + break; // that's all we have! + }; + proposals.push(self.block_to_proposal(block)); + } + + let message = ExternalMessage::MultiBlockResponse(proposals); + Ok(message) + } + + /// Phase 2: Request missing blocks from the chain. + /// + /// It constructs a set of hashes, which constitute the series of blocks that are missing. + /// These hashes are then sent to a Peer for retrieval. + /// This is phase 2 of the syncing algorithm. + fn request_missing_blocks(&mut self) -> Result<()> { + if !matches!(self.state, SyncState::Phase2(_)) { + anyhow::bail!("sync::RequestMissingBlocks : invalid state"); + } + // Early exit if there's a request in-flight; and if it has not expired. + if self.in_flight.is_some() || self.in_pipeline > self.max_blocks_in_flight { + tracing::debug!( + "sync::RequestMissingBlocks : syncing {}/{} blocks", + self.in_pipeline, + self.max_blocks_in_flight + ); + return Ok(()); + } + + // will be re-inserted below + if let Some(peer) = self.peers.get_next_peer() { + // reinsert peer, as we will use a faux peer below, to force the request to go to the original responder + self.peers.reinsert_peer(peer)?; + + // If we have no chain_segments, we have nothing to do + if let Some((meta, peer_info)) = self.db.last_sync_segment()? { + let request_hashes = self.db.get_sync_segment(meta.qc.block_hash)?; + + // Checksum of the request hashes + let checksum = request_hashes + .iter() + .fold(Hash::builder().with(Hash::ZERO.as_bytes()), |sum, h| { + sum.with(h.as_bytes()) + }) + .finalize(); + self.state = SyncState::Phase2(checksum); + + // Fire request, to the original peer that sent the segment metadata + tracing::info!( + "sync::RequestMissingBlocks : requesting {} blocks of segment #{} from {}", + request_hashes.len(), + self.db.count_sync_segments()?, + peer_info.peer_id, + ); + let (peer_info, message) = match peer_info.version { + PeerVer::V2 => { + ( + PeerInfo { + version: PeerVer::V2, + peer_id: peer_info.peer_id, + last_used: std::time::Instant::now(), + score: u32::MAX, // used to indicate faux peer, will not be added to the group of peers + }, + ExternalMessage::MultiBlockRequest(request_hashes), + ) + } + PeerVer::V1 => { + ( + PeerInfo { + version: PeerVer::V1, + peer_id: peer_info.peer_id, + last_used: std::time::Instant::now(), + score: u32::MAX, // used to indicate faux peer, will not be added to the group of peers + }, + // do not add VIEW_DRIFT - the stored marker is accurate! + ExternalMessage::BlockRequest(BlockRequest { + to_view: meta.view.saturating_sub(1), + from_view: meta.view.saturating_sub(self.max_batch_size as u64), + }), + ) + } + }; + let request_id = self + .message_sender + .send_external_message(peer_info.peer_id, message)?; + self.in_flight = Some((peer_info, request_id)); + } + } else { + tracing::warn!("sync::RequestMissingBlocks : insufficient peers to handle request"); + } + Ok(()) + } + + /// Phase 1 / 2: Handle a V1 block response + /// + /// If the response if from a V2 peer, it will upgrade that peer to V2. + /// In phase 1, it will extract the metadata and feed it into handle_metadata_response. + /// In phase 2, it will extract the blocks and feed it into handle_multiblock_response. + pub fn handle_block_response(&mut self, from: PeerId, response: BlockResponse) -> Result<()> { + // V2 response + if response.availability.is_none() + && response.proposals.is_empty() + && response.from_view == u64::MAX + { + tracing::info!("sync::HandleBlockResponse : new response from {from}",); + if let Some((mut peer, _)) = self.in_flight.take() { + if peer.peer_id == from && peer.version == PeerVer::V1 { + // upgrade to V2 peer + peer.version = PeerVer::V2; + self.peers.reinsert_peer(peer)?; + match self.state { + SyncState::Phase2(_) => { + self.state = SyncState::Retry1; + } + SyncState::Phase1(_) if Self::DO_SPECULATIVE => { + self.request_missing_metadata(None)?; + } + _ => {} + } + } + } + return Ok(()); + } + + tracing::trace!( + "sync::HandleBlockResponse : received {} blocks from {from}", + response.proposals.len() + ); + + // Convert the V1 response into a V2 response. + match self.state { + // Phase 1 - construct the metadata chain from the set of received proposals + SyncState::Phase1(BlockHeader { + number: block_number, + qc: + QuorumCertificate { + block_hash: parent_hash, + .. + }, + .. + }) => { + // We do not buffer the proposals, as it takes 250MB/day! + // Instead, we will re-request the proposals again, in Phase 2. + let mut parent_hash = parent_hash; + let metadata = response + .proposals + .into_iter() + // filter extras due to drift + .filter(|p| p.number() < block_number) + .sorted_by(|a, b| b.number().cmp(&a.number())) + // filter any forks + .filter(|p| { + if parent_hash != p.hash() { + return false; + } + parent_hash = p.header.qc.block_hash; + true + }) + .map(|p| p.header) + .collect_vec(); + + self.handle_metadata_response(from, metadata)?; + } + + // Phase 2 - extract the requested proposals only. + SyncState::Phase2(_) => { + let multi_blocks = response + .proposals + .into_iter() + // filter any blocks that are not in the chain e.g. forks + .filter(|p| { + self.db + .contains_sync_metadata(&p.hash()) + .unwrap_or_default() + }) + .sorted_by(|a, b| b.number().cmp(&a.number())) + .collect_vec(); + + self.handle_multiblock_response(from, multi_blocks)?; + } + _ => { + tracing::error!( + "sync::HandleBlockResponse : from={from} response={:?}", + response + ); + } + } + Ok(()) + } + + /// Phase 1: Handle a response to a metadata request. + /// + /// This is the first step in the syncing algorithm, where we receive a set of metadata and use it to + /// construct a chain history. We check that the metadata does indeed constitute a segment of a chain. + /// If it does, we record its segment marker and store the entire chain in-memory. + pub fn handle_metadata_response( + &mut self, + from: PeerId, + response: Vec, + ) -> Result<()> { + // Check for expected response + let segment_peer = if let Some((peer, _)) = self.in_flight.as_ref() { + if peer.peer_id != from { + tracing::warn!( + "sync::MetadataResponse : unexpected peer={} != {from}", + peer.peer_id + ); + return Ok(()); + } + peer.clone() + } else { + // We ignore any responses that arrived late, since the original request has already 'timed-out'. + tracing::warn!("sync::MetadataResponse : spurious response {from}"); + return Ok(()); + }; + + // Process whatever we have received. + if response.is_empty() { + // Empty response, downgrade peer and retry with a new peer. + tracing::warn!("sync::MetadataResponse : empty blocks {from}",); + self.peers + .done_with_peer(self.in_flight.take(), DownGrade::Empty); + return Ok(()); + } else { + self.peers + .done_with_peer(self.in_flight.take(), DownGrade::None); + } + + let SyncState::Phase1(meta) = &self.state else { + anyhow::bail!("sync::MetadataResponse : invalid state"); + }; + + // Check the linkage of the returned chain + let mut block_hash = meta.qc.block_hash; + let mut block_num = meta.number; + for meta in response.iter() { + // check that the block hash and number is as expected. + if meta.hash != Hash::ZERO && block_hash == meta.hash && block_num == meta.number + 1 { + block_hash = meta.qc.block_hash; + block_num = meta.number; + } else { + // TODO: possibly, discard and rebuild entire chain + // if something does not match, do nothing and retry the request with the next peer. + tracing::error!( + "sync::MetadataResponse : unexpected metadata hash={block_hash} != {}, num={block_num} != {}", + meta.hash, + meta.number, + ); + return Ok(()); + } + if meta.hash == response.last().unwrap().hash { + break; // done, we do not check the last parent, because that's outside this segment + } + } + + // Chain segment is sane + let segment = response; + + // Record the constructed chain metadata + self.db.insert_sync_metadata(&segment)?; + + // Record landmark(s), including peer that has this set of blocks + self.db.push_sync_segment(segment_peer, *meta)?; + + tracing::info!( + "sync::MetadataResponse : received {} metadata segment #{} from {}", + segment.len(), + self.db.count_sync_segments()?, + from + ); + + // Record the oldest block in the chain's parent + self.state = SyncState::Phase1(segment.last().cloned().unwrap()); + + // If the checkpoint is in this segment + let checkpointed = segment.iter().any(|b| b.hash == self.checkpoint_hash); + let started = self.started_at_block_number <= segment.first().as_ref().unwrap().number + && self.started_at_block_number >= segment.last().as_ref().unwrap().number; + // If the segment hits our history, start Phase 2. + if started || checkpointed { + self.state = SyncState::Phase2(Hash::ZERO); + } else if Self::DO_SPECULATIVE { + self.request_missing_metadata(None)?; + } + + Ok(()) + } + + /// Returns the metadata of the chain from a given hash. + /// + /// This constructs a historical chain going backwards from a hash, by following the parent_hash. + /// It collects N blocks and returns the metadata of that particular chain. + /// This is mainly used in Phase 1 of the syncing algorithm, to construct a chain history. + pub fn handle_metadata_request( + &mut self, + from: PeerId, + request: RequestBlocksByHeight, + ) -> Result { + tracing::debug!( + "sync::MetadataRequest : received a metadata request from {}", + from + ); + + // Do not respond to stale requests as the client has probably timed-out + if request.request_at.elapsed()? > Duration::from_secs(5) { + tracing::warn!("sync::MetadataRequest : stale request"); + return Ok(ExternalMessage::Acknowledgement); + } + + // TODO: Check if we should service this request - https://github.com/Zilliqa/zq2/issues/1878 + + let batch_size: usize = self + .max_batch_size + .min(request.to_height.saturating_sub(request.from_height) as usize); // mitigate DOS by limiting the number of blocks we return + let mut metas = Vec::with_capacity(batch_size); + let Some(block) = self.db.get_canonical_block_by_number(request.to_height)? else { + tracing::warn!("sync::MetadataRequest : unknown block height"); + return Ok(ExternalMessage::Acknowledgement); + }; + metas.push(block.header); + let mut hash = block.parent_hash(); + while metas.len() <= batch_size { + // grab the parent + let Some(block) = self.db.get_block_by_hash(&hash)? else { + break; // that's all we have! + }; + hash = block.parent_hash(); + metas.push(block.header); + } + + let message = ExternalMessage::MetaDataResponse(metas); + tracing::trace!( + ?message, + "sync::MetadataFromHash : responding to block request" + ); + Ok(message) + } + + /// Phase 1: Request chain metadata from a peer. + /// + /// This constructs a chain history by requesting blocks from a peer, going backwards from a given block. + /// If Phase 1 is in progress, it continues requesting blocks from the last known Phase 1 block. + /// Otherwise, it requests blocks from the given starting metadata. + /// + /// TODO: speed it up - https://github.com/Zilliqa/zq2/issues/2158 + pub fn request_missing_metadata(&mut self, meta: Option) -> Result<()> { + if !matches!(self.state, SyncState::Phase1(_)) && !matches!(self.state, SyncState::Phase0) { + anyhow::bail!("sync::RequestMissingMetadata : invalid state"); + } + // Early exit if there's a request in-flight; and if it has not expired. + if self.in_flight.is_some() || self.in_pipeline > self.max_batch_size { + // anything more than this and we cannot be sure whether the segment hits history + tracing::debug!( + "sync::RequestMissingMetadata : syncing {}/{} blocks", + self.in_pipeline, + self.max_batch_size + ); + return Ok(()); + } + + if let Some(peer_info) = self.peers.get_next_peer() { + tracing::info!( + "sync::RequestMissingMetadata : requesting {} metadata of segment #{} from {}", + self.max_batch_size, + self.db.count_sync_segments()? + 1, + peer_info.peer_id + ); + let message = match (self.state.clone(), &peer_info.version) { + ( + SyncState::Phase1(BlockHeader { + number: block_number, + .. + }), + PeerVer::V2, + ) => ExternalMessage::MetaDataRequest(RequestBlocksByHeight { + request_at: SystemTime::now(), + to_height: block_number.saturating_sub(1), + from_height: block_number.saturating_sub(self.max_batch_size as u64), + }), + ( + SyncState::Phase1(BlockHeader { + view: view_number, .. + }), + PeerVer::V1, + ) => { + // For V1 BlockRequest, we request a little more than we need, due to drift + // Since the view number is an 'internal' clock, it is possible for the same block number + // to have different view numbers. + let drift = self.max_batch_size as u64 / 10; + ExternalMessage::BlockRequest(BlockRequest { + to_view: view_number.saturating_add(drift), + from_view: view_number.saturating_sub(self.max_batch_size as u64), + }) + } + (SyncState::Phase0, PeerVer::V2) if meta.is_some() => { + let meta = meta.unwrap(); + let block_number = meta.number; + self.state = SyncState::Phase1(meta); + ExternalMessage::MetaDataRequest(RequestBlocksByHeight { + request_at: SystemTime::now(), + to_height: block_number.sub(1), + from_height: block_number.sub(self.max_batch_size as u64), + }) + } + (SyncState::Phase0, PeerVer::V1) if meta.is_some() => { + let meta = meta.unwrap(); + let view_number = meta.view; + self.state = SyncState::Phase1(meta); + let drift = self.max_batch_size as u64 / 10; + ExternalMessage::BlockRequest(BlockRequest { + to_view: view_number.saturating_add(drift), + from_view: view_number.saturating_sub(self.max_batch_size as u64), + }) + } + _ => anyhow::bail!("sync::MissingMetadata : invalid state"), + }; + let request_id = self + .message_sender + .send_external_message(peer_info.peer_id, message)?; + self.in_flight = Some((peer_info, request_id)); + } else { + tracing::warn!("sync::RequestMissingBlocks : insufficient peers to handle request",); + } + Ok(()) + } + + /// Phase 2 / 3: Inject the proposals into the chain. + /// + /// It adds the list of proposals into the pipeline for execution. + /// It also outputs some syncing statistics. + fn inject_proposals(&mut self, proposals: Vec) -> Result<()> { + if proposals.is_empty() { + return Ok(()); + } + + // Output some stats + if let Some((when, injected)) = self.inject_at { + let diff = injected - self.in_pipeline; + let rate = diff as f32 / when.elapsed().as_secs_f32(); + tracing::debug!("sync::InjectProposals : synced {} block/s", rate); + } + + // Increment proposals injected + self.in_pipeline = self.in_pipeline.saturating_add(proposals.len()); + tracing::debug!( + "sync::InjectProposals : injecting {}/{} proposals", + proposals.len(), + self.in_pipeline + ); + + // Just pump the Proposals back to ourselves. + for p in proposals { + if !p + .transactions + .iter() + .any(|t| matches!(t, SignedTransaction::Zilliqa { .. })) + { + tracing::trace!( + number = %p.number(), hash = %p.hash(), + "sync::InjectProposals : applying", + ); + } else { + tracing::warn!(number = %p.number(), hash = %p.hash(), "sync::InjectProposals : storing"); + // TODO: just store old ZIL blocks - https://github.com/Zilliqa/zq2/issues/2232 + } + self.message_sender.send_external_message( + self.peer_id, + ExternalMessage::InjectedProposal(InjectedProposal { + from: self.peer_id, + block: p, + }), + )?; + } + + self.inject_at = Some((std::time::Instant::now(), self.in_pipeline)); + // return last proposal + Ok(()) + } + + /// Mark a received proposal + /// + /// Mark a proposal as received, and remove it from the chain. + pub fn mark_received_proposal(&mut self, from: PeerId) -> Result<()> { + if from != self.peer_id { + tracing::error!( + "sync::MarkReceivedProposal : foreign InjectedProposal from {}", + from + ); + } + self.in_pipeline = self.in_pipeline.saturating_sub(1); + Ok(()) + } + + /// Returns (am_syncing, current_highest_block) + pub fn am_syncing(&self) -> Result { + Ok(self.in_pipeline != 0 + || !matches!(self.state, SyncState::Phase0) + || self.db.count_sync_segments()? != 0) + } + + // Returns (starting_block, current_block, highest_block) if we're syncing, + // None if we're not. + pub fn get_sync_data(&self) -> Result> { + let flag = self.am_syncing()?; + if !flag { + Ok(None) + } else { + let highest_block = self + .db + .get_canonical_block_by_number( + self.db + .get_highest_canonical_block_number()? + .expect("no highest block"), + )? + .expect("missing highest block"); + + let highest_saved_block_number = highest_block.number(); + let highest_block_number_seen = self.recent_proposals.back().unwrap().number(); + Ok(Some(( + self.started_at_block_number, + highest_saved_block_number, + highest_block_number_seen, + ))) + } + } + + /// Sets the checkpoint, if node was started from a checkpoint. + pub fn set_checkpoint(&mut self, checkpoint: &Block) { + let hash = checkpoint.hash(); + tracing::info!("sync::Checkpoint {}", hash); + self.checkpoint_hash = hash; + } +} + +#[derive(Debug)] +pub struct SyncPeers { + peer_id: PeerId, + peers: Arc>>, +} + +impl SyncPeers { + pub fn new(peer_id: PeerId) -> Self { + Self { + peer_id, + peers: Arc::new(Mutex::new(BinaryHeap::::new())), + } + } + + /// Downgrade a peer based on the response received. + /// + /// This algorithm favours good peers that respond quickly (i.e. no timeout). + /// In most cases, it eventually degenerates into 2 sources - avoid a single source of truth. + fn done_with_peer(&self, in_flight: Option<(PeerInfo, RequestId)>, downgrade: DownGrade) { + if let Some((mut peer, _)) = in_flight { + tracing::trace!("sync::DoneWithPeer {} {:?}", peer.peer_id, downgrade); + let mut peers = self.peers.lock().unwrap(); + peer.score = peer.score.saturating_add(downgrade as u32); + if !peers.is_empty() { + // Ensure that the next peer is equal or better + peer.score = peer.score.max(peers.peek().unwrap().score); + } + // Reinsert peers that are good + if peer.score < u32::MAX { + peers.push(peer); + } + } + } + + /// Add bulk peers + pub fn add_peers(&self, peers: Vec) { + tracing::debug!("sync::AddPeers {:?}", peers); + peers + .into_iter() + .filter(|p| *p != self.peer_id) + .for_each(|p| self.add_peer(p)); + } + + /// Add a peer to the list of peers. + pub fn add_peer(&self, peer: PeerId) { + let mut peers = self.peers.lock().unwrap(); + // if the new peer is not synced, it will get downgraded to the back of heap. + // but by placing them at the back of the 'best' pack, we get to try them out soon. + let new_peer = PeerInfo { + version: PeerVer::V1, + score: peers.iter().map(|p| p.score).min().unwrap_or_default(), + peer_id: peer, + last_used: Instant::now(), + }; + // ensure that it is unique + peers.retain(|p: &PeerInfo| p.peer_id != peer); + peers.push(new_peer); + + tracing::trace!("sync::AddPeer {peer}/{}", peers.len()); + } + + /// Remove a peer from the list of peers. + pub fn remove_peer(&self, peer: PeerId) { + let mut peers = self.peers.lock().unwrap(); + peers.retain(|p: &PeerInfo| p.peer_id != peer); + tracing::trace!("sync::RemovePeer {peer}/{}", peers.len()); + } + + /// Get the next best peer to use + fn get_next_peer(&self) -> Option { + if let Some(mut peer) = self.peers.lock().unwrap().pop() { + peer.last_used = std::time::Instant::now(); + tracing::trace!(peer = % peer.peer_id, score= %peer.score, "sync::GetNextPeer"); + return Some(peer); + } + None + } + + /// Reinserts the peer such that it is at the front of the queue. + fn reinsert_peer(&self, peer: PeerInfo) -> Result<()> { + if peer.score == u32::MAX { + return Ok(()); + } + let mut peers = self.peers.lock().unwrap(); + let mut peer = peer; + if !peers.is_empty() { + // Ensure that it gets to the head of the line + peer.last_used = peers + .peek() + .expect("peers.len() > 1") + .last_used + .checked_sub(Duration::from_secs(1)) + .expect("time is ordinal"); + } + peers.push(peer); + Ok(()) + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct PeerInfo { + pub score: u32, + pub peer_id: PeerId, + pub last_used: Instant, + pub version: PeerVer, +} + +impl Ord for PeerInfo { + fn cmp(&self, other: &Self) -> Ordering { + other + .score + .cmp(&self.score) + .then_with(|| other.last_used.cmp(&self.last_used)) + } +} + +impl PartialOrd for PeerInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// For downgrading a peer from being selected in get_next_peer(). +/// Ordered by degree of offence i.e. None is good, Timeout is worst +#[derive(Debug, Clone, Eq, PartialEq)] +enum DownGrade { + None, + Empty, + Timeout, +} + +impl Ord for DownGrade { + fn cmp(&self, other: &Self) -> Ordering { + (self.clone() as u32).cmp(&(other.clone() as u32)) + } +} + +impl PartialOrd for DownGrade { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Sync state +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone)] +enum SyncState { + Phase0, + Phase1(BlockHeader), + Phase2(Hash), + Phase3, + Retry1, +} + +/// Peer Version +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum PeerVer { + V1 = 1, + V2 = 2, +} + +impl FromSql for PeerVer { + fn column_result(value: ValueRef) -> FromSqlResult { + u32::column_result(value).map(|i| match i { + 1 => PeerVer::V1, + 2 => PeerVer::V2, + _ => todo!("invalid version"), + }) + } +} + +impl ToSql for PeerVer { + fn to_sql(&self) -> Result { + Ok((self.clone() as u32).into()) + } +} diff --git a/zilliqa/tests/it/consensus.rs b/zilliqa/tests/it/consensus.rs index 670ee3613..6946a5e17 100644 --- a/zilliqa/tests/it/consensus.rs +++ b/zilliqa/tests/it/consensus.rs @@ -169,6 +169,7 @@ async fn handle_forking_correctly(mut network: Network) { let original_receipt = first.unwrap(); trace!("Running until the network has reverted the block"); + network.run_until_synced(0).await; // Now we should be able to run the network until we get a different tx receipt from the first // node, which indicates that it has reverted the block network diff --git a/zilliqa/tests/it/main.rs b/zilliqa/tests/it/main.rs index 2033b261f..c7d7207e9 100644 --- a/zilliqa/tests/it/main.rs +++ b/zilliqa/tests/it/main.rs @@ -78,6 +78,7 @@ use zilliqa::{ message::{ExternalMessage, InternalMessage}, node::{Node, RequestId}, node_launcher::ResponseChannel, + sync::SyncPeers, transaction::EvmGas, }; @@ -166,6 +167,9 @@ fn node( let (reset_timeout_sender, reset_timeout_receiver) = mpsc::unbounded_channel(); std::mem::forget(reset_timeout_receiver); + let peer_id = secret_key.to_libp2p_keypair().public().to_peer_id(); + let peers = Arc::new(SyncPeers::new(peer_id)); + let node = Node::new( NodeConfig { data_dir: datadir @@ -179,6 +183,7 @@ fn node( request_responses_sender, reset_timeout_sender, Arc::new(AtomicUsize::new(0)), + peers.clone(), )?; let node = Arc::new(Mutex::new(node)); let rpc_module: RpcModule>> = @@ -187,12 +192,13 @@ fn node( Ok(( TestNode { index, - peer_id: secret_key.to_libp2p_keypair().public().to_peer_id(), + peer_id, secret_key, onchain_key, inner: node, dir: datadir, rpc_module, + peers, }, message_receiver, local_message_receiver, @@ -209,6 +215,7 @@ struct TestNode { rpc_module: RpcModule>>, inner: Arc>, dir: Option, + peers: Arc, } struct Network { @@ -403,6 +410,9 @@ impl Network { let receive_resend_message = UnboundedReceiverStream::new(receive_resend_message).boxed(); receivers.push(receive_resend_message); + let mut peers = nodes.iter().map(|n| n.peer_id).collect_vec(); + peers.shuffle(rng.lock().unwrap().deref_mut()); + for node in &nodes { trace!( "Node {}: {} (dir: {})", @@ -410,6 +420,7 @@ impl Network { node.peer_id, node.dir.as_ref().unwrap().path().to_string_lossy(), ); + node.peers.add_peers(peers.clone()); } Network { @@ -508,6 +519,10 @@ impl Network { let (node, receiver, local_receiver, request_responses) = node(config, secret_key, onchain_key, self.nodes.len(), None).unwrap(); + let mut peers = self.nodes.iter().map(|n| n.peer_id).collect_vec(); + peers.shuffle(self.rng.lock().unwrap().deref_mut()); + node.peers.add_peers(peers.clone()); + trace!("Node {}: {}", node.index, node.peer_id); let index = node.index; @@ -570,6 +585,9 @@ impl Network { .chain(request_response_receivers) .collect(); + let mut peers = nodes.iter().map(|n| n.peer_id).collect_vec(); + peers.shuffle(self.rng.lock().unwrap().deref_mut()); + for node in &nodes { trace!( "Node {}: {} (dir: {})", @@ -577,6 +595,7 @@ impl Network { node.peer_id, node.dir.as_ref().unwrap().path().to_string_lossy(), ); + node.peers.add_peers(peers.clone()); } let (resend_message, receive_resend_message) = mpsc::unbounded_channel::(); @@ -820,23 +839,28 @@ impl Network { true } } + AnyMessage::External(ExternalMessage::InjectedProposal(_)) => { + self.handle_message(m.clone()); + false + } _ => true, }); // Pick a random message - let index = self.rng.lock().unwrap().gen_range(0..messages.len()); - let (source, destination, message) = messages.swap_remove(index); - // Requeue the other messages - for message in messages { - self.resend_message.send(message).unwrap(); - } - - trace!( - "{}", - format_message(&self.nodes, source, destination, &message) - ); + if !messages.is_empty() { + let index = self.rng.lock().unwrap().gen_range(0..messages.len()); + let (source, destination, message) = messages.swap_remove(index); + // Requeue the other messages + for message in messages { + self.resend_message.send(message).unwrap(); + } + trace!( + "{}", + format_message(&self.nodes, source, destination, &message) + ); - self.handle_message((source, destination, message)) + self.handle_message((source, destination, message)) + } } fn handle_message(&mut self, message: StreamMessage) { @@ -1033,6 +1057,26 @@ impl Network { } } + async fn run_until_synced(&mut self, index: usize) { + let check = loop { + let i = self.random_index(); + if i != index { + break i; + } + }; + self.run_until( + |net| { + let syncing = net.get_node(index).consensus.sync.am_syncing().unwrap(); + let height_i = net.get_node(index).get_finalized_height().unwrap(); + let height_c = net.get_node(check).get_finalized_height().unwrap(); + height_c == height_i && height_i > 0 && !syncing + }, + 2000, + ) + .await + .unwrap(); + } + async fn run_until( &mut self, mut condition: impl FnMut(&mut Network) -> bool, diff --git a/zilliqa/tests/it/persistence.rs b/zilliqa/tests/it/persistence.rs index b4230308a..5505d522b 100644 --- a/zilliqa/tests/it/persistence.rs +++ b/zilliqa/tests/it/persistence.rs @@ -268,6 +268,7 @@ async fn checkpoints_test(mut network: Network) { assert_eq!(state["welcome_msg"], "default"); // check the new node catches up and keeps up with block production + network.run_until_synced(new_node_idx).await; network .run_until_block(&new_node_wallet, 20.into(), 200) .await; diff --git a/zilliqa/tests/it/staking.rs b/zilliqa/tests/it/staking.rs index ed5c83473..966662f21 100644 --- a/zilliqa/tests/it/staking.rs +++ b/zilliqa/tests/it/staking.rs @@ -425,14 +425,14 @@ async fn rewards_are_sent_to_reward_address_of_proposer(mut network: Network) { check_miner_got_reward(&wallet, 1).await; } -#[zilliqa_macros::test(blocks_per_epoch = 2, deposit_v3_upgrade_block_height = 12)] +#[zilliqa_macros::test(blocks_per_epoch = 2, deposit_v3_upgrade_block_height = 24)] async fn validators_can_join_and_become_proposer(mut network: Network) { let wallet = network.genesis_wallet().await; // randomise the current epoch state and current leader - let blocks_to_prerun = network.rng.lock().unwrap().gen_range(0..8); + let blocks_to_prerun = network.rng.lock().unwrap().gen_range(0..4); network - .run_until_block(&wallet, blocks_to_prerun.into(), 100) + .run_until_block(&wallet, blocks_to_prerun.into(), 200) .await; // First test joining deposit_v2 @@ -447,6 +447,7 @@ async fn validators_can_join_and_become_proposer(mut network: Network) { let staker_wallet = network.wallet_of_node(index).await; let pop_sinature = new_validator_key.pop_prove(); + // This has to be done before `contract_upgrade_block_heights` which is 24, by default in this test let deposit_hash = deposit_stake( &mut network, &wallet, @@ -513,7 +514,6 @@ async fn validators_can_join_and_become_proposer(mut network: Network) { check_miner_got_reward(&wallet, BlockNumber::Latest).await; // Now test joining deposit_v3 - let deposit_v3_deploy_block = 12; let index = network.add_node(); let new_validator_priv_key = network.get_node_raw(index).secret_key; let new_validator_pub_key = new_validator_priv_key.node_public_key(); @@ -532,7 +532,7 @@ async fn validators_can_join_and_become_proposer(mut network: Network) { // Give new node time to catch up to block including deposit_v3 deployment network - .run_until_block(&staker_wallet, deposit_v3_deploy_block.into(), 200) + .run_until_block(&staker_wallet, 24.into(), 424) .await; let deposit_hash = deposit_v3_stake( @@ -607,6 +607,7 @@ async fn block_proposers_are_selected_proportionally_to_their_stake(mut network: let staker_wallet = network.wallet_of_node(index).await; let pop_signature = new_validator_key.pop_prove(); + network.run_until_synced(index).await; deposit_stake( &mut network, &wallet,