diff --git a/Cargo.lock b/Cargo.lock index 38e1345c6..c5e17cb2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1367,7 +1367,7 @@ dependencies = [ [[package]] name = "bitcoincore-rpc" version = "0.18.0" -source = "git+https://github.com/chainwayxyz/rust-bitcoincore-rpc.git?rev=ca3cfa2#ca3cfa2a2a6fac070d1a6116db1a3894b8fd5415" +source = "git+https://github.com/chainwayxyz/rust-bitcoincore-rpc.git?rev=c863fd0#c863fd0538e1e0513048c2f764383478ec7aa181" dependencies = [ "async-trait", "bitcoincore-rpc-json", @@ -1382,7 +1382,7 @@ dependencies = [ [[package]] name = "bitcoincore-rpc-json" version = "0.18.0" -source = "git+https://github.com/chainwayxyz/rust-bitcoincore-rpc.git?rev=ca3cfa2#ca3cfa2a2a6fac070d1a6116db1a3894b8fd5415" +source = "git+https://github.com/chainwayxyz/rust-bitcoincore-rpc.git?rev=c863fd0#c863fd0538e1e0513048c2f764383478ec7aa181" dependencies = [ "bitcoin", "serde", @@ -1909,7 +1909,7 @@ dependencies = [ [[package]] name = "citrea-e2e" version = "0.1.0" -source = "git+https://github.com/chainwayxyz/citrea-e2e?rev=af85eae3010331df0e0cda5288f741b6d298813f#af85eae3010331df0e0cda5288f741b6d298813f" +source = "git+https://github.com/chainwayxyz/citrea-e2e?rev=6cee848#6cee84887501a44887cefa9e92c2237f64471fbc" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 50603528c..c2a7fbd46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,10 +154,10 @@ alloy-eips = { version = "0.4.2", default-features = false } alloy-consensus = { version = "0.4.2", default-features = false, features = ["serde", "serde-bincode-compat"] } alloy-network = { version = "0.4.2", default-features = false } -citrea-e2e = { git = "https://github.com/chainwayxyz/citrea-e2e", rev = "af85eae3010331df0e0cda5288f741b6d298813f" } +citrea-e2e = { git = "https://github.com/chainwayxyz/citrea-e2e", rev = "6cee848" } [patch.crates-io] -bitcoincore-rpc = { version = "0.18.0", git = "https://github.com/chainwayxyz/rust-bitcoincore-rpc.git", rev = "ca3cfa2" } +bitcoincore-rpc = { version = "0.18.0", git = "https://github.com/chainwayxyz/rust-bitcoincore-rpc.git", rev = "c863fd0" } [profile.release] opt-level = 3 diff --git a/bin/citrea/src/rollup/bitcoin.rs b/bin/citrea/src/rollup/bitcoin.rs index 117b713c4..f22f05ec1 100644 --- a/bin/citrea/src/rollup/bitcoin.rs +++ b/bin/citrea/src/rollup/bitcoin.rs @@ -8,7 +8,7 @@ use bitcoin_da::spec::{BitcoinSpec, RollupParams}; use bitcoin_da::verifier::BitcoinVerifier; use citrea_common::rpc::register_healthcheck_rpc; use citrea_common::tasks::manager::TaskManager; -use citrea_common::FullNodeConfig; +use citrea_common::{FeeThrottleConfig, FullNodeConfig}; use citrea_primitives::forks::use_network_forks; use citrea_primitives::{TO_BATCH_PROOF_PREFIX, TO_LIGHT_CLIENT_PREFIX}; use citrea_risc0_adapter::host::Risc0BonsaiHost; @@ -117,6 +117,7 @@ impl RollupBlueprint for BitcoinRollup { rollup_config: &FullNodeConfig, require_wallet_check: bool, task_manager: &mut TaskManager<()>, + throttle_config: Option, ) -> Result, anyhow::Error> { let (tx, rx) = unbounded_channel::>(); @@ -128,6 +129,7 @@ impl RollupBlueprint for BitcoinRollup { to_batch_proof_prefix: TO_BATCH_PROOF_PREFIX.to_vec(), }, tx, + throttle_config, ) .await? } else { @@ -142,6 +144,7 @@ impl RollupBlueprint for BitcoinRollup { .await? }; let service = Arc::new(bitcoin_service); + // until forced transactions are implemented, // require_wallet_check is set false for full nodes. if require_wallet_check { diff --git a/bin/citrea/src/rollup/mock.rs b/bin/citrea/src/rollup/mock.rs index 5297b098f..d3fdf08b7 100644 --- a/bin/citrea/src/rollup/mock.rs +++ b/bin/citrea/src/rollup/mock.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use async_trait::async_trait; use citrea_common::rpc::register_healthcheck_rpc; use citrea_common::tasks::manager::TaskManager; -use citrea_common::FullNodeConfig; +use citrea_common::{FeeThrottleConfig, FullNodeConfig}; use citrea_primitives::forks::use_network_forks; // use citrea_sp1::host::SP1Host; use citrea_risc0_adapter::host::Risc0BonsaiHost; @@ -84,6 +84,7 @@ impl RollupBlueprint for MockDemoRollup { rollup_config: &FullNodeConfig, _require_wallet_check: bool, _task_manager: &mut TaskManager<()>, + _throttle_config: Option, ) -> Result, anyhow::Error> { Ok(Arc::new(MockDaService::new( rollup_config.da.sender_address.clone(), diff --git a/bin/citrea/src/rollup/mod.rs b/bin/citrea/src/rollup/mod.rs index ce328412f..e3b473938 100644 --- a/bin/citrea/src/rollup/mod.rs +++ b/bin/citrea/src/rollup/mod.rs @@ -54,7 +54,12 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { { let mut task_manager = TaskManager::default(); let da_service = self - .create_da_service(&rollup_config, true, &mut task_manager) + .create_da_service( + &rollup_config, + true, + &mut task_manager, + Some(sequencer_config.fee_throttle.clone()), + ) .await?; // TODO: Double check what kind of storage needed here. @@ -175,7 +180,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { { let mut task_manager = TaskManager::default(); let da_service = self - .create_da_service(&rollup_config, false, &mut task_manager) + .create_da_service(&rollup_config, false, &mut task_manager, None) .await?; // TODO: Double check what kind of storage needed here. @@ -307,7 +312,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { { let mut task_manager = TaskManager::default(); let da_service = self - .create_da_service(&rollup_config, true, &mut task_manager) + .create_da_service(&rollup_config, true, &mut task_manager, None) .await?; // Migrate before constructing ledger_db instance so that no lock is present. @@ -441,7 +446,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { let mut task_manager = TaskManager::default(); let da_service = self - .create_da_service(&rollup_config, true, &mut task_manager) + .create_da_service(&rollup_config, true, &mut task_manager, None) .await?; let rocksdb_config = RocksdbConfig::new( diff --git a/bin/citrea/tests/bitcoin_e2e/batch_prover_test.rs b/bin/citrea/tests/bitcoin_e2e/batch_prover_test.rs index 6815127b0..73a950de1 100644 --- a/bin/citrea/tests/bitcoin_e2e/batch_prover_test.rs +++ b/bin/citrea/tests/bitcoin_e2e/batch_prover_test.rs @@ -208,6 +208,7 @@ impl TestCase for SkipPreprovenCommitmentsTest { to_batch_proof_prefix: TO_BATCH_PROOF_PREFIX.to_vec(), }, tx, + None, ) .await .unwrap(), diff --git a/bin/citrea/tests/bitcoin_e2e/light_client_test.rs b/bin/citrea/tests/bitcoin_e2e/light_client_test.rs index d6453b673..40a8d7cb7 100644 --- a/bin/citrea/tests/bitcoin_e2e/light_client_test.rs +++ b/bin/citrea/tests/bitcoin_e2e/light_client_test.rs @@ -531,6 +531,7 @@ impl TestCase for LightClientBatchProofMethodIdUpdateTest { to_batch_proof_prefix: TO_BATCH_PROOF_PREFIX.to_vec(), }, tx, + None, ) .await .unwrap(), @@ -810,6 +811,7 @@ impl TestCase for LightClientUnverifiableBatchProofTest { to_batch_proof_prefix: TO_BATCH_PROOF_PREFIX.to_vec(), }, tx, + None, ) .await .unwrap(), diff --git a/bin/citrea/tests/bitcoin_e2e/sequencer_test.rs b/bin/citrea/tests/bitcoin_e2e/sequencer_test.rs index 01c888cb7..65b45c117 100644 --- a/bin/citrea/tests/bitcoin_e2e/sequencer_test.rs +++ b/bin/citrea/tests/bitcoin_e2e/sequencer_test.rs @@ -1,14 +1,21 @@ +use std::net::SocketAddr; +use std::time::{Duration, SystemTime}; + use alloy_primitives::U64; use anyhow::bail; use async_trait::async_trait; +use bitcoin_da::rpc::DaRpcClient; use citrea_e2e::config::SequencerConfig; use citrea_e2e::framework::TestFramework; +use citrea_e2e::node::Config; use citrea_e2e::test_case::{TestCase, TestCaseRunner}; -use citrea_e2e::traits::Restart; +use citrea_e2e::traits::{NodeT, Restart}; use citrea_e2e::Result; +use reth_primitives::BlockNumberOrTag; use sov_ledger_rpc::LedgerRpcClient; use super::get_citrea_path; +use crate::evm::make_test_client; struct BasicSequencerTest; @@ -145,3 +152,114 @@ async fn test_sequencer_missed_da_blocks() -> Result<()> { .run() .await } + +struct DaThrottleTest; + +#[async_trait] +impl TestCase for DaThrottleTest { + async fn run_test(&mut self, f: &mut TestFramework) -> Result<()> { + let sequencer = f.sequencer.as_ref().unwrap(); + let da = f.bitcoin_nodes.get(0).expect("DA not running."); + + let seq_config = sequencer.config(); + let seq_test_client = make_test_client(SocketAddr::new( + seq_config.rpc_bind_host().parse()?, + seq_config.rpc_bind_port(), + )) + .await?; + + let base_l1_fee_rate = 2_500_000_000f64; + + // Get initial usage stats + let initial_usage = sequencer.client.http_client().da_usage_window().await?; + assert_eq!(initial_usage.total_bytes, 0); + assert_eq!(initial_usage.usage_ratio, 0.0); + + sequencer.client.send_publish_batch_request().await?; + sequencer.wait_for_l2_height(1, None).await?; + + let seq_block = seq_test_client + .eth_get_block_by_number_with_detail(Some(BlockNumberOrTag::Latest)) + .await; + + let l1_fee_rate = seq_block.other.get("l1FeeRate").unwrap().as_f64().unwrap(); + assert_eq!(l1_fee_rate, base_l1_fee_rate); + + // Generate seqcommitments to increase DA usage + for _ in 0..sequencer.min_soft_confirmations_per_commitment() - 1 { + sequencer.client.send_publish_batch_request().await?; + } + + // Wait for tx to hit mempool and check DA usage increased + da.wait_mempool_len(2, None).await?; + let da_usage = sequencer.client.http_client().da_usage_window().await?; + assert!(da_usage.total_bytes > 0); + assert!(da_usage.usage_ratio > 0.0); + + // Generate more seqcoms to exceed threshold + let n_txs = 3; + for _ in 0..n_txs { + for _ in 0..sequencer.min_soft_confirmations_per_commitment() { + sequencer.client.send_publish_batch_request().await?; + } + } + da.wait_mempool_len(2 + 2 * n_txs, None).await?; + + // Check that usage is above threshold and multiplier > 1 + let usage_after_seqcom = sequencer.client.http_client().da_usage_window().await?; + assert!(usage_after_seqcom.total_bytes > da_usage.total_bytes); + assert!(usage_after_seqcom.usage_ratio > da_usage.usage_ratio); + assert!(usage_after_seqcom.fee_multiplier > Some(1.0)); + + sequencer.client.send_publish_batch_request().await?; + + let seq_block = seq_test_client + .eth_get_block_by_number_with_detail(Some(BlockNumberOrTag::Latest)) + .await; + let throttled_l1_fee_rate = seq_block.other.get("l1FeeRate").unwrap().as_f64().unwrap(); + assert_eq!( + throttled_l1_fee_rate, + (base_l1_fee_rate * usage_after_seqcom.fee_multiplier.unwrap()).floor() + ); + + // Check that usage window is correclty resetted on interval + let interval = seq_config + .rollup_config() + .da + .monitoring + .as_ref() + .unwrap() + .window_duration_secs; + let next_reset = interval + - (SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + - da_usage.start_time); + + // Sleep until next_reset + a 1s buffer + tokio::time::sleep(Duration::from_secs(next_reset + 1)).await; + let resetted_usage = sequencer.client.http_client().da_usage_window().await?; + assert_eq!(resetted_usage.total_bytes, 0); + assert_eq!(resetted_usage.usage_ratio, 0.0); + assert_eq!(resetted_usage.fee_multiplier, Some(1.0)); + assert_eq!(resetted_usage.start_time, da_usage.start_time + interval); + + sequencer.client.send_publish_batch_request().await?; + + let seq_block = seq_test_client + .eth_get_block_by_number_with_detail(Some(BlockNumberOrTag::Latest)) + .await; + let l1_fee_rate = seq_block.other.get("l1FeeRate").unwrap().as_f64().unwrap(); + assert_eq!(l1_fee_rate, base_l1_fee_rate); + Ok(()) + } +} + +#[tokio::test] +async fn test_da_throttle() -> Result<()> { + TestCaseRunner::new(DaThrottleTest) + .set_citrea_path(get_citrea_path()) + .run() + .await +} diff --git a/crates/bitcoin-da/src/fee.rs b/crates/bitcoin-da/src/fee.rs index f45a7ab81..d3a4fc12a 100644 --- a/crates/bitcoin-da/src/fee.rs +++ b/crates/bitcoin-da/src/fee.rs @@ -11,6 +11,7 @@ use bitcoincore_rpc::json::{ BumpFeeResult, CreateRawTransactionInput, WalletCreateFundedPsbtOptions, }; use bitcoincore_rpc::{Client, RpcApi}; +use citrea_common::FeeThrottleConfig; use tracing::{debug, instrument, trace, warn}; use crate::monitoring::{MonitoredTx, MonitoredTxKind}; @@ -186,10 +187,45 @@ pub(crate) async fn get_fee_rate_from_mempool_space( Ok(Some(fee_rate)) } +#[derive(Debug, Clone)] +pub struct FeeThrottleService { + config: FeeThrottleConfig, +} + +impl FeeThrottleService { + pub fn new(config: FeeThrottleConfig) -> Result { + config.validate()?; + + Ok(Self { config }) + } + + /// Get adjusted fee rate according to current da usage + /// Returns base_fee_multiplier (1.0) when usage is below capacity threshold + /// When usage exceeds threshold, increases as: base_fee_multiplier * (1 + scalar * x^factor) + /// where x is the normalized excess usage, capped at max_fee_multiplier + /// Resulting multiplier is capped at max_fee_multiplier + #[instrument(level = "trace", skip_all, ret)] + pub fn get_fee_rate_multiplier(&self, usage_ratio: f64) -> f64 { + if usage_ratio <= self.config.capacity_threshold { + return self.config.base_fee_multiplier; + } + + let excess = usage_ratio - self.config.capacity_threshold; + let normalized_excess = excess / (1.0 - self.config.capacity_threshold); + let multiplier = (self.config.base_fee_multiplier + * (1.0 + + self.config.fee_multiplier_scalar + * normalized_excess.powf(self.config.fee_exponential_factor))) + .min(self.config.max_fee_multiplier); + + debug!("DA usage ratio: {usage_ratio:.2}, multiplier: {multiplier:.2}"); + multiplier + } +} + #[cfg(test)] mod tests { - - use super::{get_fee_rate_from_mempool_space, DEFAULT_MEMPOOL_SPACE_URL}; + use super::*; #[tokio::test] async fn test_mempool_space_fee_rate() { @@ -216,4 +252,32 @@ mod tests { .unwrap() ); } + + #[test] + fn test_fee_multiplier() { + let test_cases = vec![ + (0.0, 1.0), // No usage + (0.25, 1.0), // Below threshold + (0.5, 1.0), // At threshold + (0.6, 1.016), // Above threshold, start increasing fee + (0.7, 1.256), + (0.8, 2.296), + (0.85, 3.40), + (0.9, 4.0), // Max multiplier hit + (0.95, 4.0), + (1.0, 4.0), + ]; + + let fee_service = FeeThrottleService::new(FeeThrottleConfig::default()).unwrap(); + for (usage, expected) in test_cases { + let multiplier = fee_service.get_fee_rate_multiplier(usage); + assert!( + (multiplier - expected).abs() < 0.1, + "Usage {}: expected multiplier {}, got {}", + usage, + expected, + multiplier + ); + } + } } diff --git a/crates/bitcoin-da/src/monitoring.rs b/crates/bitcoin-da/src/monitoring.rs index 751aa0bd9..e1e2bc3fc 100644 --- a/crates/bitcoin-da/src/monitoring.rs +++ b/crates/bitcoin-da/src/monitoring.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -16,7 +16,7 @@ use tokio::select; use tokio::sync::{Mutex, RwLock}; use tokio::time::interval; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, warn}; use crate::service::FINALITY_DEPTH; use crate::spec::utxo::UTXO; @@ -24,6 +24,44 @@ use crate::spec::utxo::UTXO; type BlockHeight = u64; type Result = std::result::Result; +#[derive(Debug)] +pub struct DaUsageWindow { + pub start_time: AtomicU64, + pub current_da_usage: AtomicU64, + max_da_bandwith_bytes: u64, +} + +impl DaUsageWindow { + fn new(max_da_bandwith_bytes: u64) -> Self { + Self { + start_time: AtomicU64::new( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + ), + current_da_usage: AtomicU64::new(0), + max_da_bandwith_bytes, + } + } + + pub fn usage_ratio(&self) -> f64 { + let current_usage = self.current_da_usage.load(Ordering::SeqCst); + current_usage as f64 / self.max_da_bandwith_bytes as f64 + } + + fn reset(&self) { + self.start_time.store( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + Ordering::SeqCst, + ); + self.current_da_usage.store(0, Ordering::SeqCst); + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum TxStatus { Pending { @@ -123,6 +161,13 @@ pub enum MonitorError { } mod monitoring_defaults { + pub const fn max_da_bandwidth_bytes() -> u64 { + 4 * 1024 * 1024 // 4MB + } + + pub const fn window_duration_secs() -> u64 { + 600 // 10 minutes + } pub const fn check_interval() -> u64 { 60 } @@ -144,6 +189,10 @@ pub struct MonitoringConfig { pub history_limit: usize, #[serde(default = "monitoring_defaults::max_history_size")] pub max_history_size: usize, + #[serde(default = "monitoring_defaults::max_da_bandwidth_bytes")] + max_da_bandwidth_bytes: u64, + #[serde(default = "monitoring_defaults::window_duration_secs")] + window_duration_secs: u64, } impl Default for MonitoringConfig { @@ -152,6 +201,8 @@ impl Default for MonitoringConfig { check_interval: monitoring_defaults::check_interval(), history_limit: monitoring_defaults::history_limit(), max_history_size: monitoring_defaults::max_history_size(), + max_da_bandwidth_bytes: monitoring_defaults::max_da_bandwidth_bytes(), + window_duration_secs: monitoring_defaults::window_duration_secs(), } } } @@ -162,9 +213,17 @@ impl FromEnv for MonitoringConfig { std::env::var("DA_MONITORING_CHECK_INTERVAL"), std::env::var("DA_MONITORING_HISTORY_LIMIT"), std::env::var("DA_MONITORING_MAX_HISTORY_SIZE"), + std::env::var("DA_MONITORING_MAX_DA_BANDWIDTH_BYTES"), + std::env::var("DA_MONITORING_WINDOW_DURATION_SECS"), ) { - (Err(_), Err(_), Err(_)) => Err(anyhow!("Missing monitoring config")), - (check_interval, history_limit, max_history_size) => Ok(MonitoringConfig { + (Err(_), Err(_), Err(_), Err(_), Err(_)) => Err(anyhow!("Missing monitoring config")), + ( + check_interval, + history_limit, + max_history_size, + max_da_bandwidth_bytes, + window_duration_secs, + ) => Ok(MonitoringConfig { check_interval: check_interval.map_or_else( |_| Ok(monitoring_defaults::check_interval()), |v| v.parse().map_err(Into::::into), @@ -177,6 +236,14 @@ impl FromEnv for MonitoringConfig { |_| Ok(monitoring_defaults::max_history_size()), |v| v.parse().map_err(Into::::into), )?, + max_da_bandwidth_bytes: max_da_bandwidth_bytes.map_or_else( + |_| Ok(monitoring_defaults::max_da_bandwidth_bytes()), + |v| v.parse().map_err(Into::::into), + )?, + window_duration_secs: window_duration_secs.map_or_else( + |_| Ok(monitoring_defaults::window_duration_secs()), + |v| v.parse().map_err(Into::::into), + )?, }), } } @@ -193,15 +260,18 @@ pub struct MonitoringService { // Keep track of total monitored transaction size // Only takes into account inner tx field from MonitoredTx total_size: AtomicUsize, + usage_window: Arc, } impl MonitoringService { pub fn new(client: Arc, config: Option) -> Self { + let config = config.unwrap_or_default(); Self { client, monitored_txs: RwLock::new(HashMap::new()), chain_state: RwLock::new(ChainState::default()), - config: config.unwrap_or_default(), + usage_window: Arc::new(DaUsageWindow::new(config.max_da_bandwidth_bytes)), + config, last_tx: Mutex::new(None), total_size: AtomicUsize::new(0), } @@ -251,7 +321,8 @@ impl MonitoringService { /// Run monitoring to keep track of TX status and chain re-orgs pub async fn run(self: Arc, token: CancellationToken) { - let mut interval = interval(Duration::from_secs(self.config.check_interval)); + let mut check_interval = interval(Duration::from_secs(self.config.check_interval)); + let mut window_interval = interval(Duration::from_secs(self.config.window_duration_secs)); loop { select! { biased; @@ -259,7 +330,10 @@ impl MonitoringService { debug!("Monitoring service received shutdown signal"); break; } - _ = interval.tick() => { + _ = window_interval.tick() => { + self.usage_window.reset(); + } + _ = check_interval.tick() => { if let Err(e) = self.check_chain_state().await { error!("Error checking chain state: {}", e); } @@ -353,6 +427,8 @@ impl MonitoringService { kind, }; + self.record_da_usage(&monitored_tx.tx).await; + self.monitored_txs.write().await.insert(txid, monitored_tx); *self.last_tx.lock().await = Some(txid); debug!("[monitor_transaction_chain] setting last_tx : {:?}", txid); @@ -396,6 +472,13 @@ impl MonitoringService { next_txid: monitored_tx.next_txid, }; + // Update da usage with replacing tx + let old_tx_size = monitored_tx.tx.total_size() as u64; + self.usage_window + .current_da_usage + .fetch_sub(old_tx_size, Ordering::SeqCst); + self.record_da_usage(&new_tx.tx).await; + { let mut monitored_txs = self.monitored_txs.write().await; if let Some(prev_tx) = monitored_txs.get_mut(&prev_txid) { @@ -602,4 +685,37 @@ impl MonitoringService { parent.next_txid = Some(next_txid); } } + + #[instrument(level = "trace", skip_all)] + pub fn get_current_usage_window(&self) -> Arc { + self.usage_window.clone() + } + + #[instrument(level = "trace", skip_all)] + pub fn get_da_usage_ratio(&self) -> f64 { + self.usage_window.usage_ratio() + } + + #[instrument(level = "trace", skip_all)] + pub async fn record_da_usage(&self, tx: &Transaction) { + let tx_size = tx.total_size() as u64; + let new_usage = self + .usage_window + .current_da_usage + .fetch_add(tx_size, Ordering::SeqCst) + + tx_size; + + debug!( + "Recording usage for tx {tx:?}, size {tx_size} bytes. Current total: {} bytes", + new_usage + ); + + // TODO decide what to do when TX goes above max_da_bandwidth_bytes. + if new_usage > self.config.max_da_bandwidth_bytes { + warn!( + "DA usage above the max limit. Current usage {}, limit {}", + new_usage, self.config.max_da_bandwidth_bytes + ) + } + } } diff --git a/crates/bitcoin-da/src/rpc.rs b/crates/bitcoin-da/src/rpc.rs index eaaf606e5..4edb0dfbf 100644 --- a/crates/bitcoin-da/src/rpc.rs +++ b/crates/bitcoin-da/src/rpc.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::Ordering; use std::sync::Arc; use bitcoin::Txid; @@ -44,6 +45,14 @@ impl From<(Txid, MonitoredTx)> for MonitoredTxResponse { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DaUsageResponse { + pub total_bytes: u64, + pub start_time: u64, + pub usage_ratio: f64, + pub fee_multiplier: Option, +} + #[rpc(client, server, namespace = "da")] pub trait DaRpc { #[method(name = "getPendingTransactions")] @@ -58,6 +67,9 @@ pub trait DaRpc { #[method(name = "getLastMonitoredTx")] async fn da_get_last_monitored_tx(&self) -> RpcResult>; + #[method(name = "usageWindow")] + async fn da_usage_window(&self) -> RpcResult; + #[method(name = "bumpFeeCpfp")] async fn da_bump_transaction_fee_cpfp( &self, @@ -145,6 +157,23 @@ impl DaRpcServer for DaRpcServerImpl { ) }) } + + async fn da_usage_window(&self) -> RpcResult { + let usage_window = self.da.monitoring.get_current_usage_window(); + let usage_ratio = usage_window.usage_ratio(); + let fee_multiplier = self + .da + .fee_throttle + .as_ref() + .map(|throttler| throttler.get_fee_rate_multiplier(usage_ratio)); + + Ok(DaUsageResponse { + total_bytes: usage_window.current_da_usage.load(Ordering::Relaxed), + start_time: usage_window.start_time.load(Ordering::Relaxed), + usage_ratio, + fee_multiplier, + }) + } } pub struct DaRpcServerImpl { diff --git a/crates/bitcoin-da/src/service.rs b/crates/bitcoin-da/src/service.rs index 972c6b670..3edcf4fa6 100644 --- a/crates/bitcoin-da/src/service.rs +++ b/crates/bitcoin-da/src/service.rs @@ -20,6 +20,7 @@ use bitcoin::{Amount, BlockHash, CompactTarget, Transaction, Txid, Wtxid}; use bitcoincore_rpc::json::{SignRawTransactionInput, TestMempoolAcceptResult}; use bitcoincore_rpc::{Auth, Client, Error as BitcoinError, Error, RpcApi, RpcError}; use borsh::BorshDeserialize; +use citrea_common::FeeThrottleConfig; use citrea_primitives::compression::{compress_blob, decompress_blob}; use citrea_primitives::MAX_TXBODY_SIZE; use serde::{Deserialize, Serialize}; @@ -34,7 +35,7 @@ use tokio::sync::oneshot::channel as oneshot_channel; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, trace, warn}; -use crate::fee::{BumpFeeMethod, FeeService}; +use crate::fee::{BumpFeeMethod, FeeService, FeeThrottleService}; use crate::helpers::builders::batch_proof_namespace::{ create_seqcommitment_transactions, BatchProvingTxs, }; @@ -110,7 +111,8 @@ pub struct BitcoinService { inscribes_queue: UnboundedSender>, tx_backup_dir: PathBuf, pub monitoring: Arc, - fee: FeeService, + pub fee: FeeService, + pub fee_throttle: Option, } impl BitcoinService { @@ -119,6 +121,7 @@ impl BitcoinService { config: BitcoinServiceConfig, chain_params: RollupParams, tx: UnboundedSender>, + throttle_config: Option, ) -> Result { let client = Arc::new( Client::new( @@ -162,6 +165,7 @@ impl BitcoinService { tx_backup_dir: tx_backup_dir.to_path_buf(), monitoring, fee, + fee_throttle: throttle_config.map(FeeThrottleService::new).transpose()?, }) } @@ -205,6 +209,7 @@ impl BitcoinService { tx_backup_dir: tx_backup_dir.to_path_buf(), monitoring, fee, + fee_throttle: None, }) } @@ -1087,8 +1092,16 @@ impl DaService for BitcoinService { async fn get_fee_rate(&self) -> Result { let sat_vb_ceil = self.fee.get_fee_rate_as_sat_vb().await? as u128; + let usage_ratio = self.monitoring.get_da_usage_ratio(); + let throttle_multiplier = self.fee_throttle.as_ref().map_or(1.0f64, |throttler| { + throttler.get_fee_rate_multiplier(usage_ratio) + }); + // multiply with 10^10/4 = 25*10^8 = 2_500_000_000 for BTC to CBTC conversion (decimals) - let multiplied_fee = sat_vb_ceil.saturating_mul(2_500_000_000); + let base_multiplier = 2_500_000_000f64; + let multiplier = (base_multiplier * throttle_multiplier) as u128; + + let multiplied_fee = sat_vb_ceil.saturating_mul(multiplier); Ok(multiplied_fee) } diff --git a/crates/common/src/config.rs b/crates/common/src/config/mod.rs similarity index 82% rename from crates/common/src/config.rs rename to crates/common/src/config/mod.rs index 35258c95b..f01d08ed5 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config/mod.rs @@ -2,6 +2,7 @@ use std::fs::File; use std::io::Read; use std::path::{Path, PathBuf}; +use anyhow::bail; use citrea_pruning::PruningConfig; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -328,6 +329,8 @@ pub struct SequencerConfig { pub da_update_interval_ms: u64, /// Block production interval in ms pub block_production_interval_ms: u64, + /// Fee throttle config + pub fee_throttle: FeeThrottleConfig, } impl Default for SequencerConfig { @@ -341,6 +344,7 @@ impl Default for SequencerConfig { block_production_interval_ms: 100, da_update_interval_ms: 100, mempool_conf: Default::default(), + fee_throttle: FeeThrottleConfig::default(), } } } @@ -358,6 +362,7 @@ impl FromEnv for SequencerConfig { mempool_conf: SequencerMempoolConfig::from_env()?, da_update_interval_ms: std::env::var("DA_UPDATE_INTERVAL_MS")?.parse()?, block_production_interval_ms: std::env::var("BLOCK_PRODUCTION_INTERVAL_MS")?.parse()?, + fee_throttle: FeeThrottleConfig::from_env()?, }) } } @@ -410,6 +415,127 @@ impl FromEnv for SequencerMempoolConfig { } } +#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] +pub struct FeeThrottleConfig { + #[serde(default = "defaults::capacity_threshold")] + pub capacity_threshold: f64, + #[serde(default = "defaults::base_fee_multiplier")] + pub base_fee_multiplier: f64, + #[serde(default = "defaults::max_fee_multiplier")] + pub max_fee_multiplier: f64, + #[serde(default = "defaults::fee_exponential_factor")] + pub fee_exponential_factor: f64, + #[serde(default = "defaults::fee_multiplier_scalar")] + pub fee_multiplier_scalar: f64, +} + +mod defaults { + // Threshold after which fee start to increase exponentially + pub const fn capacity_threshold() -> f64 { + 0.50 + } + + // Multiplier used while below CAPACITY_THRESHOLD + pub const fn base_fee_multiplier() -> f64 { + 1.0 + } + + // Max multiplier over threshold + pub const fn max_fee_multiplier() -> f64 { + 4.0 + } + + // Exponential factor to adjust steepness of fee rise + pub const fn fee_exponential_factor() -> f64 { + 4.0 + } + + pub const fn fee_multiplier_scalar() -> f64 { + 10.0 + } +} + +impl Default for FeeThrottleConfig { + fn default() -> Self { + Self { + capacity_threshold: defaults::capacity_threshold(), + base_fee_multiplier: defaults::base_fee_multiplier(), + max_fee_multiplier: defaults::max_fee_multiplier(), + fee_exponential_factor: defaults::fee_exponential_factor(), + fee_multiplier_scalar: defaults::fee_multiplier_scalar(), + } + } +} + +impl FromEnv for FeeThrottleConfig { + fn from_env() -> anyhow::Result { + Ok(FeeThrottleConfig { + capacity_threshold: std::env::var("L1_FEE_CAPACITY_THRESHOLD").map_or_else( + |_| Ok(defaults::capacity_threshold()), + |v| v.parse().map_err(Into::::into), + )?, + base_fee_multiplier: std::env::var("L1_FEE_BASE_FEE_MULTIPLIER").map_or_else( + |_| Ok(defaults::base_fee_multiplier()), + |v| v.parse().map_err(Into::::into), + )?, + max_fee_multiplier: std::env::var("L1_FEE_MAX_FEE_MULTIPLIER").map_or_else( + |_| Ok(defaults::max_fee_multiplier()), + |v| v.parse().map_err(Into::::into), + )?, + fee_exponential_factor: std::env::var("L1_FEE_EXPONENTIAL_FACTOR").map_or_else( + |_| Ok(defaults::fee_exponential_factor()), + |v| v.parse().map_err(Into::::into), + )?, + fee_multiplier_scalar: std::env::var("L1_FEE_MULTIPLIER_SCALAR").map_or_else( + |_| Ok(defaults::fee_multiplier_scalar()), + |v| v.parse().map_err(Into::::into), + )?, + }) + } +} + +impl FeeThrottleConfig { + pub fn validate(&self) -> anyhow::Result<()> { + if !(0.0..=1.0).contains(&self.capacity_threshold) { + bail!( + "capacity_threshold must be between 0 and 1, got {}", + self.capacity_threshold + ); + } + + if self.base_fee_multiplier < 1.0 { + bail!( + "base_fee_multiplier must be >= 1.0, got {}", + self.base_fee_multiplier + ); + } + + if self.max_fee_multiplier <= self.base_fee_multiplier { + bail!( + "max_fee_multiplier must be > base_fee_multiplier ({} <= {})", + self.max_fee_multiplier, + self.base_fee_multiplier + ); + } + + if self.fee_exponential_factor <= 0.0 { + bail!( + "fee_exponential_factor must be > 0, got {}", + self.fee_exponential_factor + ); + } + + if self.fee_multiplier_scalar <= 0.0 { + bail!( + "fee_multiplier_scalar must be > 0, got {}", + self.fee_multiplier_scalar + ); + } + + Ok(()) + } +} + /// Telemetry configuration. #[derive(Debug, Default, Clone, PartialEq, Deserialize, Serialize)] pub struct TelemetryConfig { @@ -555,6 +681,12 @@ mod tests { base_fee_tx_limit = 100000 base_fee_tx_size = 200 max_account_slots = 16 + [fee_throttle] + capacity_threshold = 0.5 + base_fee_multiplier = 1.0 + max_fee_multiplier = 4.0 + fee_exponential_factor = 4.0 + fee_multiplier_scalar = 10.0 "#; let config_file = create_config_from(config); @@ -578,6 +710,13 @@ mod tests { }, da_update_interval_ms: 1000, block_production_interval_ms: 1000, + fee_throttle: FeeThrottleConfig { + capacity_threshold: 0.5, + base_fee_multiplier: 1.0, + max_fee_multiplier: 4.0, + fee_exponential_factor: 4.0, + fee_multiplier_scalar: 10.0, + }, }; assert_eq!(config, expected); } @@ -615,6 +754,11 @@ mod tests { std::env::set_var("BASE_FEE_TX_LIMIT", "100000"); std::env::set_var("BASE_FEE_TX_SIZE", "200"); std::env::set_var("MAX_ACCOUNT_SLOTS", "16"); + std::env::set_var("L1_FEE_CAPACITY_THRESHOLD", "0.5"); + std::env::set_var("L1_FEE_BASE_FEE_MULTIPLIER", "1.0"); + std::env::set_var("L1_FEE_MAX_FEE_MULTIPLIER", "4.0"); + std::env::set_var("L1_FEE_EXPONENTIAL_FACTOR", "4.0"); + std::env::set_var("L1_FEE_MULTIPLIER_SCALAR", "10.0"); let sequencer_config = SequencerConfig::from_env().unwrap(); @@ -635,6 +779,13 @@ mod tests { }, da_update_interval_ms: 1000, block_production_interval_ms: 1000, + fee_throttle: FeeThrottleConfig { + capacity_threshold: 0.5, + base_fee_multiplier: 1.0, + max_fee_multiplier: 4.0, + fee_exponential_factor: 4.0, + fee_multiplier_scalar: 10.0, + }, }; assert_eq!(sequencer_config, expected); } diff --git a/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs b/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs index e1757e7a7..fe828ffcc 100644 --- a/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs +++ b/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use citrea_common::tasks::manager::TaskManager; -use citrea_common::FullNodeConfig; +use citrea_common::{FeeThrottleConfig, FullNodeConfig}; use sov_db::ledger_db::LedgerDB; use sov_db::rocks_db_config::RocksdbConfig; use sov_modules_api::{Context, DaSpec, Spec}; @@ -115,6 +115,7 @@ pub trait RollupBlueprint: Sized + Send + Sync { rollup_config: &FullNodeConfig, require_wallet_check: bool, task_manager: &mut TaskManager<()>, + throttle_config: Option, ) -> Result, anyhow::Error>; /// Creates instance of [`BitcoinDaVerifier`] diff --git a/resources/configs/bitcoin-regtest/sequencer_config.toml b/resources/configs/bitcoin-regtest/sequencer_config.toml index f4c639071..a8a97f68b 100644 --- a/resources/configs/bitcoin-regtest/sequencer_config.toml +++ b/resources/configs/bitcoin-regtest/sequencer_config.toml @@ -13,3 +13,10 @@ queue_tx_size = 200 base_fee_tx_limit = 100000 base_fee_tx_size = 200 max_account_slots = 16 + +[fee_throttle] +capacity_threshold = 0.5 +base_fee_multiplier = 1.0 +max_fee_multiplier = 4.0 +fee_exponential_factor = 4.0 +fee_multiplier_scalar = 10.0 diff --git a/resources/configs/bitcoin-regtest/sequencer_rollup_config.toml b/resources/configs/bitcoin-regtest/sequencer_rollup_config.toml index d327bf6a9..810240a2d 100644 --- a/resources/configs/bitcoin-regtest/sequencer_rollup_config.toml +++ b/resources/configs/bitcoin-regtest/sequencer_rollup_config.toml @@ -14,6 +14,13 @@ network = "regtest" da_private_key = "E9873D79C6D87DC0FB6A5778633389F4453213303DA61F20BD67FC233AA33262" tx_backup_dir = "resources/bitcoin/inscription_txs" +[da.monitoring] +check_interval = 1 +history_limit = 100 +max_history_size = 1000000 +max_da_bandwidth_bytes = 4194304 +window_duration_secs = 600 + [storage] # The path to the rollup's data directory. Paths that do not begin with `/` are interpreted as relative paths. path = "resources/dbs/sequencer-db" diff --git a/resources/configs/mock-dockerized/sequencer_config.toml b/resources/configs/mock-dockerized/sequencer_config.toml index 97795bf4d..b79af9531 100644 --- a/resources/configs/mock-dockerized/sequencer_config.toml +++ b/resources/configs/mock-dockerized/sequencer_config.toml @@ -13,3 +13,10 @@ queue_tx_size = 200 base_fee_tx_limit = 100000 base_fee_tx_size = 200 max_account_slots = 16 + +[fee_throttle] +capacity_threshold = 0.5 +base_fee_multiplier = 1.0 +max_fee_multiplier = 4.0 +fee_exponential_factor = 4.0 +fee_multiplier_scalar = 10.0 diff --git a/resources/configs/mock/sequencer_config.toml b/resources/configs/mock/sequencer_config.toml index 97795bf4d..b79af9531 100644 --- a/resources/configs/mock/sequencer_config.toml +++ b/resources/configs/mock/sequencer_config.toml @@ -13,3 +13,10 @@ queue_tx_size = 200 base_fee_tx_limit = 100000 base_fee_tx_size = 200 max_account_slots = 16 + +[fee_throttle] +capacity_threshold = 0.5 +base_fee_multiplier = 1.0 +max_fee_multiplier = 4.0 +fee_exponential_factor = 4.0 +fee_multiplier_scalar = 10.0