Skip to content

Commit

Permalink
btcio/query: Rescan recent blocks if filter rules changed in an epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
Bibek Pandey committed Dec 9, 2024
1 parent 7797aa3 commit 0b49fd1
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 51 deletions.
6 changes: 4 additions & 2 deletions bin/strata-client/src/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ mod tests {
use strata_primitives::{
bridge::OperatorIdx,
buf::Buf32,
l1::{BitcoinAmount, L1BlockManifest, L1TxProof, OutputRef, XOnlyPk},
l1::{
BitcoinAmount, EpochedL1BlockManifest, L1BlockManifest, L1TxProof, OutputRef, XOnlyPk,
},
};
use strata_rocksdb::{test_utils::get_rocksdb_tmp_instance, L1Db};
use strata_state::{
Expand Down Expand Up @@ -381,7 +383,7 @@ mod tests {
})
.collect();

let mf: L1BlockManifest = arb.generate();
let mf: EpochedL1BlockManifest = arb.generate();

// Insert block data
let res = l1_db.put_block_data(idx, mf.clone(), txs.clone());
Expand Down
112 changes: 105 additions & 7 deletions crates/btcio/src/reader/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{
};

use anyhow::bail;
use bitcoin::{hashes::Hash, BlockHash};
use strata_primitives::buf::Buf32;
use bitcoin::{hashes::Hash, Block, BlockHash};
use strata_primitives::{buf::Buf32, l1::Epoch};
use strata_state::l1::{
get_btc_params, get_difficulty_adjustment_height, BtcParams, HeaderVerificationState,
L1BlockId, TimestampStore,
Expand All @@ -21,7 +21,10 @@ use tokio::sync::mpsc;
use tracing::*;

use crate::{
reader::{config::ReaderConfig, state::ReaderState},
reader::{
config::ReaderConfig,
state::{EpochFilterConfig, ReaderState},
},
rpc::traits::Reader,
status::{apply_status_updates, L1StatusUpdate},
};
Expand Down Expand Up @@ -73,6 +76,8 @@ async fn do_reader_task<R: Reader>(
let cur_best_height = state.best_block_idx();
let poll_span = debug_span!("l1poll", %cur_best_height);

check_and_update_epoch_scan_rules_change(&ctx, &mut state).await?;

if let Err(err) = poll_for_new_blocks(&ctx, &mut state, &mut status_updates)
.instrument(poll_span)
.await
Expand Down Expand Up @@ -105,6 +110,68 @@ async fn do_reader_task<R: Reader>(
}
}

/// Checks for epoch and scan rule update. If so, rescan recent blocks to account for possibly
/// missed relevant txs.
async fn check_and_update_epoch_scan_rules_change<R: Reader>(
ctx: &ReaderContext<R>,
state: &mut ReaderState,
) -> anyhow::Result<()> {
let latest_epoch = ctx.status_channel.epoch().unwrap_or(0);
// TODO: check if latest_epoch < current epoch. should panic if so?
let curr_epoch = match state.epoch() {
Epoch::Exact(e) => *e,
Epoch::AtTransition(_from, to) => *to,
};
let is_new_epoch = curr_epoch == latest_epoch;
let curr_filter_config = match state.filter_config() {
EpochFilterConfig::AtEpoch(c) => c.clone(),
EpochFilterConfig::AtTransition(_, c) => c.clone(),
};
if is_new_epoch {
// TODO: pass in chainstate to `derive_from`
let new_config = TxFilterConfig::derive_from(ctx.config.params.rollup())?;

// If filter rule has changed, first revert recent scanned blocks and rescan them with new
// rule
if new_config != curr_filter_config {
// Send L1 revert so that the recent txs can be appropriately re-filtered
let revert_ev =
L1Event::RevertTo(state.best_block_idx() - state.recent_blocks().len() as u64);
if ctx.event_tx.send(revert_ev).await.is_err() {
warn!("unable to submit L1 reorg event, did persistence task exit?");
}

state.set_filter_config(EpochFilterConfig::AtTransition(
curr_filter_config,
new_config,
));
state.set_epoch(Epoch::AtTransition(curr_epoch, latest_epoch));

rescan_recent_blocks(ctx, state).await?;
} else {
state.set_filter_config(EpochFilterConfig::AtEpoch(curr_filter_config));
state.set_epoch(Epoch::Exact(curr_epoch));
}
}
// TODO: anything else?
Ok(())
}

async fn rescan_recent_blocks<R: Reader>(
ctx: &ReaderContext<R>,
state: &mut ReaderState,
) -> anyhow::Result<()> {
let recent_len = state.recent_blocks().len() as u64;
let mut status_updates: Vec<L1StatusUpdate> = Vec::new(); // TODO: is this necessary
let recent = state.recent_blocks().to_vec();
for (i, blk_hash) in recent.iter().enumerate() {
let height = state.best_block_idx() - recent_len + i as u64;
let block = ctx.client.get_block(blk_hash).await?;
process_block(ctx, state, &mut status_updates, height, block).await?;
}
Ok(())
}

/// Inits the reader state by trying to backfill blocks up to a target height.
async fn init_reader_state<R: Reader>(
ctx: &ReaderContext<R>,
Expand Down Expand Up @@ -140,7 +207,16 @@ async fn init_reader_state<R: Reader>(
real_cur_height = height;
}

let state = ReaderState::new(real_cur_height + 1, lookback, init_queue);
let params = ctx.config.params.clone();
let filter_config = TxFilterConfig::derive_from(params.rollup())?;
let epoch = ctx.status_channel.epoch().unwrap_or(0);
let state = ReaderState::new(
real_cur_height + 1,
lookback,
init_queue,
EpochFilterConfig::AtEpoch(filter_config), // TODO: decide if it is transitional or exact
Epoch::Exact(epoch), // TODO: decide if it is transitional or exact
);
Ok(state)
}

Expand Down Expand Up @@ -229,11 +305,28 @@ async fn fetch_and_process_block<R: Reader>(
status_updates: &mut Vec<L1StatusUpdate>,
) -> anyhow::Result<BlockHash> {
let block = ctx.client.get_block_at(height).await?;
process_block(ctx, state, status_updates, height, block).await
}

async fn process_block<R: Reader>(
ctx: &ReaderContext<R>,
state: &mut ReaderState,
status_updates: &mut Vec<L1StatusUpdate>,
height: u64,
block: Block,
) -> anyhow::Result<BlockHash> {
let txs = block.txdata.len();

let params = ctx.config.params.clone();
let filter_config = TxFilterConfig::derive_from(params.rollup())?;
let filtered_txs = filter_protocol_op_tx_refs(&block, filter_config);
let filtered_txs = match state.filter_config() {
EpochFilterConfig::AtEpoch(config) => filter_protocol_op_tx_refs(&block, config),
EpochFilterConfig::AtTransition(from_config, to_config) => {
let mut txs = filter_protocol_op_tx_refs(&block, from_config);
let txs1 = filter_protocol_op_tx_refs(&block, to_config);
txs.extend_from_slice(&txs1);
txs
}
};
let block_data = BlockData::new(height, block, filtered_txs);
let l1blkid = block_data.block().block_hash();
trace!(%height, %l1blkid, %txs, "fetched block from client");
Expand Down Expand Up @@ -264,7 +357,12 @@ async fn fetch_and_process_block<R: Reader>(
}
}

if let Err(e) = ctx.event_tx.send(L1Event::BlockData(block_data)).await {
// TODO: probably need to send the current epoch as well
if let Err(e) = ctx
.event_tx
.send(L1Event::BlockData(block_data, state.epoch().clone()))
.await
{
error!("failed to submit L1 block event, did the persistence task crash?");
return Err(e.into());
}
Expand Down
52 changes: 51 additions & 1 deletion crates/btcio/src/reader/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::VecDeque;

use bitcoin::BlockHash;
use strata_primitives::l1::Epoch;
use strata_tx_parser::filter::TxFilterConfig;

/// State we use in various parts of the reader.
#[derive(Debug)]
Expand All @@ -13,24 +15,64 @@ pub struct ReaderState {

/// Depth at which we start pulling recent blocks out of the front of the queue.
max_depth: usize,

/// Current transaction filtering config
filter_config: EpochFilterConfig,

/// Current epoch
epoch: Epoch,
}

/// Filter config that can be exact or transitioning from one epoch to another.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum EpochFilterConfig {
/// Config at a particular epoch
AtEpoch(TxFilterConfig),
/// Configs at transitioning epochs(from, to).
/// NOTE: The assumption is that we don't have to consider for more than two transitioning
/// configs
AtTransition(TxFilterConfig, TxFilterConfig),
}

impl ReaderState {
/// Constructs a new reader state instance using some context about how we
/// want to manage it.
pub fn new(next_height: u64, max_depth: usize, recent_blocks: VecDeque<BlockHash>) -> Self {
// FIXME: the args list is getting long
pub fn new(
next_height: u64,
max_depth: usize,
recent_blocks: VecDeque<BlockHash>,
filter_config: EpochFilterConfig,
epoch: Epoch,
) -> Self {
assert!(!recent_blocks.is_empty());
Self {
next_height,
max_depth,
recent_blocks,
filter_config,
epoch,
}
}

pub fn next_height(&self) -> u64 {
self.next_height
}

pub fn recent_blocks(&self) -> &[BlockHash] {
// TODO: is this correct?
self.recent_blocks.as_slices().0
}

pub fn epoch(&self) -> &Epoch {
&self.epoch
}

pub fn set_epoch(&mut self, epoch: Epoch) {
self.epoch = epoch;
}

pub fn best_block(&self) -> &BlockHash {
self.recent_blocks.back().unwrap()
}
Expand All @@ -39,6 +81,14 @@ impl ReaderState {
self.next_height - 1
}

pub fn filter_config(&self) -> &EpochFilterConfig {
&self.filter_config
}

pub fn set_filter_config(&mut self, filter_config: EpochFilterConfig) {
self.filter_config = filter_config;
}

/// Returns the idx of the deepest block in the reader state.
#[allow(unused)]
fn deepest_block(&self) -> u64 {
Expand Down
12 changes: 5 additions & 7 deletions crates/consensus-logic/src/l1_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use strata_db::traits::{Database, L1Database};
use strata_primitives::{
block_credential::CredRule,
buf::Buf32,
l1::{L1BlockManifest, L1BlockManifestWithScanRule, L1TxProof},
l1::{Epoch, EpochedL1BlockManifest, L1BlockManifest, L1TxProof},
params::{Params, RollupParams},
vk::RollupVerifyingKey,
};
Expand Down Expand Up @@ -80,7 +80,7 @@ where
Ok(())
}

L1Event::BlockData(blockdata) => {
L1Event::BlockData(blockdata, epoch) => {
let height = blockdata.block_num();

// Bail out fast if we don't have to care.
Expand All @@ -92,8 +92,7 @@ where

let l1blkid = blockdata.block().block_hash();

// TODO; pass in epoch and scan rule reference
let manifest = generate_block_manifest(blockdata.block());
let manifest = generate_block_manifest(blockdata.block(), epoch);
let l1txs: Vec<_> = generate_l1txs(&blockdata);
let num_txs = l1txs.len();
l1db.put_block_data(blockdata.block_num(), manifest, l1txs.clone())?;
Expand Down Expand Up @@ -193,7 +192,7 @@ pub fn verify_proof(checkpoint: &BatchCheckpoint, rollup_params: &RollupParams)

/// Given a block, generates a manifest of the parts we care about that we can
/// store in the database.
fn generate_block_manifest(block: &Block) -> L1BlockManifestWithScanRule {
fn generate_block_manifest(block: &Block, epoch: Epoch) -> EpochedL1BlockManifest {
let blockid = Buf32::from(block.block_hash().to_raw_hash().to_byte_array());
let root = block
.witness_root()
Expand All @@ -202,8 +201,7 @@ fn generate_block_manifest(block: &Block) -> L1BlockManifestWithScanRule {
let header = serialize(&block.header);

let mf = L1BlockManifest::new(blockid, header, Buf32::from(root));
let scan_rules = vec![]; // TODO: get rules here
L1BlockManifestWithScanRule::new(mf, scan_rules)
EpochedL1BlockManifest::new(mf, epoch)
}

fn generate_l1txs(blockdata: &BlockData) -> Vec<L1Tx> {
Expand Down
9 changes: 2 additions & 7 deletions crates/db/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ pub trait L1Database {
/// Atomically extends the chain with a new block, providing the manifest
/// and a list of transactions we find relevant. Returns error if
/// provided out-of-order.
fn put_block_data(
&self,
idx: u64,
mf: L1BlockManifestWithScanRule,
txs: Vec<L1Tx>,
) -> DbResult<()>;
fn put_block_data(&self, idx: u64, mf: EpochedL1BlockManifest, txs: Vec<L1Tx>) -> DbResult<()>;

/// Stores an MMR checkpoint so we have to query less far back. If the
/// provided height does not match the entries in the MMR, will return an
Expand All @@ -64,7 +59,7 @@ pub trait L1Database {
fn get_chain_tip(&self) -> DbResult<Option<u64>>;

/// Gets the block manifest for a block index.
fn get_block_manifest(&self, idx: u64) -> DbResult<Option<L1BlockManifestWithScanRule>>;
fn get_block_manifest(&self, idx: u64) -> DbResult<Option<EpochedL1BlockManifest>>;

/// Returns a half-open interval of block hashes, if we have all of them
/// present. Otherwise, returns error.
Expand Down
22 changes: 15 additions & 7 deletions crates/primitives/src/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,25 @@ impl L1TxProof {

/// Includes `L1BlockManifest` along with scan rules that it is applied to
#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)]
pub struct L1BlockManifestWithScanRule {
pub struct EpochedL1BlockManifest {
/// The actual l1 manifest
manifest: L1BlockManifest,
/// Rules with which the corresponding block is scaned
// FIXME: use non-empty vec
rules: Vec<ScanRule>,
/// Epoch, whose scan rules are applied to this manifest
epoch: Epoch,
}

impl L1BlockManifestWithScanRule {
pub fn new(manifest: L1BlockManifest, rules: Vec<ScanRule>) -> Self {
Self { manifest, rules }
/// Epoch that is either exact or transitioning from one to another.
#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Arbitrary)]
pub enum Epoch {
/// Exact epoch
Exact(u64),
/// Transitioning Epoch(from, to)
AtTransition(u64, u64),
}

impl EpochedL1BlockManifest {
pub fn new(manifest: L1BlockManifest, epoch: Epoch) -> Self {
Self { manifest, epoch }
}

pub fn header(&self) -> &[u8] {
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl RollupParams {
}

/// Configuration common among deposit and deposit request transaction
#[derive(Clone, Debug, PartialEq, BorshSerialize, BorshDeserialize, Deserialize, Serialize)]
#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize, Deserialize, Serialize)]
pub struct DepositTxParams {
/// Magic bytes we use to regonize a deposit with.
pub magic_bytes: Vec<u8>,
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/src/sorted_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cmp::Ordering;
use borsh::{BorshDeserialize, BorshSerialize};

/// A vector wrapper that ensures the elements are sorted
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
pub struct SortedVec<T> {
inner: Vec<T>,
}
Expand Down
Loading

0 comments on commit 0b49fd1

Please sign in to comment.