Skip to content

Commit

Permalink
Enhance bundler task
Browse files Browse the repository at this point in the history
  • Loading branch information
Bibek Pandey committed Jan 21, 2025
1 parent c175668 commit 9cb200f
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 54 deletions.
82 changes: 53 additions & 29 deletions crates/btcio/src/writer/bundler.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,61 @@
use std::{sync::Arc, time::Duration};

use strata_db::types::{BundledPayloadEntry, IntentEntry, IntentStatus};
use strata_config::btcio::WriterConfig;
use strata_db::{
types::{BundledPayloadEntry, IntentEntry, IntentStatus},
DbResult,
};
use strata_storage::ops::writer::EnvelopeDataOps;
use tokio::time::sleep;
use strata_tasks::ShutdownGuard;
use tokio::{select, sync::mpsc::Receiver};
use tracing::*;

// TODO: get this from config
const BUNDLE_INTERVAL: u64 = 200; // millis

/// Periodically bundles unbundled intents into payload entries.
pub(crate) async fn bundler_task(ops: Arc<EnvelopeDataOps>) -> anyhow::Result<()> {
let mut last_idx = 0;
pub(crate) async fn bundler_task(
mut unbundled: Vec<IntentEntry>,
ops: Arc<EnvelopeDataOps>,
config: Arc<WriterConfig>,
mut intent_rx: Receiver<IntentEntry>,
shutdown: ShutdownGuard,
) -> anyhow::Result<()> {
let interval = tokio::time::interval(Duration::from_millis(config.bundle_interval_ms));
tokio::pin!(interval);
loop {
let (unbundled, new_idx) = get_unbundled_intents_after(last_idx, ops.as_ref()).await?;
process_unbundled_entries(ops.as_ref(), unbundled).await?;
last_idx = new_idx;
select! {
maybe_intent = intent_rx.recv() => {
if shutdown.should_shutdown() {
info!("Bundler received shutdown. Stopping.");
break;
}
if let Some(intent) = maybe_intent {
unbundled.push(intent);
} else {
warn!("Intent receiver closed, stopping bundler task");
break;
}
}

let _ = sleep(Duration::from_millis(BUNDLE_INTERVAL)).await;
_ = interval.tick() => {
if shutdown.should_shutdown() {
info!("Bundler received shutdown. Stopping.");
break;
}
// Process unbundled, returning entries which are unprocessed for some reason.
unbundled = process_unbundled_entries(ops.as_ref(), unbundled).await?;
}
}
}
Ok(())
}

/// Processes and bundles a list of unbundled intents into payload entries.
/// Processes and bundles a list of unbundled intents into payload entries. Returns a vector of
/// entries which are unbundled for some reason.
/// NOTE: The current logic is simply 1-1 mapping between intents and payloads, in future it can
/// be sophisticated.
async fn process_unbundled_entries(
ops: &EnvelopeDataOps,
unbundled: Vec<IntentEntry>,
) -> anyhow::Result<()> {
) -> DbResult<Vec<IntentEntry>> {
for mut entry in unbundled {
// NOTE: In future, the logic to create payload will be different. We need to group
// intents and create payload entries accordingly
Expand All @@ -42,22 +71,22 @@ async fn process_unbundled_entries(
.await?;
// Atomic Ends.
}
Ok(())
// Return empty Vec because each entry is being bundled right now. This might be different in
// future.
Ok(vec![])
}

/// Retrieves unbundled intents after a given index in ascending order along with the latest
/// unbundled entry idx.
async fn get_unbundled_intents_after(
idx: u64,
/// entry idx.
pub(crate) fn get_initial_unbundled_entries(
ops: &EnvelopeDataOps,
) -> anyhow::Result<(Vec<IntentEntry>, u64)> {
let latest_idx = ops.get_next_intent_idx_async().await?.saturating_sub(1);
let mut curr_idx = latest_idx;

) -> anyhow::Result<Vec<IntentEntry>> {
let mut curr_idx = ops.get_next_intent_idx_blocking()?;
let mut unbundled = Vec::new();

while curr_idx >= idx {
if let Some(intent) = ops.get_intent_by_idx_async(curr_idx).await? {
while curr_idx > 0 {
curr_idx -= 1;
if let Some(intent) = ops.get_intent_by_idx_blocking(curr_idx)? {
match intent.status {
IntentStatus::Unbundled => unbundled.push(intent),
IntentStatus::Bundled(_) => {
Expand All @@ -69,15 +98,10 @@ async fn get_unbundled_intents_after(
warn!(%curr_idx, "Could not find expected intent in db");
break;
}

if curr_idx == 0 {
break;
}
curr_idx -= 1;
}

// Reverse the items so that they are in ascending order of index
unbundled.reverse();

Ok((unbundled, latest_idx))
Ok(unbundled)
}
35 changes: 26 additions & 9 deletions crates/btcio/src/writer/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use strata_primitives::{
use strata_status::StatusChannel;
use strata_storage::ops::writer::{Context, EnvelopeDataOps};
use strata_tasks::TaskExecutor;
use tokio::sync::mpsc::{self, Sender};
use tracing::*;

use super::bundler::bundler_task;
use super::bundler::{bundler_task, get_initial_unbundled_entries};
use crate::{
broadcaster::L1BroadcastHandle,
rpc::{traits::WriterRpc, BitcoinClient},
Expand All @@ -28,11 +29,12 @@ use crate::{
/// A handle to the Envelope task.
pub struct EnvelopeHandle {
ops: Arc<EnvelopeDataOps>,
intent_tx: Sender<IntentEntry>,
}

impl EnvelopeHandle {
pub fn new(ops: Arc<EnvelopeDataOps>) -> Self {
Self { ops }
pub fn new(ops: Arc<EnvelopeDataOps>, intent_tx: Sender<IntentEntry>) -> Self {
Self { ops, intent_tx }
}

/// Checks if it is duplicate, if not creates a new [`IntentEntry`] from `intent` and puts it in
Expand All @@ -56,7 +58,13 @@ impl EnvelopeHandle {

// Create and store IntentEntry
let entry = IntentEntry::new_unbundled(intent);
Ok(self.ops.put_intent_entry_blocking(id, entry)?)
self.ops.put_intent_entry_blocking(id, entry.clone())?;

// Send to bundler
if let Err(e) = self.intent_tx.blocking_send(entry) {
warn!("Could not send intent entry to bundler: {:?}", e);
}
Ok(())
}

/// Checks if it is duplicate, if not creates a new [`IntentEntry`] from `intent` and puts it in
Expand All @@ -80,7 +88,14 @@ impl EnvelopeHandle {

// Create and store IntentEntry
let entry = IntentEntry::new_unbundled(intent);
Ok(self.ops.put_intent_entry_async(id, entry).await?)
self.ops.put_intent_entry_blocking(id, entry.clone())?;

// Send to bundler
if let Err(e) = self.intent_tx.send(entry).await {
warn!("Could not send intent entry to bundler: {:?}", e);
}

Ok(())
}
}

Expand All @@ -106,11 +121,12 @@ pub fn start_envelope_task<D: L1WriterDatabase + Send + Sync + 'static>(
) -> anyhow::Result<Arc<EnvelopeHandle>> {
let writer_ops = Arc::new(Context::new(db).into_ops(pool));
let next_watch_payload_idx = get_next_payloadidx_to_watch(writer_ops.as_ref())?;
let (intent_tx, intent_rx) = mpsc::channel::<IntentEntry>(64);

let envelope_handle = Arc::new(EnvelopeHandle::new(writer_ops.clone()));
let envelope_handle = Arc::new(EnvelopeHandle::new(writer_ops.clone(), intent_tx));
let ctx = Arc::new(WriterContext::new(
params,
config,
config.clone(),
sequencer_address,
bitcoin_client,
status_channel,
Expand All @@ -121,8 +137,9 @@ pub fn start_envelope_task<D: L1WriterDatabase + Send + Sync + 'static>(
watcher_task(next_watch_payload_idx, ctx, wops.clone(), broadcast_handle).await
});

executor.spawn_critical_async("btcio::bundler_task", async move {
bundler_task(writer_ops).await
let unbundled = get_initial_unbundled_entries(writer_ops.as_ref())?;
executor.spawn_critical_async_with_shutdown("btcio::bundler_task", |shutdown| async move {
bundler_task(unbundled, writer_ops, config.clone(), intent_rx, shutdown).await
});

Ok(envelope_handle)
Expand Down
7 changes: 5 additions & 2 deletions crates/config/src/btcio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ pub struct ReaderConfig {
/// Configuration for btcio writer/signer.
#[derive(Debug, Clone, Deserialize)]
pub struct WriterConfig {
/// How often to invoke the writer
/// How often to invoke the writer.
pub write_poll_dur_ms: u64,
/// How the fees for are determined.
// FIXME: This should actually be a part of signer.
pub fee_policy: FeePolicy,
/// How much amount(in sats) to send to reveal address
/// How much amount(in sats) to send to reveal address.
pub reveal_amount: u64,
/// How often to bundle write intents.
pub bundle_interval_ms: u64,
}

/// Definition of how fees are determined while creating l1 transactions.
Expand All @@ -42,6 +44,7 @@ impl Default for WriterConfig {
write_poll_dur_ms: 1_000,
fee_policy: FeePolicy::Smart,
reveal_amount: 1_000,
bundle_interval_ms: 500,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/db/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ pub trait CheckpointDatabase {
fn put_batch_checkpoint(&self, batchidx: u64, entry: CheckpointEntry) -> DbResult<()>;
}

/// A trait encapsulating provider and store traits to create/update [`BundledPayloadEntry`] in the
/// Encapsulates provider and store traits to create/update [`BundledPayloadEntry`] in the
/// database and to fetch [`BundledPayloadEntry`] and indices from the database
pub trait L1WriterDatabase {
/// Store the [`BundledPayloadEntry`].
Expand Down
10 changes: 5 additions & 5 deletions crates/l1tx/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ mod test {

// Create an envelope transaction. The focus here is to create a tapscript, rather than a
// completely valid control block. Includes `n_envelopes` envelopes in the tapscript.
fn create_checkpoint_envelope_tx(params: Arc<Params>, n_envelopes: u32) -> Transaction {
fn create_checkpoint_envelope_tx(params: &Params, n_envelopes: u32) -> Transaction {
let address = parse_addr(OTHER_ADDR);
let inp_tx = create_test_tx(vec![create_test_txout(100000000, &address)]);
let payloads: Vec<_> = (0..n_envelopes)
Expand Down Expand Up @@ -218,7 +218,7 @@ mod test {

// Testing multiple envelopes are parsed
let num_envelopes = 2;
let tx = create_checkpoint_envelope_tx(params.clone().into(), num_envelopes);
let tx = create_checkpoint_envelope_tx(&params, num_envelopes);
let block = create_test_block(vec![tx]);

let ops = filter_protocol_op_tx_refs(&block, params.rollup(), &filter_config);
Expand All @@ -235,7 +235,7 @@ mod test {
let mut new_params = params.clone();
new_params.rollup.checkpoint_tag = "invalid_checkpoint_tag".to_string();

let tx = create_checkpoint_envelope_tx(new_params.into(), 2);
let tx = create_checkpoint_envelope_tx(&new_params, 2);
let block = create_test_block(vec![tx]);
let result = filter_protocol_op_tx_refs(&block, params.rollup(), &filter_config);
assert!(result.is_empty(), "Should filter out invalid name");
Expand All @@ -260,9 +260,9 @@ mod test {
fn test_filter_relevant_txs_multiple_matches() {
let params: Params = gen_params();
let filter_config = create_tx_filter_config(&params);
let tx1 = create_checkpoint_envelope_tx(params.clone().into(), 1);
let tx1 = create_checkpoint_envelope_tx(&params, 1);
let tx2 = create_test_tx(vec![create_test_txout(100, &parse_addr(OTHER_ADDR))]);
let tx3 = create_checkpoint_envelope_tx(params.clone().into(), 1);
let tx3 = create_checkpoint_envelope_tx(&params, 1);
let block = create_test_block(vec![tx1, tx2, tx3]);

let txids: Vec<u32> = filter_protocol_op_tx_refs(&block, params.rollup(), &filter_config)
Expand Down
10 changes: 2 additions & 8 deletions crates/rocksdb-store/src/writer/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,18 @@ impl L1WriterDatabase for RBL1WriterDb {
}

fn put_intent_entry(&self, intent_id: Buf32, intent_entry: IntentEntry) -> DbResult<()> {
let res = self
.db
self.db
.with_optimistic_txn(
rockbound::TransactionRetry::Count(self.ops.retry_count),
|tx| -> Result<(), DbError> {
tracing::debug!(%intent_id, "putting intent");
let idx = get_next_id::<IntentIdxSchema, DB>(tx)?;
tracing::debug!(%idx, "next intent idx...");
tx.put::<IntentIdxSchema>(&idx, &intent_id)?;
tx.put::<IntentSchema>(&intent_id, &intent_entry)?;

Ok(())
},
)
.map_err(|e| DbError::TransactionError(e.to_string()));
let next = self.get_next_intent_idx()?;
tracing::debug!(%next, "next intent idx after put");
res
.map_err(|e| DbError::TransactionError(e.to_string()))
}

fn get_intent_by_id(&self, id: Buf32) -> DbResult<Option<IntentEntry>> {
Expand Down

0 comments on commit 9cb200f

Please sign in to comment.