Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm committed Feb 3, 2025
1 parent 3f08e78 commit 8b6b7ed
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 64 deletions.
125 changes: 61 additions & 64 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ use near_primitives::epoch_block_info::BlockInfo;
use near_primitives::errors::EpochError;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{merklize, verify_path, PartialMerkleTree};
use near_primitives::optimistic_block::{BlockToApply, OptimisticBlock, OptimisticBlockKeySource};
use near_primitives::optimistic_block::{
BlockToApply, CachedShardUpdateKey, OptimisticBlock, OptimisticBlockKeySource,
};
use near_primitives::receipt::Receipt;
use near_primitives::sandbox::state_patch::SandboxStatePatch;
use near_primitives::shard_layout::{ShardLayout, ShardUId};
Expand Down Expand Up @@ -232,7 +234,7 @@ pub fn check_known(
}

pub struct ApplyChunksResultCache {
cache: LruCache<CryptoHash, ShardUpdateResult>,
cache: LruCache<CachedShardUpdateKey, ShardUpdateResult>,
/// We use Cell to record access statistics even if we don't have
/// mutability.
hits: Cell<usize>,
Expand All @@ -248,7 +250,7 @@ impl ApplyChunksResultCache {
}
}

pub fn peek(&self, key: &CryptoHash) -> Option<&ShardUpdateResult> {
pub fn peek(&self, key: &CachedShardUpdateKey) -> Option<&ShardUpdateResult> {
if let Some(result) = self.cache.peek(key) {
self.hits.set(self.hits.get() + 1);
return Some(result);
Expand All @@ -258,7 +260,7 @@ impl ApplyChunksResultCache {
None
}

pub fn push(&mut self, key: CryptoHash, result: ShardUpdateResult) {
pub fn push(&mut self, key: CachedShardUpdateKey, result: ShardUpdateResult) {
self.cache.put(key, result);
}

Expand All @@ -276,7 +278,7 @@ impl ApplyChunksResultCache {
}

type BlockApplyChunksResult =
(BlockToApply, Vec<(ShardId, CryptoHash, Result<ShardUpdateResult, Error>)>);
(BlockToApply, Vec<(ShardId, CachedShardUpdateKey, Result<ShardUpdateResult, Error>)>);

/// Facade to the blockchain block processing and storage.
/// Provides current view on the state according to the chain state.
Expand Down Expand Up @@ -348,7 +350,7 @@ impl Drop for Chain {
/// Execution context (latest blocks/chunks details) are already captured within.
type UpdateShardJob = (
ShardId,
CryptoHash,
CachedShardUpdateKey,
Box<dyn FnOnce(&Span) -> Result<ShardUpdateResult, Error> + Send + Sync + 'static>,
);

Expand Down Expand Up @@ -1621,14 +1623,8 @@ impl Chain {
state_patch: SandboxStatePatch::default(),
};

let block_key_source = OptimisticBlockKeySource {
height: block_context.height,
prev_block_hash: block_context.prev_block_hash,
block_timestamp: block_context.block_timestamp,
random_seed: block_context.random_seed,
};
let cached_shard_update_key =
Self::get_cached_shard_update_key(&block_key_source, &chunks, shard_id)?;
Self::get_cached_shard_update_key(&block_context, &chunks, shard_id)?;
let job = self.get_update_shard_job(
me,
cached_shard_update_key,
Expand Down Expand Up @@ -1708,45 +1704,7 @@ impl Chain {
}
}
BlockToApply::Optimistic(optimistic_block_hash) => {
let (optimistic_block, _) = self.blocks_in_processing.remove_optimistic(&optimistic_block_hash).unwrap_or_else(|| {
panic!(
"optimistic block {:?} finished applying chunks but not in blocks_in_processing pool",
optimistic_block_hash
)
});

let prev_block_hash = optimistic_block.prev_block_hash();
let block_height = optimistic_block.height();
for (shard_id, cached_shard_update_key, apply_result) in
apply_result.into_iter()
{
match apply_result {
Ok(result) => {
info!(
target: "chain",
?prev_block_hash,
block_height,
?shard_id,
?cached_shard_update_key,
"Caching ShardUpdate result from OptimisticBlock"
);
self.apply_chunk_results_cache
.push(cached_shard_update_key, result);
}
Err(e) => {
warn!(
target: "chain",
?e,
?shard_id,
?prev_block_hash,
block_height,
?optimistic_block_hash,
?cached_shard_update_key,
"Error applying chunk for OptimisticBlock"
);
}
}
}
self.postprocess_optimistic_block(optimistic_block_hash, apply_result);
}
}
}
Expand Down Expand Up @@ -2314,6 +2272,42 @@ impl Chain {
Ok(AcceptedBlock { hash: *block.hash(), status: block_status, provenance })
}

fn postprocess_optimistic_block(
&mut self,
optimistic_block_hash: CryptoHash,
apply_result: Vec<(ShardId, CachedShardUpdateKey, Result<ShardUpdateResult, Error>)>,
) {
let (optimistic_block, _) = self.blocks_in_processing.remove_optimistic(&optimistic_block_hash).unwrap_or_else(|| {
panic!(
"optimistic block {:?} finished applying chunks but not in blocks_in_processing pool",
optimistic_block_hash
)
});

let prev_block_hash = optimistic_block.prev_block_hash();
let block_height = optimistic_block.height();
for (shard_id, cached_shard_update_key, apply_result) in apply_result.into_iter() {
match apply_result {
Ok(result) => {
info!(
target: "chain", ?prev_block_hash, block_height,
?shard_id, ?cached_shard_update_key,
"Caching ShardUpdate result from OptimisticBlock"
);
self.apply_chunk_results_cache.push(cached_shard_update_key, result);
}
Err(e) => {
warn!(
target: "chain", ?e, ?optimistic_block_hash,
?prev_block_hash, block_height, ?shard_id,
?cached_shard_update_key,
"Error applying chunk for OptimisticBlock"
);
}
}
}
}

fn check_if_upgrade_needed(&self, block_hash: &CryptoHash) {
if let Ok(next_epoch_protocol_version) =
self.epoch_manager.get_next_epoch_protocol_version(block_hash)
Expand Down Expand Up @@ -3890,14 +3884,8 @@ impl Chain {
let storage_context =
StorageContext { storage_data_source: StorageDataSource::Db, state_patch };

let block_key_source = OptimisticBlockKeySource {
height: block_context.height,
prev_block_hash: block_context.prev_block_hash,
block_timestamp: block_context.block_timestamp,
random_seed: block_context.random_seed,
};
let cached_shard_update_key =
Self::get_cached_shard_update_key(&block_key_source, chunk_headers, shard_id)?;
Self::get_cached_shard_update_key(&block_context, chunk_headers, shard_id)?;
let job = self.get_update_shard_job(
me,
cached_shard_update_key,
Expand Down Expand Up @@ -3980,28 +3968,37 @@ impl Chain {
Ok(ShardContext { shard_uid, should_apply_chunk })
}

/// Get a key which can uniquely define result of applying a chunk based on
/// block execution context and other chunks.
fn get_cached_shard_update_key(
block: &OptimisticBlockKeySource,
block_context: &ApplyChunkBlockContext,
chunk_headers: &Chunks,
shard_id: ShardId,
) -> Result<CryptoHash, Error> {
) -> Result<CachedShardUpdateKey, Error> {
const BYTES_LEN: usize =
size_of::<CryptoHash>() + size_of::<CryptoHash>() + size_of::<u64>();

let mut bytes: Vec<u8> = Vec::with_capacity(BYTES_LEN);
let block = OptimisticBlockKeySource {
height: block_context.height,
prev_block_hash: block_context.prev_block_hash,
block_timestamp: block_context.block_timestamp,
random_seed: block_context.random_seed,
};
bytes.extend_from_slice(&hash(&borsh::to_vec(&block)?).0);

let chunks_key_source: Vec<_> = chunk_headers.iter_raw().map(|c| c.chunk_hash()).collect();
bytes.extend_from_slice(&hash(&borsh::to_vec(&chunks_key_source)?).0);
bytes.extend_from_slice(&shard_id.to_le_bytes());

Ok(hash(&bytes))
Ok(CachedShardUpdateKey::new(hash(&bytes)))
}

/// This method returns the closure that is responsible for updating a shard.
fn get_update_shard_job(
&self,
me: &Option<AccountId>,
cached_shard_update_key: CryptoHash,
cached_shard_update_key: CachedShardUpdateKey,
block: ApplyChunkBlockContext,
chunk_headers: &Chunks,
shard_index: ShardIndex,
Expand Down Expand Up @@ -4806,7 +4803,7 @@ pub fn do_apply_chunks(
block: BlockToApply,
block_height: BlockHeight,
work: Vec<UpdateShardJob>,
) -> Vec<(ShardId, CryptoHash, Result<ShardUpdateResult, Error>)> {
) -> Vec<(ShardId, CachedShardUpdateKey, Result<ShardUpdateResult, Error>)> {
let parent_span =
tracing::debug_span!(target: "chain", "do_apply_chunks", block_height, ?block).entered();
work.into_par_iter()
Expand Down
17 changes: 17 additions & 0 deletions core/primitives/src/optimistic_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ impl OptimisticBlock {
}
}

/// Optimistic block fields which are enough to define unique context for
/// applying chunks in that block. Thus hash of this struct can be used to
/// cache *valid* optimistic blocks.
///
/// This struct is created just so that we can conveniently derive and use
/// `borsh` serialization for it.
#[derive(BorshSerialize)]
pub struct OptimisticBlockKeySource {
pub height: BlockHeight,
Expand All @@ -121,3 +127,14 @@ pub enum BlockToApply {
Normal(CryptoHash),
Optimistic(CryptoHash),
}

#[derive(Hash, PartialEq, Eq, Clone, Copy, Debug)]
pub struct CachedShardUpdateKey(CryptoHash);

impl CachedShardUpdateKey {
/// Explicit constructor to minimize the risk of using hashes of other
/// entities accidentally.
pub fn new(hash: CryptoHash) -> Self {
Self(hash)
}
}

0 comments on commit 8b6b7ed

Please sign in to comment.