Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm committed Feb 3, 2025
1 parent 423ded8 commit da16b54
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 86 deletions.
4 changes: 2 additions & 2 deletions chain/chain/src/block_processing_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl BlocksInProcessing {
block: Block,
preprocess_info: BlockPreprocessInfo,
) -> Result<(), AddError> {
self.add_dry_run(&BlockToApply::Normal(block.hash().clone()))?;
self.add_dry_run(&BlockToApply::Normal(*block.hash()))?;

self.preprocessed_blocks.insert(*block.hash(), (block, preprocess_info));
Ok(())
Expand All @@ -120,7 +120,7 @@ impl BlocksInProcessing {
block: OptimisticBlock,
preprocess_info: OptimisticBlockInfo,
) -> Result<(), AddError> {
self.add_dry_run(&BlockToApply::Optimistic(block.hash().clone()))?;
self.add_dry_run(&BlockToApply::Optimistic(*block.hash()))?;

self.optimistic_blocks.insert(*block.hash(), (block, preprocess_info));
Ok(())
Expand Down
22 changes: 19 additions & 3 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ pub fn check_known(
if block_hash == &head.last_block_hash || block_hash == &head.prev_block_hash {
return Ok(Err(BlockKnownError::KnownInHead));
}
if chain.blocks_in_processing.contains(&BlockToApply::Normal(block_hash)) {
if chain.blocks_in_processing.contains(&BlockToApply::Normal(*block_hash)) {
return Ok(Err(BlockKnownError::KnownInProcessing));
}
// Check if this block is in the set of known orphans.
Expand Down Expand Up @@ -2162,6 +2162,8 @@ impl Chain {
Ok(new_head) => new_head,
};

self.update_optimistic_blocks_pool(&block)?;

let epoch_id = block.header().epoch_id();
let mut shards_cares_this_or_next_epoch = vec![];
for shard_id in self.epoch_manager.shard_ids(epoch_id)? {
Expand Down Expand Up @@ -2367,6 +2369,20 @@ impl Chain {
Ok(())
}

/// If `block` is committed, the `last_final_block(block)` is final.
/// Thus it is enough to keep only chunks which are built on top of block
/// with `height(last_final_block(block))` or higher.
fn update_optimistic_blocks_pool(&mut self, block: &Block) -> Result<(), Error> {
let final_block = block.header().last_final_block();
if final_block == &CryptoHash::default() {
return Ok(());
}

let final_block_height = self.chain_store.get_block_header(final_block)?.height();
self.optimistic_block_chunks.update_minimal_base_height(final_block_height);
Ok(())
}

/// Preprocess a block before applying chunks, verify that we have the necessary information
/// to process the block and the block is valid.
/// Note that this function does NOT introduce any changes to chain state.
Expand All @@ -2383,7 +2399,7 @@ impl Chain {
let header = block.header();

// see if the block is already in processing or if there are too many blocks being processed
self.blocks_in_processing.add_dry_run(&BlockToApply::Normal(block.hash().clone()))?;
self.blocks_in_processing.add_dry_run(&BlockToApply::Normal(*block.hash()))?;

debug!(target: "chain", height=header.height(), num_approvals = header.num_approvals(), "preprocess_block");

Expand Down Expand Up @@ -4576,7 +4592,7 @@ impl Chain {
/// Check if hash is for a block that is being processed
#[inline]
pub fn is_in_processing(&self, hash: &CryptoHash) -> bool {
self.blocks_in_processing.contains(&BlockToApply::Normal(hash.clone()))
self.blocks_in_processing.contains(&BlockToApply::Normal(*hash))
}

#[inline]
Expand Down
179 changes: 141 additions & 38 deletions chain/chain/src/missing_chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,15 @@ impl<Block: BlockLike> MissingChunksPool<Block> {
}
}

/// Stores chunks for some optimistic block received so far.
#[derive(Debug, Default)]
struct OptimisticBlockChunks {
/// Number of remaining chunks to be received. Stored to avoid naive
/// recomputation.
remaining_chunks: usize,
/// Height of the previous block.
prev_block_height: BlockHeight,
/// Stores chunks for each shard, if received.
chunks: Vec<Option<ShardChunkHeader>>,
}

Expand All @@ -202,36 +207,52 @@ impl OptimisticBlockChunks {
Self { remaining_chunks: num_shards, prev_block_height, chunks: vec![None; num_shards] }
}
}
/// Stores optimistic blocks which are waiting for chunks to be received.
/// Stores optimistic blocks and chunks which are waiting to be processed.
///
/// Once block and all chunks on top of the same previous block are received,
/// it can provide the optimistic block to be processed.
#[derive(Debug, Default)]
pub struct OptimisticBlockChunksPool {
/// Strict tip. Everything dies before this height.
final_tip: BlockHeight,
/// Optimistic blocks tip. Softer to ensure unique optimistic block processing.
blocks_tip: BlockHeight,
/// All blocks and chunks built on top of blocks *smaller* than this height
/// can be discarded. Needed to garbage collect old data in case of forks
/// which never can get finalized.
minimal_base_height: BlockHeight,
/// Optimistic block with heights *not greater* than this height are discarded.
/// Needed on top of `minimal_base_height` as well to ensure that only one
/// optimistic block on each height can be processed.
block_height_threshold: BlockHeight,
/// Maps previous block hash to the received optimistic block with the
/// highest height on top of it.
blocks: HashMap<CryptoHash, OptimisticBlock>,
/// Maps previous block hash to the vector of chunk headers corresponding
/// to complete chunks.
chunks: HashMap<CryptoHash, OptimisticBlockChunks>,
/// Result.
/// Optimistic block with the largest height which is ready to process.
/// Block and chunks must correspond to the same previous block hash.
latest_ready_block: Option<(OptimisticBlock, Vec<ShardChunkHeader>)>,
}

impl OptimisticBlockChunksPool {
pub fn new() -> Self {
Self {
final_tip: 0,
blocks_tip: 0,
minimal_base_height: 0,
block_height_threshold: 0,
blocks: Default::default(),
chunks: Default::default(),
latest_ready_block: None,
}
}

pub fn num_blocks(&self) -> usize {
self.blocks.len()
}

pub fn num_chunks(&self) -> usize {
self.chunks.len()
}

pub fn add_block(&mut self, block: OptimisticBlock) {
if block.height() <= self.blocks_tip {
if block.height() <= self.block_height_threshold {
return;
}

Expand All @@ -240,13 +261,10 @@ impl OptimisticBlockChunksPool {
self.update_latest_ready_block(&prev_block_hash);
}

pub fn accept_chunk(
&mut self,
shard_layout: &ShardLayout,
prev_block_height: BlockHeight,
chunk_header: ShardChunkHeader,
) {
if prev_block_height <= self.final_tip {
pub fn add_chunk(&mut self, shard_layout: &ShardLayout, chunk_header: ShardChunkHeader) {
// We assume that `chunk_header.height_created() = prev_block_height + 1`.
let prev_block_height = chunk_header.height_created().saturating_sub(1);
if prev_block_height < self.minimal_base_height {
return;
}

Expand Down Expand Up @@ -284,10 +302,14 @@ impl OptimisticBlockChunksPool {
}
}

/// Takes the latest optimistic block and chunks which are ready to
/// be processed.
pub fn take_latest_ready_block(&mut self) -> Option<(OptimisticBlock, Vec<ShardChunkHeader>)> {
self.latest_ready_block.take()
}

/// If the optimistic block on top of `prev_block_hash` is ready to
/// process, sets the latest ready block to it.
fn update_latest_ready_block(&mut self, prev_block_hash: &CryptoHash) {
let Some(chunks) = self.chunks.get(prev_block_hash) else {
return;
Expand All @@ -298,7 +320,7 @@ impl OptimisticBlockChunksPool {
let Some(block) = self.blocks.remove(prev_block_hash) else {
return;
};
if block.height() <= self.blocks_tip {
if block.height() <= self.block_height_threshold {
return;
}

Expand All @@ -319,31 +341,17 @@ impl OptimisticBlockChunksPool {
chunk
})
.collect();
self.update_blocks_tip(block.height());
self.update_block_height_threshold(block.height());
self.latest_ready_block = Some((block, chunks));
}

pub fn update_tip(&mut self, tip: BlockHeight) {
self.update_blocks_tip(tip);

self.final_tip = std::cmp::max(self.final_tip, tip);
let hashes_to_remove: Vec<_> = self
.chunks
.iter()
.filter(|(_, h)| h.prev_block_height <= self.final_tip)
.map(|(h, _)| *h)
.collect();
for h in hashes_to_remove {
self.chunks.remove(&h);
}
}

pub fn update_blocks_tip(&mut self, blocks_tip: BlockHeight) {
self.blocks_tip = std::cmp::max(self.blocks_tip, blocks_tip);
/// Updates the block height threshold and cleans up old blocks.
pub fn update_block_height_threshold(&mut self, height: BlockHeight) {
self.block_height_threshold = std::cmp::max(self.block_height_threshold, height);
let hashes_to_remove: Vec<_> = self
.blocks
.iter()
.filter(|(_, h)| h.height() <= self.blocks_tip)
.filter(|(_, h)| h.height() <= self.block_height_threshold)
.map(|(h, _)| *h)
.collect();
for h in hashes_to_remove {
Expand All @@ -353,14 +361,32 @@ impl OptimisticBlockChunksPool {
let Some((block, _)) = &self.latest_ready_block else {
return;
};
if block.height() <= self.blocks_tip {
if block.height() <= self.block_height_threshold {
self.latest_ready_block = None;
}
}

/// Updates the minimal base height and cleans up old blocks and chunks.
pub fn update_minimal_base_height(&mut self, height: BlockHeight) {
// If *new* chunks must be built on top of *at least* this height,
// optimistic block height must be *strictly greater* than this height.
self.update_block_height_threshold(height);

self.minimal_base_height = std::cmp::max(self.minimal_base_height, height);
let hashes_to_remove: Vec<_> = self
.chunks
.iter()
.filter(|(_, h)| h.prev_block_height < self.minimal_base_height)
.map(|(h, _)| *h)
.collect();
for h in hashes_to_remove {
self.chunks.remove(&h);
}
}
}

#[cfg(test)]
mod test {
mod missing_chunks_test {
use super::{BlockHash, BlockLike, MissingChunksPool, MAX_BLOCKS_MISSING_CHUNKS};
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::sharding::ChunkHash;
Expand Down Expand Up @@ -465,3 +491,80 @@ mod test {
assert!(pool.contains(&later_block_hash));
}
}

#[cfg(test)]
mod optimistic_block_chunks_pool_test {
use super::{OptimisticBlock, OptimisticBlockChunksPool, ShardChunkHeader};
use itertools::Itertools;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::types::ShardId;

#[test]
fn test_add_block() {
let mut pool = OptimisticBlockChunksPool::new();
let prev_hash = CryptoHash::default();
let block = OptimisticBlock::new_dummy(1, prev_hash);

pool.add_block(block);
assert_eq!(pool.num_blocks(), 1);
assert!(pool.blocks.contains_key(&prev_hash));
}

#[test]
fn test_add_chunk() {
let mut pool = OptimisticBlockChunksPool::new();
let prev_hash = CryptoHash::default();
let shard_layout = ShardLayout::single_shard();
let chunk_header = ShardChunkHeader::new_dummy(0, ShardId::new(0), prev_hash);

pool.add_chunk(&shard_layout, chunk_header);
assert_eq!(pool.num_chunks(), 1);
assert!(pool.chunks.contains_key(&prev_hash));
}

#[test]
fn test_ready_block() {
let mut pool = OptimisticBlockChunksPool::new();
let prev_hash = CryptoHash::default();
let block = OptimisticBlock::new_dummy(1, prev_hash);
let shard_layout = ShardLayout::multi_shard(2, 3);
let shard_ids = shard_layout.shard_ids().collect_vec();

let chunk_header = ShardChunkHeader::new_dummy(0, shard_ids[0], prev_hash);
pool.add_block(block);
pool.add_chunk(&shard_layout, chunk_header);

pool.update_latest_ready_block(&prev_hash);
assert!(pool.take_latest_ready_block().is_none());

let new_chunk_header = ShardChunkHeader::new_dummy(0, shard_ids[1], prev_hash);
pool.add_chunk(&shard_layout, new_chunk_header);

assert!(pool.take_latest_ready_block().is_some());
assert!(pool.take_latest_ready_block().is_none());
}

#[test]
fn test_thresholds() {
let mut pool = OptimisticBlockChunksPool::new();
pool.block_height_threshold = 10;
pool.minimal_base_height = 5;

let prev_hash = CryptoHash::default();
let block_below_threshold = OptimisticBlock::new_dummy(10, prev_hash);
pool.add_block(block_below_threshold);
assert_eq!(pool.num_blocks(), 0, "Block below threshold should not be added");

let shard_layout = ShardLayout::single_shard();
let chunk_header_below_threshold =
ShardChunkHeader::new_dummy(5, ShardId::new(0), prev_hash);
pool.add_chunk(&shard_layout, chunk_header_below_threshold);
assert_eq!(pool.num_chunks(), 0, "Chunk strictly below threshold should not be added");

let chunk_header_passing_threshold =
ShardChunkHeader::new_dummy(6, ShardId::new(0), prev_hash);
pool.add_chunk(&shard_layout, chunk_header_passing_threshold);
assert_eq!(pool.num_chunks(), 1);
}
}
2 changes: 1 addition & 1 deletion chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub fn wait_for_all_blocks_in_processing(chain: &Chain) -> bool {
}

pub fn is_block_in_processing(chain: &Chain, block_hash: &CryptoHash) -> bool {
chain.blocks_in_processing.contains(&BlockToApply::Normal(block_hash.clone()))
chain.blocks_in_processing.contains(&BlockToApply::Normal(*block_hash))
}

pub fn wait_for_block_in_processing(
Expand Down
8 changes: 1 addition & 7 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1631,13 +1631,7 @@ impl Client {
.expect("Could not persist chunk");
// We're marking chunk as accepted.
self.chain.blocks_with_missing_chunks.accept_chunk(&chunk_header.chunk_hash());
// TODO: ugly assumption on height_created
let prev_block_height = chunk_header.height_created().saturating_sub(1);
self.chain.optimistic_block_chunks.accept_chunk(
&shard_layout,
prev_block_height,
chunk_header,
);
self.chain.optimistic_block_chunks.add_chunk(&shard_layout, chunk_header);
// If this was the last chunk that was missing for a block, it will be processed now.
self.process_blocks_with_missing_chunks(apply_chunks_done_sender, &signer);
self.maybe_process_optimistic_block();
Expand Down
Loading

0 comments on commit da16b54

Please sign in to comment.