From 9cb200f4ad8d1b922f2a711aafdb78fef227f78b Mon Sep 17 00:00:00 2001 From: Bibek Pandey Date: Tue, 21 Jan 2025 13:59:16 +0545 Subject: [PATCH] Enhance bundler task --- crates/btcio/src/writer/bundler.rs | 82 +++++++++++++++++---------- crates/btcio/src/writer/task.rs | 35 +++++++++--- crates/config/src/btcio.rs | 7 ++- crates/db/src/traits.rs | 2 +- crates/l1tx/src/filter.rs | 10 ++-- crates/rocksdb-store/src/writer/db.rs | 10 +--- 6 files changed, 92 insertions(+), 54 deletions(-) diff --git a/crates/btcio/src/writer/bundler.rs b/crates/btcio/src/writer/bundler.rs index 9fc17d849..2f8a8fe45 100644 --- a/crates/btcio/src/writer/bundler.rs +++ b/crates/btcio/src/writer/bundler.rs @@ -1,32 +1,61 @@ use std::{sync::Arc, time::Duration}; -use strata_db::types::{BundledPayloadEntry, IntentEntry, IntentStatus}; +use strata_config::btcio::WriterConfig; +use strata_db::{ + types::{BundledPayloadEntry, IntentEntry, IntentStatus}, + DbResult, +}; use strata_storage::ops::writer::EnvelopeDataOps; -use tokio::time::sleep; +use strata_tasks::ShutdownGuard; +use tokio::{select, sync::mpsc::Receiver}; use tracing::*; -// TODO: get this from config -const BUNDLE_INTERVAL: u64 = 200; // millis - /// Periodically bundles unbundled intents into payload entries. -pub(crate) async fn bundler_task(ops: Arc) -> anyhow::Result<()> { - let mut last_idx = 0; +pub(crate) async fn bundler_task( + mut unbundled: Vec, + ops: Arc, + config: Arc, + mut intent_rx: Receiver, + shutdown: ShutdownGuard, +) -> anyhow::Result<()> { + let interval = tokio::time::interval(Duration::from_millis(config.bundle_interval_ms)); + tokio::pin!(interval); loop { - let (unbundled, new_idx) = get_unbundled_intents_after(last_idx, ops.as_ref()).await?; - process_unbundled_entries(ops.as_ref(), unbundled).await?; - last_idx = new_idx; + select! { + maybe_intent = intent_rx.recv() => { + if shutdown.should_shutdown() { + info!("Bundler received shutdown. Stopping."); + break; + } + if let Some(intent) = maybe_intent { + unbundled.push(intent); + } else { + warn!("Intent receiver closed, stopping bundler task"); + break; + } + } - let _ = sleep(Duration::from_millis(BUNDLE_INTERVAL)).await; + _ = interval.tick() => { + if shutdown.should_shutdown() { + info!("Bundler received shutdown. Stopping."); + break; + } + // Process unbundled, returning entries which are unprocessed for some reason. + unbundled = process_unbundled_entries(ops.as_ref(), unbundled).await?; + } + } } + Ok(()) } -/// Processes and bundles a list of unbundled intents into payload entries. +/// Processes and bundles a list of unbundled intents into payload entries. Returns a vector of +/// entries which are unbundled for some reason. /// NOTE: The current logic is simply 1-1 mapping between intents and payloads, in future it can /// be sophisticated. async fn process_unbundled_entries( ops: &EnvelopeDataOps, unbundled: Vec, -) -> anyhow::Result<()> { +) -> DbResult> { for mut entry in unbundled { // NOTE: In future, the logic to create payload will be different. We need to group // intents and create payload entries accordingly @@ -42,22 +71,22 @@ async fn process_unbundled_entries( .await?; // Atomic Ends. } - Ok(()) + // Return empty Vec because each entry is being bundled right now. This might be different in + // future. + Ok(vec![]) } /// Retrieves unbundled intents after a given index in ascending order along with the latest -/// unbundled entry idx. -async fn get_unbundled_intents_after( - idx: u64, +/// entry idx. +pub(crate) fn get_initial_unbundled_entries( ops: &EnvelopeDataOps, -) -> anyhow::Result<(Vec, u64)> { - let latest_idx = ops.get_next_intent_idx_async().await?.saturating_sub(1); - let mut curr_idx = latest_idx; - +) -> anyhow::Result> { + let mut curr_idx = ops.get_next_intent_idx_blocking()?; let mut unbundled = Vec::new(); - while curr_idx >= idx { - if let Some(intent) = ops.get_intent_by_idx_async(curr_idx).await? { + while curr_idx > 0 { + curr_idx -= 1; + if let Some(intent) = ops.get_intent_by_idx_blocking(curr_idx)? { match intent.status { IntentStatus::Unbundled => unbundled.push(intent), IntentStatus::Bundled(_) => { @@ -69,15 +98,10 @@ async fn get_unbundled_intents_after( warn!(%curr_idx, "Could not find expected intent in db"); break; } - - if curr_idx == 0 { - break; - } - curr_idx -= 1; } // Reverse the items so that they are in ascending order of index unbundled.reverse(); - Ok((unbundled, latest_idx)) + Ok(unbundled) } diff --git a/crates/btcio/src/writer/task.rs b/crates/btcio/src/writer/task.rs index f71272f9d..4cdc65509 100644 --- a/crates/btcio/src/writer/task.rs +++ b/crates/btcio/src/writer/task.rs @@ -13,9 +13,10 @@ use strata_primitives::{ use strata_status::StatusChannel; use strata_storage::ops::writer::{Context, EnvelopeDataOps}; use strata_tasks::TaskExecutor; +use tokio::sync::mpsc::{self, Sender}; use tracing::*; -use super::bundler::bundler_task; +use super::bundler::{bundler_task, get_initial_unbundled_entries}; use crate::{ broadcaster::L1BroadcastHandle, rpc::{traits::WriterRpc, BitcoinClient}, @@ -28,11 +29,12 @@ use crate::{ /// A handle to the Envelope task. pub struct EnvelopeHandle { ops: Arc, + intent_tx: Sender, } impl EnvelopeHandle { - pub fn new(ops: Arc) -> Self { - Self { ops } + pub fn new(ops: Arc, intent_tx: Sender) -> Self { + Self { ops, intent_tx } } /// Checks if it is duplicate, if not creates a new [`IntentEntry`] from `intent` and puts it in @@ -56,7 +58,13 @@ impl EnvelopeHandle { // Create and store IntentEntry let entry = IntentEntry::new_unbundled(intent); - Ok(self.ops.put_intent_entry_blocking(id, entry)?) + self.ops.put_intent_entry_blocking(id, entry.clone())?; + + // Send to bundler + if let Err(e) = self.intent_tx.blocking_send(entry) { + warn!("Could not send intent entry to bundler: {:?}", e); + } + Ok(()) } /// Checks if it is duplicate, if not creates a new [`IntentEntry`] from `intent` and puts it in @@ -80,7 +88,14 @@ impl EnvelopeHandle { // Create and store IntentEntry let entry = IntentEntry::new_unbundled(intent); - Ok(self.ops.put_intent_entry_async(id, entry).await?) + self.ops.put_intent_entry_blocking(id, entry.clone())?; + + // Send to bundler + if let Err(e) = self.intent_tx.send(entry).await { + warn!("Could not send intent entry to bundler: {:?}", e); + } + + Ok(()) } } @@ -106,11 +121,12 @@ pub fn start_envelope_task( ) -> anyhow::Result> { let writer_ops = Arc::new(Context::new(db).into_ops(pool)); let next_watch_payload_idx = get_next_payloadidx_to_watch(writer_ops.as_ref())?; + let (intent_tx, intent_rx) = mpsc::channel::(64); - let envelope_handle = Arc::new(EnvelopeHandle::new(writer_ops.clone())); + let envelope_handle = Arc::new(EnvelopeHandle::new(writer_ops.clone(), intent_tx)); let ctx = Arc::new(WriterContext::new( params, - config, + config.clone(), sequencer_address, bitcoin_client, status_channel, @@ -121,8 +137,9 @@ pub fn start_envelope_task( watcher_task(next_watch_payload_idx, ctx, wops.clone(), broadcast_handle).await }); - executor.spawn_critical_async("btcio::bundler_task", async move { - bundler_task(writer_ops).await + let unbundled = get_initial_unbundled_entries(writer_ops.as_ref())?; + executor.spawn_critical_async_with_shutdown("btcio::bundler_task", |shutdown| async move { + bundler_task(unbundled, writer_ops, config.clone(), intent_rx, shutdown).await }); Ok(envelope_handle) diff --git a/crates/config/src/btcio.rs b/crates/config/src/btcio.rs index 9a29cba84..b7ca3a51d 100644 --- a/crates/config/src/btcio.rs +++ b/crates/config/src/btcio.rs @@ -17,13 +17,15 @@ pub struct ReaderConfig { /// Configuration for btcio writer/signer. #[derive(Debug, Clone, Deserialize)] pub struct WriterConfig { - /// How often to invoke the writer + /// How often to invoke the writer. pub write_poll_dur_ms: u64, /// How the fees for are determined. // FIXME: This should actually be a part of signer. pub fee_policy: FeePolicy, - /// How much amount(in sats) to send to reveal address + /// How much amount(in sats) to send to reveal address. pub reveal_amount: u64, + /// How often to bundle write intents. + pub bundle_interval_ms: u64, } /// Definition of how fees are determined while creating l1 transactions. @@ -42,6 +44,7 @@ impl Default for WriterConfig { write_poll_dur_ms: 1_000, fee_policy: FeePolicy::Smart, reveal_amount: 1_000, + bundle_interval_ms: 500, } } } diff --git a/crates/db/src/traits.rs b/crates/db/src/traits.rs index e48dee61a..8113df232 100644 --- a/crates/db/src/traits.rs +++ b/crates/db/src/traits.rs @@ -244,7 +244,7 @@ pub trait CheckpointDatabase { fn put_batch_checkpoint(&self, batchidx: u64, entry: CheckpointEntry) -> DbResult<()>; } -/// A trait encapsulating provider and store traits to create/update [`BundledPayloadEntry`] in the +/// Encapsulates provider and store traits to create/update [`BundledPayloadEntry`] in the /// database and to fetch [`BundledPayloadEntry`] and indices from the database pub trait L1WriterDatabase { /// Store the [`BundledPayloadEntry`]. diff --git a/crates/l1tx/src/filter.rs b/crates/l1tx/src/filter.rs index a3d93fc19..2cb23348c 100644 --- a/crates/l1tx/src/filter.rs +++ b/crates/l1tx/src/filter.rs @@ -178,7 +178,7 @@ mod test { // Create an envelope transaction. The focus here is to create a tapscript, rather than a // completely valid control block. Includes `n_envelopes` envelopes in the tapscript. - fn create_checkpoint_envelope_tx(params: Arc, n_envelopes: u32) -> Transaction { + fn create_checkpoint_envelope_tx(params: &Params, n_envelopes: u32) -> Transaction { let address = parse_addr(OTHER_ADDR); let inp_tx = create_test_tx(vec![create_test_txout(100000000, &address)]); let payloads: Vec<_> = (0..n_envelopes) @@ -218,7 +218,7 @@ mod test { // Testing multiple envelopes are parsed let num_envelopes = 2; - let tx = create_checkpoint_envelope_tx(params.clone().into(), num_envelopes); + let tx = create_checkpoint_envelope_tx(¶ms, num_envelopes); let block = create_test_block(vec![tx]); let ops = filter_protocol_op_tx_refs(&block, params.rollup(), &filter_config); @@ -235,7 +235,7 @@ mod test { let mut new_params = params.clone(); new_params.rollup.checkpoint_tag = "invalid_checkpoint_tag".to_string(); - let tx = create_checkpoint_envelope_tx(new_params.into(), 2); + let tx = create_checkpoint_envelope_tx(&new_params, 2); let block = create_test_block(vec![tx]); let result = filter_protocol_op_tx_refs(&block, params.rollup(), &filter_config); assert!(result.is_empty(), "Should filter out invalid name"); @@ -260,9 +260,9 @@ mod test { fn test_filter_relevant_txs_multiple_matches() { let params: Params = gen_params(); let filter_config = create_tx_filter_config(¶ms); - let tx1 = create_checkpoint_envelope_tx(params.clone().into(), 1); + let tx1 = create_checkpoint_envelope_tx(¶ms, 1); let tx2 = create_test_tx(vec![create_test_txout(100, &parse_addr(OTHER_ADDR))]); - let tx3 = create_checkpoint_envelope_tx(params.clone().into(), 1); + let tx3 = create_checkpoint_envelope_tx(¶ms, 1); let block = create_test_block(vec![tx1, tx2, tx3]); let txids: Vec = filter_protocol_op_tx_refs(&block, params.rollup(), &filter_config) diff --git a/crates/rocksdb-store/src/writer/db.rs b/crates/rocksdb-store/src/writer/db.rs index 94e7a3505..43016f3e1 100644 --- a/crates/rocksdb-store/src/writer/db.rs +++ b/crates/rocksdb-store/src/writer/db.rs @@ -51,24 +51,18 @@ impl L1WriterDatabase for RBL1WriterDb { } fn put_intent_entry(&self, intent_id: Buf32, intent_entry: IntentEntry) -> DbResult<()> { - let res = self - .db + self.db .with_optimistic_txn( rockbound::TransactionRetry::Count(self.ops.retry_count), |tx| -> Result<(), DbError> { - tracing::debug!(%intent_id, "putting intent"); let idx = get_next_id::(tx)?; - tracing::debug!(%idx, "next intent idx..."); tx.put::(&idx, &intent_id)?; tx.put::(&intent_id, &intent_entry)?; Ok(()) }, ) - .map_err(|e| DbError::TransactionError(e.to_string())); - let next = self.get_next_intent_idx()?; - tracing::debug!(%next, "next intent idx after put"); - res + .map_err(|e| DbError::TransactionError(e.to_string())) } fn get_intent_by_id(&self, id: Buf32) -> DbResult> {