Skip to content

Commit

Permalink
chore: small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Sep 5, 2024
1 parent b92ccc5 commit 6f969e4
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 48 deletions.
38 changes: 14 additions & 24 deletions crates/rollup/src/driver/context/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use url::Url;
use super::{Blocks, ChainNotification, DriverContext};

/// The number of blocks to keep in the reorg cache.
/// Equivalent to 2 epochs at 32 slots/epoch.
const FINALIZATION_BLOCKS: u64 = 64;
/// Equivalent to 2 epochs at 32 slots/epoch on Ethereum Mainnet.
const FINALIZATION_TIMEOUT: u64 = 64;

#[allow(unused)]
#[derive(Debug)]
Expand All @@ -36,11 +36,8 @@ pub struct StandaloneContext {
/// We can safely prune all cached blocks below this tip once they
/// become finalized on L1.
processed_tip: BlockNumber,
/// The channel to send processed block numbers to the driver
processed_block_tx: mpsc::Sender<BlockNumber>,
/// The channel to receive processed block numbers from the driver
processed_block_rx: mpsc::Receiver<BlockNumber>,
/// Cache of blocks that might be reorged out
/// Cache of blocks that might be reorged out. In normal conditions,
/// this cache will not grow beyond [`FINALIZATION_TIMEOUT`] keys.
reorg_cache: BTreeMap<BlockNumber, HashMap<B256, Block<TxEnvelope>>>,
/// Handle to the background task that fetches and processes new blocks.
_handle: JoinHandle<()>,
Expand Down Expand Up @@ -137,17 +134,7 @@ impl StandaloneContext {
new_block_rx: mpsc::Receiver<Block<TxEnvelope>>,
_handle: JoinHandle<()>,
) -> Self {
let (processed_block_tx, processed_block_rx) = mpsc::channel(128);

Self {
new_block_rx,
_handle,
l1_tip: 0,
processed_tip: 0,
processed_block_tx,
processed_block_rx,
reorg_cache: BTreeMap::new(),
}
Self { new_block_rx, _handle, l1_tip: 0, processed_tip: 0, reorg_cache: BTreeMap::new() }
}
}

Expand All @@ -167,16 +154,19 @@ impl DriverContext for StandaloneContext {
if block_num <= self.l1_tip {
todo!("handle reorgs");
} else {
self.l1_tip = block_num;

// upon a new tip, prune the reorg cache for all blocks that have been finalized,
// as they are no longer candidates for reorgs.
self.reorg_cache.retain(|num, _| *num > block_num - FINALIZATION_BLOCKS);
self.reorg_cache.retain(|num, _| *num > block_num - FINALIZATION_TIMEOUT);
}

Some(ChainNotification::New { new_blocks: Blocks::from(block) })
}

fn send_processed_tip_event(&mut self, tip: BlockNumber) -> Result<(), SendError<BlockNumber>> {
self.processed_block_tx.try_send(tip).map_err(|_| SendError(tip))
self.processed_tip = tip;
Ok(())
}
}

Expand Down Expand Up @@ -265,12 +255,12 @@ mod tests {
.insert(new_block.header.hash.unwrap(), new_block.clone());

// Manually call the pruning logic
ctx.reorg_cache.retain(|num, _| *num > 101 - FINALIZATION_BLOCKS);
ctx.reorg_cache.retain(|num, _| *num > 101 - FINALIZATION_TIMEOUT);

// Check that only the last FINALIZATION_BLOCKS are kept
assert_eq!(ctx.reorg_cache.len(), FINALIZATION_BLOCKS as usize);
assert!(ctx.reorg_cache.contains_key(&(101 - FINALIZATION_BLOCKS + 1)));
assert!(!ctx.reorg_cache.contains_key(&(101 - FINALIZATION_BLOCKS)));
assert_eq!(ctx.reorg_cache.len(), FINALIZATION_TIMEOUT as usize);
assert!(ctx.reorg_cache.contains_key(&(101 - FINALIZATION_TIMEOUT + 1)));
assert!(!ctx.reorg_cache.contains_key(&(101 - FINALIZATION_TIMEOUT)));
}

#[tokio::test]
Expand Down
47 changes: 25 additions & 22 deletions crates/rollup/src/driver/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::collections::{BTreeMap, VecDeque};

use hashbrown::HashMap;
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};

