diff --git a/chain/chain/src/block_processing_utils.rs b/chain/chain/src/block_processing_utils.rs index d0fc1e36e01..3d416de308e 100644 --- a/chain/chain/src/block_processing_utils.rs +++ b/chain/chain/src/block_processing_utils.rs @@ -109,7 +109,7 @@ impl BlocksInProcessing { block: Block, preprocess_info: BlockPreprocessInfo, ) -> Result<(), AddError> { - self.add_dry_run(&BlockToApply::Normal(block.hash().clone()))?; + self.add_dry_run(&BlockToApply::Normal(*block.hash()))?; self.preprocessed_blocks.insert(*block.hash(), (block, preprocess_info)); Ok(()) @@ -120,7 +120,7 @@ impl BlocksInProcessing { block: OptimisticBlock, preprocess_info: OptimisticBlockInfo, ) -> Result<(), AddError> { - self.add_dry_run(&BlockToApply::Optimistic(block.hash().clone()))?; + self.add_dry_run(&BlockToApply::Optimistic(*block.hash()))?; self.optimistic_blocks.insert(*block.hash(), (block, preprocess_info)); Ok(()) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 2dc9be90d8a..a8586556f7d 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -214,7 +214,7 @@ pub fn check_known( if block_hash == &head.last_block_hash || block_hash == &head.prev_block_hash { return Ok(Err(BlockKnownError::KnownInHead)); } - if chain.blocks_in_processing.contains(&BlockToApply::Normal(block_hash)) { + if chain.blocks_in_processing.contains(&BlockToApply::Normal(*block_hash)) { return Ok(Err(BlockKnownError::KnownInProcessing)); } // Check if this block is in the set of known orphans. @@ -2162,6 +2162,8 @@ impl Chain { Ok(new_head) => new_head, }; + self.update_optimistic_blocks_pool(&block)?; + let epoch_id = block.header().epoch_id(); let mut shards_cares_this_or_next_epoch = vec![]; for shard_id in self.epoch_manager.shard_ids(epoch_id)? { @@ -2367,6 +2369,20 @@ impl Chain { Ok(()) } + /// If `block` is committed, the `last_final_block(block)` is final. + /// Thus it is enough to keep only chunks which are built on top of block + /// with `height(last_final_block(block))` or higher. + fn update_optimistic_blocks_pool(&mut self, block: &Block) -> Result<(), Error> { + let final_block = block.header().last_final_block(); + if final_block == &CryptoHash::default() { + return Ok(()); + } + + let final_block_height = self.chain_store.get_block_header(final_block)?.height(); + self.optimistic_block_chunks.update_minimal_base_height(final_block_height); + Ok(()) + } + /// Preprocess a block before applying chunks, verify that we have the necessary information /// to process the block and the block is valid. /// Note that this function does NOT introduce any changes to chain state. @@ -2383,7 +2399,7 @@ impl Chain { let header = block.header(); // see if the block is already in processing or if there are too many blocks being processed - self.blocks_in_processing.add_dry_run(&BlockToApply::Normal(block.hash().clone()))?; + self.blocks_in_processing.add_dry_run(&BlockToApply::Normal(*block.hash()))?; debug!(target: "chain", height=header.height(), num_approvals = header.num_approvals(), "preprocess_block"); @@ -4576,7 +4592,7 @@ impl Chain { /// Check if hash is for a block that is being processed #[inline] pub fn is_in_processing(&self, hash: &CryptoHash) -> bool { - self.blocks_in_processing.contains(&BlockToApply::Normal(hash.clone())) + self.blocks_in_processing.contains(&BlockToApply::Normal(*hash)) } #[inline] diff --git a/chain/chain/src/missing_chunks.rs b/chain/chain/src/missing_chunks.rs index 3b9fd1bc98f..a8e6f03bf68 100644 --- a/chain/chain/src/missing_chunks.rs +++ b/chain/chain/src/missing_chunks.rs @@ -190,10 +190,15 @@ impl MissingChunksPool { } } +/// Stores chunks for some optimistic block received so far. #[derive(Debug, Default)] struct OptimisticBlockChunks { + /// Number of remaining chunks to be received. Stored to avoid naive + /// recomputation. remaining_chunks: usize, + /// Height of the previous block. prev_block_height: BlockHeight, + /// Stores chunks for each shard, if received. chunks: Vec>, } @@ -202,36 +207,52 @@ impl OptimisticBlockChunks { Self { remaining_chunks: num_shards, prev_block_height, chunks: vec![None; num_shards] } } } -/// Stores optimistic blocks which are waiting for chunks to be received. +/// Stores optimistic blocks and chunks which are waiting to be processed. +/// +/// Once block and all chunks on top of the same previous block are received, +/// it can provide the optimistic block to be processed. #[derive(Debug, Default)] pub struct OptimisticBlockChunksPool { - /// Strict tip. Everything dies before this height. - final_tip: BlockHeight, - /// Optimistic blocks tip. Softer to ensure unique optimistic block processing. - blocks_tip: BlockHeight, + /// All blocks and chunks built on top of blocks *smaller* than this height + /// can be discarded. Needed to garbage collect old data in case of forks + /// which never can get finalized. + minimal_base_height: BlockHeight, + /// Optimistic block with heights *not greater* than this height are discarded. + /// Needed on top of `minimal_base_height` as well to ensure that only one + /// optimistic block on each height can be processed. + block_height_threshold: BlockHeight, /// Maps previous block hash to the received optimistic block with the /// highest height on top of it. blocks: HashMap, /// Maps previous block hash to the vector of chunk headers corresponding /// to complete chunks. chunks: HashMap, - /// Result. + /// Optimistic block with the largest height which is ready to process. + /// Block and chunks must correspond to the same previous block hash. latest_ready_block: Option<(OptimisticBlock, Vec)>, } impl OptimisticBlockChunksPool { pub fn new() -> Self { Self { - final_tip: 0, - blocks_tip: 0, + minimal_base_height: 0, + block_height_threshold: 0, blocks: Default::default(), chunks: Default::default(), latest_ready_block: None, } } + pub fn num_blocks(&self) -> usize { + self.blocks.len() + } + + pub fn num_chunks(&self) -> usize { + self.chunks.len() + } + pub fn add_block(&mut self, block: OptimisticBlock) { - if block.height() <= self.blocks_tip { + if block.height() <= self.block_height_threshold { return; } @@ -240,13 +261,10 @@ impl OptimisticBlockChunksPool { self.update_latest_ready_block(&prev_block_hash); } - pub fn accept_chunk( - &mut self, - shard_layout: &ShardLayout, - prev_block_height: BlockHeight, - chunk_header: ShardChunkHeader, - ) { - if prev_block_height <= self.final_tip { + pub fn add_chunk(&mut self, shard_layout: &ShardLayout, chunk_header: ShardChunkHeader) { + // We assume that `chunk_header.height_created() = prev_block_height + 1`. + let prev_block_height = chunk_header.height_created().saturating_sub(1); + if prev_block_height < self.minimal_base_height { return; } @@ -284,10 +302,14 @@ impl OptimisticBlockChunksPool { } } + /// Takes the latest optimistic block and chunks which are ready to + /// be processed. pub fn take_latest_ready_block(&mut self) -> Option<(OptimisticBlock, Vec)> { self.latest_ready_block.take() } + /// If the optimistic block on top of `prev_block_hash` is ready to + /// process, sets the latest ready block to it. fn update_latest_ready_block(&mut self, prev_block_hash: &CryptoHash) { let Some(chunks) = self.chunks.get(prev_block_hash) else { return; @@ -298,7 +320,7 @@ impl OptimisticBlockChunksPool { let Some(block) = self.blocks.remove(prev_block_hash) else { return; }; - if block.height() <= self.blocks_tip { + if block.height() <= self.block_height_threshold { return; } @@ -319,31 +341,17 @@ impl OptimisticBlockChunksPool { chunk }) .collect(); - self.update_blocks_tip(block.height()); + self.update_block_height_threshold(block.height()); self.latest_ready_block = Some((block, chunks)); } - pub fn update_tip(&mut self, tip: BlockHeight) { - self.update_blocks_tip(tip); - - self.final_tip = std::cmp::max(self.final_tip, tip); - let hashes_to_remove: Vec<_> = self - .chunks - .iter() - .filter(|(_, h)| h.prev_block_height <= self.final_tip) - .map(|(h, _)| *h) - .collect(); - for h in hashes_to_remove { - self.chunks.remove(&h); - } - } - - pub fn update_blocks_tip(&mut self, blocks_tip: BlockHeight) { - self.blocks_tip = std::cmp::max(self.blocks_tip, blocks_tip); + /// Updates the block height threshold and cleans up old blocks. + pub fn update_block_height_threshold(&mut self, height: BlockHeight) { + self.block_height_threshold = std::cmp::max(self.block_height_threshold, height); let hashes_to_remove: Vec<_> = self .blocks .iter() - .filter(|(_, h)| h.height() <= self.blocks_tip) + .filter(|(_, h)| h.height() <= self.block_height_threshold) .map(|(h, _)| *h) .collect(); for h in hashes_to_remove { @@ -353,14 +361,32 @@ impl OptimisticBlockChunksPool { let Some((block, _)) = &self.latest_ready_block else { return; }; - if block.height() <= self.blocks_tip { + if block.height() <= self.block_height_threshold { self.latest_ready_block = None; } } + + /// Updates the minimal base height and cleans up old blocks and chunks. + pub fn update_minimal_base_height(&mut self, height: BlockHeight) { + // If *new* chunks must be built on top of *at least* this height, + // optimistic block height must be *strictly greater* than this height. + self.update_block_height_threshold(height); + + self.minimal_base_height = std::cmp::max(self.minimal_base_height, height); + let hashes_to_remove: Vec<_> = self + .chunks + .iter() + .filter(|(_, h)| h.prev_block_height < self.minimal_base_height) + .map(|(h, _)| *h) + .collect(); + for h in hashes_to_remove { + self.chunks.remove(&h); + } + } } #[cfg(test)] -mod test { +mod missing_chunks_test { use super::{BlockHash, BlockLike, MissingChunksPool, MAX_BLOCKS_MISSING_CHUNKS}; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::sharding::ChunkHash; @@ -465,3 +491,80 @@ mod test { assert!(pool.contains(&later_block_hash)); } } + +#[cfg(test)] +mod optimistic_block_chunks_pool_test { + use super::{OptimisticBlock, OptimisticBlockChunksPool, ShardChunkHeader}; + use itertools::Itertools; + use near_primitives::hash::CryptoHash; + use near_primitives::shard_layout::ShardLayout; + use near_primitives::types::ShardId; + + #[test] + fn test_add_block() { + let mut pool = OptimisticBlockChunksPool::new(); + let prev_hash = CryptoHash::default(); + let block = OptimisticBlock::new_dummy(1, prev_hash); + + pool.add_block(block); + assert_eq!(pool.num_blocks(), 1); + assert!(pool.blocks.contains_key(&prev_hash)); + } + + #[test] + fn test_add_chunk() { + let mut pool = OptimisticBlockChunksPool::new(); + let prev_hash = CryptoHash::default(); + let shard_layout = ShardLayout::single_shard(); + let chunk_header = ShardChunkHeader::new_dummy(0, ShardId::new(0), prev_hash); + + pool.add_chunk(&shard_layout, chunk_header); + assert_eq!(pool.num_chunks(), 1); + assert!(pool.chunks.contains_key(&prev_hash)); + } + + #[test] + fn test_ready_block() { + let mut pool = OptimisticBlockChunksPool::new(); + let prev_hash = CryptoHash::default(); + let block = OptimisticBlock::new_dummy(1, prev_hash); + let shard_layout = ShardLayout::multi_shard(2, 3); + let shard_ids = shard_layout.shard_ids().collect_vec(); + + let chunk_header = ShardChunkHeader::new_dummy(0, shard_ids[0], prev_hash); + pool.add_block(block); + pool.add_chunk(&shard_layout, chunk_header); + + pool.update_latest_ready_block(&prev_hash); + assert!(pool.take_latest_ready_block().is_none()); + + let new_chunk_header = ShardChunkHeader::new_dummy(0, shard_ids[1], prev_hash); + pool.add_chunk(&shard_layout, new_chunk_header); + + assert!(pool.take_latest_ready_block().is_some()); + assert!(pool.take_latest_ready_block().is_none()); + } + + #[test] + fn test_thresholds() { + let mut pool = OptimisticBlockChunksPool::new(); + pool.block_height_threshold = 10; + pool.minimal_base_height = 5; + + let prev_hash = CryptoHash::default(); + let block_below_threshold = OptimisticBlock::new_dummy(10, prev_hash); + pool.add_block(block_below_threshold); + assert_eq!(pool.num_blocks(), 0, "Block below threshold should not be added"); + + let shard_layout = ShardLayout::single_shard(); + let chunk_header_below_threshold = + ShardChunkHeader::new_dummy(5, ShardId::new(0), prev_hash); + pool.add_chunk(&shard_layout, chunk_header_below_threshold); + assert_eq!(pool.num_chunks(), 0, "Chunk strictly below threshold should not be added"); + + let chunk_header_passing_threshold = + ShardChunkHeader::new_dummy(6, ShardId::new(0), prev_hash); + pool.add_chunk(&shard_layout, chunk_header_passing_threshold); + assert_eq!(pool.num_chunks(), 1); + } +} diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index 64fd8ce031b..a390302af0a 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -91,7 +91,7 @@ pub fn wait_for_all_blocks_in_processing(chain: &Chain) -> bool { } pub fn is_block_in_processing(chain: &Chain, block_hash: &CryptoHash) -> bool { - chain.blocks_in_processing.contains(&BlockToApply::Normal(block_hash.clone())) + chain.blocks_in_processing.contains(&BlockToApply::Normal(*block_hash)) } pub fn wait_for_block_in_processing( diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 47b68724621..8153a6d6e7a 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1631,13 +1631,7 @@ impl Client { .expect("Could not persist chunk"); // We're marking chunk as accepted. self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash()); - // TODO: ugly assumption on height_created - let prev_block_height = chunk_header.height_created().saturating_sub(1); - self.chain.optimistic_block_chunks.accept_chunk( - &shard_layout, - prev_block_height, - chunk_header, - ); + self.chain.optimistic_block_chunks.add_chunk(&shard_layout, chunk_header); // If this was the last chunk that was missing for a block, it will be processed now. self.process_blocks_with_missing_chunks(apply_chunks_done_sender, &signer); self.maybe_process_optimistic_block(); diff --git a/core/primitives/src/optimistic_block.rs b/core/primitives/src/optimistic_block.rs index 2d4c12f9d85..9a4966bd18a 100644 --- a/core/primitives/src/optimistic_block.rs +++ b/core/primitives/src/optimistic_block.rs @@ -1,12 +1,12 @@ -use std::fmt::Debug; - -use borsh::{BorshDeserialize, BorshSerialize}; -use near_crypto::Signature; -use near_schema_checker_lib::ProtocolSchema; - use crate::block::BlockHeader; use crate::hash::{hash, CryptoHash}; use crate::types::{BlockHeight, SignatureDifferentiator}; +use borsh::{BorshDeserialize, BorshSerialize}; +use near_crypto::{InMemorySigner, Signature}; +use near_primitives_core::types::AccountId; +use near_schema_checker_lib::ProtocolSchema; +use std::fmt::Debug; +use std::str::FromStr; #[derive(BorshSerialize, BorshDeserialize, Clone, Debug, Eq, PartialEq, ProtocolSchema)] pub struct OptimisticBlockInner { @@ -88,6 +88,24 @@ impl OptimisticBlock { pub fn random_value(&self) -> &CryptoHash { &self.inner.random_value } + + pub fn new_dummy(height: BlockHeight, prev_hash: CryptoHash) -> Self { + let signer = InMemorySigner::test_signer(&AccountId::from_str("test".into()).unwrap()); + let (vrf_value, vrf_proof) = signer.compute_vrf_with_proof(Default::default()); + Self { + inner: OptimisticBlockInner { + block_height: height, + prev_block_hash: prev_hash, + block_timestamp: 0, + random_value: Default::default(), + vrf_value, + vrf_proof, + signature_differentiator: "test".to_string(), + }, + signature: Default::default(), + hash: Default::default(), + } + } } #[derive(BorshSerialize)] diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 76825f85991..1471511531e 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -6,11 +6,12 @@ use crate::receipt::Receipt; use crate::transaction::SignedTransaction; use crate::types::validator_stake::{ValidatorStake, ValidatorStakeIter, ValidatorStakeV1}; use crate::types::{Balance, BlockHeight, Gas, MerkleHash, ShardId, StateRoot}; -use crate::validator_signer::ValidatorSigner; +use crate::validator_signer::{EmptyValidatorSigner, ValidatorSigner}; use crate::version::{ProtocolFeature, ProtocolVersion, SHARD_CHUNK_HEADER_UPGRADE_VERSION}; use borsh::{BorshDeserialize, BorshSerialize}; use near_crypto::Signature; use near_fmt::AbbrBytes; +use near_primitives_core::version::PROTOCOL_VERSION; use near_schema_checker_lib::ProtocolSchema; use shard_chunk_header_inner::ShardChunkHeaderInnerV4; use std::cmp::Ordering; @@ -344,6 +345,32 @@ pub enum ShardChunkHeader { } impl ShardChunkHeader { + pub fn new_dummy(height: BlockHeight, shard_id: ShardId, prev_block_hash: CryptoHash) -> Self { + let congestion_info = ProtocolFeature::CongestionControl + .enabled(PROTOCOL_VERSION) + .then_some(CongestionInfo::default()); + + ShardChunkHeader::V3(ShardChunkHeaderV3::new( + PROTOCOL_VERSION, + prev_block_hash, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + height, + shard_id, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + congestion_info, + BandwidthRequests::default_for_protocol_version(PROTOCOL_VERSION), + &EmptyValidatorSigner::default().into(), + )) + } + #[inline] pub fn take_inner(self) -> ShardChunkHeaderInner { match self { @@ -362,6 +389,10 @@ impl ShardChunkHeader { hash(&inner_bytes.expect("Failed to serialize")) } + /// Height at which the chunk was created. + /// TODO: this is always `height(prev_block_hash) + 1`. Consider using + /// `prev_block_height` instead as this is more explicit and + /// `height_created` also conflicts with `height_included`. #[inline] pub fn height_created(&self) -> BlockHeight { match self { diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index d5ed9d6d249..e1526c2508b 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -2,21 +2,17 @@ use std::collections::HashMap; use std::fmt::Debug; use super::ChunkProductionKey; -use crate::bandwidth_scheduler::BandwidthRequests; use crate::challenge::PartialState; -use crate::congestion_info::CongestionInfo; #[cfg(feature = "solomon")] use crate::reed_solomon::{ReedSolomonEncoderDeserialize, ReedSolomonEncoderSerialize}; -use crate::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader, ShardChunkHeaderV3}; +use crate::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader}; use crate::transaction::SignedTransaction; use crate::types::{EpochId, SignatureDifferentiator}; use crate::utils::compression::CompressedData; -use crate::validator_signer::EmptyValidatorSigner; use borsh::{BorshDeserialize, BorshSerialize}; use bytesize::ByteSize; use near_primitives_core::hash::CryptoHash; use near_primitives_core::types::{AccountId, BlockHeight, ShardId}; -use near_primitives_core::version::{ProtocolFeature, PROTOCOL_VERSION}; use near_schema_checker_lib::ProtocolSchema; /// Represents max allowed size of the raw (not compressed) state witness, @@ -199,29 +195,7 @@ impl ChunkStateWitness { } pub fn new_dummy(height: BlockHeight, shard_id: ShardId, prev_block_hash: CryptoHash) -> Self { - let congestion_info = ProtocolFeature::CongestionControl - .enabled(PROTOCOL_VERSION) - .then_some(CongestionInfo::default()); - - let header = ShardChunkHeader::V3(ShardChunkHeaderV3::new( - PROTOCOL_VERSION, - prev_block_hash, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - height, - shard_id, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - congestion_info, - BandwidthRequests::default_for_protocol_version(PROTOCOL_VERSION), - &EmptyValidatorSigner::default().into(), - )); + let header = ShardChunkHeader::new_dummy(height, shard_id, prev_block_hash); Self::new( "alice.near".parse().unwrap(), EpochId::default(), diff --git a/integration-tests/src/test_loop/tests/optimistic_block.rs b/integration-tests/src/test_loop/tests/optimistic_block.rs index 1e3d0f039f9..aac834ecd30 100644 --- a/integration-tests/src/test_loop/tests/optimistic_block.rs +++ b/integration-tests/src/test_loop/tests/optimistic_block.rs @@ -45,5 +45,12 @@ fn test_optimistic_block() { env.test_loop.run_for(Duration::seconds(10)); + { + let chain = + &env.test_loop.data.get(&env.datas[0].client_sender.actor_handle()).client.chain; + println!("num_blocks: {}", chain.optimistic_block_chunks.num_blocks()); + println!("num_chunks: {}", chain.optimistic_block_chunks.num_chunks()); + } + env.shutdown_and_drain_remaining_events(Duration::seconds(20)); }