From 0b49fd1a817d9a71db4c870dce64768a479cb5ad Mon Sep 17 00:00:00 2001 From: Bibek Pandey Date: Mon, 9 Dec 2024 16:10:28 +0545 Subject: [PATCH] btcio/query: Rescan recent blocks if filter rules changed in an epoch --- bin/strata-client/src/extractor.rs | 6 +- crates/btcio/src/reader/query.rs | 112 +++++++++++++++++++++-- crates/btcio/src/reader/state.rs | 52 ++++++++++- crates/consensus-logic/src/l1_handler.rs | 12 +-- crates/db/src/traits.rs | 9 +- crates/primitives/src/l1.rs | 22 +++-- crates/primitives/src/params.rs | 2 +- crates/primitives/src/sorted_vec.rs | 2 +- crates/rocksdb-store/src/l1/db.rs | 17 ++-- crates/rocksdb-store/src/l1/schemas.rs | 4 +- crates/state/src/chain_state.rs | 4 + crates/status/src/status_manager.rs | 4 + crates/tx-parser/src/filter.rs | 4 +- crates/tx-parser/src/filter_types.rs | 2 +- crates/tx-parser/src/messages.rs | 6 +- 15 files changed, 207 insertions(+), 51 deletions(-) diff --git a/bin/strata-client/src/extractor.rs b/bin/strata-client/src/extractor.rs index 83ee86a1c..3188297a8 100644 --- a/bin/strata-client/src/extractor.rs +++ b/bin/strata-client/src/extractor.rs @@ -199,7 +199,9 @@ mod tests { use strata_primitives::{ bridge::OperatorIdx, buf::Buf32, - l1::{BitcoinAmount, L1BlockManifest, L1TxProof, OutputRef, XOnlyPk}, + l1::{ + BitcoinAmount, EpochedL1BlockManifest, L1BlockManifest, L1TxProof, OutputRef, XOnlyPk, + }, }; use strata_rocksdb::{test_utils::get_rocksdb_tmp_instance, L1Db}; use strata_state::{ @@ -381,7 +383,7 @@ mod tests { }) .collect(); - let mf: L1BlockManifest = arb.generate(); + let mf: EpochedL1BlockManifest = arb.generate(); // Insert block data let res = l1_db.put_block_data(idx, mf.clone(), txs.clone()); diff --git a/crates/btcio/src/reader/query.rs b/crates/btcio/src/reader/query.rs index edb0cc4d1..177912038 100644 --- a/crates/btcio/src/reader/query.rs +++ b/crates/btcio/src/reader/query.rs @@ -5,8 +5,8 @@ use std::{ }; use anyhow::bail; -use bitcoin::{hashes::Hash, BlockHash}; -use strata_primitives::buf::Buf32; +use bitcoin::{hashes::Hash, Block, BlockHash}; +use strata_primitives::{buf::Buf32, l1::Epoch}; use strata_state::l1::{ get_btc_params, get_difficulty_adjustment_height, BtcParams, HeaderVerificationState, L1BlockId, TimestampStore, @@ -21,7 +21,10 @@ use tokio::sync::mpsc; use tracing::*; use crate::{ - reader::{config::ReaderConfig, state::ReaderState}, + reader::{ + config::ReaderConfig, + state::{EpochFilterConfig, ReaderState}, + }, rpc::traits::Reader, status::{apply_status_updates, L1StatusUpdate}, }; @@ -73,6 +76,8 @@ async fn do_reader_task( let cur_best_height = state.best_block_idx(); let poll_span = debug_span!("l1poll", %cur_best_height); + check_and_update_epoch_scan_rules_change(&ctx, &mut state).await?; + if let Err(err) = poll_for_new_blocks(&ctx, &mut state, &mut status_updates) .instrument(poll_span) .await @@ -105,6 +110,68 @@ async fn do_reader_task( } } +/// Checks for epoch and scan rule update. If so, rescan recent blocks to account for possibly +/// missed relevant txs. +async fn check_and_update_epoch_scan_rules_change( + ctx: &ReaderContext, + state: &mut ReaderState, +) -> anyhow::Result<()> { + let latest_epoch = ctx.status_channel.epoch().unwrap_or(0); + // TODO: check if latest_epoch < current epoch. should panic if so? + let curr_epoch = match state.epoch() { + Epoch::Exact(e) => *e, + Epoch::AtTransition(_from, to) => *to, + }; + let is_new_epoch = curr_epoch == latest_epoch; + let curr_filter_config = match state.filter_config() { + EpochFilterConfig::AtEpoch(c) => c.clone(), + EpochFilterConfig::AtTransition(_, c) => c.clone(), + }; + if is_new_epoch { + // TODO: pass in chainstate to `derive_from` + let new_config = TxFilterConfig::derive_from(ctx.config.params.rollup())?; + + // If filter rule has changed, first revert recent scanned blocks and rescan them with new + // rule + if new_config != curr_filter_config { + // Send L1 revert so that the recent txs can be appropriately re-filtered + let revert_ev = + L1Event::RevertTo(state.best_block_idx() - state.recent_blocks().len() as u64); + if ctx.event_tx.send(revert_ev).await.is_err() { + warn!("unable to submit L1 reorg event, did persistence task exit?"); + } + + state.set_filter_config(EpochFilterConfig::AtTransition( + curr_filter_config, + new_config, + )); + state.set_epoch(Epoch::AtTransition(curr_epoch, latest_epoch)); + + rescan_recent_blocks(ctx, state).await?; + } else { + state.set_filter_config(EpochFilterConfig::AtEpoch(curr_filter_config)); + state.set_epoch(Epoch::Exact(curr_epoch)); + } + } + // TODO: anything else? + Ok(()) +} + +async fn rescan_recent_blocks( + ctx: &ReaderContext, + state: &mut ReaderState, +) -> anyhow::Result<()> { + let recent_len = state.recent_blocks().len() as u64; + let mut status_updates: Vec = Vec::new(); // TODO: is this necessary + let recent = state.recent_blocks().to_vec(); + for (i, blk_hash) in recent.iter().enumerate() { + let height = state.best_block_idx() - recent_len + i as u64; + let block = ctx.client.get_block(blk_hash).await?; + process_block(ctx, state, &mut status_updates, height, block).await?; + } + Ok(()) +} + /// Inits the reader state by trying to backfill blocks up to a target height. async fn init_reader_state( ctx: &ReaderContext, @@ -140,7 +207,16 @@ async fn init_reader_state( real_cur_height = height; } - let state = ReaderState::new(real_cur_height + 1, lookback, init_queue); + let params = ctx.config.params.clone(); + let filter_config = TxFilterConfig::derive_from(params.rollup())?; + let epoch = ctx.status_channel.epoch().unwrap_or(0); + let state = ReaderState::new( + real_cur_height + 1, + lookback, + init_queue, + EpochFilterConfig::AtEpoch(filter_config), // TODO: decide if it is transitional or exact + Epoch::Exact(epoch), // TODO: decide if it is transitional or exact + ); Ok(state) } @@ -229,11 +305,28 @@ async fn fetch_and_process_block( status_updates: &mut Vec, ) -> anyhow::Result { let block = ctx.client.get_block_at(height).await?; + process_block(ctx, state, status_updates, height, block).await +} + +async fn process_block( + ctx: &ReaderContext, + state: &mut ReaderState, + status_updates: &mut Vec, + height: u64, + block: Block, +) -> anyhow::Result { let txs = block.txdata.len(); let params = ctx.config.params.clone(); - let filter_config = TxFilterConfig::derive_from(params.rollup())?; - let filtered_txs = filter_protocol_op_tx_refs(&block, filter_config); + let filtered_txs = match state.filter_config() { + EpochFilterConfig::AtEpoch(config) => filter_protocol_op_tx_refs(&block, config), + EpochFilterConfig::AtTransition(from_config, to_config) => { + let mut txs = filter_protocol_op_tx_refs(&block, from_config); + let txs1 = filter_protocol_op_tx_refs(&block, to_config); + txs.extend_from_slice(&txs1); + txs + } + }; let block_data = BlockData::new(height, block, filtered_txs); let l1blkid = block_data.block().block_hash(); trace!(%height, %l1blkid, %txs, "fetched block from client"); @@ -264,7 +357,12 @@ async fn fetch_and_process_block( } } - if let Err(e) = ctx.event_tx.send(L1Event::BlockData(block_data)).await { + // TODO: probably need to send the current epoch as well + if let Err(e) = ctx + .event_tx + .send(L1Event::BlockData(block_data, state.epoch().clone())) + .await + { error!("failed to submit L1 block event, did the persistence task crash?"); return Err(e.into()); } diff --git a/crates/btcio/src/reader/state.rs b/crates/btcio/src/reader/state.rs index 761107335..da604b6b8 100644 --- a/crates/btcio/src/reader/state.rs +++ b/crates/btcio/src/reader/state.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; use bitcoin::BlockHash; +use strata_primitives::l1::Epoch; +use strata_tx_parser::filter::TxFilterConfig; /// State we use in various parts of the reader. #[derive(Debug)] @@ -13,17 +15,44 @@ pub struct ReaderState { /// Depth at which we start pulling recent blocks out of the front of the queue. max_depth: usize, + + /// Current transaction filtering config + filter_config: EpochFilterConfig, + + /// Current epoch + epoch: Epoch, +} + +/// Filter config that can be exact or transitioning from one epoch to another. +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum EpochFilterConfig { + /// Config at a particular epoch + AtEpoch(TxFilterConfig), + /// Configs at transitioning epochs(from, to). + /// NOTE: The assumption is that we don't have to consider for more than two transitioning + /// configs + AtTransition(TxFilterConfig, TxFilterConfig), } impl ReaderState { /// Constructs a new reader state instance using some context about how we /// want to manage it. - pub fn new(next_height: u64, max_depth: usize, recent_blocks: VecDeque) -> Self { + // FIXME: the args list is getting long + pub fn new( + next_height: u64, + max_depth: usize, + recent_blocks: VecDeque, + filter_config: EpochFilterConfig, + epoch: Epoch, + ) -> Self { assert!(!recent_blocks.is_empty()); Self { next_height, max_depth, recent_blocks, + filter_config, + epoch, } } @@ -31,6 +60,19 @@ impl ReaderState { self.next_height } + pub fn recent_blocks(&self) -> &[BlockHash] { + // TODO: is this correct? + self.recent_blocks.as_slices().0 + } + + pub fn epoch(&self) -> &Epoch { + &self.epoch + } + + pub fn set_epoch(&mut self, epoch: Epoch) { + self.epoch = epoch; + } + pub fn best_block(&self) -> &BlockHash { self.recent_blocks.back().unwrap() } @@ -39,6 +81,14 @@ impl ReaderState { self.next_height - 1 } + pub fn filter_config(&self) -> &EpochFilterConfig { + &self.filter_config + } + + pub fn set_filter_config(&mut self, filter_config: EpochFilterConfig) { + self.filter_config = filter_config; + } + /// Returns the idx of the deepest block in the reader state. #[allow(unused)] fn deepest_block(&self) -> u64 { diff --git a/crates/consensus-logic/src/l1_handler.rs b/crates/consensus-logic/src/l1_handler.rs index 0ce04636c..a06950cab 100644 --- a/crates/consensus-logic/src/l1_handler.rs +++ b/crates/consensus-logic/src/l1_handler.rs @@ -10,7 +10,7 @@ use strata_db::traits::{Database, L1Database}; use strata_primitives::{ block_credential::CredRule, buf::Buf32, - l1::{L1BlockManifest, L1BlockManifestWithScanRule, L1TxProof}, + l1::{Epoch, EpochedL1BlockManifest, L1BlockManifest, L1TxProof}, params::{Params, RollupParams}, vk::RollupVerifyingKey, }; @@ -80,7 +80,7 @@ where Ok(()) } - L1Event::BlockData(blockdata) => { + L1Event::BlockData(blockdata, epoch) => { let height = blockdata.block_num(); // Bail out fast if we don't have to care. @@ -92,8 +92,7 @@ where let l1blkid = blockdata.block().block_hash(); - // TODO; pass in epoch and scan rule reference - let manifest = generate_block_manifest(blockdata.block()); + let manifest = generate_block_manifest(blockdata.block(), epoch); let l1txs: Vec<_> = generate_l1txs(&blockdata); let num_txs = l1txs.len(); l1db.put_block_data(blockdata.block_num(), manifest, l1txs.clone())?; @@ -193,7 +192,7 @@ pub fn verify_proof(checkpoint: &BatchCheckpoint, rollup_params: &RollupParams) /// Given a block, generates a manifest of the parts we care about that we can /// store in the database. -fn generate_block_manifest(block: &Block) -> L1BlockManifestWithScanRule { +fn generate_block_manifest(block: &Block, epoch: Epoch) -> EpochedL1BlockManifest { let blockid = Buf32::from(block.block_hash().to_raw_hash().to_byte_array()); let root = block .witness_root() @@ -202,8 +201,7 @@ fn generate_block_manifest(block: &Block) -> L1BlockManifestWithScanRule { let header = serialize(&block.header); let mf = L1BlockManifest::new(blockid, header, Buf32::from(root)); - let scan_rules = vec![]; // TODO: get rules here - L1BlockManifestWithScanRule::new(mf, scan_rules) + EpochedL1BlockManifest::new(mf, epoch) } fn generate_l1txs(blockdata: &BlockData) -> Vec { diff --git a/crates/db/src/traits.rs b/crates/db/src/traits.rs index 04451a75c..a04ebee7d 100644 --- a/crates/db/src/traits.rs +++ b/crates/db/src/traits.rs @@ -42,12 +42,7 @@ pub trait L1Database { /// Atomically extends the chain with a new block, providing the manifest /// and a list of transactions we find relevant. Returns error if /// provided out-of-order. - fn put_block_data( - &self, - idx: u64, - mf: L1BlockManifestWithScanRule, - txs: Vec, - ) -> DbResult<()>; + fn put_block_data(&self, idx: u64, mf: EpochedL1BlockManifest, txs: Vec) -> DbResult<()>; /// Stores an MMR checkpoint so we have to query less far back. If the /// provided height does not match the entries in the MMR, will return an @@ -64,7 +59,7 @@ pub trait L1Database { fn get_chain_tip(&self) -> DbResult>; /// Gets the block manifest for a block index. - fn get_block_manifest(&self, idx: u64) -> DbResult>; + fn get_block_manifest(&self, idx: u64) -> DbResult>; /// Returns a half-open interval of block hashes, if we have all of them /// present. Otherwise, returns error. diff --git a/crates/primitives/src/l1.rs b/crates/primitives/src/l1.rs index 5f0162617..b2df1c1b5 100644 --- a/crates/primitives/src/l1.rs +++ b/crates/primitives/src/l1.rs @@ -118,17 +118,25 @@ impl L1TxProof { /// Includes `L1BlockManifest` along with scan rules that it is applied to #[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] -pub struct L1BlockManifestWithScanRule { +pub struct EpochedL1BlockManifest { /// The actual l1 manifest manifest: L1BlockManifest, - /// Rules with which the corresponding block is scaned - // FIXME: use non-empty vec - rules: Vec, + /// Epoch, whose scan rules are applied to this manifest + epoch: Epoch, } -impl L1BlockManifestWithScanRule { - pub fn new(manifest: L1BlockManifest, rules: Vec) -> Self { - Self { manifest, rules } +/// Epoch that is either exact or transitioning from one to another. +#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)] +pub enum Epoch { + /// Exact epoch + Exact(u64), + /// Transitioning Epoch(from, to) + AtTransition(u64, u64), +} + +impl EpochedL1BlockManifest { + pub fn new(manifest: L1BlockManifest, epoch: Epoch) -> Self { + Self { manifest, epoch } } pub fn header(&self) -> &[u8] { diff --git a/crates/primitives/src/params.rs b/crates/primitives/src/params.rs index 5ddcaf1d1..39ed4d8cd 100644 --- a/crates/primitives/src/params.rs +++ b/crates/primitives/src/params.rs @@ -130,7 +130,7 @@ impl RollupParams { } /// Configuration common among deposit and deposit request transaction -#[derive(Clone, Debug, PartialEq, BorshSerialize, BorshDeserialize, Deserialize, Serialize)] +#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Deserialize, Serialize)] pub struct DepositTxParams { /// Magic bytes we use to regonize a deposit with. pub magic_bytes: Vec, diff --git a/crates/primitives/src/sorted_vec.rs b/crates/primitives/src/sorted_vec.rs index 9f1e3ad7f..0ea042233 100644 --- a/crates/primitives/src/sorted_vec.rs +++ b/crates/primitives/src/sorted_vec.rs @@ -3,7 +3,7 @@ use std::cmp::Ordering; use borsh::{BorshDeserialize, BorshSerialize}; /// A vector wrapper that ensures the elements are sorted -#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)] +#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] pub struct SortedVec { inner: Vec, } diff --git a/crates/rocksdb-store/src/l1/db.rs b/crates/rocksdb-store/src/l1/db.rs index f720144c9..5ef9dfe4f 100644 --- a/crates/rocksdb-store/src/l1/db.rs +++ b/crates/rocksdb-store/src/l1/db.rs @@ -8,7 +8,7 @@ use strata_db::{errors::DbError, traits::*, DbResult}; use strata_mmr::CompactMmr; use strata_primitives::{ buf::Buf32, - l1::{L1BlockManifestWithScanRule, L1TxRef}, + l1::{EpochedL1BlockManifest, L1TxRef}, }; use strata_state::l1::L1Tx; use tracing::*; @@ -42,12 +42,7 @@ impl L1Db { } impl L1Database for L1Db { - fn put_block_data( - &self, - idx: u64, - mf: L1BlockManifestWithScanRule, - txs: Vec, - ) -> DbResult<()> { + fn put_block_data(&self, idx: u64, mf: EpochedL1BlockManifest, txs: Vec) -> DbResult<()> { // If there is latest block then expect the idx to be 1 greater than the block number, else // allow arbitrary block number to be inserted match self.get_latest_block_number()? { @@ -184,7 +179,7 @@ impl L1Database for L1Db { Ok(res) } - fn get_block_manifest(&self, idx: u64) -> DbResult> { + fn get_block_manifest(&self, idx: u64) -> DbResult> { Ok(self.db.get::(&idx)?) } @@ -243,11 +238,11 @@ mod tests { idx: u64, db: &L1Db, num_txs: usize, - ) -> (L1BlockManifestWithScanRule, Vec, CompactMmr) { + ) -> (EpochedL1BlockManifest, Vec, CompactMmr) { let arb = ArbitraryGenerator::new(); // TODO maybe tweak this to make it a bit more realistic? - let mf: L1BlockManifestWithScanRule = arb.generate(); + let mf: EpochedL1BlockManifest = arb.generate(); let txs: Vec = (0..num_txs) .map(|i| { let proof = L1TxProof::new(i as u32, arb.generate()); @@ -294,7 +289,7 @@ mod tests { .collect(); let res = db.put_block_data( invalid_idx, - ArbitraryGenerator::new().generate::(), + ArbitraryGenerator::new().generate::(), txs, ); assert!(res.is_err(), "Should fail to insert to db"); diff --git a/crates/rocksdb-store/src/l1/schemas.rs b/crates/rocksdb-store/src/l1/schemas.rs index b84260899..a2cde46e9 100644 --- a/crates/rocksdb-store/src/l1/schemas.rs +++ b/crates/rocksdb-store/src/l1/schemas.rs @@ -1,5 +1,5 @@ use strata_mmr::CompactMmr; -use strata_primitives::{buf::Buf32, l1::L1BlockManifestWithScanRule}; +use strata_primitives::{buf::Buf32, l1::EpochedL1BlockManifest}; use strata_state::l1::L1Tx; use crate::{ @@ -13,7 +13,7 @@ type HeaderHash = Buf32; // L1 Block Schema and corresponding codecs implementation define_table_with_seek_key_codec!( /// A table to store L1 Block data. Maps block index to header - (L1BlockSchema) u64 => L1BlockManifestWithScanRule + (L1BlockSchema) u64 => EpochedL1BlockManifest ); // L1 Txns Schema and corresponding codecs implementation diff --git a/crates/state/src/chain_state.rs b/crates/state/src/chain_state.rs index 7453d9db9..dafdee4af 100644 --- a/crates/state/src/chain_state.rs +++ b/crates/state/src/chain_state.rs @@ -94,6 +94,10 @@ impl Chainstate { &self.l1_state } + pub fn epoch(&self) -> u64 { + self.epoch + } + /// Computes a commitment to a the chainstate. This is super expensive /// because it does a bunch of hashing. pub fn compute_state_root(&self) -> Buf32 { diff --git a/crates/status/src/status_manager.rs b/crates/status/src/status_manager.rs index 389f04ef1..a0a3d5b47 100644 --- a/crates/status/src/status_manager.rs +++ b/crates/status/src/status_manager.rs @@ -98,6 +98,10 @@ impl StatusChannel { self.receiver.l1.borrow().clone() } + pub fn epoch(&self) -> Option { + self.receiver.chs.borrow().to_owned().map(|ch| ch.epoch()) + } + /// Waits until there's a new client state and returns the client state. pub async fn wait_for_client_change(&self) -> Result { let mut s = self.receiver.cl.clone(); diff --git a/crates/tx-parser/src/filter.rs b/crates/tx-parser/src/filter.rs index bddf54b69..6cd29ab05 100644 --- a/crates/tx-parser/src/filter.rs +++ b/crates/tx-parser/src/filter.rs @@ -15,14 +15,14 @@ use crate::{ /// [`TxFilterConfig`]s pub fn filter_protocol_op_tx_refs( block: &Block, - filter_config: TxFilterConfig, + filter_config: &TxFilterConfig, ) -> Vec { block .txdata .iter() .enumerate() .flat_map(|(i, tx)| { - extract_protocol_ops(tx, &filter_config) + extract_protocol_ops(tx, filter_config) .into_iter() .map(move |relevant_tx| ProtocolOpTxRef::new(i as u32, relevant_tx)) }) diff --git a/crates/tx-parser/src/filter_types.rs b/crates/tx-parser/src/filter_types.rs index 84ef15f87..a5c653aa9 100644 --- a/crates/tx-parser/src/filter_types.rs +++ b/crates/tx-parser/src/filter_types.rs @@ -9,7 +9,7 @@ use strata_primitives::{ use crate::utils::{generate_taproot_address, get_operator_wallet_pks}; /// A configuration that determines how relevant transactions in a bitcoin block are filtered. -#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)] +#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize)] pub struct TxFilterConfig { /// For checkpoint update inscriptions. pub rollup_name: String, diff --git a/crates/tx-parser/src/messages.rs b/crates/tx-parser/src/messages.rs index dd2befc81..0501c4e64 100644 --- a/crates/tx-parser/src/messages.rs +++ b/crates/tx-parser/src/messages.rs @@ -1,12 +1,14 @@ use bitcoin::Block; use borsh::{BorshDeserialize, BorshSerialize}; +use strata_primitives::l1::Epoch; use strata_state::{l1::HeaderVerificationState, tx::ProtocolOperation}; /// L1 events that we observe and want the persistence task to work on. #[derive(Clone, Debug)] pub enum L1Event { - /// Data that contains block number, block and relevant transactions - BlockData(BlockData), + /// Data that contains block number, block and relevant transactions, and also the epoch whose + /// rules are applied to + BlockData(BlockData, Epoch), /// Revert to the provided block height RevertTo(u64),