From 8b6b7ed522c9dee624606519f1c6e971951487eb Mon Sep 17 00:00:00 2001 From: Longarithm Date: Mon, 3 Feb 2025 23:38:33 +0400 Subject: [PATCH] wip --- chain/chain/src/chain.rs | 125 ++++++++++++------------ core/primitives/src/optimistic_block.rs | 17 ++++ 2 files changed, 78 insertions(+), 64 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 80a75c2a27b..69c8c19aabf 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -67,7 +67,9 @@ use near_primitives::epoch_block_info::BlockInfo; use near_primitives::errors::EpochError; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::{merklize, verify_path, PartialMerkleTree}; -use near_primitives::optimistic_block::{BlockToApply, OptimisticBlock, OptimisticBlockKeySource}; +use near_primitives::optimistic_block::{ + BlockToApply, CachedShardUpdateKey, OptimisticBlock, OptimisticBlockKeySource, +}; use near_primitives::receipt::Receipt; use near_primitives::sandbox::state_patch::SandboxStatePatch; use near_primitives::shard_layout::{ShardLayout, ShardUId}; @@ -232,7 +234,7 @@ pub fn check_known( } pub struct ApplyChunksResultCache { - cache: LruCache, + cache: LruCache, /// We use Cell to record access statistics even if we don't have /// mutability. hits: Cell, @@ -248,7 +250,7 @@ impl ApplyChunksResultCache { } } - pub fn peek(&self, key: &CryptoHash) -> Option<&ShardUpdateResult> { + pub fn peek(&self, key: &CachedShardUpdateKey) -> Option<&ShardUpdateResult> { if let Some(result) = self.cache.peek(key) { self.hits.set(self.hits.get() + 1); return Some(result); @@ -258,7 +260,7 @@ impl ApplyChunksResultCache { None } - pub fn push(&mut self, key: CryptoHash, result: ShardUpdateResult) { + pub fn push(&mut self, key: CachedShardUpdateKey, result: ShardUpdateResult) { self.cache.put(key, result); } @@ -276,7 +278,7 @@ impl ApplyChunksResultCache { } type BlockApplyChunksResult = - (BlockToApply, Vec<(ShardId, CryptoHash, Result)>); + (BlockToApply, Vec<(ShardId, CachedShardUpdateKey, Result)>); /// Facade to the blockchain block processing and storage. /// Provides current view on the state according to the chain state. @@ -348,7 +350,7 @@ impl Drop for Chain { /// Execution context (latest blocks/chunks details) are already captured within. type UpdateShardJob = ( ShardId, - CryptoHash, + CachedShardUpdateKey, Box Result + Send + Sync + 'static>, ); @@ -1621,14 +1623,8 @@ impl Chain { state_patch: SandboxStatePatch::default(), }; - let block_key_source = OptimisticBlockKeySource { - height: block_context.height, - prev_block_hash: block_context.prev_block_hash, - block_timestamp: block_context.block_timestamp, - random_seed: block_context.random_seed, - }; let cached_shard_update_key = - Self::get_cached_shard_update_key(&block_key_source, &chunks, shard_id)?; + Self::get_cached_shard_update_key(&block_context, &chunks, shard_id)?; let job = self.get_update_shard_job( me, cached_shard_update_key, @@ -1708,45 +1704,7 @@ impl Chain { } } BlockToApply::Optimistic(optimistic_block_hash) => { - let (optimistic_block, _) = self.blocks_in_processing.remove_optimistic(&optimistic_block_hash).unwrap_or_else(|| { - panic!( - "optimistic block {:?} finished applying chunks but not in blocks_in_processing pool", - optimistic_block_hash - ) - }); - - let prev_block_hash = optimistic_block.prev_block_hash(); - let block_height = optimistic_block.height(); - for (shard_id, cached_shard_update_key, apply_result) in - apply_result.into_iter() - { - match apply_result { - Ok(result) => { - info!( - target: "chain", - ?prev_block_hash, - block_height, - ?shard_id, - ?cached_shard_update_key, - "Caching ShardUpdate result from OptimisticBlock" - ); - self.apply_chunk_results_cache - .push(cached_shard_update_key, result); - } - Err(e) => { - warn!( - target: "chain", - ?e, - ?shard_id, - ?prev_block_hash, - block_height, - ?optimistic_block_hash, - ?cached_shard_update_key, - "Error applying chunk for OptimisticBlock" - ); - } - } - } + self.postprocess_optimistic_block(optimistic_block_hash, apply_result); } } } @@ -2314,6 +2272,42 @@ impl Chain { Ok(AcceptedBlock { hash: *block.hash(), status: block_status, provenance }) } + fn postprocess_optimistic_block( + &mut self, + optimistic_block_hash: CryptoHash, + apply_result: Vec<(ShardId, CachedShardUpdateKey, Result)>, + ) { + let (optimistic_block, _) = self.blocks_in_processing.remove_optimistic(&optimistic_block_hash).unwrap_or_else(|| { + panic!( + "optimistic block {:?} finished applying chunks but not in blocks_in_processing pool", + optimistic_block_hash + ) + }); + + let prev_block_hash = optimistic_block.prev_block_hash(); + let block_height = optimistic_block.height(); + for (shard_id, cached_shard_update_key, apply_result) in apply_result.into_iter() { + match apply_result { + Ok(result) => { + info!( + target: "chain", ?prev_block_hash, block_height, + ?shard_id, ?cached_shard_update_key, + "Caching ShardUpdate result from OptimisticBlock" + ); + self.apply_chunk_results_cache.push(cached_shard_update_key, result); + } + Err(e) => { + warn!( + target: "chain", ?e, ?optimistic_block_hash, + ?prev_block_hash, block_height, ?shard_id, + ?cached_shard_update_key, + "Error applying chunk for OptimisticBlock" + ); + } + } + } + } + fn check_if_upgrade_needed(&self, block_hash: &CryptoHash) { if let Ok(next_epoch_protocol_version) = self.epoch_manager.get_next_epoch_protocol_version(block_hash) @@ -3890,14 +3884,8 @@ impl Chain { let storage_context = StorageContext { storage_data_source: StorageDataSource::Db, state_patch }; - let block_key_source = OptimisticBlockKeySource { - height: block_context.height, - prev_block_hash: block_context.prev_block_hash, - block_timestamp: block_context.block_timestamp, - random_seed: block_context.random_seed, - }; let cached_shard_update_key = - Self::get_cached_shard_update_key(&block_key_source, chunk_headers, shard_id)?; + Self::get_cached_shard_update_key(&block_context, chunk_headers, shard_id)?; let job = self.get_update_shard_job( me, cached_shard_update_key, @@ -3980,28 +3968,37 @@ impl Chain { Ok(ShardContext { shard_uid, should_apply_chunk }) } + /// Get a key which can uniquely define result of applying a chunk based on + /// block execution context and other chunks. fn get_cached_shard_update_key( - block: &OptimisticBlockKeySource, + block_context: &ApplyChunkBlockContext, chunk_headers: &Chunks, shard_id: ShardId, - ) -> Result { + ) -> Result { const BYTES_LEN: usize = size_of::() + size_of::() + size_of::(); let mut bytes: Vec = Vec::with_capacity(BYTES_LEN); + let block = OptimisticBlockKeySource { + height: block_context.height, + prev_block_hash: block_context.prev_block_hash, + block_timestamp: block_context.block_timestamp, + random_seed: block_context.random_seed, + }; bytes.extend_from_slice(&hash(&borsh::to_vec(&block)?).0); + let chunks_key_source: Vec<_> = chunk_headers.iter_raw().map(|c| c.chunk_hash()).collect(); bytes.extend_from_slice(&hash(&borsh::to_vec(&chunks_key_source)?).0); bytes.extend_from_slice(&shard_id.to_le_bytes()); - Ok(hash(&bytes)) + Ok(CachedShardUpdateKey::new(hash(&bytes))) } /// This method returns the closure that is responsible for updating a shard. fn get_update_shard_job( &self, me: &Option, - cached_shard_update_key: CryptoHash, + cached_shard_update_key: CachedShardUpdateKey, block: ApplyChunkBlockContext, chunk_headers: &Chunks, shard_index: ShardIndex, @@ -4806,7 +4803,7 @@ pub fn do_apply_chunks( block: BlockToApply, block_height: BlockHeight, work: Vec, -) -> Vec<(ShardId, CryptoHash, Result)> { +) -> Vec<(ShardId, CachedShardUpdateKey, Result)> { let parent_span = tracing::debug_span!(target: "chain", "do_apply_chunks", block_height, ?block).entered(); work.into_par_iter() diff --git a/core/primitives/src/optimistic_block.rs b/core/primitives/src/optimistic_block.rs index 9a4966bd18a..b16deed1b19 100644 --- a/core/primitives/src/optimistic_block.rs +++ b/core/primitives/src/optimistic_block.rs @@ -108,6 +108,12 @@ impl OptimisticBlock { } } +/// Optimistic block fields which are enough to define unique context for +/// applying chunks in that block. Thus hash of this struct can be used to +/// cache *valid* optimistic blocks. +/// +/// This struct is created just so that we can conveniently derive and use +/// `borsh` serialization for it. #[derive(BorshSerialize)] pub struct OptimisticBlockKeySource { pub height: BlockHeight, @@ -121,3 +127,14 @@ pub enum BlockToApply { Normal(CryptoHash), Optimistic(CryptoHash), } + +#[derive(Hash, PartialEq, Eq, Clone, Copy, Debug)] +pub struct CachedShardUpdateKey(CryptoHash); + +impl CachedShardUpdateKey { + /// Explicit constructor to minimize the risk of using hashes of other + /// entities accidentally. + pub fn new(hash: CryptoHash) -> Self { + Self(hash) + } +}