Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm committed Feb 3, 2025
1 parent da16b54 commit 3f08e78
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 9 deletions.
55 changes: 48 additions & 7 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ use near_store::get_genesis_state_roots;
use near_store::DBCol;
use node_runtime::bootstrap_congestion_info;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::cell::Cell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -230,6 +231,50 @@ pub fn check_known(
check_known_store(chain, block_hash)
}

pub struct ApplyChunksResultCache {
cache: LruCache<CryptoHash, ShardUpdateResult>,
/// We use Cell to record access statistics even if we don't have
/// mutability.
hits: Cell<usize>,
misses: Cell<usize>,
}

impl ApplyChunksResultCache {
pub fn new(size: usize) -> Self {
Self {
cache: LruCache::new(NonZeroUsize::new(size).unwrap()),
hits: Cell::new(0),
misses: Cell::new(0),
}
}

pub fn peek(&self, key: &CryptoHash) -> Option<&ShardUpdateResult> {
if let Some(result) = self.cache.peek(key) {
self.hits.set(self.hits.get() + 1);
return Some(result);
}

self.misses.set(self.misses.get() + 1);
None
}

pub fn push(&mut self, key: CryptoHash, result: ShardUpdateResult) {
self.cache.put(key, result);
}

pub fn hits(&self) -> usize {
self.hits.get()
}

pub fn misses(&self) -> usize {
self.misses.get()
}

pub fn len(&self) -> usize {
self.cache.len()
}
}

type BlockApplyChunksResult =
(BlockToApply, Vec<(ShardId, CryptoHash, Result<ShardUpdateResult, Error>)>);

Expand Down Expand Up @@ -261,7 +306,7 @@ pub struct Chain {
apply_chunks_receiver: Receiver<BlockApplyChunksResult>,
/// Used to spawn the apply chunks jobs.
apply_chunks_spawner: Arc<dyn AsyncComputationSpawner>,
apply_chunk_results_cache: LruCache<CryptoHash, ShardUpdateResult>,
pub apply_chunk_results_cache: ApplyChunksResultCache,
/// Time when head was updated most recently.
last_time_head_updated: Instant,
/// Prevents re-application of known-to-be-invalid blocks, so that in case of a
Expand Down Expand Up @@ -414,9 +459,7 @@ impl Chain {
apply_chunks_sender: sc,
apply_chunks_receiver: rc,
apply_chunks_spawner: Arc::new(RayonAsyncComputationSpawner),
apply_chunk_results_cache: LruCache::new(
NonZeroUsize::new(APPLY_CHUNK_RESULTS_CACHE_SIZE).unwrap(),
),
apply_chunk_results_cache: ApplyChunksResultCache::new(APPLY_CHUNK_RESULTS_CACHE_SIZE),
last_time_head_updated: clock.now(),
invalid_blocks: LruCache::new(NonZeroUsize::new(INVALID_CHUNKS_POOL_SIZE).unwrap()),
pending_state_patch: Default::default(),
Expand Down Expand Up @@ -607,9 +650,7 @@ impl Chain {
apply_chunks_sender: sc,
apply_chunks_receiver: rc,
apply_chunks_spawner,
apply_chunk_results_cache: LruCache::new(
NonZeroUsize::new(APPLY_CHUNK_RESULTS_CACHE_SIZE).unwrap(),
),
apply_chunk_results_cache: ApplyChunksResultCache::new(APPLY_CHUNK_RESULTS_CACHE_SIZE),
last_time_head_updated: clock.now(),
pending_state_patch: Default::default(),
requested_state_parts: StateRequestTracker::new(),
Expand Down
18 changes: 16 additions & 2 deletions integration-tests/src/test_loop/tests/optimistic_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fn test_optimistic_block() {
&[],
);
let shard_layout = ShardLayout::multi_shard(3, 1);
let num_shards = shard_layout.num_shards() as usize;

let (genesis, epoch_config_store) = build_genesis_and_epoch_config_store(
GenesisAndEpochConfigParams {
Expand All @@ -48,8 +49,21 @@ fn test_optimistic_block() {
{
let chain =
&env.test_loop.data.get(&env.datas[0].client_sender.actor_handle()).client.chain;
println!("num_blocks: {}", chain.optimistic_block_chunks.num_blocks());
println!("num_chunks: {}", chain.optimistic_block_chunks.num_chunks());
// Under normal block processing, there can be only one optimistic
// block waiting to be processed.
assert!(chain.optimistic_block_chunks.num_blocks() <= 1);
// Under normal block processing, number of waiting chunks can't exceed
// delta between the highest block height and the final block height
// (normally 3), multiplied by the number of shards.
assert!(chain.optimistic_block_chunks.num_chunks() <= 3 * num_shards);
// There should be at least one optimistic block result in the cache.
assert!(chain.apply_chunk_results_cache.len() > 0);
// Optimistic block result should be used at least once.
assert!(chain.apply_chunk_results_cache.hits() > 0);
// Because there is no optimistic block distribution yet, there should
// be at least one miss for each shard.
// TODO: after distribution is implemented, this may change.
assert!(chain.apply_chunk_results_cache.misses() > 0);
}

env.shutdown_and_drain_remaining_events(Duration::seconds(20));
Expand Down

0 comments on commit 3f08e78

Please sign in to comment.