use kona_primitives::{BlockInfo, L2BlockInfo};
use superchain_registry::RollupConfig;

/// A cursor that keeps track of the L2 tip block for a given L1 origin block.
///
Expand All @@ -12,6 +16,8 @@ pub struct SyncCursor {
/// The block cache capacity before evicting old entries
/// (to avoid unbounded memory growth)
capacity: usize,
/// The channel timeout for the [RollupConfig] used to create the cursor.
channel_timeout: u64,
/// The L1 origin block numbers for which we have an L2 block in the cache.
/// Used to keep track of the order of insertion and evict the oldest entry.
l1_origin_key_order: VecDeque<u64>,
Expand All @@ -21,21 +27,15 @@ pub struct SyncCursor {
l1_origin_to_l2_blocks: BTreeMap<u64, L2BlockInfo>,
}

#[allow(unused)]
impl SyncCursor {
/// Create a new cursor with the default cache capacity.
pub fn new() -> Self {
// NOTE: this value must be greater than the `CHANNEL_TIMEOUT` to allow
// for derivation to proceed through a deep reorg. This value is set
// to 300 blocks before the Granite hardfork and 50 blocks after it.
// Ref: <https://specs.optimism.io/protocol/derivation.html#timeouts>
Self::with_capacity(350)
}

/// Create a new cursor with the given cache capacity.
fn with_capacity(capacity: usize) -> Self {
pub fn new(channel_timeout: u64) -> Self {
Self {
capacity,
// NOTE: capacity must be greater than the `channel_timeout` to allow
// for derivation to proceed through a deep reorg.
// Ref: <https://specs.optimism.io/protocol/derivation.html#timeouts>
capacity: channel_timeout + 5,
channel_timeout,
l1_origin_key_order: VecDeque::with_capacity(capacity),
l1_origin_block_info: HashMap::with_capacity(capacity),
l1_origin_to_l2_blocks: BTreeMap::new(),
Expand Down Expand Up @@ -66,22 +66,25 @@ impl SyncCursor {
self.l1_origin_to_l2_blocks.insert(l1_origin_block.number, l2_tip_block);
}

/// When the L1 undergoes a reorg, we need to reset the cursor to the fork block.
/// This is the last L1 block for which we have a corresponding L2 block in the cache.
/// When the L1 undergoes a reorg, we need to reset the cursor to the fork block minus
/// the channel timeout, because an L2 block might have started to be derived at the
/// beginning of the channel.
///
/// Returns the (L2 block info, L1 origin block info) tuple for the new cursor state.
pub fn reset(&mut self, fork_block: u64) -> (BlockInfo, BlockInfo) {
match self.l1_origin_to_l2_blocks.get(&fork_block) {
let channel_start = fork_block - self.channel_timeout;

match self.l1_origin_to_l2_blocks.get(&channel_start) {
Some(l2_safe_tip) => {
// The fork block is in the cache, we can use it to reset the cursor.
(l2_safe_tip.block_info, self.l1_origin_block_info[&fork_block])
// The channel start block is in the cache, we can use it to reset the cursor.
(l2_safe_tip.block_info, self.l1_origin_block_info[&channel_start])
}
None => {
// If the fork block is not in the cache, we reset the cursor
// to the last known L1 block for which we have a corresponding L2 block.
// If the channel start block is not in the cache, we reset the cursor
// to the closest known L1 block for which we have a corresponding L2 block.
let (last_l1_known_tip, l2_known_tip) = self
.l1_origin_to_l2_blocks
.range(..=fork_block)
.range(..=channel_start)
.next_back()
.expect("walked back to genesis without finding anchor origin block");

Expand Down
4 changes: 2 additions & 2 deletions crates/rollup/src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
chain_provider: cp,
blob_provider: bp,
l2_chain_provider: l2_cp,
cursor: SyncCursor::new(),
cursor: SyncCursor::new(cfg.channel_timeout),
}
}
}
Expand All @@ -78,7 +78,7 @@ impl Driver<StandaloneContext, AlloyChainProvider, DurableBlobProvider, AlloyL2C
chain_provider: cp,
blob_provider: bp,
l2_chain_provider: l2_cp,
cursor: SyncCursor::new(),
cursor: SyncCursor::new(cfg.channel_timeout),
}
}
}
Expand Down

0 comments on commit 6f969e4

Please sign in to comment.