diff --git a/Cargo.lock b/Cargo.lock index cde004878..ff04cc857 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1890,6 +1890,7 @@ dependencies = [ "hyper", "jsonrpsee", "lru", + "metrics", "serde", "serde_json", "sov-db", @@ -1902,6 +1903,7 @@ dependencies = [ "tokio", "tokio-util", "toml", + "tower 0.4.13", "tower-http", "tracing", ] diff --git a/bin/citrea/Cargo.toml b/bin/citrea/Cargo.toml index c3b99f452..0d25dd12d 100644 --- a/bin/citrea/Cargo.toml +++ b/bin/citrea/Cargo.toml @@ -48,7 +48,7 @@ anyhow = { workspace = true } async-trait = { workspace = true } borsh = { workspace = true, features = ["bytes"] } clap = { workspace = true } -hex = { workspace = true, optional = true } +hex = { workspace = true } jsonrpsee = { workspace = true, features = ["http-client", "server"] } lazy_static = { workspace = true } log-panics = { workspace = true } @@ -80,7 +80,6 @@ alloy-rpc-types = { workspace = true } alloy-rpc-types-trace = { workspace = true } bincode = { workspace = true } borsh = { workspace = true } -hex = { workspace = true } jmt = { workspace = true } rand = { workspace = true } reqwest = { workspace = true } diff --git a/bin/citrea/src/cli.rs b/bin/citrea/src/cli.rs new file mode 100644 index 000000000..b05afd75b --- /dev/null +++ b/bin/citrea/src/cli.rs @@ -0,0 +1,116 @@ +use anyhow::Context; +use citrea::NetworkArg; +use citrea_common::{ + from_toml_path, BatchProverConfig, FromEnv, LightClientProverConfig, SequencerConfig, +}; +use clap::{command, Parser}; + +#[derive(clap::ValueEnum, Clone, Debug)] +pub(crate) enum SupportedDaLayer { + Mock, + Bitcoin, +} + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +pub(crate) struct Args { + /// The mode in which the node runs. + /// This determines which guest code to use. + /// Default is Mainnet. + #[clap(short, long, default_value_t, value_enum)] + pub(crate) network: NetworkArg, + + /// Run the development chain + #[arg(long, default_value_t)] + pub(crate) dev: bool, + + /// Run the regtest chain + #[arg(long, default_value_t, conflicts_with = "dev")] + pub(crate) dev_all_forks: bool, + + /// Path to the genesis configuration. + /// Defines the genesis of module states like evm. + #[arg(long)] + pub(crate) genesis_paths: String, + + /// The data layer type. + #[arg(long, default_value = "mock")] + pub(crate) da_layer: SupportedDaLayer, + + /// The path to the rollup config, if a string is provided, it will be used as the path to the rollup config, otherwise environment variables will be used. + #[arg(long)] + pub(crate) rollup_config_path: Option, + + /// The option to run the node in sequencer mode, if a string is provided, it will be used as the path to the sequencer config, otherwise environment variables will be used. + #[arg(long, conflicts_with_all = ["batch_prover", "light_client_prover"])] + pub(crate) sequencer: Option>, + + /// The option to run the node in batch prover mode, if a string is provided, it will be used as the path to the batch prover config, otherwise the environment variables will be used. + #[arg(long, conflicts_with_all = ["sequencer", "light_client_prover"])] + pub(crate) batch_prover: Option>, + + /// The option to run the node in light client prover mode, if a string is provided, it will be used as the path to the light client prover config, otherwise the environment variables will be used. + #[arg(long, conflicts_with_all = ["sequencer", "batch_prover"])] + pub(crate) light_client_prover: Option>, + + /// Logging verbosity + #[arg(long, short = 'v', action = clap::ArgAction::Count, default_value = "2")] + pub(crate) verbose: u8, + /// Logging verbosity + #[arg(long, short = 'q', action)] + pub(crate) quiet: bool, +} + +pub(crate) enum NodeType { + Sequencer(SequencerConfig), + FullNode, + BatchProver(BatchProverConfig), + LightClientProver(LightClientProverConfig), +} + +pub(crate) fn node_type_from_args(args: &Args) -> anyhow::Result { + let sequencer_config = match &args.sequencer { + Some(Some(path)) => Some( + from_toml_path(path) + .context("Failed to read sequencer configuration from the config file")?, + ), + Some(None) => Some( + SequencerConfig::from_env() + .context("Failed to read sequencer configuration from the environment")?, + ), + None => None, + }; + + let batch_prover_config = match &args.batch_prover { + Some(Some(path)) => Some( + from_toml_path(path) + .context("Failed to read prover configuration from the config file")?, + ), + Some(None) => Some( + BatchProverConfig::from_env() + .context("Failed to read prover configuration from the environment")?, + ), + None => None, + }; + + let light_client_prover_config = match &args.light_client_prover { + Some(Some(path)) => Some( + from_toml_path(path) + .context("Failed to read prover configuration from the config file")?, + ), + Some(None) => Some( + LightClientProverConfig::from_env() + .context("Failed to read prover configuration from the environment")?, + ), + None => None, + }; + + if let Some(sequencer_config) = sequencer_config { + return Ok(NodeType::Sequencer(sequencer_config)); + } else if let Some(batch_prover_config) = batch_prover_config { + return Ok(NodeType::BatchProver(batch_prover_config)); + } else if let Some(light_client_prover_config) = light_client_prover_config { + return Ok(NodeType::LightClientProver(light_client_prover_config)); + } + Ok(NodeType::FullNode) +} diff --git a/bin/citrea/src/main.rs b/bin/citrea/src/main.rs index 8bab72632..78fd29252 100644 --- a/bin/citrea/src/main.rs +++ b/bin/citrea/src/main.rs @@ -5,16 +5,22 @@ use std::time::Duration; use anyhow::{anyhow, Context as _}; use bitcoin_da::service::BitcoinServiceConfig; use citrea::{ - initialize_logging, BitcoinRollup, CitreaRollupBlueprint, MockDemoRollup, NetworkArg, -}; -use citrea_common::{ - from_toml_path, BatchProverConfig, FromEnv, FullNodeConfig, LightClientProverConfig, - SequencerConfig, + initialize_logging, BitcoinRollup, CitreaRollupBlueprint, Dependencies, MockDemoRollup, Storage, }; +use citrea_common::da::get_start_l1_height; +use citrea_common::rpc::server::start_rpc_server; +use citrea_common::{from_toml_path, FromEnv, FullNodeConfig}; +use citrea_light_client_prover::da_block_handler::StartVariant; use citrea_stf::genesis_config::GenesisPaths; use clap::Parser; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_util::MetricKindMask; +use sov_db::ledger_db::SharedLedgerOps; +use sov_db::rocks_db_config::RocksdbConfig; +use sov_db::schema::tables::{ + BATCH_PROVER_LEDGER_TABLES, FULL_NODE_LEDGER_TABLES, LIGHT_CLIENT_PROVER_LEDGER_TABLES, + SEQUENCER_LEDGER_TABLES, +}; use sov_mock_da::MockDaConfig; use sov_modules_api::Spec; use sov_modules_rollup_blueprint::RollupBlueprint; @@ -22,69 +28,15 @@ use sov_rollup_interface::Network; use sov_state::storage::NativeStorage; use tracing::{debug, error, info, instrument}; +use crate::cli::{node_type_from_args, Args, NodeType, SupportedDaLayer}; + +mod cli; #[cfg(test)] mod test_rpc; /// Main runner. Initializes a DA service, and starts a node using the provided arguments. - -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - /// The mode in which the node runs. - /// This determines which guest code to use. - /// Default is Mainnet. - #[clap(short, long, default_value_t, value_enum)] - network: NetworkArg, - - /// Run the development chain - #[arg(long, default_value_t)] - dev: bool, - - /// Run the regtest chain - #[arg(long, default_value_t, conflicts_with = "dev")] - dev_all_forks: bool, - - /// Path to the genesis configuration. - /// Defines the genesis of module states like evm. - #[arg(long)] - genesis_paths: String, - - /// The data layer type. - #[arg(long, default_value = "mock")] - da_layer: SupportedDaLayer, - - /// The path to the rollup config, if a string is provided, it will be used as the path to the rollup config, otherwise environment variables will be used. - #[arg(long)] - rollup_config_path: Option, - - /// The option to run the node in sequencer mode, if a string is provided, it will be used as the path to the sequencer config, otherwise environment variables will be used. - #[arg(long, conflicts_with_all = ["batch_prover", "light_client_prover"])] - sequencer: Option>, - - /// The option to run the node in batch prover mode, if a string is provided, it will be used as the path to the batch prover config, otherwise the environment variables will be used. - #[arg(long, conflicts_with_all = ["sequencer", "light_client_prover"])] - batch_prover: Option>, - - /// The option to run the node in light client prover mode, if a string is provided, it will be used as the path to the light client prover config, otherwise the environment variables will be used. - #[arg(long, conflicts_with_all = ["sequencer", "batch_prover"])] - light_client_prover: Option>, - - /// Logging verbosity - #[arg(long, short = 'v', action = clap::ArgAction::Count, default_value = "2")] - verbose: u8, - /// Logging verbosity - #[arg(long, short = 'q', action)] - quiet: bool, -} - -#[derive(clap::ValueEnum, Clone, Debug)] -enum SupportedDaLayer { - Mock, - Bitcoin, -} - #[tokio::main] -async fn main() -> Result<(), anyhow::Error> { +async fn main() -> anyhow::Result<()> { let mut args = Args::parse(); if args.quiet { @@ -100,57 +52,7 @@ async fn main() -> Result<(), anyhow::Error> { }; initialize_logging(logging_level); - let sequencer_config = match args.sequencer { - Some(Some(path)) => Some( - from_toml_path(path) - .context("Failed to read sequencer configuration from the config file")?, - ), - Some(None) => Some( - SequencerConfig::from_env() - .context("Failed to read sequencer configuration from the environment")?, - ), - None => None, - }; - - let batch_prover_config = match args.batch_prover { - Some(Some(path)) => Some( - from_toml_path(path) - .context("Failed to read prover configuration from the config file")?, - ), - Some(None) => Some( - BatchProverConfig::from_env() - .context("Failed to read prover configuration from the environment")?, - ), - None => None, - }; - - let light_client_prover_config = match args.light_client_prover { - Some(Some(path)) => Some( - from_toml_path(path) - .context("Failed to read prover configuration from the config file")?, - ), - Some(None) => Some( - LightClientProverConfig::from_env() - .context("Failed to read prover configuration from the environment")?, - ), - None => None, - }; - - if batch_prover_config.is_some() && sequencer_config.is_some() { - return Err(anyhow::anyhow!( - "Cannot run in both batch prover and sequencer mode at the same time" - )); - } - if batch_prover_config.is_some() && light_client_prover_config.is_some() { - return Err(anyhow::anyhow!( - "Cannot run in both batch prover and light client prover mode at the same time" - )); - } - if light_client_prover_config.is_some() && sequencer_config.is_some() { - return Err(anyhow::anyhow!( - "Cannot run in both light client prover and sequencer mode at the same time" - )); - } + let node_type = node_type_from_args(&args)?; let mut network = args.network.into(); if args.dev { @@ -169,9 +71,7 @@ async fn main() -> Result<(), anyhow::Error> { network, &GenesisPaths::from_dir(&args.genesis_paths), args.rollup_config_path, - batch_prover_config, - light_client_prover_config, - sequencer_config, + node_type, ) .await?; } @@ -180,9 +80,7 @@ async fn main() -> Result<(), anyhow::Error> { network, &GenesisPaths::from_dir(&args.genesis_paths), args.rollup_config_path, - batch_prover_config, - light_client_prover_config, - sequencer_config, + node_type, ) .await?; } @@ -194,17 +92,15 @@ async fn main() -> Result<(), anyhow::Error> { #[instrument(level = "trace", skip_all, err)] async fn start_rollup( network: Network, - rt_genesis_paths: &<::NativeRuntime as sov_modules_stf_blueprint::Runtime< + runtime_genesis_paths: &<::NativeRuntime as sov_modules_stf_blueprint::Runtime< ::NativeContext, ::DaSpec, >>::GenesisPaths, rollup_config_path: Option, - batch_prover_config: Option, - light_client_prover_config: Option, - sequencer_config: Option, + node_type: NodeType, ) -> Result<(), anyhow::Error> where - DaC: serde::de::DeserializeOwned + DebugTrait + Clone + FromEnv, + DaC: serde::de::DeserializeOwned + DebugTrait + Clone + FromEnv + Send + Sync + 'static, S: CitreaRollupBlueprint, <::NativeContext as Spec>::Storage: NativeStorage, { @@ -236,64 +132,220 @@ where let rollup_blueprint = S::new(network); - if let Some(sequencer_config) = sequencer_config { - let (mut sequencer, rpc_methods) = rollup_blueprint - .create_new_sequencer(rt_genesis_paths, rollup_config.clone(), sequencer_config) - .await - .expect("Could not start sequencer"); - sequencer.start_rpc_server(rpc_methods, None).await.unwrap(); - - if let Err(e) = sequencer.run().await { - error!("Error: {}", e); + // Based on the node's type, execute migrations before constructing an instance of LedgerDB + // so that avoid locking the DB. + let (tables, migrations) = match node_type { + NodeType::Sequencer(_) => ( + SEQUENCER_LEDGER_TABLES + .iter() + .map(|table| table.to_string()) + .collect::>(), + citrea_sequencer::db_migrations::migrations(), + ), + NodeType::FullNode => ( + FULL_NODE_LEDGER_TABLES + .iter() + .map(|table| table.to_string()) + .collect::>(), + citrea_fullnode::db_migrations::migrations(), + ), + NodeType::BatchProver(_) => ( + BATCH_PROVER_LEDGER_TABLES + .iter() + .map(|table| table.to_string()) + .collect::>(), + citrea_batch_prover::db_migrations::migrations(), + ), + NodeType::LightClientProver(_) => ( + LIGHT_CLIENT_PROVER_LEDGER_TABLES + .iter() + .map(|table| table.to_string()) + .collect::>(), + citrea_light_client_prover::db_migrations::migrations(), + ), + }; + rollup_blueprint.run_ledger_migrations(&rollup_config, tables.clone(), migrations)?; + + let genesis_config = + rollup_blueprint.create_genesis_config(runtime_genesis_paths, &rollup_config)?; + + let rocksdb_path = rollup_config.storage.path.clone(); + let rocksdb_config = RocksdbConfig::new( + rocksdb_path.as_path(), + rollup_config.storage.db_max_open_files, + Some(tables), + ); + + let Storage { + ledger_db, + storage_manager, + prover_storage, + } = rollup_blueprint.setup_storage(&rollup_config, &rocksdb_config)?; + + let Dependencies { + da_service, + mut task_manager, + soft_confirmation_channel, + } = rollup_blueprint.setup_dependencies(&rollup_config).await?; + + let sequencer_client_url = rollup_config + .runner + .clone() + .map(|runner| runner.sequencer_client_url); + let soft_confirmation_rx = match node_type { + NodeType::Sequencer(_) | NodeType::BatchProver(_) | NodeType::FullNode => { + soft_confirmation_channel.1 } - } else if let Some(batch_prover_config) = batch_prover_config { - let (mut prover, rpc_methods) = CitreaRollupBlueprint::create_new_batch_prover( - &rollup_blueprint, - rt_genesis_paths, - rollup_config, - batch_prover_config, - ) - .await - .expect("Could not start batch prover"); - - prover - .start_rpc_server(rpc_methods, None) - .await - .expect("Failed to start rpc server"); + _ => None, + }; - if let Err(e) = prover.run().await { - error!("Error: {}", e); + let rpc_module = rollup_blueprint.setup_rpc( + &prover_storage, + ledger_db.clone(), + da_service.clone(), + sequencer_client_url, + soft_confirmation_rx, + )?; + + match node_type { + NodeType::Sequencer(sequencer_config) => { + let (mut sequencer, rpc_module) = rollup_blueprint + .create_sequencer( + genesis_config, + rollup_config.clone(), + sequencer_config, + da_service, + ledger_db, + storage_manager, + prover_storage, + soft_confirmation_channel.0, + rpc_module, + ) + .expect("Could not start sequencer"); + + start_rpc_server( + rollup_config.rpc.clone(), + &mut task_manager, + rpc_module, + None, + ); + + if let Err(e) = sequencer.run(task_manager).await { + error!("Error: {}", e); + } } - } else if let Some(light_client_prover_config) = light_client_prover_config { - let (mut prover, rpc_methods) = CitreaRollupBlueprint::create_new_light_client_prover( - &rollup_blueprint, - rollup_config, - light_client_prover_config, - ) - .await - .expect("Could not start light client prover"); - - prover - .start_rpc_server(rpc_methods, None) - .await - .expect("Failed to start rpc server"); - - if let Err(e) = prover.run().await { - error!("Error: {}", e); + NodeType::BatchProver(batch_prover_config) => { + let (mut prover, l1_block_handler, rpc_module) = + CitreaRollupBlueprint::create_batch_prover( + &rollup_blueprint, + batch_prover_config, + genesis_config, + rollup_config.clone(), + da_service, + ledger_db.clone(), + storage_manager, + prover_storage, + soft_confirmation_channel.0, + rpc_module, + ) + .await + .expect("Could not start batch prover"); + + start_rpc_server( + rollup_config.rpc.clone(), + &mut task_manager, + rpc_module, + None, + ); + + task_manager.spawn(|cancellation_token| async move { + let Ok(start_l1_height) = get_start_l1_height(&rollup_config, &ledger_db).await + else { + error!("Failed to start prover L1 block handler due to start l1 height not present"); + return; + }; + l1_block_handler + .run(start_l1_height, cancellation_token) + .await + }); + + if let Err(e) = prover.run(task_manager).await { + error!("Error: {}", e); + } + } + NodeType::LightClientProver(light_client_prover_config) => { + let starting_block = match ledger_db.get_last_scanned_l1_height()? { + Some(l1_height) => StartVariant::LastScanned(l1_height.0), + // first time starting the prover + // start from the block given in the config + None => StartVariant::FromBlock(light_client_prover_config.initial_da_height), + }; + + let (mut prover, l1_block_handler, rpc_module) = + CitreaRollupBlueprint::create_light_client_prover( + &rollup_blueprint, + light_client_prover_config, + rollup_config.clone(), + &rocksdb_config, + da_service, + ledger_db, + rpc_module, + ) + .await + .expect("Could not start light client prover"); + + start_rpc_server( + rollup_config.rpc.clone(), + &mut task_manager, + rpc_module, + None, + ); + + task_manager.spawn(|cancellation_token| async move { + l1_block_handler + .run(starting_block, cancellation_token) + .await + }); + + if let Err(e) = prover.run(task_manager).await { + error!("Error: {}", e); + } } - } else { - let (mut rollup, rpc_methods) = CitreaRollupBlueprint::create_new_rollup( - &rollup_blueprint, - rt_genesis_paths, - rollup_config, - ) - .await - .expect("Could not start full-node"); - - rollup.start_rpc_server(rpc_methods, None).await; - - if let Err(e) = rollup.run().await { - error!("Error: {}", e); + _ => { + let (mut full_node, l1_block_handler) = CitreaRollupBlueprint::create_full_node( + &rollup_blueprint, + genesis_config, + rollup_config.clone(), + da_service, + ledger_db.clone(), + storage_manager, + prover_storage, + soft_confirmation_channel.0, + ) + .await + .expect("Could not start full-node"); + + start_rpc_server( + rollup_config.rpc.clone(), + &mut task_manager, + rpc_module, + None, + ); + + task_manager.spawn(|cancellation_token| async move { + let Ok(start_l1_height) = get_start_l1_height(&rollup_config, &ledger_db).await + else { + error!("Failed to start fullnode L1 block handler due to start l1 height not present"); + return; + }; + l1_block_handler + .run(start_l1_height, cancellation_token) + .await + }); + + if let Err(e) = full_node.run(task_manager).await { + error!("Error: {}", e); + } } } diff --git a/bin/citrea/src/rollup/bitcoin.rs b/bin/citrea/src/rollup/bitcoin.rs index 117b713c4..15a0f3b69 100644 --- a/bin/citrea/src/rollup/bitcoin.rs +++ b/bin/citrea/src/rollup/bitcoin.rs @@ -235,7 +235,7 @@ impl RollupBlueprint for BitcoinRollup { } } - fn get_light_client_proof_code_commitment( + fn get_light_client_proof_code_commitments( &self, ) -> HashMap::CodeCommitment> { match self.network { diff --git a/bin/citrea/src/rollup/mock.rs b/bin/citrea/src/rollup/mock.rs index 5297b098f..721cea004 100644 --- a/bin/citrea/src/rollup/mock.rs +++ b/bin/citrea/src/rollup/mock.rs @@ -118,7 +118,7 @@ impl RollupBlueprint for MockDemoRollup { .collect() } - fn get_light_client_proof_code_commitment( + fn get_light_client_proof_code_commitments( &self, ) -> HashMap::CodeCommitment> { LIGHT_CLIENT_LATEST_MOCK_GUESTS diff --git a/bin/citrea/src/rollup/mod.rs b/bin/citrea/src/rollup/mod.rs index 044dd8ea4..703a6d931 100644 --- a/bin/citrea/src/rollup/mod.rs +++ b/bin/citrea/src/rollup/mod.rs @@ -1,97 +1,82 @@ use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Result}; use async_trait::async_trait; +use citrea_batch_prover::da_block_handler::L1BlockHandler as BatchProverL1BlockHandler; use citrea_batch_prover::CitreaBatchProver; use citrea_common::tasks::manager::TaskManager; use citrea_common::{BatchProverConfig, FullNodeConfig, LightClientProverConfig, SequencerConfig}; +use citrea_fullnode::da_block_handler::L1BlockHandler as FullNodeL1BlockHandler; use citrea_fullnode::CitreaFullnode; +use citrea_light_client_prover::da_block_handler::L1BlockHandler as LightClientProverL1BlockHandler; use citrea_light_client_prover::runner::CitreaLightClientProver; use citrea_primitives::forks::get_forks; use citrea_sequencer::CitreaSequencer; use jsonrpsee::RpcModule; -use sov_db::ledger_db::migrations::LedgerDBMigrator; +use sov_db::ledger_db::migrations::{LedgerDBMigrator, Migrations}; use sov_db::ledger_db::{LedgerDB, SharedLedgerOps}; -use sov_db::mmr_db::MmrDB; use sov_db::rocks_db_config::RocksdbConfig; -use sov_db::schema::tables::{ - BATCH_PROVER_LEDGER_TABLES, FULL_NODE_LEDGER_TABLES, LIGHT_CLIENT_PROVER_LEDGER_TABLES, - SEQUENCER_LEDGER_TABLES, -}; use sov_db::schema::types::SoftConfirmationNumber; +use sov_modules_api::transaction::Transaction; use sov_modules_api::Spec; use sov_modules_rollup_blueprint::RollupBlueprint; -use sov_modules_stf_blueprint::{Runtime as RuntimeTrait, StfBlueprint}; +use sov_modules_stf_blueprint::{ + GenesisParams as StfGenesisParams, Runtime as RuntimeTrait, StfBlueprint, +}; +use sov_prover_storage_manager::{ProverStorageManager, SnapshotManager}; use sov_rollup_interface::fork::ForkManager; +use sov_rollup_interface::stf::StateTransitionFunction; +use sov_rollup_interface::zk::StorageRootHash; use sov_state::storage::NativeStorage; -use sov_stf_runner::InitVariant; +use sov_state::{ArrayWitness, ProverStorage}; +use sov_stf_runner::InitParams; use tokio::sync::broadcast; -use tracing::{info, instrument}; +use tracing::{debug, info, instrument}; mod bitcoin; mod mock; pub use bitcoin::*; pub use mock::*; +type GenesisParams = StfGenesisParams< + <::NativeRuntime as RuntimeTrait< + ::NativeContext, + ::DaSpec, + >>::GenesisConfig, +>; + +/// Group for storage instances +pub struct Storage { + /// The ledger DB instance + pub ledger_db: LedgerDB, + /// The prover storage manager instance. + pub storage_manager: ProverStorageManager<::DaSpec>, + /// The prover storage + pub prover_storage: ProverStorage, +} + +/// Group for initialization dependencies +pub struct Dependencies { + /// The task manager + pub task_manager: TaskManager<()>, + /// The DA service + pub da_service: Arc<::DaService>, + /// The channel on which L2 block number is broadcasted. + pub soft_confirmation_channel: (broadcast::Sender, Option>), +} + /// Overrides RollupBlueprint methods #[async_trait] pub trait CitreaRollupBlueprint: RollupBlueprint { - /// Creates a new sequencer - #[instrument(level = "trace", skip_all)] - async fn create_new_sequencer( + /// Setup the rollup's dependencies + async fn setup_dependencies( &self, - runtime_genesis_paths: &>::GenesisPaths, - rollup_config: FullNodeConfig, - sequencer_config: SequencerConfig, - ) -> Result< - ( - CitreaSequencer, - RpcModule<()>, - ), - anyhow::Error, - > - where - ::Storage: NativeStorage, - { + rollup_config: &FullNodeConfig, + ) -> Result> { let mut task_manager = TaskManager::default(); let da_service = self - .create_da_service(&rollup_config, true, &mut task_manager) + .create_da_service(rollup_config, true, &mut task_manager) .await?; - - // TODO: Double check what kind of storage needed here. - // Maybe whole "prev_root" can be initialized inside runner - // Getting block here, so prover_service doesn't have to be `Send` - - // Migrate before constructing ledger_db instance so that no lock is present. - let migrator = LedgerDBMigrator::new( - rollup_config.storage.path.as_path(), - citrea_sequencer::db_migrations::migrations(), - ); - - let sequencer_tables = SEQUENCER_LEDGER_TABLES - .iter() - .map(|table| table.to_string()) - .collect::>(); - - migrator.migrate( - rollup_config.storage.db_max_open_files, - sequencer_tables.clone(), - )?; - - let rocksdb_config = RocksdbConfig::new( - rollup_config.storage.path.as_path(), - rollup_config.storage.db_max_open_files, - Some(sequencer_tables), - ); - let ledger_db = self.create_ledger_db(&rocksdb_config); - let genesis_config = self.create_genesis_config(runtime_genesis_paths, &rollup_config)?; - - let mut storage_manager = self.create_storage_manager(&rollup_config)?; - let prover_storage = storage_manager.create_finalized_storage()?; - let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10); // If subscriptions disabled, pass None let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions { @@ -99,40 +84,70 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { } else { None }; - // TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218) - let rpc_methods = self.create_rpc_methods( - &prover_storage, - &ledger_db, - &da_service, - None, - soft_confirmation_rx, - )?; - let native_stf = StfBlueprint::new(); + Ok(Dependencies { + task_manager, + da_service, + soft_confirmation_channel: (soft_confirmation_tx, soft_confirmation_rx), + }) + } - let genesis_root = prover_storage.get_root_hash(1); + /// Setup the rollup's storage access + fn setup_storage( + &self, + rollup_config: &FullNodeConfig, + rocksdb_config: &RocksdbConfig, + ) -> Result> { + let ledger_db = self.create_ledger_db(rocksdb_config); + let mut storage_manager = self.create_storage_manager(rollup_config)?; + let prover_storage = storage_manager.create_finalized_storage()?; - let init_variant = match ledger_db.get_head_soft_confirmation()? { - // At least one soft confirmation was processed - Some((number, soft_confirmation)) => { - info!("Initialize sequencer at batch number {:?}. State root: {:?}. Last soft confirmation hash: {:?}.", number, prover_storage.get_root_hash(number.0 + 1)?.as_ref(), soft_confirmation.hash); - - InitVariant::Initialized(( - prover_storage.get_root_hash(number.0 + 1)?, - soft_confirmation.hash, - )) - } - None => { - info!("Initialize sequencer at genesis."); - match genesis_root { - // Chain was initialized but no soft confirmations was processed - Ok(root_hash) => InitVariant::Initialized((root_hash, [0; 32])), - // Not even initialized - _ => InitVariant::Genesis(genesis_config), - } - } - }; + Ok(Storage { + ledger_db, + storage_manager, + prover_storage, + }) + } + /// Setup the RPC server + fn setup_rpc( + &self, + prover_storage: &ProverStorage, + ledger_db: LedgerDB, + da_service: Arc<::DaService>, + sequencer_client_url: Option, + soft_confirmation_rx: Option>, + ) -> Result> { + self.create_rpc_methods( + prover_storage, + &ledger_db, + &da_service, + sequencer_client_url, + soft_confirmation_rx, + ) + } + + /// Creates a new sequencer + #[instrument(level = "trace", skip_all)] + #[allow(clippy::type_complexity, clippy::too_many_arguments)] + fn create_sequencer( + &self, + genesis_config: GenesisParams, + rollup_config: FullNodeConfig, + sequencer_config: SequencerConfig, + da_service: Arc<::DaService>, + ledger_db: LedgerDB, + mut storage_manager: ProverStorageManager<::DaSpec>, + prover_storage: ProverStorage, + soft_confirmation_tx: broadcast::Sender, + rpc_module: RpcModule<()>, + ) -> Result<( + CitreaSequencer, + RpcModule<()>, + )> + where + ::Storage: NativeStorage, + { let current_l2_height = ledger_db .get_head_soft_confirmation() .map_err(|e| anyhow!("Failed to get head soft confirmation: {}", e))? @@ -142,135 +157,65 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { let mut fork_manager = ForkManager::new(get_forks(), current_l2_height.0); fork_manager.register_handler(Box::new(ledger_db.clone())); - let seq = CitreaSequencer::new( - da_service, - prover_storage, + let native_stf = StfBlueprint::new(); + let init_params = self.init_chain( + genesis_config, + &native_stf, + &ledger_db, + &mut storage_manager, + &prover_storage, + )?; + + citrea_sequencer::build_services( sequencer_config, + init_params, native_stf, - storage_manager, - init_variant, rollup_config.public_keys, + da_service, ledger_db, - rollup_config.rpc, - fork_manager, + storage_manager, + prover_storage, soft_confirmation_tx, - task_manager, + fork_manager, + rpc_module, ) - .unwrap(); - - Ok((seq, rpc_methods)) } /// Creates a new rollup. #[instrument(level = "trace", skip_all)] - async fn create_new_rollup( + #[allow(clippy::too_many_arguments)] + async fn create_full_node( &self, - runtime_genesis_paths: &>::GenesisPaths, + genesis_config: GenesisParams, rollup_config: FullNodeConfig, - ) -> Result< - ( - CitreaFullnode< - Self::DaService, - Self::Vm, - Self::NativeContext, - LedgerDB, - Self::NativeRuntime, - >, - RpcModule<()>, - ), - anyhow::Error, - > + da_service: Arc<::DaService>, + ledger_db: LedgerDB, + mut storage_manager: ProverStorageManager<::DaSpec>, + prover_storage: ProverStorage, + soft_confirmation_tx: broadcast::Sender, + ) -> Result<( + CitreaFullnode, + FullNodeL1BlockHandler< + Self::NativeContext, + Self::Vm, + Self::DaService, + StorageRootHash, + LedgerDB, + >, + )> where ::Storage: NativeStorage, { - let mut task_manager = TaskManager::default(); - let da_service = self - .create_da_service(&rollup_config, false, &mut task_manager) - .await?; - - // TODO: Double check what kind of storage needed here. - // Maybe whole "prev_root" can be initialized inside runner - // Getting block here, so prover_service doesn't have to be `Send` - - // Migrate before constructing ledger_db instance so that no lock is present. - let migrator = LedgerDBMigrator::new( - rollup_config.storage.path.as_path(), - citrea_fullnode::db_migrations::migrations(), - ); - - let full_node_tables = FULL_NODE_LEDGER_TABLES - .iter() - .map(|table| table.to_string()) - .collect::>(); - - migrator.migrate( - rollup_config.storage.db_max_open_files, - full_node_tables.clone(), - )?; - - let rocksdb_config = RocksdbConfig::new( - rollup_config.storage.path.as_path(), - rollup_config.storage.db_max_open_files, - Some(full_node_tables), - ); - - let ledger_db = self.create_ledger_db(&rocksdb_config); - - let genesis_config = self.create_genesis_config(runtime_genesis_paths, &rollup_config)?; - - let mut storage_manager = self.create_storage_manager(&rollup_config)?; - - let prover_storage = storage_manager.create_finalized_storage()?; - let runner_config = rollup_config.runner.expect("Runner config is missing"); - let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10); - // If subscriptions disabled, pass None - let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions { - Some(soft_confirmation_rx) - } else { - None - }; - // TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218) - let rpc_methods = self.create_rpc_methods( - &prover_storage, - &ledger_db, - &da_service, - Some(runner_config.sequencer_client_url.clone()), - soft_confirmation_rx, - )?; let native_stf = StfBlueprint::new(); - - let genesis_root = prover_storage.get_root_hash(1); - - let head_sc = ledger_db.get_head_soft_confirmation()?; - - let init_variant = match head_sc { - // At least one soft confirmation was processed - Some((number, soft_confirmation)) => { - let state_root = prover_storage.get_root_hash(number.0 + 1)?; - info!("Initialize node at batch number {:?}. State root: {:?}. Last soft confirmation hash: {:?}.", number, state_root.as_ref(), soft_confirmation.hash); - - InitVariant::Initialized(( - prover_storage.get_root_hash(number.0 + 1)?, - soft_confirmation.hash, - )) - } - None => { - info!("Initialize node at genesis."); - match genesis_root { - // Chain was initialized but no soft confirmations was processed - Ok(root_hash) => InitVariant::Initialized((root_hash, [0; 32])), - // Not even initialized - _ => InitVariant::Genesis(genesis_config), - } - } - }; - - let code_commitments_by_spec = self.get_batch_proof_code_commitments(); + let init_params = self.init_chain( + genesis_config, + &native_stf, + &ledger_db, + &mut storage_manager, + &prover_storage, + )?; let current_l2_height = ledger_db .get_head_soft_confirmation() @@ -281,141 +226,62 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { let mut fork_manager = ForkManager::new(get_forks(), current_l2_height.0); fork_manager.register_handler(Box::new(ledger_db.clone())); - let runner = CitreaFullnode::new( + let code_commitments = self.get_batch_proof_code_commitments(); + + citrea_fullnode::build_services( runner_config, + init_params, + native_stf, rollup_config.public_keys, - rollup_config.rpc, da_service, ledger_db, - native_stf, storage_manager, - init_variant, - code_commitments_by_spec, - fork_manager, soft_confirmation_tx, - task_manager, - )?; - - Ok((runner, rpc_methods)) + fork_manager, + code_commitments, + ) } /// Creates a new prover #[instrument(level = "trace", skip_all)] - async fn create_new_batch_prover( + #[allow(clippy::type_complexity, clippy::too_many_arguments)] + async fn create_batch_prover( &self, - runtime_genesis_paths: &>::GenesisPaths, - rollup_config: FullNodeConfig, prover_config: BatchProverConfig, - ) -> Result< - ( - CitreaBatchProver< - Self::NativeContext, - Self::DaService, - Self::Vm, - Self::ProverService, - LedgerDB, - Self::NativeRuntime, - >, - RpcModule<()>, - ), - anyhow::Error, - > + genesis_config: GenesisParams, + rollup_config: FullNodeConfig, + da_service: Arc<::DaService>, + ledger_db: LedgerDB, + mut storage_manager: ProverStorageManager<::DaSpec>, + prover_storage: ProverStorage, + soft_confirmation_tx: broadcast::Sender, + rpc_module: RpcModule<()>, + ) -> Result<( + CitreaBatchProver, + BatchProverL1BlockHandler< + Self::Vm, + Self::DaService, + Self::ProverService, + LedgerDB, + StorageRootHash, + ArrayWitness, + Transaction<::NativeContext>, + >, + RpcModule<()>, + )> where ::Storage: NativeStorage, { - let mut task_manager = TaskManager::default(); - let da_service = self - .create_da_service(&rollup_config, true, &mut task_manager) - .await?; - - // Migrate before constructing ledger_db instance so that no lock is present. - let migrator = LedgerDBMigrator::new( - rollup_config.storage.path.as_path(), - citrea_batch_prover::db_migrations::migrations(), - ); - - let batch_prover_tables = BATCH_PROVER_LEDGER_TABLES - .iter() - .map(|table| table.to_string()) - .collect::>(); - - migrator.migrate( - rollup_config.storage.db_max_open_files, - batch_prover_tables.clone(), - )?; - - let rocksdb_config = RocksdbConfig::new( - rollup_config.storage.path.as_path(), - rollup_config.storage.db_max_open_files, - Some(batch_prover_tables), - ); - let ledger_db = self.create_ledger_db(&rocksdb_config); - - let prover_service = self - .create_prover_service( - prover_config.proving_mode, - &da_service, - ledger_db.clone(), - prover_config.proof_sampling_number, - ) - .await; - - // TODO: Double check what kind of storage needed here. - // Maybe whole "prev_root" can be initialized inside runner - // Getting block here, so prover_service doesn't have to be `Send` - - let genesis_config = self.create_genesis_config(runtime_genesis_paths, &rollup_config)?; - - let mut storage_manager = self.create_storage_manager(&rollup_config)?; - let prover_storage = storage_manager.create_finalized_storage()?; - - let (soft_confirmation_tx, soft_confirmation_rx) = broadcast::channel(10); - // If subscriptions disabled, pass None - let soft_confirmation_rx = if rollup_config.rpc.enable_subscriptions { - Some(soft_confirmation_rx) - } else { - None - }; let runner_config = rollup_config.runner.expect("Runner config is missing"); - // TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218) - let rpc_methods = self.create_rpc_methods( - &prover_storage, - &ledger_db, - &da_service, - Some(runner_config.sequencer_client_url.clone()), - soft_confirmation_rx, - )?; let native_stf = StfBlueprint::new(); - - let genesis_root = prover_storage.get_root_hash(1); - - let init_variant = match ledger_db.get_head_soft_confirmation()? { - // At least one soft confirmation was processed - Some((number, soft_confirmation)) => { - info!("Initialize prover at batch number {:?}. State root: {:?}. Last soft confirmation hash: {:?}.", number, prover_storage.get_root_hash(number.0 + 1)?.as_ref(), soft_confirmation.hash); - - InitVariant::Initialized(( - prover_storage.get_root_hash(number.0 + 1)?, - soft_confirmation.hash, - )) - } - None => { - info!("Initialize prover at genesis."); - match genesis_root { - // Chain was initialized but no soft confirmations was processed - Ok(root_hash) => InitVariant::Initialized((root_hash, [0; 32])), - // Not even initialized - _ => InitVariant::Genesis(genesis_config), - } - } - }; - - let code_commitments_by_spec = self.get_batch_proof_code_commitments(); - let elfs_by_spec = self.get_batch_proof_elfs(); + let init_params = self.init_chain( + genesis_config, + &native_stf, + &ledger_db, + &mut storage_manager, + &prover_storage, + )?; let current_l2_height = ledger_db .get_head_soft_confirmation_height() @@ -425,101 +291,56 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { let mut fork_manager = ForkManager::new(get_forks(), current_l2_height); fork_manager.register_handler(Box::new(ledger_db.clone())); - let runner = CitreaBatchProver::new( + let prover_service = Arc::new( + self.create_prover_service( + prover_config.proving_mode, + &da_service, + ledger_db.clone(), + prover_config.proof_sampling_number, + ) + .await, + ); + let code_commitments = self.get_batch_proof_code_commitments(); + let elfs = self.get_batch_proof_elfs(); + + citrea_batch_prover::build_services( + prover_config, runner_config, + init_params, + native_stf, rollup_config.public_keys, - rollup_config.rpc, da_service, + prover_service, ledger_db, - native_stf, storage_manager, - init_variant, - Arc::new(prover_service), - prover_config, - code_commitments_by_spec, - elfs_by_spec, - fork_manager, soft_confirmation_tx, - task_manager, - )?; - - Ok((runner, rpc_methods)) + fork_manager, + code_commitments, + elfs, + rpc_module, + ) + .await } /// Creates a new light client prover #[instrument(level = "trace", skip_all)] - async fn create_new_light_client_prover( + async fn create_light_client_prover( &self, - rollup_config: FullNodeConfig, prover_config: LightClientProverConfig, - ) -> Result< - ( - CitreaLightClientProver, - RpcModule<()>, - ), - anyhow::Error, - > + rollup_config: FullNodeConfig, + rocksdb_config: &RocksdbConfig, + da_service: Arc<::DaService>, + ledger_db: LedgerDB, + rpc_module: RpcModule<()>, + ) -> Result<( + CitreaLightClientProver, + LightClientProverL1BlockHandler, + RpcModule<()>, + )> where ::Storage: NativeStorage, { - // Migrate before constructing ledger_db instance so that no lock is present. - let migrator = LedgerDBMigrator::new( - rollup_config.storage.path.as_path(), - citrea_light_client_prover::db_migrations::migrations(), - ); - - let light_client_prover_tables = LIGHT_CLIENT_PROVER_LEDGER_TABLES - .iter() - .map(|table| table.to_string()) - .collect::>(); - - migrator.migrate( - rollup_config.storage.db_max_open_files, - light_client_prover_tables.clone(), - )?; - - let mut task_manager = TaskManager::default(); - let da_service = self - .create_da_service(&rollup_config, true, &mut task_manager) - .await?; - - let rocksdb_config = RocksdbConfig::new( - rollup_config.storage.path.as_path(), - rollup_config.storage.db_max_open_files, - Some(light_client_prover_tables), - ); - let ledger_db = self.create_ledger_db(&rocksdb_config); - let mmr_db = MmrDB::new(&rocksdb_config)?; - - let prover_service = self - .create_prover_service( - prover_config.proving_mode, - &da_service, - ledger_db.clone(), - prover_config.proof_sampling_number, - ) - .await; - - // TODO: Double check what kind of storage needed here. - // Maybe whole "prev_root" can be initialized inside runner - // Getting block here, so prover_service doesn't have to be `Send` - - let mut storage_manager = self.create_storage_manager(&rollup_config)?; - let prover_storage = storage_manager.create_finalized_storage()?; - let runner_config = rollup_config.runner.expect("Runner config is missing"); - // TODO(https://github.com/Sovereign-Labs/sovereign-sdk/issues/1218) - let rpc_methods = self.create_rpc_methods( - &prover_storage, - &ledger_db, - &da_service, - Some(runner_config.sequencer_client_url.clone()), - None, - )?; - - let batch_prover_code_commitments_by_spec = self.get_batch_proof_code_commitments(); - let light_client_prover_code_commitment = self.get_light_client_proof_code_commitment(); - let light_client_prover_elfs = self.get_light_client_elfs(); let current_l2_height = ledger_db .get_head_soft_confirmation() @@ -530,21 +351,97 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { let mut fork_manager = ForkManager::new(get_forks(), current_l2_height.0); fork_manager.register_handler(Box::new(ledger_db.clone())); - let runner = CitreaLightClientProver::new( + let prover_service = Arc::new( + self.create_prover_service( + prover_config.proving_mode, + &da_service, + ledger_db.clone(), + prover_config.proof_sampling_number, + ) + .await, + ); + + let batch_prover_code_commitments = self.get_batch_proof_code_commitments(); + let code_commitments = self.get_light_client_proof_code_commitments(); + let elfs = self.get_light_client_elfs(); + + citrea_light_client_prover::build_services( + prover_config, runner_config, - rollup_config.public_keys, - rollup_config.rpc, - da_service, + rocksdb_config, ledger_db, - Arc::new(prover_service), - prover_config, - batch_prover_code_commitments_by_spec, - light_client_prover_code_commitment, - light_client_prover_elfs, - mmr_db, - task_manager, - )?; + da_service, + prover_service, + rollup_config.public_keys, + batch_prover_code_commitments, + code_commitments, + elfs, + rpc_module, + ) + } - Ok((runner, rpc_methods)) + /// Run Ledger DB migrations + fn run_ledger_migrations( + &self, + rollup_config: &FullNodeConfig, + tables: Vec, + migrations: Migrations, + ) -> anyhow::Result<()> { + // Migrate before constructing ledger_db instance so that no lock is present. + let migrator = LedgerDBMigrator::new(rollup_config.storage.path.as_path(), migrations); + migrator.migrate(rollup_config.storage.db_max_open_files, tables)?; + Ok(()) + } + + /// Initialize the chain from existing data, if any. + /// Otherwise, fallback to initialization from genesis + #[allow(clippy::type_complexity)] + fn init_chain( + &self, + genesis_config: GenesisParams, + stf: &StfBlueprint, + ledger_db: &LedgerDB, + storage_manager: &mut ProverStorageManager, + prover_storage: &ProverStorage, + ) -> anyhow::Result< + InitParams< + StfBlueprint, + Self::DaSpec, + >, + > { + let genesis_root = prover_storage.get_root_hash(1); + if let Some((number, soft_confirmation)) = ledger_db.get_head_soft_confirmation()? { + // At least one soft confirmation was processed + info!("Initialize sequencer at batch number {:?}. State root: {:?}. Last soft confirmation hash: {:?}.", number, prover_storage.get_root_hash(number.0 + 1)?.as_ref(), soft_confirmation.hash); + + return Ok(InitParams { + state_root: prover_storage.get_root_hash(number.0 + 1)?, + batch_hash: soft_confirmation.hash, + }); + } + + if let Ok(state_root) = genesis_root { + // Chain was initialized but no soft confirmations was processed + debug!("Chain is already initialized. Skipping initialization."); + return Ok(InitParams { + state_root, + batch_hash: [0; 32], + }); + } + + info!("No history detected. Initializing chain...",); + let storage = storage_manager.create_storage_on_l2_height(0)?; + let (genesis_root, initialized_storage) = stf.init_chain(storage, genesis_config); + storage_manager.save_change_set_l2(0, initialized_storage)?; + storage_manager.finalize_l2(0)?; + ledger_db.set_l2_genesis_state_root(&genesis_root)?; + info!( + "Chain initialization is done. Genesis root: 0x{}", + hex::encode(genesis_root.as_ref()), + ); + Ok(InitParams { + state_root: genesis_root, + batch_hash: [0; 32], + }) } } diff --git a/bin/citrea/tests/evm/fee.rs b/bin/citrea/tests/evm/fee.rs index 47e79b095..b874db7af 100644 --- a/bin/citrea/tests/evm/fee.rs +++ b/bin/citrea/tests/evm/fee.rs @@ -22,7 +22,7 @@ async fn test_minimum_base_fee() -> Result<(), anyhow::Error> { let rollup_config = create_default_rollup_config(true, &sequencer_db_dir, &da_db_dir, NodeMode::SequencerNode); let sequencer_config = SequencerConfig::default(); - tokio::spawn(async { + let seq_task = tokio::spawn(async { // Don't provide a prover since the EVM is not currently provable start_rollup( port_tx, @@ -59,5 +59,6 @@ async fn test_minimum_base_fee() -> Result<(), anyhow::Error> { // Base fee should at most be 0.01 gwei assert_eq!(block.header.base_fee_per_gas.unwrap(), 10000000); + seq_task.abort(); Ok(()) } diff --git a/bin/citrea/tests/test_helpers/mod.rs b/bin/citrea/tests/test_helpers/mod.rs index 6e913436f..63680cda9 100644 --- a/bin/citrea/tests/test_helpers/mod.rs +++ b/bin/citrea/tests/test_helpers/mod.rs @@ -4,13 +4,22 @@ use std::time::{Duration, SystemTime}; use anyhow::bail; use borsh::BorshDeserialize; -use citrea::{CitreaRollupBlueprint, MockDemoRollup}; +use citrea::{CitreaRollupBlueprint, Dependencies, MockDemoRollup, Storage}; +use citrea_common::da::get_start_l1_height; +use citrea_common::rpc::server::start_rpc_server; use citrea_common::{ BatchProverConfig, FullNodeConfig, LightClientProverConfig, RollupPublicKeys, RpcConfig, RunnerConfig, SequencerConfig, StorageConfig, }; +use citrea_light_client_prover::da_block_handler::StartVariant; use citrea_primitives::TEST_PRIVATE_KEY; use citrea_stf::genesis_config::GenesisPaths; +use sov_db::ledger_db::SharedLedgerOps; +use sov_db::rocks_db_config::RocksdbConfig; +use sov_db::schema::tables::{ + BATCH_PROVER_LEDGER_TABLES, FULL_NODE_LEDGER_TABLES, LIGHT_CLIENT_PROVER_LEDGER_TABLES, + SEQUENCER_LEDGER_TABLES, +}; use sov_mock_da::{MockAddress, MockBlock, MockDaConfig, MockDaService}; use sov_modules_api::default_signature::private_key::DefaultPrivateKey; use sov_modules_api::PrivateKey; @@ -38,7 +47,7 @@ pub enum NodeMode { pub async fn start_rollup( rpc_reporting_channel: oneshot::Sender, - rt_genesis_paths: GenesisPaths, + runtime_genesis_paths: GenesisPaths, rollup_prover_config: Option, light_client_prover_config: Option, rollup_config: FullNodeConfig, @@ -62,6 +71,89 @@ pub async fn start_rollup( panic!("Both batch prover and light client prover config cannot be set at the same time"); } + let (tables, migrations) = if sequencer_config.is_some() { + ( + SEQUENCER_LEDGER_TABLES + .iter() + .map(|table| table.to_string()) + .collect::>(), + citrea_sequencer::db_migrations::migrations(), + ) + } else if rollup_prover_config.is_some() { + ( + BATCH_PROVER_LEDGER_TABLES + .iter() + .map(|table| table.to_string()) + .collect::>(), + citrea_batch_prover::db_migrations::migrations(), + ) + } else if light_client_prover_config.is_some() { + ( + LIGHT_CLIENT_PROVER_LEDGER_TABLES + .iter() + .map(|table| table.to_string()) + .collect::>(), + citrea_light_client_prover::db_migrations::migrations(), + ) + } else { + ( + FULL_NODE_LEDGER_TABLES + .iter() + .map(|table| table.to_string()) + .collect::>(), + citrea_fullnode::db_migrations::migrations(), + ) + }; + mock_demo_rollup + .run_ledger_migrations(&rollup_config, tables.clone(), migrations) + .expect("Migrations should have executed successfully"); + + let genesis_config = mock_demo_rollup + .create_genesis_config(&runtime_genesis_paths, &rollup_config) + .expect("Should be able to create genesis config"); + + let rocksdb_path = rollup_config.storage.path.clone(); + let rocksdb_config = RocksdbConfig::new( + rocksdb_path.as_path(), + rollup_config.storage.db_max_open_files, + Some(tables), + ); + let Storage { + ledger_db, + storage_manager, + prover_storage, + } = mock_demo_rollup + .setup_storage(&rollup_config, &rocksdb_config) + .expect("Storage setup should work"); + + let Dependencies { + da_service, + mut task_manager, + soft_confirmation_channel, + } = mock_demo_rollup + .setup_dependencies(&rollup_config) + .await + .expect("Dependencies setup should work"); + + let sequencer_client_url = rollup_config + .runner + .clone() + .map(|runner| runner.sequencer_client_url); + let soft_confirmation_rx = if light_client_prover_config.is_none() { + soft_confirmation_channel.1 + } else { + None + }; + let rpc_module = mock_demo_rollup + .setup_rpc( + &prover_storage, + ledger_db.clone(), + da_service.clone(), + sequencer_client_url, + soft_confirmation_rx, + ) + .expect("RPC module setup should work"); + if let Some(sequencer_config) = sequencer_config { warn!( "Starting sequencer node pub key: {:?}", @@ -70,77 +162,146 @@ pub async fn start_rollup( .pub_key() ); let span = info_span!("Sequencer"); - let (mut sequencer, rpc_methods) = CitreaRollupBlueprint::create_new_sequencer( + + let (mut sequencer, rpc_module) = CitreaRollupBlueprint::create_sequencer( &mock_demo_rollup, - &rt_genesis_paths, + genesis_config, rollup_config.clone(), sequencer_config, + da_service, + ledger_db, + storage_manager, + prover_storage, + soft_confirmation_channel.0, + rpc_module, ) - .instrument(span.clone()) - .await .unwrap(); - sequencer - .start_rpc_server(rpc_methods, Some(rpc_reporting_channel)) - .instrument(span.clone()) - .await - .unwrap(); + start_rpc_server( + rollup_config.rpc, + &mut task_manager, + rpc_module, + Some(rpc_reporting_channel), + ); - sequencer.run().instrument(span).await.unwrap(); + sequencer.run(task_manager).instrument(span).await.unwrap(); } else if let Some(rollup_prover_config) = rollup_prover_config { let span = info_span!("Prover"); - let (mut rollup, rpc_methods) = CitreaRollupBlueprint::create_new_batch_prover( - &mock_demo_rollup, - &rt_genesis_paths, - rollup_config, - rollup_prover_config, - ) - .instrument(span.clone()) - .await - .unwrap(); - rollup - .start_rpc_server(rpc_methods, Some(rpc_reporting_channel)) + let (mut prover, l1_block_handler, rpc_module) = + CitreaRollupBlueprint::create_batch_prover( + &mock_demo_rollup, + rollup_prover_config, + genesis_config, + rollup_config.clone(), + da_service, + ledger_db.clone(), + storage_manager, + prover_storage, + soft_confirmation_channel.0, + rpc_module, + ) .instrument(span.clone()) .await .unwrap(); - rollup.run().instrument(span).await.unwrap(); + start_rpc_server( + rollup_config.rpc.clone(), + &mut task_manager, + rpc_module, + Some(rpc_reporting_channel), + ); + + let handler_span = span.clone(); + task_manager.spawn(|cancellation_token| async move { + let start_l1_height = get_start_l1_height(&rollup_config, &ledger_db) + .await + .expect("Failed to fetch start L1 height"); + l1_block_handler + .run(start_l1_height, cancellation_token) + .instrument(handler_span.clone()) + .await + }); + prover.run(task_manager).instrument(span).await.unwrap(); } else if let Some(light_client_prover_config) = light_client_prover_config { let span = info_span!("LightClientProver"); - let (mut rollup, rpc_methods) = CitreaRollupBlueprint::create_new_light_client_prover( - &mock_demo_rollup, - rollup_config.clone(), - light_client_prover_config, - ) - .instrument(span.clone()) - .await - .unwrap(); - rollup - .start_rpc_server(rpc_methods, Some(rpc_reporting_channel)) + let starting_block = match ledger_db + .get_last_scanned_l1_height() + .expect("Should be able to read DB") + { + Some(l1_height) => StartVariant::LastScanned(l1_height.0), + // first time starting the prover + // start from the block given in the config + None => StartVariant::FromBlock(light_client_prover_config.initial_da_height), + }; + + let (mut rollup, l1_block_handler, rpc_module) = + CitreaRollupBlueprint::create_light_client_prover( + &mock_demo_rollup, + light_client_prover_config, + rollup_config.clone(), + &rocksdb_config, + da_service, + ledger_db, + rpc_module, + ) .instrument(span.clone()) .await .unwrap(); - rollup.run().instrument(span).await.unwrap(); + start_rpc_server( + rollup_config.rpc.clone(), + &mut task_manager, + rpc_module, + Some(rpc_reporting_channel), + ); + + let handler_span = span.clone(); + task_manager.spawn(|cancellation_token| async move { + l1_block_handler + .run(starting_block, cancellation_token) + .instrument(handler_span.clone()) + .await + }); + + rollup.run(task_manager).instrument(span).await.unwrap(); } else { let span = info_span!("FullNode"); - let (mut rollup, rpc_methods) = CitreaRollupBlueprint::create_new_rollup( + + let (mut rollup, l1_block_handler) = CitreaRollupBlueprint::create_full_node( &mock_demo_rollup, - &rt_genesis_paths, + genesis_config, rollup_config.clone(), + da_service, + ledger_db.clone(), + storage_manager, + prover_storage, + soft_confirmation_channel.0, ) .instrument(span.clone()) .await .unwrap(); - rollup - .start_rpc_server(rpc_methods, Some(rpc_reporting_channel)) - .instrument(span.clone()) - .await; + start_rpc_server( + rollup_config.rpc.clone(), + &mut task_manager, + rpc_module, + Some(rpc_reporting_channel), + ); + + let handler_span = span.clone(); + task_manager.spawn(|cancellation_token| async move { + let start_l1_height = get_start_l1_height(&rollup_config, &ledger_db) + .await + .expect("Failed to fetch starting L1 height"); + l1_block_handler + .run(start_l1_height, cancellation_token) + .instrument(handler_span.clone()) + .await + }); - rollup.run().instrument(span).await.unwrap(); + rollup.run(task_manager).instrument(span).await.unwrap(); } } diff --git a/crates/batch-prover/src/da_block_handler.rs b/crates/batch-prover/src/da_block_handler.rs index 44e301558..a191cff0d 100644 --- a/crates/batch-prover/src/da_block_handler.rs +++ b/crates/batch-prover/src/da_block_handler.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use anyhow::{anyhow, Context as _}; use borsh::{BorshDeserialize, BorshSerialize}; use citrea_common::cache::L1BlockCache; -use citrea_common::da::get_da_block_at_height; +use citrea_common::da::{get_da_block_at_height, sync_l1}; use citrea_common::utils::merge_state_diffs; use citrea_common::BatchProverConfig; use citrea_primitives::compression::compress_blob; @@ -27,7 +27,7 @@ use sov_rollup_interface::zk::ZkvmHost; use sov_stf_runner::{ProverGuestRunConfig, ProverService}; use tokio::select; use tokio::sync::{mpsc, Mutex}; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -41,7 +41,7 @@ type CommitmentStateTransitionData<'txs, Witness, Da, Tx> = ( VecDeque::Spec as DaSpec>::BlockHeader>>, ); -pub(crate) struct L1BlockHandler +pub struct L1BlockHandler where Da: DaService, Vm: ZkvmHost + Zkvm, @@ -137,6 +137,7 @@ where self.da_service.clone(), l1_tx, self.l1_block_cache.clone(), + BATCH_PROVER_METRICS.scan_l1_block.clone(), ); tokio::pin!(l1_sync_worker); @@ -316,55 +317,6 @@ where } } -async fn sync_l1( - start_l1_height: u64, - da_service: Arc, - sender: mpsc::Sender, - l1_block_cache: Arc>>, -) where - Da: DaService, -{ - let mut l1_height = start_l1_height; - info!("Starting to sync from L1 height {}", l1_height); - - 'block_sync: loop { - let last_finalized_l1_block_header = - match da_service.get_last_finalized_block_header().await { - Ok(header) => header, - Err(e) => { - error!("Could not fetch last finalized L1 block header: {}", e); - sleep(Duration::from_secs(2)).await; - continue; - } - }; - - let new_l1_height = last_finalized_l1_block_header.height(); - - for block_number in l1_height + 1..=new_l1_height { - let l1_block = - match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) - .await - { - Ok(block) => block, - Err(e) => { - error!("Could not fetch last finalized L1 block: {}", e); - sleep(Duration::from_secs(2)).await; - continue 'block_sync; - } - }; - if block_number > l1_height { - l1_height = block_number; - if let Err(e) = sender.send(l1_block).await { - error!("Could not notify about L1 block: {}", e); - continue 'block_sync; - } - } - } - - sleep(Duration::from_secs(2)).await; - } -} - pub(crate) async fn get_batch_proof_circuit_input_from_commitments< 'txs, Da: DaService, diff --git a/crates/batch-prover/src/lib.rs b/crates/batch-prover/src/lib.rs index a590dcd15..c52490b81 100644 --- a/crates/batch-prover/src/lib.rs +++ b/crates/batch-prover/src/lib.rs @@ -1,10 +1,112 @@ -mod da_block_handler; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +use anyhow::Result; +use borsh::{BorshDeserialize, BorshSerialize}; +use citrea_common::cache::L1BlockCache; +use citrea_common::{BatchProverConfig, RollupPublicKeys, RunnerConfig}; +use da_block_handler::L1BlockHandler; +use jsonrpsee::RpcModule; +pub use proving::GroupCommitments; +pub use runner::*; +use serde::de::DeserializeOwned; +use serde::Serialize; +use sov_db::ledger_db::BatchProverLedgerOps; +use sov_modules_api::fork::ForkManager; +use sov_modules_api::{Context, Spec, SpecId, Zkvm}; +use sov_modules_stf_blueprint::{Runtime, StfBlueprint}; +use sov_prover_storage_manager::{ProverStorage, ProverStorageManager, SnapshotManager}; +use sov_rollup_interface::services::da::DaService; +use sov_rollup_interface::zk::ZkvmHost; +use sov_stf_runner::{InitParams, ProverService}; +use tokio::sync::{broadcast, Mutex}; + +pub mod da_block_handler; pub mod db_migrations; mod errors; -mod runner; -pub use runner::*; mod metrics; mod proving; pub mod rpc; +mod runner; -pub use proving::GroupCommitments; +#[allow(clippy::type_complexity, clippy::too_many_arguments)] +pub async fn build_services( + prover_config: BatchProverConfig, + runner_config: RunnerConfig, + init_params: InitParams, Da::Spec>, + native_stf: StfBlueprint::Spec, RT>, + public_keys: RollupPublicKeys, + da_service: Arc, + prover_service: Arc, + ledger_db: DB, + storage_manager: ProverStorageManager, + soft_confirmation_tx: broadcast::Sender, + fork_manager: ForkManager<'static>, + code_commitments: HashMap::CodeCommitment>, + elfs: HashMap>, + rpc_module: RpcModule<()>, +) -> Result<( + CitreaBatchProver, + L1BlockHandler, + RpcModule<()>, +)> +where + C: Context + Spec>, + Da: DaService, + DB: BatchProverLedgerOps + Clone + 'static, + RT: Runtime, + Vm: ZkvmHost + Zkvm + 'static, + Ps: ProverService + Send + Sync + 'static, + StateRoot: BorshDeserialize + + BorshSerialize + + Serialize + + DeserializeOwned + + Clone + + AsRef<[u8]> + + Debug, + Witness: Default + BorshSerialize + BorshDeserialize + Serialize + DeserializeOwned, + Tx: Clone + BorshSerialize + BorshDeserialize, +{ + let l1_block_cache = Arc::new(Mutex::new(L1BlockCache::new())); + + let rpc_context = rpc::create_rpc_context::( + da_service.clone(), + prover_service.clone(), + ledger_db.clone(), + public_keys.sequencer_da_pub_key.clone(), + public_keys.sequencer_public_key.clone(), + l1_block_cache, + code_commitments.clone(), + elfs.clone(), + ); + let rpc_module = rpc::register_rpc_methods::(rpc_context, rpc_module)?; + + let batch_prover = CitreaBatchProver::new( + runner_config, + init_params, + native_stf, + public_keys.clone(), + da_service.clone(), + ledger_db.clone(), + storage_manager, + fork_manager, + soft_confirmation_tx, + )?; + let skip_submission_until_l1 = + std::env::var("SKIP_PROOF_SUBMISSION_UNTIL_L1").map_or(0u64, |v| v.parse().unwrap_or(0)); + + let l1_block_handler = L1BlockHandler::new( + prover_config, + prover_service, + ledger_db, + da_service, + public_keys.sequencer_public_key, + public_keys.sequencer_da_pub_key, + code_commitments, + elfs, + skip_submission_until_l1, + Arc::new(Mutex::new(L1BlockCache::new())), + ); + Ok((batch_prover, l1_block_handler, rpc_module)) +} diff --git a/crates/batch-prover/src/metrics.rs b/crates/batch-prover/src/metrics.rs index 9d48e279d..707096f91 100644 --- a/crates/batch-prover/src/metrics.rs +++ b/crates/batch-prover/src/metrics.rs @@ -11,6 +11,8 @@ pub struct BatchProverMetrics { pub current_l2_block: Gauge, #[metric(describe = "The duration of processing a single soft confirmation")] pub process_soft_confirmation: Histogram, + #[metric(describe = "The duration of scanning and processing a single L1 block")] + pub scan_l1_block: Histogram, } /// Batch prover metrics diff --git a/crates/batch-prover/src/rpc.rs b/crates/batch-prover/src/rpc.rs index 20aaca78c..3c16bef3c 100644 --- a/crates/batch-prover/src/rpc.rs +++ b/crates/batch-prover/src/rpc.rs @@ -16,12 +16,14 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use sov_db::ledger_db::BatchProverLedgerOps; use sov_modules_api::{SpecId, Zkvm}; +use sov_modules_stf_blueprint::Runtime; use sov_rollup_interface::services::da::DaService; use sov_rollup_interface::zk::{BatchProofCircuitInputV1, ZkvmHost}; use sov_stf_runner::ProverService; use tokio::sync::Mutex; use crate::proving::{data_to_prove, prove_l1, GroupCommitments}; +use crate::{StfStateRoot, StfTransaction, StfWitness}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ProverInputResponse { @@ -30,21 +32,13 @@ pub struct ProverInputResponse { pub encoded_serialized_batch_proof_input: String, } -pub struct RpcContext +pub struct RpcContext where C: sov_modules_api::Context, Da: DaService, DB: BatchProverLedgerOps + Clone, Vm: ZkvmHost + Zkvm, - Ps: ProverService, - StateRoot: BorshDeserialize - + BorshSerialize - + Serialize - + DeserializeOwned - + Clone - + AsRef<[u8]> - + Debug, - Witness: Default + BorshDeserialize + Serialize + DeserializeOwned, + Ps: ProverService + Send + Sync, { pub da_service: Arc, pub prover_service: Arc, @@ -56,9 +50,67 @@ where pub elfs_by_spec: HashMap>, pub(crate) phantom_c: PhantomData C>, pub(crate) phantom_vm: PhantomData Vm>, - pub(crate) phantom_sr: PhantomData StateRoot>, - pub(crate) phantom_w: PhantomData Witness>, - pub(crate) phantom_tx: PhantomData Tx>, +} + +/// Creates a shared RpcContext with all required data. +#[allow(clippy::type_complexity, clippy::too_many_arguments)] +pub fn create_rpc_context( + da_service: Arc, + prover_service: Arc, + ledger: DB, + sequencer_da_pub_key: Vec, + sequencer_pub_key: Vec, + l1_block_cache: Arc>>, + code_commitments_by_spec: HashMap, + elfs_by_spec: HashMap>, +) -> RpcContext +where + C: sov_modules_api::Context, + Da: DaService, + DB: BatchProverLedgerOps + Clone, + Vm: ZkvmHost + Zkvm, + Ps: ProverService + Send + Sync, + RT: Runtime, +{ + RpcContext { + ledger: ledger.clone(), + da_service: da_service.clone(), + sequencer_da_pub_key: sequencer_da_pub_key.clone(), + sequencer_pub_key: sequencer_pub_key.clone(), + l1_block_cache: l1_block_cache.clone(), + prover_service: prover_service.clone(), + code_commitments_by_spec: code_commitments_by_spec.clone(), + elfs_by_spec: elfs_by_spec.clone(), + phantom_c: std::marker::PhantomData, + phantom_vm: std::marker::PhantomData, + } +} + +/// Updates the given RpcModule with Prover methods. +pub fn register_rpc_methods( + rpc_context: RpcContext, + mut rpc_methods: jsonrpsee::RpcModule<()>, +) -> Result, jsonrpsee::core::RegisterMethodError> +where + C: sov_modules_api::Context, + Da: DaService, + DB: BatchProverLedgerOps + Clone + 'static, + Vm: ZkvmHost + Zkvm + 'static, + Ps: ProverService + Send + Sync + 'static, + RT: Runtime, +{ + let rpc = create_rpc_module::< + C, + Da, + Ps, + Vm, + DB, + StfStateRoot, + StfWitness, + StfTransaction, + >(rpc_context); + rpc_methods.merge(rpc)?; + Ok(rpc_methods) } #[rpc(client, server, namespace = "batchProver")] @@ -86,7 +138,7 @@ where Da: DaService, DB: BatchProverLedgerOps + Clone + Send + Sync + 'static, Vm: ZkvmHost + Zkvm, - Ps: ProverService, + Ps: ProverService + Send + Sync, StateRoot: BorshDeserialize + BorshSerialize + Serialize @@ -96,7 +148,10 @@ where + Debug, Witness: Default + BorshDeserialize + Serialize + DeserializeOwned, { - context: Arc>, + context: Arc>, + _state_root: PhantomData, + _witness: PhantomData, + _tx: PhantomData, } impl @@ -106,19 +161,24 @@ where Da: DaService, DB: BatchProverLedgerOps + Clone + Send + Sync + 'static, Vm: ZkvmHost + Zkvm, - Ps: ProverService, + Ps: ProverService + Send + Sync, StateRoot: BorshDeserialize + BorshSerialize + Serialize + DeserializeOwned + Clone + AsRef<[u8]> - + Debug, - Witness: Default + BorshDeserialize + Serialize + DeserializeOwned, + + Debug + + Send + + Sync, + Witness: Default + BorshDeserialize + Serialize + DeserializeOwned + Send + Sync, { - pub fn new(context: RpcContext) -> Self { + pub fn new(context: RpcContext) -> Self { Self { context: Arc::new(context), + _state_root: PhantomData, + _witness: PhantomData, + _tx: PhantomData, } } } @@ -140,9 +200,16 @@ where + AsRef<[u8]> + Debug + Send + + Sync + + 'static, + Witness: Default + + BorshSerialize + + BorshDeserialize + + Serialize + + DeserializeOwned + + Send + + Sync + 'static, - Witness: - Default + BorshSerialize + BorshDeserialize + Serialize + DeserializeOwned + Send + 'static, Tx: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static, { async fn generate_input( @@ -270,7 +337,7 @@ where } pub fn create_rpc_module( - rpc_context: RpcContext, + rpc_context: RpcContext, ) -> jsonrpsee::RpcModule> where C: sov_modules_api::Context, @@ -286,9 +353,16 @@ where + AsRef<[u8]> + Debug + Send + + Sync + + 'static, + Witness: Default + + BorshSerialize + + BorshDeserialize + + Serialize + + DeserializeOwned + + Send + + Sync + 'static, - Witness: - Default + BorshSerialize + BorshDeserialize + Serialize + DeserializeOwned + Send + 'static, Tx: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static, { let server = BatchProverRpcServerImpl::new(rpc_context); diff --git a/crates/batch-prover/src/runner.rs b/crates/batch-prover/src/runner.rs index 1a0f3ff3b..bb734ecf5 100644 --- a/crates/batch-prover/src/runner.rs +++ b/crates/batch-prover/src/runner.rs @@ -1,23 +1,20 @@ use core::panic; -use std::collections::{HashMap, VecDeque}; -use std::net::SocketAddr; +use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; use alloy_primitives::U64; -use anyhow::{anyhow, bail, Context as _}; +use anyhow::{bail, Context as _}; use backoff::exponential::ExponentialBackoffBuilder; use backoff::future::retry as retry_backoff; use citrea_common::cache::L1BlockCache; -use citrea_common::da::{get_da_block_at_height, get_initial_slot_height}; +use citrea_common::da::get_da_block_at_height; use citrea_common::tasks::manager::TaskManager; use citrea_common::utils::{create_shutdown_signal, soft_confirmation_to_receipt}; -use citrea_common::{BatchProverConfig, RollupPublicKeys, RpcConfig, RunnerConfig}; +use citrea_common::{RollupPublicKeys, RunnerConfig}; use citrea_primitives::types::SoftConfirmationHash; use jsonrpsee::core::client::Error as JsonrpseeError; use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; -use jsonrpsee::server::{BatchRequestConfig, ServerBuilder}; -use jsonrpsee::RpcModule; use sov_db::ledger_db::BatchProverLedgerOps; use sov_db::schema::types::{SlotNumber, SoftConfirmationNumber}; use sov_ledger_rpc::LedgerRpcClient; @@ -28,30 +25,26 @@ use sov_rollup_interface::da::BlockHeaderTrait; use sov_rollup_interface::fork::ForkManager; use sov_rollup_interface::rpc::SoftConfirmationResponse; use sov_rollup_interface::services::da::DaService; -use sov_rollup_interface::spec::SpecId; use sov_rollup_interface::stf::StateTransitionFunction; -use sov_rollup_interface::zk::ZkvmHost; -use sov_stf_runner::{InitVariant, ProverService}; +use sov_stf_runner::InitParams; use tokio::select; -use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; +use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::time::sleep; use tracing::{debug, error, info, instrument}; -use crate::da_block_handler::L1BlockHandler; use crate::metrics::BATCH_PROVER_METRICS; -use crate::rpc::{create_rpc_module, RpcContext}; -type StfStateRoot = as StateTransitionFunction>::StateRoot; -type StfTransaction = +pub(crate) type StfStateRoot = + as StateTransitionFunction>::StateRoot; +pub(crate) type StfTransaction = as StateTransitionFunction>::Transaction; -type StfWitness = as StateTransitionFunction>::Witness; +pub(crate) type StfWitness = + as StateTransitionFunction>::Witness; -pub struct CitreaBatchProver +pub struct CitreaBatchProver where C: Context + Spec>, Da: DaService, - Vm: ZkvmHost, - Ps: ProverService, DB: BatchProverLedgerOps + Clone, RT: Runtime, { @@ -62,28 +55,19 @@ where ledger_db: DB, state_root: StfStateRoot, batch_hash: SoftConfirmationHash, - rpc_config: RpcConfig, - prover_service: Arc, sequencer_client: HttpClient, sequencer_pub_key: Vec, - sequencer_da_pub_key: Vec, phantom: std::marker::PhantomData, - prover_config: BatchProverConfig, - code_commitments_by_spec: HashMap, - elfs_by_spec: HashMap>, l1_block_cache: Arc>>, sync_blocks_count: u64, fork_manager: ForkManager<'static>, soft_confirmation_tx: broadcast::Sender, - task_manager: TaskManager<()>, } -impl CitreaBatchProver +impl CitreaBatchProver where C: Context + Spec>, Da: DaService + Send + 'static, - Vm: ZkvmHost + 'static, - Ps: ProverService + Send + Sync + 'static, DB: BatchProverLedgerOps + Clone + 'static, RT: Runtime, { @@ -95,41 +79,15 @@ where #[allow(clippy::too_many_arguments)] pub fn new( runner_config: RunnerConfig, + init_params: InitParams, Da::Spec>, + stf: StfBlueprint, public_keys: RollupPublicKeys, - rpc_config: RpcConfig, da_service: Arc, ledger_db: DB, - stf: StfBlueprint, - mut storage_manager: ProverStorageManager, - init_variant: InitVariant, Da::Spec>, - prover_service: Arc, - prover_config: BatchProverConfig, - code_commitments_by_spec: HashMap, - elfs_by_spec: HashMap>, + storage_manager: ProverStorageManager, fork_manager: ForkManager<'static>, soft_confirmation_tx: broadcast::Sender, - task_manager: TaskManager<()>, ) -> Result { - let (prev_state_root, prev_batch_hash) = match init_variant { - InitVariant::Initialized((state_root, batch_hash)) => { - debug!("Chain is already initialized. Skipping initialization."); - (state_root, batch_hash) - } - InitVariant::Genesis(params) => { - info!("No history detected. Initializing chain..."); - let storage = storage_manager.create_storage_on_l2_height(0)?; - let (genesis_root, initialized_storage) = stf.init_chain(storage, params); - storage_manager.save_change_set_l2(0, initialized_storage)?; - storage_manager.finalize_l2(0)?; - ledger_db.set_l2_genesis_state_root(&genesis_root)?; - info!( - "Chain initialization is done. Genesis root: 0x{}", - hex::encode(genesis_root.as_ref()), - ); - (genesis_root, [0; 32]) - } - }; - // Last L1/L2 height before shutdown. let start_l2_height = ledger_db.get_head_soft_confirmation_height()?.unwrap_or(0) + 1; @@ -139,186 +97,22 @@ where stf, storage_manager, ledger_db, - state_root: prev_state_root, - batch_hash: prev_batch_hash, - rpc_config, - prover_service, + state_root: init_params.state_root, + batch_hash: init_params.batch_hash, sequencer_client: HttpClientBuilder::default() .build(runner_config.sequencer_client_url)?, sequencer_pub_key: public_keys.sequencer_public_key, - sequencer_da_pub_key: public_keys.sequencer_da_pub_key, phantom: std::marker::PhantomData, - prover_config, - code_commitments_by_spec, - elfs_by_spec, l1_block_cache: Arc::new(Mutex::new(L1BlockCache::new())), sync_blocks_count: runner_config.sync_blocks_count, fork_manager, soft_confirmation_tx, - task_manager, }) } - /// Creates a shared RpcContext with all required data. - #[allow(clippy::type_complexity)] - fn create_rpc_context( - &self, - ) -> RpcContext< - C, - Da, - Ps, - Vm, - DB, - StfStateRoot, - StfWitness, - StfTransaction, - > { - RpcContext { - ledger: self.ledger_db.clone(), - da_service: self.da_service.clone(), - sequencer_da_pub_key: self.sequencer_da_pub_key.clone(), - sequencer_pub_key: self.sequencer_pub_key.clone(), - l1_block_cache: self.l1_block_cache.clone(), - prover_service: self.prover_service.clone(), - code_commitments_by_spec: self.code_commitments_by_spec.clone(), - elfs_by_spec: self.elfs_by_spec.clone(), - phantom_c: std::marker::PhantomData, - phantom_vm: std::marker::PhantomData, - phantom_sr: std::marker::PhantomData, - phantom_w: std::marker::PhantomData, - phantom_tx: std::marker::PhantomData, - } - } - - /// Updates the given RpcModule with Prover methods. - pub fn register_rpc_methods( - &self, - mut rpc_methods: jsonrpsee::RpcModule<()>, - ) -> Result, jsonrpsee::core::RegisterMethodError> { - let rpc_context = self.create_rpc_context(); - let rpc = create_rpc_module(rpc_context); - rpc_methods.merge(rpc)?; - Ok(rpc_methods) - } - - /// Starts a RPC server with provided rpc methods. - pub async fn start_rpc_server( - &mut self, - methods: RpcModule<()>, - channel: Option>, - ) -> anyhow::Result<()> { - let methods = self.register_rpc_methods(methods)?; - - let listen_address = SocketAddr::new( - self.rpc_config - .bind_host - .parse() - .map_err(|e| anyhow!("Failed to parse bind host: {}", e))?, - self.rpc_config.bind_port, - ); - - let max_connections = self.rpc_config.max_connections; - let max_subscriptions_per_connection = self.rpc_config.max_subscriptions_per_connection; - let max_request_body_size = self.rpc_config.max_request_body_size; - let max_response_body_size = self.rpc_config.max_response_body_size; - let batch_requests_limit = self.rpc_config.batch_requests_limit; - - let middleware = tower::ServiceBuilder::new().layer(citrea_common::rpc::get_cors_layer()); - // .layer(citrea_common::rpc::get_healthcheck_proxy_layer()); - - self.task_manager.spawn(|cancellation_token| async move { - let server = ServerBuilder::default() - .max_connections(max_connections) - .max_subscriptions_per_connection(max_subscriptions_per_connection) - .max_request_body_size(max_request_body_size) - .max_response_body_size(max_response_body_size) - .set_batch_request_config(BatchRequestConfig::Limit(batch_requests_limit)) - .set_http_middleware(middleware) - .build([listen_address].as_ref()) - .await; - - match server { - Ok(server) => { - let bound_address = match server.local_addr() { - Ok(address) => address, - Err(e) => { - error!("{}", e); - return; - } - }; - if let Some(channel) = channel { - if let Err(e) = channel.send(bound_address) { - error!("Could not send bound_address {}: {}", bound_address, e); - return; - } - } - info!("Starting RPC server at {} ", &bound_address); - - let _server_handle = server.start(methods); - cancellation_token.cancelled().await; - } - Err(e) => { - error!("Could not start RPC server: {}", e); - } - } - }); - Ok(()) - } - /// Runs the rollup. #[instrument(level = "trace", skip_all, err)] - pub async fn run(&mut self) -> Result<(), anyhow::Error> { - let skip_submission_until_l1 = std::env::var("SKIP_PROOF_SUBMISSION_UNTIL_L1") - .map_or(0u64, |v| v.parse().unwrap_or(0)); - - // Prover node should sync when a new sequencer commitment arrives - // Check da block get and sync up to the latest block in the latest commitment - let last_scanned_l1_height = self - .ledger_db - .get_last_scanned_l1_height() - .unwrap_or_else(|_| panic!("Failed to get last scanned l1 height from the ledger db")); - - let start_l1_height = match last_scanned_l1_height { - Some(height) => height.0, - None => get_initial_slot_height(&self.sequencer_client).await, - }; - - let ledger_db = self.ledger_db.clone(); - let prover_config = self.prover_config.clone(); - let prover_service = self.prover_service.clone(); - let da_service = self.da_service.clone(); - let sequencer_pub_key = self.sequencer_pub_key.clone(); - let sequencer_da_pub_key = self.sequencer_da_pub_key.clone(); - let code_commitments_by_spec = self.code_commitments_by_spec.clone(); - let elfs_by_spec = self.elfs_by_spec.clone(); - let l1_block_cache = self.l1_block_cache.clone(); - - self.task_manager.spawn(|cancellation_token| async move { - let l1_block_handler = L1BlockHandler::< - Vm, - Da, - Ps, - DB, - StfStateRoot, - StfWitness, - StfTransaction, - >::new( - prover_config, - prover_service, - ledger_db, - da_service, - sequencer_pub_key, - sequencer_da_pub_key, - code_commitments_by_spec, - elfs_by_spec, - skip_submission_until_l1, - l1_block_cache.clone(), - ); - l1_block_handler - .run(start_l1_height, cancellation_token) - .await - }); - + pub async fn run(&mut self, task_manager: TaskManager<()>) -> Result<(), anyhow::Error> { // Create l2 sync worker task let (l2_tx, mut l2_rx) = mpsc::channel(1); @@ -376,17 +170,14 @@ where } } }, - Some(_) = shutdown_signal.recv() => return self.shutdown().await, + Some(_) = shutdown_signal.recv() => { + info!("Shutting down"); + task_manager.abort().await; + }, } } } - async fn shutdown(&self) -> anyhow::Result<()> { - info!("Shutting down"); - self.task_manager.abort().await; - Ok(()) - } - async fn process_l2_block( &mut self, l2_height: u64, diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 0849a6e0a..5dd207ccc 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -21,11 +21,13 @@ hex = { workspace = true } hyper = { workspace = true } jsonrpsee = { workspace = true, features = ["http-client", "server"] } lru = { workspace = true } +metrics = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } toml = { workspace = true } +tower = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } diff --git a/crates/common/src/da.rs b/crates/common/src/da.rs index 11a8a3224..3765446b9 100644 --- a/crates/common/src/da.rs +++ b/crates/common/src/da.rs @@ -1,19 +1,86 @@ use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use alloy_primitives::U64; use anyhow::anyhow; use backoff::future::retry as retry_backoff; use backoff::ExponentialBackoffBuilder; -use jsonrpsee::http_client::HttpClient; +use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use metrics::Histogram; +use sov_db::ledger_db::{LedgerDB, SharedLedgerOps}; use sov_ledger_rpc::LedgerRpcClient; use sov_rollup_interface::da::{BlockHeaderTrait, SequencerCommitment}; use sov_rollup_interface::services::da::{DaService, SlotData}; use sov_rollup_interface::zk::Proof; -use tokio::sync::Mutex; -use tracing::warn; +use tokio::sync::{mpsc, Mutex}; +use tokio::time::sleep; +use tracing::{error, info, warn}; use crate::cache::L1BlockCache; +use crate::FullNodeConfig; + +pub async fn sync_l1( + last_scanned_l1_height: u64, + da_service: Arc, + sender: mpsc::Sender, + l1_block_cache: Arc>>, + l1_block_scan_histogram: Histogram, +) where + Da: DaService, +{ + let mut last_scanned_l1_height = last_scanned_l1_height; + info!("Starting to sync from L1 height {}", last_scanned_l1_height); + + let start = Instant::now(); + + loop { + let last_finalized_l1_block_header = + match da_service.get_last_finalized_block_header().await { + Ok(header) => header, + Err(e) => { + error!("Could not fetch last finalized L1 block header: {}", e); + sleep(Duration::from_secs(2)).await; + continue; + } + }; + + let new_l1_height = last_finalized_l1_block_header.height(); + + for block_number in last_scanned_l1_height + 1..=new_l1_height { + let l1_block = + match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) + .await + { + Ok(block) => block, + Err(e) => { + error!("Could not fetch last finalized L1 block: {}", e); + sleep(Duration::from_secs(2)).await; + // In case of a failure in fetching the L1 block, trigger the retry loop. + break; + } + }; + + if block_number > last_scanned_l1_height { + if let Err(e) = sender.send(l1_block).await { + error!("Could not notify about L1 block: {}", e); + // We should not continue with the internal loop since we were not + // able to notify about the L1 block + break; + } + // If the send above does not succeed, we don't set new values + // nor do we record any metrics. + last_scanned_l1_height = block_number; + l1_block_scan_histogram.record( + Instant::now() + .saturating_duration_since(start) + .as_secs_f64(), + ); + } + } + + sleep(Duration::from_secs(2)).await; + } +} pub async fn get_da_block_at_height( da_service: &Arc, @@ -88,3 +155,24 @@ pub async fn get_initial_slot_height(client: &HttpClient) -> u64 { } } } + +pub async fn get_start_l1_height( + rollup_config: &FullNodeConfig, + ledger_db: &LedgerDB, +) -> anyhow::Result { + let last_scanned_l1_height = ledger_db.get_last_scanned_l1_height()?; + + let height = match last_scanned_l1_height { + Some(height) => height.0, + None => { + let runner_config = rollup_config + .runner + .clone() + .expect("Runner config should be set"); + let sequencer_client = + HttpClientBuilder::default().build(runner_config.sequencer_client_url)?; + get_initial_slot_height(&sequencer_client).await + } + }; + Ok(height) +} diff --git a/crates/common/src/rpc/mod.rs b/crates/common/src/rpc/mod.rs index 142f162ef..db00b3ece 100644 --- a/crates/common/src/rpc/mod.rs +++ b/crates/common/src/rpc/mod.rs @@ -14,6 +14,8 @@ use sov_db::ledger_db::{LedgerDB, SharedLedgerOps}; use sov_db::schema::types::SoftConfirmationNumber; use tower_http::cors::{Any, CorsLayer}; +pub mod server; + // Exit early if head_batch_num is below this threshold const BLOCK_NUM_THRESHOLD: u64 = 2; diff --git a/crates/common/src/rpc/server.rs b/crates/common/src/rpc/server.rs new file mode 100644 index 000000000..406bf136c --- /dev/null +++ b/crates/common/src/rpc/server.rs @@ -0,0 +1,75 @@ +use std::net::SocketAddr; + +use jsonrpsee::server::{BatchRequestConfig, RpcServiceBuilder, ServerBuilder}; +use jsonrpsee::RpcModule; +use tokio::sync::oneshot; +use tracing::{error, info}; + +use crate::tasks::manager::TaskManager; +use crate::RpcConfig; + +/// Starts a RPC server with provided rpc methods. +pub fn start_rpc_server( + rpc_config: RpcConfig, + task_manager: &mut TaskManager<()>, + methods: RpcModule<()>, + channel: Option>, +) { + let bind_host = match rpc_config.bind_host.parse() { + Ok(bind_host) => bind_host, + Err(e) => { + error!("Failed to parse bind host: {}", e); + return; + } + }; + let listen_address = SocketAddr::new(bind_host, rpc_config.bind_port); + + let max_connections = rpc_config.max_connections; + let max_subscriptions_per_connection = rpc_config.max_subscriptions_per_connection; + let max_request_body_size = rpc_config.max_request_body_size; + let max_response_body_size = rpc_config.max_response_body_size; + let batch_requests_limit = rpc_config.batch_requests_limit; + + let middleware = tower::ServiceBuilder::new() + .layer(super::get_cors_layer()) + .layer(super::get_healthcheck_proxy_layer()); + let rpc_middleware = RpcServiceBuilder::new().layer_fn(super::Logger); + + task_manager.spawn(move |cancellation_token| async move { + let server = ServerBuilder::default() + .max_connections(max_connections) + .max_subscriptions_per_connection(max_subscriptions_per_connection) + .max_request_body_size(max_request_body_size) + .max_response_body_size(max_response_body_size) + .set_batch_request_config(BatchRequestConfig::Limit(batch_requests_limit)) + .set_http_middleware(middleware) + .set_rpc_middleware(rpc_middleware) + .build([listen_address].as_ref()) + .await; + + match server { + Ok(server) => { + let bound_address = match server.local_addr() { + Ok(address) => address, + Err(e) => { + error!("{}", e); + return; + } + }; + if let Some(channel) = channel { + if let Err(e) = channel.send(bound_address) { + error!("Could not send bound_address {}: {}", bound_address, e); + return; + } + } + info!("Starting RPC server at {} ", &bound_address); + + let _server_handle = server.start(methods); + cancellation_token.cancelled().await; + } + Err(e) => { + error!("Could not start RPC server: {}", e); + } + } + }); +} diff --git a/crates/fullnode/Cargo.toml b/crates/fullnode/Cargo.toml index 0dcf91d8a..793d16238 100644 --- a/crates/fullnode/Cargo.toml +++ b/crates/fullnode/Cargo.toml @@ -21,6 +21,7 @@ sov-modules-api = { path = "../sovereign-sdk/module-system/sov-modules-api", def sov-modules-stf-blueprint = { path = "../sovereign-sdk/module-system/sov-modules-stf-blueprint", features = ["native"] } sov-prover-storage-manager = { path = "../sovereign-sdk/full-node/sov-prover-storage-manager" } sov-rollup-interface = { path = "../sovereign-sdk/rollup-interface" } +sov-state = { path = "../sovereign-sdk/module-system/sov-state" } sov-stf-runner = { path = "../sovereign-sdk/full-node/sov-stf-runner" } # 3rd-party deps diff --git a/crates/fullnode/src/da_block_handler.rs b/crates/fullnode/src/da_block_handler.rs index 6979100c9..570ca12c1 100644 --- a/crates/fullnode/src/da_block_handler.rs +++ b/crates/fullnode/src/da_block_handler.rs @@ -2,12 +2,11 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Instant; use anyhow::anyhow; use borsh::{BorshDeserialize, BorshSerialize}; use citrea_common::cache::L1BlockCache; -use citrea_common::da::{extract_sequencer_commitments, extract_zk_proofs, get_da_block_at_height}; +use citrea_common::da::{extract_sequencer_commitments, extract_zk_proofs, sync_l1}; use citrea_common::error::SyncError; use citrea_common::utils::check_l2_range_exists; use citrea_primitives::forks::fork_from_block_number; @@ -29,13 +28,13 @@ use sov_rollup_interface::zk::{ }; use tokio::select; use tokio::sync::{mpsc, Mutex}; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use crate::metrics::FULLNODE_METRICS; -pub(crate) struct L1BlockHandler +pub struct L1BlockHandler where C: Context, Da: DaService, @@ -109,6 +108,7 @@ where self.da_service.clone(), l1_tx, self.l1_block_cache.clone(), + FULLNODE_METRICS.scan_l1_block.clone(), ); tokio::pin!(l1_sync_worker); @@ -454,60 +454,3 @@ where Ok(()) } } - -async fn sync_l1( - start_l1_height: u64, - da_service: Arc, - sender: mpsc::Sender, - l1_block_cache: Arc>>, -) where - Da: DaService, -{ - let mut l1_height = start_l1_height; - info!("Starting to sync from L1 height {}", l1_height); - - let start = Instant::now(); - - 'block_sync: loop { - let last_finalized_l1_block_header = - match da_service.get_last_finalized_block_header().await { - Ok(header) => header, - Err(e) => { - error!("Could not fetch last finalized L1 block header: {}", e); - sleep(Duration::from_secs(2)).await; - continue; - } - }; - - let new_l1_height = last_finalized_l1_block_header.height(); - - for block_number in l1_height + 1..=new_l1_height { - let l1_block = - match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) - .await - { - Ok(block) => block, - Err(e) => { - error!("Could not fetch last finalized L1 block: {}", e); - sleep(Duration::from_secs(2)).await; - continue 'block_sync; - } - }; - - if block_number > l1_height { - l1_height = block_number; - FULLNODE_METRICS.scan_l1_block.record( - Instant::now() - .saturating_duration_since(start) - .as_secs_f64(), - ); - if let Err(e) = sender.send(l1_block).await { - error!("Could not notify about L1 block: {}", e); - continue 'block_sync; - } - } - } - - sleep(Duration::from_secs(2)).await; - } -} diff --git a/crates/fullnode/src/lib.rs b/crates/fullnode/src/lib.rs index 28406da97..c4bb4ab6b 100644 --- a/crates/fullnode/src/lib.rs +++ b/crates/fullnode/src/lib.rs @@ -1,6 +1,82 @@ +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +use anyhow::Result; +use borsh::{BorshDeserialize, BorshSerialize}; +use citrea_common::cache::L1BlockCache; +use citrea_common::{RollupPublicKeys, RunnerConfig}; +use da_block_handler::L1BlockHandler; pub use runner::*; +use serde::de::DeserializeOwned; +use serde::Serialize; +use sov_db::ledger_db::NodeLedgerOps; +use sov_modules_api::fork::ForkManager; +use sov_modules_api::{Context, Spec, SpecId, Zkvm}; +use sov_modules_stf_blueprint::{Runtime, StfBlueprint}; +use sov_prover_storage_manager::{ProverStorageManager, SnapshotManager}; +use sov_rollup_interface::services::da::DaService; +use sov_rollup_interface::zk::ZkvmHost; +use sov_state::ProverStorage; +use sov_stf_runner::InitParams; +use tokio::sync::{broadcast, Mutex}; -mod da_block_handler; +pub mod da_block_handler; pub mod db_migrations; mod metrics; mod runner; + +#[allow(clippy::type_complexity, clippy::too_many_arguments)] +pub fn build_services( + runner_config: RunnerConfig, + init_params: InitParams, Da::Spec>, + native_stf: StfBlueprint::Spec, RT>, + public_keys: RollupPublicKeys, + da_service: Arc, + ledger_db: DB, + storage_manager: ProverStorageManager, + soft_confirmation_tx: broadcast::Sender, + fork_manager: ForkManager<'static>, + code_commitments: HashMap::CodeCommitment>, +) -> Result<( + CitreaFullnode, + L1BlockHandler, +)> +where + Da: DaService, + C: Context + Spec> + Send + Sync, + DB: NodeLedgerOps + Send + Sync + Clone + 'static, + RT: Runtime, + Vm: ZkvmHost + Zkvm, + StateRoot: BorshDeserialize + + BorshSerialize + + Serialize + + DeserializeOwned + + Clone + + AsRef<[u8]> + + Debug, +{ + let runner = CitreaFullnode::new( + runner_config, + init_params, + native_stf, + public_keys.clone(), + da_service.clone(), + ledger_db.clone(), + storage_manager, + fork_manager, + soft_confirmation_tx, + )?; + + let l1_block_handler = L1BlockHandler::new( + ledger_db, + da_service, + public_keys.sequencer_public_key, + public_keys.sequencer_da_pub_key, + public_keys.prover_da_pub_key, + code_commitments, + Arc::new(Mutex::new(L1BlockCache::new())), + ); + + Ok((runner, l1_block_handler)) +} diff --git a/crates/fullnode/src/runner.rs b/crates/fullnode/src/runner.rs index ed37e9c6d..f1f162121 100644 --- a/crates/fullnode/src/runner.rs +++ b/crates/fullnode/src/runner.rs @@ -1,5 +1,4 @@ -use std::collections::{HashMap, VecDeque}; -use std::net::SocketAddr; +use std::collections::VecDeque; use std::sync::Arc; use std::time::Instant; @@ -11,13 +10,11 @@ use citrea_common::cache::L1BlockCache; use citrea_common::da::get_da_block_at_height; use citrea_common::tasks::manager::TaskManager; use citrea_common::utils::{create_shutdown_signal, soft_confirmation_to_receipt}; -use citrea_common::{RollupPublicKeys, RpcConfig, RunnerConfig}; +use citrea_common::{RollupPublicKeys, RunnerConfig}; use citrea_primitives::types::SoftConfirmationHash; use citrea_pruning::{Pruner, PruningConfig}; use jsonrpsee::core::client::Error as JsonrpseeError; use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; -use jsonrpsee::server::{BatchRequestConfig, RpcServiceBuilder, ServerBuilder}; -use jsonrpsee::RpcModule; use sov_db::ledger_db::NodeLedgerOps; use sov_db::schema::types::{SlotNumber, SoftConfirmationNumber}; use sov_ledger_rpc::LedgerRpcClient; @@ -28,16 +25,13 @@ use sov_rollup_interface::da::BlockHeaderTrait; use sov_rollup_interface::fork::ForkManager; use sov_rollup_interface::rpc::SoftConfirmationResponse; use sov_rollup_interface::services::da::{DaService, SlotData}; -use sov_rollup_interface::spec::SpecId; use sov_rollup_interface::stf::StateTransitionFunction; -use sov_rollup_interface::zk::{Zkvm, ZkvmHost}; -use sov_stf_runner::InitVariant; +use sov_stf_runner::InitParams; use tokio::select; -use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; +use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::time::{sleep, Duration}; use tracing::{debug, error, info, instrument}; -use crate::da_block_handler::L1BlockHandler; use crate::metrics::FULLNODE_METRICS; type StateRoot = as StateTransitionFunction>::StateRoot; @@ -45,10 +39,9 @@ type StfTransaction = as StateTransitionFunction>::Transaction; /// Citrea's own STF runner implementation. -pub struct CitreaFullnode +pub struct CitreaFullnode where Da: DaService, - Vm: ZkvmHost + Zkvm, C: Context + Spec>, DB: NodeLedgerOps + Clone, RT: Runtime, @@ -60,27 +53,20 @@ where ledger_db: DB, state_root: StateRoot, batch_hash: SoftConfirmationHash, - rpc_config: RpcConfig, sequencer_client: HttpClient, sequencer_pub_key: Vec, - sequencer_da_pub_key: Vec, - prover_da_pub_key: Vec, phantom: std::marker::PhantomData, include_tx_body: bool, - code_commitments_by_spec: HashMap, l1_block_cache: Arc>>, sync_blocks_count: u64, fork_manager: ForkManager<'static>, soft_confirmation_tx: broadcast::Sender, pruning_config: Option, - task_manager: TaskManager<()>, } -impl CitreaFullnode +impl CitreaFullnode where Da: DaService, - Vm: ZkvmHost + Zkvm, - ::CodeCommitment: Send, C: Context + Spec> + Send + Sync, DB: NodeLedgerOps + Clone + Send + Sync + 'static, RT: Runtime, @@ -93,38 +79,15 @@ where #[allow(clippy::too_many_arguments)] pub fn new( runner_config: RunnerConfig, + init_params: InitParams, Da::Spec>, + stf: StfBlueprint, public_keys: RollupPublicKeys, - rpc_config: RpcConfig, da_service: Arc, ledger_db: DB, - stf: StfBlueprint, - mut storage_manager: ProverStorageManager, - init_variant: InitVariant, Da::Spec>, - code_commitments_by_spec: HashMap, + storage_manager: ProverStorageManager, fork_manager: ForkManager<'static>, soft_confirmation_tx: broadcast::Sender, - task_manager: TaskManager<()>, ) -> Result { - let (prev_state_root, prev_batch_hash) = match init_variant { - InitVariant::Initialized((state_root, batch_hash)) => { - info!("Chain is already initialized. Skipping initialization. State root: {}. Previous soft confirmation hash: {}", hex::encode(state_root.as_ref()), hex::encode(batch_hash)); - (state_root, batch_hash) - } - InitVariant::Genesis(params) => { - info!("No history detected. Initializing chain..."); - let storage = storage_manager.create_storage_on_l2_height(0)?; - let (genesis_root, initialized_storage) = stf.init_chain(storage, params); - storage_manager.save_change_set_l2(0, initialized_storage)?; - storage_manager.finalize_l2(0)?; - ledger_db.set_l2_genesis_state_root(&genesis_root)?; - info!( - "Chain initialization is done. Genesis root: 0x{}", - hex::encode(genesis_root.as_ref()), - ); - (genesis_root, [0; 32]) - } - }; - let start_l2_height = ledger_db.get_head_soft_confirmation_height()?.unwrap_or(0) + 1; info!("Starting L2 height: {}", start_l2_height); @@ -135,92 +98,21 @@ where stf, storage_manager, ledger_db, - state_root: prev_state_root, - batch_hash: prev_batch_hash, - rpc_config, + state_root: init_params.state_root, + batch_hash: init_params.batch_hash, sequencer_client: HttpClientBuilder::default() .build(runner_config.sequencer_client_url)?, sequencer_pub_key: public_keys.sequencer_public_key, - sequencer_da_pub_key: public_keys.sequencer_da_pub_key, - prover_da_pub_key: public_keys.prover_da_pub_key, phantom: std::marker::PhantomData, include_tx_body: runner_config.include_tx_body, - code_commitments_by_spec, sync_blocks_count: runner_config.sync_blocks_count, l1_block_cache: Arc::new(Mutex::new(L1BlockCache::new())), fork_manager, soft_confirmation_tx, pruning_config: runner_config.pruning_config, - task_manager, }) } - /// Starts a RPC server with provided rpc methods. - pub async fn start_rpc_server( - &mut self, - methods: RpcModule<()>, - channel: Option>, - ) { - let bind_host = match self.rpc_config.bind_host.parse() { - Ok(bind_host) => bind_host, - Err(e) => { - error!("Failed to parse bind host: {}", e); - return; - } - }; - let listen_address = SocketAddr::new(bind_host, self.rpc_config.bind_port); - - let max_connections = self.rpc_config.max_connections; - let max_subscriptions_per_connection = self.rpc_config.max_subscriptions_per_connection; - let max_request_body_size = self.rpc_config.max_request_body_size; - let max_response_body_size = self.rpc_config.max_response_body_size; - let batch_requests_limit = self.rpc_config.batch_requests_limit; - - let middleware = tower::ServiceBuilder::new() - .layer(citrea_common::rpc::get_cors_layer()) - .layer(citrea_common::rpc::get_healthcheck_proxy_layer()); - let rpc_middleware = RpcServiceBuilder::new().layer_fn(citrea_common::rpc::Logger); - - self.task_manager - .spawn(move |cancellation_token| async move { - let server = ServerBuilder::default() - .max_connections(max_connections) - .max_subscriptions_per_connection(max_subscriptions_per_connection) - .max_request_body_size(max_request_body_size) - .max_response_body_size(max_response_body_size) - .set_batch_request_config(BatchRequestConfig::Limit(batch_requests_limit)) - .set_http_middleware(middleware) - .set_rpc_middleware(rpc_middleware) - .build([listen_address].as_ref()) - .await; - - match server { - Ok(server) => { - let bound_address = match server.local_addr() { - Ok(address) => address, - Err(e) => { - error!("{}", e); - return; - } - }; - if let Some(channel) = channel { - if let Err(e) = channel.send(bound_address) { - error!("Could not send bound_address {}: {}", bound_address, e); - return; - } - } - info!("Starting RPC server at {} ", &bound_address); - - let _server_handle = server.start(methods); - cancellation_token.cancelled().await; - } - Err(e) => { - error!("Could not start RPC server: {}", e); - } - } - }); - } - async fn process_l2_block( &mut self, l2_height: u64, @@ -323,22 +215,8 @@ where /// Runs the rollup. #[instrument(level = "trace", skip_all, err)] - pub async fn run(&mut self) -> Result<(), anyhow::Error> { + pub async fn run(&mut self, mut task_manager: TaskManager<()>) -> Result<(), anyhow::Error> { // Last L1/L2 height before shutdown. - let start_l1_height = { - let last_scanned_l1_height = self - .ledger_db - .get_last_scanned_l1_height() - .unwrap_or_else(|_| { - panic!("Failed to get last scanned l1 height from the ledger db") - }); - - match last_scanned_l1_height { - Some(height) => height.0, - None => get_initial_slot_height(&self.sequencer_client).await, - } - }; - if let Some(config) = &self.pruning_config { let pruner = Pruner::::new( config.clone(), @@ -347,35 +225,9 @@ where self.ledger_db.clone(), ); - self.task_manager - .spawn(|cancellation_token| pruner.run(cancellation_token)); + task_manager.spawn(|cancellation_token| pruner.run(cancellation_token)); } - let ledger_db = self.ledger_db.clone(); - let da_service = self.da_service.clone(); - let sequencer_pub_key = self.sequencer_pub_key.clone(); - let sequencer_da_pub_key = self.sequencer_da_pub_key.clone(); - let prover_da_pub_key = self.prover_da_pub_key.clone(); - let code_commitments_by_spec = self.code_commitments_by_spec.clone(); - let l1_block_cache = self.l1_block_cache.clone(); - - self.task_manager - .spawn(move |cancellation_token| async move { - let l1_block_handler = - L1BlockHandler::, DB>::new( - ledger_db, - da_service, - sequencer_pub_key, - sequencer_da_pub_key, - prover_da_pub_key, - code_commitments_by_spec, - l1_block_cache.clone(), - ); - l1_block_handler - .run(start_l1_height, cancellation_token) - .await - }); - let (l2_tx, mut l2_rx) = mpsc::channel(1); let l2_sync_worker = sync_l2( self.start_l2_height, @@ -432,17 +284,14 @@ where } } }, - Some(_) = shutdown_signal.recv() => return self.shutdown().await, + Some(_) = shutdown_signal.recv() => { + info!("Shutting down"); + task_manager.abort().await; + }, } } } - async fn shutdown(&self) -> anyhow::Result<()> { - info!("Shutting down"); - self.task_manager.abort().await; - Ok(()) - } - /// Allows to read current state root pub fn get_state_root(&self) -> &StateRoot { &self.state_root @@ -528,16 +377,3 @@ async fn sync_l2( } } } - -async fn get_initial_slot_height(client: &HttpClient) -> u64 { - loop { - match client.get_soft_confirmation_by_number(U64::from(1)).await { - Ok(Some(soft_confirmation)) => return soft_confirmation.da_slot_height, - _ => { - // sleep 1 - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - continue; - } - } - } -} diff --git a/crates/light-client-prover/src/da_block_handler.rs b/crates/light-client-prover/src/da_block_handler.rs index 2f485a7f4..7474f4871 100644 --- a/crates/light-client-prover/src/da_block_handler.rs +++ b/crates/light-client-prover/src/da_block_handler.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use borsh::BorshDeserialize; use citrea_common::cache::L1BlockCache; -use citrea_common::da::get_da_block_at_height; +use citrea_common::da::sync_l1; use citrea_common::LightClientProverConfig; use citrea_primitives::forks::fork_from_block_number; use sov_db::ledger_db::{LightClientProverLedgerOps, SharedLedgerOps}; @@ -20,14 +20,18 @@ use sov_rollup_interface::zk::{ use sov_stf_runner::{ProofData, ProverService}; use tokio::select; use tokio::sync::{mpsc, Mutex}; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use crate::metrics::LIGHT_CLIENT_METRICS; -use crate::runner::StartVariant; -pub(crate) struct L1BlockHandler +pub enum StartVariant { + LastScanned(u64), + FromBlock(u64), +} + +pub struct L1BlockHandler where Da: DaService, Vm: ZkvmHost + Zkvm, @@ -97,13 +101,17 @@ where // .clear_pending_proving_sessions() // .expect("Failed to clear pending proving sessions"); // } - + let start_l1_height = match last_l1_height_scanned { + StartVariant::LastScanned(height) => height + 1, // last scanned block + 1 + StartVariant::FromBlock(height) => height, // first block to scan + }; let (l1_tx, mut l1_rx) = mpsc::channel(1); let l1_sync_worker = sync_l1( - last_l1_height_scanned, + start_l1_height, self.da_service.clone(), l1_tx, self.l1_block_cache.clone(), + LIGHT_CLIENT_METRICS.scan_l1_block.clone(), ); tokio::pin!(l1_sync_worker); @@ -477,58 +485,3 @@ where Ok(proofs[0].clone()) } } - -async fn sync_l1( - start_l1_height: StartVariant, - da_service: Arc, - sender: mpsc::Sender, - l1_block_cache: Arc>>, -) where - Da: DaService, -{ - let mut l1_height = match start_l1_height { - StartVariant::LastScanned(height) => height + 1, // last scanned block + 1 - StartVariant::FromBlock(height) => height, // first block to scan - }; - info!("Starting to sync from L1 height {}", l1_height); - - 'block_sync: loop { - let last_finalized_l1_block_header = - match da_service.get_last_finalized_block_header().await { - Ok(header) => header, - Err(e) => { - error!("Could not fetch last finalized L1 block header: {}", e); - sleep(Duration::from_secs(2)).await; - continue; - } - }; - - let new_l1_height = last_finalized_l1_block_header.height(); - - for block_number in l1_height..=new_l1_height { - let l1_block = - match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) - .await - { - Ok(block) => block, - Err(e) => { - error!("Could not fetch last finalized L1 block: {}", e); - sleep(Duration::from_secs(2)).await; - continue 'block_sync; - } - }; - - if let Err(e) = sender.send(l1_block).await { - error!("Could not notify about L1 block: {}", e); - continue 'block_sync; - } - } - - // next iteration of he loop will start from the next block - if new_l1_height >= l1_height { - l1_height = new_l1_height + 1; - } - - sleep(Duration::from_secs(2)).await; - } -} diff --git a/crates/light-client-prover/src/lib.rs b/crates/light-client-prover/src/lib.rs index 8b9812818..4887e0f1d 100644 --- a/crates/light-client-prover/src/lib.rs +++ b/crates/light-client-prover/src/lib.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "native")] +pub use services::*; + pub mod circuit; #[cfg(feature = "native")] pub mod da_block_handler; @@ -9,6 +12,8 @@ pub mod metrics; pub mod rpc; #[cfg(feature = "native")] pub mod runner; +#[cfg(feature = "native")] +mod services; #[cfg(test)] mod tests; pub(crate) mod utils; diff --git a/crates/light-client-prover/src/metrics.rs b/crates/light-client-prover/src/metrics.rs index d07673242..fe2775cbd 100644 --- a/crates/light-client-prover/src/metrics.rs +++ b/crates/light-client-prover/src/metrics.rs @@ -1,4 +1,4 @@ -use metrics::Gauge; +use metrics::{Gauge, Histogram}; use metrics_derive::Metrics; use once_cell::sync::Lazy; @@ -7,6 +7,8 @@ use once_cell::sync::Lazy; pub struct LightClientProverMetrics { #[metric(describe = "The current L1 block number which is used to produce L2 blocks")] pub current_l1_block: Gauge, + #[metric(describe = "The duration of scanning and processing a single L1 block")] + pub scan_l1_block: Histogram, } /// Light client metrics diff --git a/crates/light-client-prover/src/rpc.rs b/crates/light-client-prover/src/rpc.rs index 298c70822..e7799595b 100644 --- a/crates/light-client-prover/src/rpc.rs +++ b/crates/light-client-prover/src/rpc.rs @@ -14,6 +14,32 @@ where pub ledger: DB, } +/// Creates a shared RpcContext with all required data. +pub fn create_rpc_context(ledger_db: DB) -> RpcContext { + RpcContext { ledger: ledger_db } +} + +pub fn create_rpc_module( + rpc_context: RpcContext, +) -> jsonrpsee::RpcModule> +where + DB: LightClientProverLedgerOps + Clone + Send + Sync + 'static, +{ + let server = LightClientProverRpcServerImpl::new(rpc_context); + + LightClientProverRpcServer::into_rpc(server) +} + +/// Updates the given RpcModule with Prover methods. +pub fn register_rpc_methods( + mut rpc_methods: jsonrpsee::RpcModule<()>, + rpc_context: RpcContext, +) -> Result, jsonrpsee::core::RegisterMethodError> { + let rpc = create_rpc_module(rpc_context); + rpc_methods.merge(rpc)?; + Ok(rpc_methods) +} + #[rpc(client, server, namespace = "lightClientProver")] pub trait LightClientProverRpc { /// Generate state transition data for the given L1 block height, and return the data as a borsh serialized hex string. @@ -66,14 +92,3 @@ where Ok(res) } } - -pub fn create_rpc_module( - rpc_context: RpcContext, -) -> jsonrpsee::RpcModule> -where - DB: LightClientProverLedgerOps + Clone + Send + Sync + 'static, -{ - let server = LightClientProverRpcServerImpl::new(rpc_context); - - LightClientProverRpcServer::into_rpc(server) -} diff --git a/crates/light-client-prover/src/runner.rs b/crates/light-client-prover/src/runner.rs index 4bf8e1464..29bf54bf1 100644 --- a/crates/light-client-prover/src/runner.rs +++ b/crates/light-client-prover/src/runner.rs @@ -1,209 +1,26 @@ -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; - use citrea_common::tasks::manager::TaskManager; -use citrea_common::{LightClientProverConfig, RollupPublicKeys, RpcConfig, RunnerConfig}; -use jsonrpsee::server::{BatchRequestConfig, ServerBuilder}; -use jsonrpsee::RpcModule; -use sov_db::ledger_db::{LightClientProverLedgerOps, SharedLedgerOps}; -use sov_db::mmr_db::MmrDB; -use sov_rollup_interface::services::da::DaService; -use sov_rollup_interface::spec::SpecId; -use sov_rollup_interface::zk::ZkvmHost; -use sov_stf_runner::ProverService; +use citrea_common::RunnerConfig; use tokio::signal; -use tokio::sync::oneshot; -use tracing::{error, info, instrument}; - -use crate::da_block_handler::L1BlockHandler; -use crate::rpc::{create_rpc_module, RpcContext}; +use tracing::instrument; -pub(crate) enum StartVariant { - LastScanned(u64), - FromBlock(u64), -} - -pub struct CitreaLightClientProver -where - Da: DaService + Send + Sync, - Vm: ZkvmHost, - Ps: ProverService, - DB: LightClientProverLedgerOps + SharedLedgerOps + Clone, -{ +pub struct CitreaLightClientProver { _runner_config: RunnerConfig, - public_keys: RollupPublicKeys, - rpc_config: RpcConfig, - da_service: Arc, - ledger_db: DB, - prover_service: Arc, - prover_config: LightClientProverConfig, - task_manager: TaskManager<()>, - batch_proof_commitments_by_spec: HashMap, - light_client_proof_commitment: HashMap, - light_client_proof_elfs: HashMap>, - mmr_db: MmrDB, } -impl CitreaLightClientProver -where - Da: DaService + Send + Sync + 'static, - Vm: ZkvmHost, - Ps: ProverService + Send + Sync + 'static, - DB: LightClientProverLedgerOps + SharedLedgerOps + Clone + 'static, -{ +impl CitreaLightClientProver { #[allow(clippy::too_many_arguments)] - pub fn new( - runner_config: RunnerConfig, - public_keys: RollupPublicKeys, - rpc_config: RpcConfig, - da_service: Arc, - ledger_db: DB, - prover_service: Arc, - prover_config: LightClientProverConfig, - batch_proof_commitments_by_spec: HashMap, - light_client_proof_commitment: HashMap, - light_client_proof_elfs: HashMap>, - mmr_db: MmrDB, - task_manager: TaskManager<()>, - ) -> Result { + pub fn new(runner_config: RunnerConfig) -> Result { Ok(Self { _runner_config: runner_config, - public_keys, - rpc_config, - da_service, - ledger_db, - prover_service, - prover_config, - task_manager, - batch_proof_commitments_by_spec, - light_client_proof_commitment, - light_client_proof_elfs, - mmr_db, }) } - /// Starts a RPC server with provided rpc methods. - pub async fn start_rpc_server( - &mut self, - methods: RpcModule<()>, - channel: Option>, - ) -> anyhow::Result<()> { - let methods = self.register_rpc_methods(methods)?; - let listen_address = SocketAddr::new( - self.rpc_config - .bind_host - .parse() - .map_err(|e| anyhow::anyhow!("Failed to parse bind host: {}", e))?, - self.rpc_config.bind_port, - ); - - let max_connections = self.rpc_config.max_connections; - let max_subscriptions_per_connection = self.rpc_config.max_subscriptions_per_connection; - let max_request_body_size = self.rpc_config.max_request_body_size; - let max_response_body_size = self.rpc_config.max_response_body_size; - let batch_requests_limit = self.rpc_config.batch_requests_limit; - - let middleware = tower::ServiceBuilder::new().layer(citrea_common::rpc::get_cors_layer()); - // .layer(citrea_common::rpc::get_healthcheck_proxy_layer()); - - self.task_manager.spawn(|cancellation_token| async move { - let server = ServerBuilder::default() - .max_connections(max_connections) - .max_subscriptions_per_connection(max_subscriptions_per_connection) - .max_request_body_size(max_request_body_size) - .max_response_body_size(max_response_body_size) - .set_batch_request_config(BatchRequestConfig::Limit(batch_requests_limit)) - .set_http_middleware(middleware) - .build([listen_address].as_ref()) - .await; - - match server { - Ok(server) => { - let bound_address = match server.local_addr() { - Ok(address) => address, - Err(e) => { - error!("{}", e); - return; - } - }; - if let Some(channel) = channel { - if let Err(e) = channel.send(bound_address) { - error!("Could not send bound_address {}: {}", bound_address, e); - return; - } - } - info!("Starting RPC server at {} ", &bound_address); - - let _server_handle = server.start(methods); - cancellation_token.cancelled().await; - } - Err(e) => { - error!("Could not start RPC server: {}", e); - } - } - }); - Ok(()) - } - /// Runs the rollup. #[instrument(level = "trace", skip_all, err)] - pub async fn run(&mut self) -> Result<(), anyhow::Error> { - let starting_block = match self.ledger_db.get_last_scanned_l1_height()? { - Some(l1_height) => StartVariant::LastScanned(l1_height.0), - // first time starting the prover - // start from the block given in the config - None => StartVariant::FromBlock(self.prover_config.initial_da_height), - }; - - let prover_config = self.prover_config.clone(); - let prover_service = self.prover_service.clone(); - let ledger_db = self.ledger_db.clone(); - let mmr_db = self.mmr_db.clone(); - let da_service = self.da_service.clone(); - let batch_prover_da_pub_key = self.public_keys.prover_da_pub_key.clone(); - let batch_proof_commitments_by_spec = self.batch_proof_commitments_by_spec.clone(); - let light_client_proof_commitment = self.light_client_proof_commitment.clone(); - let light_client_proof_elfs = self.light_client_proof_elfs.clone(); - - self.task_manager.spawn(|cancellation_token| async move { - let l1_block_handler = L1BlockHandler::::new( - prover_config, - prover_service, - ledger_db, - da_service, - batch_prover_da_pub_key, - batch_proof_commitments_by_spec, - light_client_proof_commitment, - light_client_proof_elfs, - mmr_db, - ); - l1_block_handler - .run(starting_block, cancellation_token) - .await - }); - + pub async fn run(&mut self, task_manager: TaskManager<()>) -> Result<(), anyhow::Error> { signal::ctrl_c().await.expect("Failed to listen ctrl+c"); - self.task_manager.abort().await; + task_manager.abort().await; Ok(()) } - - /// Creates a shared RpcContext with all required data. - fn create_rpc_context(&self) -> RpcContext { - RpcContext { - ledger: self.ledger_db.clone(), - } - } - - /// Updates the given RpcModule with Prover methods. - pub fn register_rpc_methods( - &self, - mut rpc_methods: jsonrpsee::RpcModule<()>, - ) -> Result, jsonrpsee::core::RegisterMethodError> { - let rpc_context = self.create_rpc_context(); - let rpc = create_rpc_module(rpc_context); - rpc_methods.merge(rpc)?; - Ok(rpc_methods) - } } diff --git a/crates/light-client-prover/src/services.rs b/crates/light-client-prover/src/services.rs new file mode 100644 index 000000000..f10a556b1 --- /dev/null +++ b/crates/light-client-prover/src/services.rs @@ -0,0 +1,62 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::Result; +use citrea_common::{LightClientProverConfig, RollupPublicKeys, RunnerConfig}; +use jsonrpsee::RpcModule; +use sov_db::ledger_db::{LightClientProverLedgerOps, SharedLedgerOps}; +use sov_db::mmr_db::MmrDB; +use sov_db::rocks_db_config::RocksdbConfig; +use sov_modules_api::{SpecId, Zkvm}; +use sov_rollup_interface::services::da::DaService; +use sov_rollup_interface::zk::ZkvmHost; +use sov_stf_runner::ProverService; + +use crate::da_block_handler::L1BlockHandler; +use crate::rpc; +use crate::runner::CitreaLightClientProver; + +#[allow(clippy::type_complexity, clippy::too_many_arguments)] +pub fn build_services( + prover_config: LightClientProverConfig, + runner_config: RunnerConfig, + rocksdb_config: &RocksdbConfig, + ledger_db: DB, + da_service: Arc, + prover_service: Arc, + public_keys: RollupPublicKeys, + batch_prover_code_commitments: HashMap, + light_client_prover_code_commitments: HashMap, + light_client_prover_elfs: HashMap>, + rpc_module: RpcModule<()>, +) -> Result<( + CitreaLightClientProver, + L1BlockHandler, + RpcModule<()>, +)> +where + Da: DaService, + Vm: ZkvmHost + Zkvm, + Ps: ProverService, + DB: LightClientProverLedgerOps + SharedLedgerOps + Clone + 'static, +{ + let rpc_context = rpc::create_rpc_context(ledger_db.clone()); + let rpc_module = rpc::register_rpc_methods(rpc_module, rpc_context)?; + + let mmr_db = MmrDB::new(rocksdb_config)?; + let l1_block_handler = L1BlockHandler::new( + prover_config, + prover_service, + ledger_db, + da_service, + public_keys.prover_da_pub_key, + batch_prover_code_commitments, + light_client_prover_code_commitments, + light_client_prover_elfs, + mmr_db, + ); + + let prover = CitreaLightClientProver::new(runner_config)?; + + Ok((prover, l1_block_handler, rpc_module)) +} diff --git a/crates/sequencer/src/commitment/mod.rs b/crates/sequencer/src/commitment/mod.rs index aefba3dfa..7dd333162 100644 --- a/crates/sequencer/src/commitment/mod.rs +++ b/crates/sequencer/src/commitment/mod.rs @@ -3,8 +3,6 @@ use std::sync::Arc; use std::time::Instant; use anyhow::anyhow; -use futures::channel::mpsc::UnboundedReceiver; -use futures::StreamExt; use parking_lot::RwLock; use rs_merkle::algorithms::Sha256; use rs_merkle::MerkleTree; @@ -14,6 +12,7 @@ use sov_modules_api::StateDiff; use sov_rollup_interface::da::{BlockHeaderTrait, DaTxRequest, SequencerCommitment}; use sov_rollup_interface::services::da::{DaService, TxRequestWithNotifier}; use tokio::select; +use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument}; @@ -73,7 +72,7 @@ where _ = cancellation_token.cancelled() => { return; }, - info = self.soft_confirmation_rx.next() => { + info = self.soft_confirmation_rx.recv() => { let Some((height, state_diff)) = info else { // An error is returned because the channel is either // closed or lagged. diff --git a/crates/sequencer/src/deposit_data_mempool.rs b/crates/sequencer/src/deposit_data_mempool.rs index e9b8aa1e9..3d679dbc3 100644 --- a/crates/sequencer/src/deposit_data_mempool.rs +++ b/crates/sequencer/src/deposit_data_mempool.rs @@ -6,16 +6,14 @@ use citrea_evm::system_contracts::BridgeWrapper; use citrea_evm::SYSTEM_SIGNER; use tracing::instrument; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct DepositDataMempool { accepted_deposit_txs: VecDeque>, } impl DepositDataMempool { pub fn new() -> Self { - Self { - accepted_deposit_txs: VecDeque::new(), - } + Self::default() } pub fn make_deposit_tx_from_data(&mut self, deposit_tx_data: Vec) -> TransactionRequest { diff --git a/crates/sequencer/src/lib.rs b/crates/sequencer/src/lib.rs index bcf94aca1..6d0bdf3fb 100644 --- a/crates/sequencer/src/lib.rs +++ b/crates/sequencer/src/lib.rs @@ -1,13 +1,91 @@ +use std::sync::Arc; + +use anyhow::Result; +use citrea_common::RollupPublicKeys; +pub use citrea_common::SequencerConfig; +use db_provider::DbProvider; +use deposit_data_mempool::DepositDataMempool; +use jsonrpsee::RpcModule; +use mempool::CitreaMempool; +use parking_lot::Mutex; +pub use rpc::SequencerRpcClient; +pub use runner::CitreaSequencer; +use sov_db::ledger_db::SequencerLedgerOps; +use sov_modules_api::{Context, Spec}; +use sov_modules_stf_blueprint::{Runtime, StfBlueprint}; +use sov_prover_storage_manager::{ProverStorageManager, SnapshotManager}; +use sov_rollup_interface::fork::ForkManager; +use sov_rollup_interface::services::da::DaService; +use sov_state::ProverStorage; +use sov_stf_runner::InitParams; +use tokio::sync::broadcast; +use tokio::sync::mpsc::unbounded_channel; + mod commitment; pub mod db_migrations; mod db_provider; mod deposit_data_mempool; mod mempool; mod metrics; -mod rpc; +pub mod rpc; mod runner; mod utils; -pub use citrea_common::{SequencerConfig, SequencerMempoolConfig}; -pub use rpc::SequencerRpcClient; -pub use runner::CitreaSequencer; +#[allow(clippy::type_complexity, clippy::too_many_arguments)] +pub fn build_services( + sequencer_config: SequencerConfig, + init_params: InitParams, Da::Spec>, + native_stf: StfBlueprint::Spec, RT>, + public_keys: RollupPublicKeys, + da_service: Arc, + ledger_db: DB, + storage_manager: ProverStorageManager, + prover_storage: ProverStorage, + soft_confirmation_tx: broadcast::Sender, + fork_manager: ForkManager<'static>, + rpc_module: RpcModule<()>, +) -> Result<(CitreaSequencer, RpcModule<()>)> +where + C: Context + Spec>, + Da: DaService, + DB: SequencerLedgerOps + Send + Sync + Clone + 'static, + RT: Runtime, +{ + let (l2_force_block_tx, l2_force_block_rx) = unbounded_channel(); + // used as client of reth's mempool + let db_provider = DbProvider::new(prover_storage.clone()); + let mempool = Arc::new(CitreaMempool::new( + db_provider.clone(), + sequencer_config.mempool_conf.clone(), + )?); + let deposit_mempool = Arc::new(Mutex::new(DepositDataMempool::new())); + + let rpc_context = rpc::create_rpc_context( + mempool.clone(), + deposit_mempool.clone(), + l2_force_block_tx, + prover_storage.clone(), + ledger_db.clone(), + sequencer_config.test_mode, + ); + let rpc_module = rpc::register_rpc_methods::(rpc_context, rpc_module)?; + + let seq = CitreaSequencer::new( + da_service, + sequencer_config, + init_params, + native_stf, + storage_manager, + public_keys, + ledger_db, + db_provider, + mempool, + deposit_mempool, + fork_manager, + soft_confirmation_tx, + l2_force_block_rx, + ) + .unwrap(); + + Ok((seq, rpc_module)) +} diff --git a/crates/sequencer/src/mempool.rs b/crates/sequencer/src/mempool.rs index 72cccc0a3..ad3cf4161 100644 --- a/crates/sequencer/src/mempool.rs +++ b/crates/sequencer/src/mempool.rs @@ -26,7 +26,7 @@ type CitreaMempoolImpl = Pool< type Transaction = as TransactionPool>::Transaction; -pub(crate) struct CitreaMempool(CitreaMempoolImpl); +pub struct CitreaMempool(CitreaMempoolImpl); impl CitreaMempool { pub(crate) fn new( diff --git a/crates/sequencer/src/rpc.rs b/crates/sequencer/src/rpc.rs index 6cd27373d..82101c2e2 100644 --- a/crates/sequencer/src/rpc.rs +++ b/crates/sequencer/src/rpc.rs @@ -4,7 +4,6 @@ use alloy_eips::eip2718::Encodable2718; use alloy_network::AnyNetwork; use alloy_primitives::{Bytes, B256}; use citrea_evm::Evm; -use futures::channel::mpsc::UnboundedSender; use jsonrpsee::core::RpcResult; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::error::{INTERNAL_ERROR_CODE, INTERNAL_ERROR_MSG}; @@ -16,7 +15,8 @@ use reth_rpc_eth_types::error::EthApiError; use reth_rpc_types_compat::transaction::from_recovered; use reth_transaction_pool::{EthPooledTransaction, PoolTransaction}; use sov_db::ledger_db::SequencerLedgerOps; -use sov_modules_api::WorkingSet; +use sov_modules_api::{Context, WorkingSet}; +use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, error}; use crate::deposit_data_mempool::DepositDataMempool; @@ -24,7 +24,7 @@ use crate::mempool::CitreaMempool; use crate::metrics::SEQUENCER_METRICS; use crate::utils::recover_raw_transaction; -pub(crate) struct RpcContext { +pub struct RpcContext { pub mempool: Arc>, pub deposit_mempool: Arc>, pub l2_force_block_tx: UnboundedSender<()>, @@ -33,6 +33,42 @@ pub(crate) struct RpcContext( + mempool: Arc>, + deposit_mempool: Arc>, + l2_force_block_tx: UnboundedSender<()>, + storage: C::Storage, + ledger_db: DB, + test_mode: bool, +) -> RpcContext +where + C: Context, + DB: SequencerLedgerOps + Send + Clone + 'static, +{ + RpcContext { + mempool, + deposit_mempool, + l2_force_block_tx, + storage, + ledger: ledger_db, + test_mode, + } +} + +/// Updates the given RpcModule with Sequencer methods. +pub fn register_rpc_methods< + C: sov_modules_api::Context, + DB: SequencerLedgerOps + Send + Sync + 'static, +>( + rpc_context: RpcContext, + mut rpc_methods: jsonrpsee::RpcModule<()>, +) -> Result, jsonrpsee::core::RegisterMethodError> { + let rpc = create_rpc_module(rpc_context); + rpc_methods.merge(rpc)?; + Ok(rpc_methods) +} + #[rpc(client, server)] pub trait SequencerRpc { #[method(name = "eth_sendRawTransaction")] @@ -177,16 +213,13 @@ impl = as StateTransitionFunction>::StateRoot; @@ -80,10 +75,8 @@ where da_service: Arc, mempool: Arc>, sov_tx_signer_priv_key: C::PrivateKey, - l2_force_block_tx: UnboundedSender<()>, l2_force_block_rx: UnboundedReceiver<()>, db_provider: DbProvider, - storage: C::Storage, ledger_db: DB, config: SequencerConfig, stf: StfBlueprint, @@ -93,10 +86,8 @@ where batch_hash: SoftConfirmationHash, sequencer_pub_key: Vec, sequencer_da_pub_key: Vec, - rpc_config: RpcConfig, fork_manager: ForkManager<'static>, soft_confirmation_tx: broadcast::Sender, - task_manager: TaskManager<()>, } enum L2BlockMode { @@ -114,138 +105,41 @@ where #[allow(clippy::too_many_arguments)] pub fn new( da_service: Arc, - storage: C::Storage, config: SequencerConfig, + init_params: InitParams, Da::Spec>, stf: StfBlueprint, - mut storage_manager: ProverStorageManager, - init_variant: InitVariant, Da::Spec>, + storage_manager: ProverStorageManager, public_keys: RollupPublicKeys, ledger_db: DB, - rpc_config: RpcConfig, + db_provider: DbProvider, + mempool: Arc>, + deposit_mempool: Arc>, fork_manager: ForkManager<'static>, soft_confirmation_tx: broadcast::Sender, - task_manager: TaskManager<()>, + l2_force_block_rx: UnboundedReceiver<()>, ) -> anyhow::Result { - let (l2_force_block_tx, l2_force_block_rx) = unbounded(); - - let (prev_state_root, prev_batch_hash) = match init_variant { - InitVariant::Initialized((state_root, batch_hash)) => { - debug!("Chain is already initialized. Skipping initialization."); - (state_root, batch_hash) - } - InitVariant::Genesis(params) => { - info!("No history detected. Initializing chain...",); - let storage = storage_manager.create_storage_on_l2_height(0)?; - let (genesis_root, initialized_storage) = stf.init_chain(storage, params); - storage_manager.save_change_set_l2(0, initialized_storage)?; - storage_manager.finalize_l2(0)?; - ledger_db.set_l2_genesis_state_root(&genesis_root)?; - info!( - "Chain initialization is done. Genesis root: 0x{}", - hex::encode(genesis_root.as_ref()), - ); - (genesis_root, [0; 32]) - } - }; - - // used as client of reth's mempool - let db_provider = DbProvider::new(storage.clone()); - - let pool = CitreaMempool::new(db_provider.clone(), config.mempool_conf.clone())?; - - let deposit_mempool = Arc::new(Mutex::new(DepositDataMempool::new())); - let sov_tx_signer_priv_key = C::PrivateKey::try_from(&hex::decode(&config.private_key)?)?; Ok(Self { da_service, - mempool: Arc::new(pool), + mempool, sov_tx_signer_priv_key, - l2_force_block_tx, l2_force_block_rx, db_provider, - storage, ledger_db, config, stf, deposit_mempool, storage_manager, - state_root: prev_state_root, - batch_hash: prev_batch_hash, + state_root: init_params.state_root, + batch_hash: init_params.batch_hash, sequencer_pub_key: public_keys.sequencer_public_key, sequencer_da_pub_key: public_keys.sequencer_da_pub_key, - rpc_config, fork_manager, soft_confirmation_tx, - task_manager, }) } - pub async fn start_rpc_server( - &mut self, - methods: RpcModule<()>, - channel: Option>, - ) -> anyhow::Result<()> { - let methods = self.register_rpc_methods(methods).await?; - - let listen_address = SocketAddr::new( - self.rpc_config - .bind_host - .parse() - .map_err(|e| anyhow!("Failed to parse bind host: {}", e))?, - self.rpc_config.bind_port, - ); - - let max_connections = self.rpc_config.max_connections; - let max_subscriptions_per_connection = self.rpc_config.max_subscriptions_per_connection; - let max_request_body_size = self.rpc_config.max_request_body_size; - let max_response_body_size = self.rpc_config.max_response_body_size; - let batch_requests_limit = self.rpc_config.batch_requests_limit; - - let middleware = tower::ServiceBuilder::new().layer(citrea_common::rpc::get_cors_layer()); - // .layer(citrea_common::rpc::get_healthcheck_proxy_layer()); - let rpc_middleware = RpcServiceBuilder::new().layer_fn(citrea_common::rpc::Logger); - - self.task_manager.spawn(|cancellation_token| async move { - let server = ServerBuilder::default() - .max_connections(max_connections) - .max_subscriptions_per_connection(max_subscriptions_per_connection) - .max_request_body_size(max_request_body_size) - .max_response_body_size(max_response_body_size) - .set_batch_request_config(BatchRequestConfig::Limit(batch_requests_limit)) - .set_http_middleware(middleware) - .set_rpc_middleware(rpc_middleware) - .build([listen_address].as_ref()) - .await; - - match server { - Ok(server) => { - let bound_address = match server.local_addr() { - Ok(address) => address, - Err(e) => { - error!("{}", e); - return; - } - }; - if let Some(channel) = channel { - if let Err(e) = channel.send(bound_address) { - error!("Could not send bound_address {}: {}", bound_address, e); - return; - } - } - info!("Starting RPC server at {} ", &bound_address); - - let _server_handle = server.start(methods); - cancellation_token.cancelled().await; - } - Err(e) => { - error!("Could not start RPC server: {}", e); - } - } - }); - Ok(()) - } - #[allow(clippy::too_many_arguments)] async fn dry_run_transactions( &mut self, @@ -659,8 +553,8 @@ where } } - #[instrument(level = "trace", skip(self), err, ret)] - pub async fn run(&mut self) -> Result<(), anyhow::Error> { + #[instrument(level = "trace", skip(self, task_manager), err, ret)] + pub async fn run(&mut self, mut task_manager: TaskManager<()>) -> Result<(), anyhow::Error> { // TODO: hotfix for mock da self.da_service .get_block_at(1) @@ -696,7 +590,7 @@ where // Setup required workers to update our knowledge of the DA layer every X seconds (configurable). let (da_height_update_tx, mut da_height_update_rx) = mpsc::channel(1); - let (da_commitment_tx, da_commitment_rx) = unbounded::<(u64, StateDiff)>(); + let (da_commitment_tx, da_commitment_rx) = unbounded_channel::<(u64, StateDiff)>(); let mut commitment_service = CommitmentService::new( self.ledger_db.clone(), @@ -709,10 +603,10 @@ where // Resubmit if there were pending commitments on restart, skip it on first init commitment_service.resubmit_pending_commitments().await?; } - self.task_manager - .spawn(|cancellation_token| commitment_service.run(cancellation_token)); - self.task_manager.spawn(|cancellation_token| { + task_manager.spawn(|cancellation_token| commitment_service.run(cancellation_token)); + + task_manager.spawn(|cancellation_token| { da_block_monitor( self.da_service.clone(), da_height_update_tx, @@ -751,7 +645,7 @@ where // If sequencer is in test mode, it will build a block every time it receives a message // The RPC from which the sender can be called is only registered for test mode. This means // that evey though we check the receiver here, it'll never be "ready" to be consumed unless in test mode. - _ = self.l2_force_block_rx.next(), if self.config.test_mode => { + _ = self.l2_force_block_rx.recv(), if self.config.test_mode => { if missed_da_blocks_count > 0 { if let Err(e) = self.process_missed_da_blocks(missed_da_blocks_count, last_used_l1_height, l1_fee_rate).await { error!("Sequencer error: {}", e); @@ -768,7 +662,7 @@ where // Only errors when there are no receivers let _ = self.soft_confirmation_tx.send(l2_height); - let _ = da_commitment_tx.unbounded_send((l2_height, state_diff)); + let _ = da_commitment_tx.send((l2_height, state_diff)); }, Err(e) => { error!("Sequencer error: {}", e); @@ -800,7 +694,7 @@ where // Only errors when there are no receivers let _ = self.soft_confirmation_tx.send(l2_height); - let _ = da_commitment_tx.unbounded_send((l2_height, state_diff)); + let _ = da_commitment_tx.send((l2_height, state_diff)); }, Err(e) => { error!("Sequencer error: {}", e); @@ -809,7 +703,7 @@ where }, _ = signal::ctrl_c() => { info!("Shutting down sequencer"); - self.task_manager.abort().await; + task_manager.abort().await; return Ok(()); } } @@ -954,31 +848,6 @@ where } } - /// Creates a shared RpcContext with all required data. - async fn create_rpc_context(&self) -> RpcContext { - let l2_force_block_tx = self.l2_force_block_tx.clone(); - - RpcContext { - mempool: self.mempool.clone(), - deposit_mempool: self.deposit_mempool.clone(), - l2_force_block_tx, - storage: self.storage.clone(), - ledger: self.ledger_db.clone(), - test_mode: self.config.test_mode, - } - } - - /// Updates the given RpcModule with Sequencer methods. - pub async fn register_rpc_methods( - &self, - mut rpc_methods: jsonrpsee::RpcModule<()>, - ) -> Result, jsonrpsee::core::RegisterMethodError> { - let rpc_context = self.create_rpc_context().await; - let rpc = create_rpc_module(rpc_context); - rpc_methods.merge(rpc)?; - Ok(rpc_methods) - } - pub async fn restore_mempool(&self) -> Result<(), anyhow::Error> { let mempool_txs = self.ledger_db.get_mempool_txs()?; for (_, tx) in mempool_txs { diff --git a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/migrations/mod.rs b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/migrations/mod.rs index ef1773819..11fdf140e 100644 --- a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/migrations/mod.rs +++ b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/migrations/mod.rs @@ -18,6 +18,8 @@ pub mod utils; pub type MigrationName = String; /// Alias for migration version type pub type MigrationVersion = u64; +/// Alias for migrations list +pub type Migrations = &'static Vec>; /// A trait that should be implemented by migrations. pub trait LedgerMigration { @@ -39,7 +41,7 @@ pub trait LedgerMigration { /// made by any run migration useless. pub struct LedgerDBMigrator<'a> { ledger_path: &'a Path, - migrations: &'static Vec>, + migrations: Migrations, } impl<'a> LedgerDBMigrator<'a> { diff --git a/crates/sovereign-sdk/full-node/sov-stf-runner/src/lib.rs b/crates/sovereign-sdk/full-node/sov-stf-runner/src/lib.rs index 5f6173f73..a7aa0ec13 100644 --- a/crates/sovereign-sdk/full-node/sov-stf-runner/src/lib.rs +++ b/crates/sovereign-sdk/full-node/sov-stf-runner/src/lib.rs @@ -19,8 +19,6 @@ use sov_modules_api::DaSpec; #[cfg(feature = "native")] use sov_rollup_interface::stf::StateTransitionFunction; -#[cfg(feature = "native")] -type GenesisParams = >::GenesisParams; #[cfg(feature = "native")] type SoftConfirmationHash = [u8; 32]; @@ -41,10 +39,9 @@ pub fn read_json_file>( #[cfg(feature = "native")] /// How [`StateTransitionRunner`] is initialized -pub enum InitVariant, Da: DaSpec> { - /// From given state root and soft confirmation hash - Initialized((Stf::StateRoot, SoftConfirmationHash)), - /// From empty state root - /// Genesis params for Stf::init - Genesis(GenesisParams), +pub struct InitParams, Da: DaSpec> { + /// The last known state root + pub state_root: Stf::StateRoot, + /// The last known batch hash + pub batch_hash: SoftConfirmationHash, } diff --git a/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs b/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs index e1757e7a7..6b7e514cc 100644 --- a/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs +++ b/crates/sovereign-sdk/module-system/sov-modules-rollup-blueprint/src/lib.rs @@ -71,7 +71,7 @@ pub trait RollupBlueprint: Sized + Send + Sync { ) -> HashMap::CodeCommitment>; /// Get light client prover code commitment. - fn get_light_client_proof_code_commitment( + fn get_light_client_proof_code_commitments( &self, ) -> HashMap::CodeCommitment>; diff --git a/crates/sovereign-sdk/module-system/sov-modules-stf-blueprint/src/lib.rs b/crates/sovereign-sdk/module-system/sov-modules-stf-blueprint/src/lib.rs index 14166153a..f67381879 100644 --- a/crates/sovereign-sdk/module-system/sov-modules-stf-blueprint/src/lib.rs +++ b/crates/sovereign-sdk/module-system/sov-modules-stf-blueprint/src/lib.rs @@ -323,7 +323,6 @@ where type TxReceiptContents = TxEffect; type BatchReceiptContents = (); - // SequencerOutcome<::Address>; type Witness = ::Witness;