diff --git a/crates/topos-sequencer-subnet-client/Cargo.toml b/crates/topos-certificate-producer-subnet-client/Cargo.toml similarity index 91% rename from crates/topos-sequencer-subnet-client/Cargo.toml rename to crates/topos-certificate-producer-subnet-client/Cargo.toml index 38adf8233..b227a10c5 100644 --- a/crates/topos-sequencer-subnet-client/Cargo.toml +++ b/crates/topos-certificate-producer-subnet-client/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "topos-sequencer-subnet-client" +name = "topos-certificate-producer-subnet-client" version = "0.1.0" edition = "2021" diff --git a/crates/topos-sequencer-subnet-client/src/lib.rs b/crates/topos-certificate-producer-subnet-client/src/lib.rs similarity index 100% rename from crates/topos-sequencer-subnet-client/src/lib.rs rename to crates/topos-certificate-producer-subnet-client/src/lib.rs diff --git a/crates/topos-sequencer-subnet-client/src/subnet_contract.rs b/crates/topos-certificate-producer-subnet-client/src/subnet_contract.rs similarity index 100% rename from crates/topos-sequencer-subnet-client/src/subnet_contract.rs rename to crates/topos-certificate-producer-subnet-client/src/subnet_contract.rs diff --git a/crates/topos-sequencer-subnet-runtime/Cargo.toml b/crates/topos-certificate-producer-subnet-runtime/Cargo.toml similarity index 84% rename from crates/topos-sequencer-subnet-runtime/Cargo.toml rename to crates/topos-certificate-producer-subnet-runtime/Cargo.toml index 6e503b549..96edd2261 100644 --- a/crates/topos-sequencer-subnet-runtime/Cargo.toml +++ b/crates/topos-certificate-producer-subnet-runtime/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "topos-sequencer-subnet-runtime" +name = "topos-certificate-producer-subnet-runtime" version = "0.1.0" edition = "2021" @@ -29,7 +29,7 @@ tracing-opentelemetry.workspace = true opentelemetry.workspace = true topos-core = { workspace = true, features = ["uci"] } -topos-sequencer-subnet-client = { package = "topos-sequencer-subnet-client", path = "../topos-sequencer-subnet-client" } +topos-certificate-producer-subnet-client = { package = "topos-certificate-producer-subnet-client", path = "../topos-certificate-producer-subnet-client" } topos-crypto = {package = "topos-crypto", path = "../topos-crypto"} [dev-dependencies] diff --git a/crates/topos-sequencer-subnet-runtime/src/certification.rs b/crates/topos-certificate-producer-subnet-runtime/src/certification.rs similarity index 98% rename from crates/topos-sequencer-subnet-runtime/src/certification.rs rename to crates/topos-certificate-producer-subnet-runtime/src/certification.rs index fa70e160b..daf5cab03 100644 --- a/crates/topos-sequencer-subnet-runtime/src/certification.rs +++ b/crates/topos-certificate-producer-subnet-runtime/src/certification.rs @@ -3,8 +3,8 @@ use std::collections::{HashSet, LinkedList}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use tokio::sync::Mutex; +use topos_certificate_producer_subnet_client::{BlockInfo, SubnetEvent}; use topos_core::uci::{Certificate, CertificateId, SubnetId}; -use topos_sequencer_subnet_client::{BlockInfo, SubnetEvent}; pub struct Certification { /// Last known certificate id for subnet diff --git a/crates/topos-sequencer-subnet-runtime/src/lib.rs b/crates/topos-certificate-producer-subnet-runtime/src/lib.rs similarity index 99% rename from crates/topos-sequencer-subnet-runtime/src/lib.rs rename to crates/topos-certificate-producer-subnet-runtime/src/lib.rs index aabd1f670..afcdf59dc 100644 --- a/crates/topos-sequencer-subnet-runtime/src/lib.rs +++ b/crates/topos-certificate-producer-subnet-runtime/src/lib.rs @@ -38,7 +38,7 @@ pub enum Error { #[error("subnet client error: {source}")] SubnetError { #[from] - source: topos_sequencer_subnet_client::Error, + source: topos_certificate_producer_subnet_client::Error, }, #[error("Unable to retrieve key error: {source}")] diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-certificate-producer-subnet-runtime/src/proxy.rs similarity index 91% rename from crates/topos-sequencer-subnet-runtime/src/proxy.rs rename to crates/topos-certificate-producer-subnet-runtime/src/proxy.rs index 298453004..c56601328 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-certificate-producer-subnet-runtime/src/proxy.rs @@ -9,9 +9,11 @@ use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; +use topos_certificate_producer_subnet_client::{ + self, BlockInfo, SubnetClient, SubnetClientListener, +}; use topos_core::api::grpc::checkpoints::TargetStreamPosition; use topos_core::uci::{Certificate, CertificateId, SubnetId}; -use topos_sequencer_subnet_client::{self, BlockInfo, SubnetClient, SubnetClientListener}; use tracing::{debug, error, field, info, info_span, instrument, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -102,7 +104,7 @@ impl SubnetRuntimeProxy { let runtime_proxy = runtime_proxy.clone(); let subnet_contract_address = subnet_contract_address.clone(); tokio::spawn(async move { - // If the `start_block` sequencer parameter is provided, first block retrieved from blockchain (for genesis certificate) + // If the `start_block` CP parameter is provided, first block retrieved from blockchain (for genesis certificate) // will be `start_block`. `default_block_sync_start` is hence `start_block`-1 // as first block retrieved from subnet node is `latest_acquired_subnet_block_number` + 1 let default_block_sync_start: i128 = config @@ -128,7 +130,7 @@ impl SubnetRuntimeProxy { certificate_and_position ); // If tce source head position is provided, continue synchronizing from it - // If the `start_block` sequencer parameter is provided and tce source head is missing, + // If the `start_block` CP parameter is provided and tce source head is missing, // we should start synchronizing from that block instead of genesis // If neither tce source head position nor start_block parameters are provided, // sync should start form -1, so that first fetched is subnet genesis block @@ -155,7 +157,7 @@ impl SubnetRuntimeProxy { // Establish the connection with the Subnet let subnet_listener: Option = tokio::select! { // Create subnet client - Ok(client) = topos_sequencer_subnet_client::connect_to_subnet_listener_with_retry( + Ok(client) = topos_certificate_producer_subnet_client::connect_to_subnet_listener_with_retry( ws_runtime_endpoint.as_str(), subnet_contract_address.as_str(), ) => { @@ -290,7 +292,7 @@ impl SubnetRuntimeProxy { // Establish the connection with the Subnet let mut subnet_client: Option = tokio::select! { // Create subnet client - Ok(client) = topos_sequencer_subnet_client::connect_to_subnet_with_retry( + Ok(client) = topos_certificate_producer_subnet_client::connect_to_subnet_with_retry( http_runtime_endpoint.as_ref(), Some(signing_key.clone()), subnet_contract_address.as_str(), @@ -361,10 +363,14 @@ impl SubnetRuntimeProxy { info!("Block {} processed", next_block); Ok(()) } - Err(topos_sequencer_subnet_client::Error::BlockNotAvailable(block_number)) => { + Err(topos_certificate_producer_subnet_client::Error::BlockNotAvailable( + block_number, + )) => { warn!("New block {block_number} not yet available, trying again soon"); Err(Error::SubnetError { - source: topos_sequencer_subnet_client::Error::BlockNotAvailable(block_number), + source: topos_certificate_producer_subnet_client::Error::BlockNotAvailable( + block_number, + ), }) } Err(e) => { @@ -568,25 +574,26 @@ impl SubnetRuntimeProxy { info!("Connecting to subnet to query for checkpoints..."); let http_runtime_endpoint = self.config.http_endpoint.as_ref(); // Create subnet client - let subnet_client = match topos_sequencer_subnet_client::connect_to_subnet_with_retry( - http_runtime_endpoint, - None, // We do not need actual key here as we are just reading state - self.config.subnet_contract_address.as_str(), - ) - .await - { - Ok(subnet_client) => { - info!( - "Connected to subnet node to acquire checkpoints {}", - http_runtime_endpoint - ); - subnet_client - } - Err(e) => { - error!("Unable to connect to the subnet node to get checkpoints: {e}"); - return Err(Error::SubnetError { source: e }); - } - }; + let subnet_client = + match topos_certificate_producer_subnet_client::connect_to_subnet_with_retry( + http_runtime_endpoint, + None, // We do not need actual key here as we are just reading state + self.config.subnet_contract_address.as_str(), + ) + .await + { + Ok(subnet_client) => { + info!( + "Connected to subnet node to acquire checkpoints {}", + http_runtime_endpoint + ); + subnet_client + } + Err(e) => { + error!("Unable to connect to the subnet node to get checkpoints: {e}"); + return Err(Error::SubnetError { source: e }); + } + }; match subnet_client.get_checkpoints(&self.config.subnet_id).await { Ok(checkpoints) => { @@ -611,25 +618,26 @@ impl SubnetRuntimeProxy { ) -> Result { info!("Connecting to subnet to query for subnet id..."); // Create subnet client - let subnet_client = match topos_sequencer_subnet_client::connect_to_subnet_with_retry( - http_endpoint, - None, // We do not need actual key here as we are just reading state - contract_address, - ) - .await - { - Ok(subnet_client) => { - info!( - "Connected to subnet node to acquire subnet id {}", - http_endpoint - ); - subnet_client - } - Err(e) => { - error!("Unable to connect to the subnet node to get subnet id: {e}"); - return Err(Error::SubnetError { source: e }); - } - }; + let subnet_client = + match topos_certificate_producer_subnet_client::connect_to_subnet_with_retry( + http_endpoint, + None, // We do not need actual key here as we are just reading state + contract_address, + ) + .await + { + Ok(subnet_client) => { + info!( + "Connected to subnet node to acquire subnet id {}", + http_endpoint + ); + subnet_client + } + Err(e) => { + error!("Unable to connect to the subnet node to get subnet id: {e}"); + return Err(Error::SubnetError { source: e }); + } + }; match subnet_client.get_subnet_id().await { Ok(subnet_id) => { diff --git a/crates/topos-sequencer-subnet-runtime/tests/common/abi.rs b/crates/topos-certificate-producer-subnet-runtime/tests/common/abi.rs similarity index 100% rename from crates/topos-sequencer-subnet-runtime/tests/common/abi.rs rename to crates/topos-certificate-producer-subnet-runtime/tests/common/abi.rs diff --git a/crates/topos-sequencer-subnet-runtime/tests/common/mod.rs b/crates/topos-certificate-producer-subnet-runtime/tests/common/mod.rs similarity index 100% rename from crates/topos-sequencer-subnet-runtime/tests/common/mod.rs rename to crates/topos-certificate-producer-subnet-runtime/tests/common/mod.rs diff --git a/crates/topos-sequencer-subnet-runtime/tests/common/subnet_test_data.rs b/crates/topos-certificate-producer-subnet-runtime/tests/common/subnet_test_data.rs similarity index 100% rename from crates/topos-sequencer-subnet-runtime/tests/common/subnet_test_data.rs rename to crates/topos-certificate-producer-subnet-runtime/tests/common/subnet_test_data.rs diff --git a/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs b/crates/topos-certificate-producer-subnet-runtime/tests/subnet_contract.rs similarity index 98% rename from crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs rename to crates/topos-certificate-producer-subnet-runtime/tests/subnet_contract.rs index 7ca5e54f9..800113bd0 100644 --- a/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs +++ b/crates/topos-certificate-producer-subnet-runtime/tests/subnet_contract.rs @@ -6,24 +6,29 @@ use ethers::{ middleware::SignerMiddleware, providers::{Http, Middleware, Provider}, signers::{LocalWallet, Signer}, - types::{Block, H256}, + types::{Block, H160, H256}, }; use rstest::*; use serial_test::serial; use std::collections::HashSet; use std::process::{Child, Command}; +use std::str::FromStr; use std::sync::Arc; use test_log::test; use tokio::sync::Mutex; +use topos_certificate_producer_subnet_runtime::proxy::{ + SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent, +}; use topos_core::uci::{Certificate, CertificateId, SubnetId, SUBNET_ID_LENGTH}; -use topos_sequencer_subnet_runtime::proxy::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent}; use tracing::{error, info, warn, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; mod common; use crate::common::subnet_test_data::generate_test_private_key; +use topos_certificate_producer_subnet_runtime::{ + SubnetRuntimeProxyConfig, SubnetRuntimeProxyWorker, +}; use topos_core::api::grpc::checkpoints::TargetStreamPosition; -use topos_sequencer_subnet_runtime::{SubnetRuntimeProxyConfig, SubnetRuntimeProxyWorker}; use topos_test_sdk::constants::*; @@ -497,7 +502,7 @@ async fn test_subnet_node_get_block_info( ) -> Result<(), Box> { //Context with subnet let context = context_running_subnet_node.await; - match topos_sequencer_subnet_client::SubnetClientListener::new( + match topos_certificate_producer_subnet_client::SubnetClientListener::new( &context.jsonrpc_ws(), &("0x".to_string() + &hex::encode(context.i_topos_core.address())), ) @@ -545,7 +550,8 @@ async fn test_create_runtime() -> Result<(), Box> { test_private_key, ) .await?; - let runtime_proxy = topos_sequencer_subnet_runtime::testing::get_runtime(&runtime_proxy_worker); + let runtime_proxy = + topos_certificate_producer_subnet_runtime::testing::get_runtime(&runtime_proxy_worker); let runtime_proxy = runtime_proxy.lock().await; info!("New runtime proxy created:{:?}", &runtime_proxy); Ok(()) @@ -687,7 +693,7 @@ async fn test_subnet_certificate_get_checkpoints_call( let subnet_jsonrpc_http = context.jsonrpc(); // Get checkpoints when contract is empty - let subnet_client = topos_sequencer_subnet_client::SubnetClient::new( + let subnet_client = topos_certificate_producer_subnet_client::SubnetClient::new( &subnet_jsonrpc_http, Some(hex::decode(TEST_SECRET_ETHEREUM_KEY).unwrap()), &subnet_smart_contract_address, @@ -805,7 +811,7 @@ async fn test_subnet_id_call( let subnet_jsonrpc_http = context.jsonrpc(); // Create subnet client - let subnet_client = topos_sequencer_subnet_client::SubnetClient::new( + let subnet_client = topos_certificate_producer_subnet_client::SubnetClient::new( &subnet_jsonrpc_http, Some(hex::decode(TEST_SECRET_ETHEREUM_KEY).unwrap()), &subnet_smart_contract_address, @@ -902,7 +908,7 @@ async fn test_subnet_send_token_processing( .i_erc20_messaging .send_token( TARGET_SUBNET_ID_2.into(), - TOKEN_SYMBOL.into(), + H160::from_str(TOKEN_SYMBOL).unwrap(), "00000000000000000000000000000000000000AA".parse()?, U256::from(2), ) @@ -1408,7 +1414,7 @@ async fn test_subnet_multiple_send_token_in_a_block( if let Err(e) = i_erc20_messaging .send_token( target_subnet.into(), - TOKEN_SYMBOL.into(), + H160::from_str(TOKEN_SYMBOL).unwrap(), "00000000000000000000000000000000000000AA".parse().unwrap(), U256::from(i), ) diff --git a/crates/topos-sequencer/Cargo.toml b/crates/topos-certificate-producer/Cargo.toml similarity index 77% rename from crates/topos-sequencer/Cargo.toml rename to crates/topos-certificate-producer/Cargo.toml index 61ea66c50..f0b0e3e73 100644 --- a/crates/topos-sequencer/Cargo.toml +++ b/crates/topos-certificate-producer/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "topos-sequencer" +name = "topos-certificate-producer" description = "Implementation of the Topos protocol" version = "0.1.0" edition = "2021" @@ -20,6 +20,5 @@ opentelemetry.workspace = true topos-crypto.workspace = true topos-wallet = { path = "../topos-wallet" } topos-core = { workspace = true, features = ["uci"] } -topos-sequencer-subnet-runtime = { package = "topos-sequencer-subnet-runtime", path = "../topos-sequencer-subnet-runtime" } +topos-certificate-producer-subnet-runtime = { package = "topos-certificate-producer-subnet-runtime", path = "../topos-certificate-producer-subnet-runtime" } topos-tce-proxy = { package = "topos-tce-proxy", path = "../topos-tce-proxy" } - diff --git a/crates/topos-sequencer/src/app_context.rs b/crates/topos-certificate-producer/src/app_context.rs similarity index 82% rename from crates/topos-sequencer/src/app_context.rs rename to crates/topos-certificate-producer/src/app_context.rs index 3ff343556..dadf795bc 100644 --- a/crates/topos-sequencer/src/app_context.rs +++ b/crates/topos-certificate-producer/src/app_context.rs @@ -1,17 +1,19 @@ //! //! Application logic glue //! -use crate::SequencerConfiguration; +use crate::CertificateProducerConfiguration; use opentelemetry::trace::FutureExt; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use topos_sequencer_subnet_runtime::proxy::{SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent}; -use topos_sequencer_subnet_runtime::SubnetRuntimeProxyWorker; +use topos_certificate_producer_subnet_runtime::proxy::{ + SubnetRuntimeProxyCommand, SubnetRuntimeProxyEvent, +}; +use topos_certificate_producer_subnet_runtime::SubnetRuntimeProxyWorker; use topos_tce_proxy::{worker::TceProxyWorker, TceProxyCommand, TceProxyEvent}; use tracing::{debug, error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -/// Top-level transducer sequencer app context & driver (alike) +/// Top-level transducer certificate producer app context & driver (alike) /// /// Implements <...Host> traits for network and Api, listens for protocol events in events /// (store is not active component). @@ -20,7 +22,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; /// config+data as input and runs app returning data as output /// pub struct AppContext { - pub config: SequencerConfiguration, + pub config: CertificateProducerConfiguration, pub subnet_runtime_proxy_worker: SubnetRuntimeProxyWorker, pub tce_proxy_worker: TceProxyWorker, } @@ -33,7 +35,7 @@ pub enum AppContextStatus { impl AppContext { /// Factory pub fn new( - config: SequencerConfiguration, + config: CertificateProducerConfiguration, runtime_proxy_worker: SubnetRuntimeProxyWorker, tce_proxy_worker: TceProxyWorker, ) -> Self { @@ -63,15 +65,14 @@ impl AppContext { debug!("tce_proxy_worker.next_event(): {:?}", &tce_evt); match tce_evt { TceProxyEvent::TceServiceFailure | TceProxyEvent::WatchCertificatesChannelFailed => { - // Unrecoverable failure in interaction with the TCE. Sequencer needs to be restarted + // Unrecoverable failure in interaction with the TCE. Certificate Producer needs to be restarted warn!( - "Unrecoverable failure in sequencer <-> tce interaction. Shutting down sequencer \ - sequencer..." + "Unrecoverable failure in Certificate Producer <-> TCE interaction. Shutting down Certificate Producer." ); if let Err(e) = self.shutdown().await { warn!("Error happened during shutdown: {e:?}"); } - warn!("Shutdown finished, restarting sequencer..."); + warn!("Shutdown finished, restarting Certificate Producer..."); return AppContextStatus::Restarting; }, _ => self.on_tce_proxy_event(tce_evt).await, @@ -80,11 +81,11 @@ impl AppContext { // Shutdown signal _ = shutdown.0.cancelled() => { - info!("Shutting down Sequencer app context..."); + info!("Shutting down Certificate Producer app context..."); if let Err(e) = self.shutdown().await { - error!("Error shutting down Sequencer app context: {e}"); + error!("Error shutting down Certificate Producer app context: {e}"); } - // Drop the sender to notify the Sequencer termination + // Drop the sender to notify the Certificate Producer termination drop(shutdown.1); return AppContextStatus::Finished; } @@ -100,7 +101,7 @@ impl AppContext { block_number: _, ctx, } => { - let span = info_span!("Sequencer app context"); + let span = info_span!("Certificate Producer app context"); span.set_parent(ctx); if let Err(e) = self .tce_proxy_worker @@ -123,7 +124,7 @@ impl AppContext { async fn on_tce_proxy_event(&mut self, evt: TceProxyEvent) { if let TceProxyEvent::NewDeliveredCerts { certificates, ctx } = evt { - let span = info_span!("Sequencer app context"); + let span = info_span!("Certificate Producer app context"); span.set_parent(ctx); async { // New certificates acquired from TCE diff --git a/crates/topos-sequencer/src/lib.rs b/crates/topos-certificate-producer/src/lib.rs similarity index 82% rename from crates/topos-sequencer/src/lib.rs rename to crates/topos-certificate-producer/src/lib.rs index 08da48f62..34e211036 100644 --- a/crates/topos-sequencer/src/lib.rs +++ b/crates/topos-certificate-producer/src/lib.rs @@ -9,8 +9,10 @@ use tokio::{ }, }; use tokio_util::sync::CancellationToken; +use topos_certificate_producer_subnet_runtime::{ + SubnetRuntimeProxyConfig, SubnetRuntimeProxyWorker, +}; use topos_core::uci::{CertificateId, SubnetId}; -use topos_sequencer_subnet_runtime::{SubnetRuntimeProxyConfig, SubnetRuntimeProxyWorker}; use topos_tce_proxy::{worker::TceProxyWorker, TceProxyConfig}; use topos_wallet::SecretKey; use tracing::{debug, info, warn}; @@ -18,7 +20,7 @@ use tracing::{debug, info, warn}; mod app_context; #[derive(Debug, Clone)] -pub struct SequencerConfiguration { +pub struct CertificateProducerConfiguration { pub subnet_id: Option, pub public_key: Option>, pub subnet_jsonrpc_http: String, @@ -31,12 +33,12 @@ pub struct SequencerConfiguration { } async fn launch_workers( - config: SequencerConfiguration, + config: CertificateProducerConfiguration, ctx_send: Sender, subnet_id: SubnetId, ) -> Result<(), Box> { let (http_endpoint, mut ws_endpoint) = - topos_sequencer_subnet_runtime::derive_endpoints(&config.subnet_jsonrpc_http)?; + topos_certificate_producer_subnet_runtime::derive_endpoints(&config.subnet_jsonrpc_http)?; if let Some(config_ws_endpoint) = config.subnet_jsonrpc_ws.as_ref() { // Use explicitly provided websocket subnet endpoint @@ -72,9 +74,9 @@ async fn launch_workers( } }; - // Launch Tce proxy worker for handling interaction with TCE node - // For initialization it will retry using backoff algorithm, but if it fails we can not proceed and we restart sequencer - // Once it is initialized, TCE proxy will try reconnecting in the loop (with backoff) if TCE becomes unavailable + // Launch Tce proxy worker for handling interaction with the TCE node + // For initialization it will retry using backoff algorithm, but if it fails we can not proceed and we restart the certificate producer + // Once it is initialized, the TCE proxy will try to reconnect in a loop (with backoff) if TCE becomes unavailable let (tce_proxy_worker, source_head_certificate_id) = match TceProxyWorker::new(TceProxyConfig { subnet_id, tce_endpoint: config.tce_grpc_endpoint.clone(), @@ -127,10 +129,10 @@ async fn launch_workers( } pub async fn launch( - config: SequencerConfiguration, + config: CertificateProducerConfiguration, ctx_send: Sender, ) -> Result<(), Box> { - debug!("Starting topos-sequencer application"); + debug!("Starting topos-certificate-producer application"); // If subnetID is specified as command line argument, use it let subnet_id: SubnetId = if let Some(pk) = &config.public_key { @@ -147,15 +149,16 @@ pub async fn launch( // Get subnet id from the subnet node if not provided via the command line argument // It will retry using backoff algorithm, but if it fails (default max backoff elapsed time is 15 min) we can not proceed else { - let http_endpoint = - topos_sequencer_subnet_runtime::derive_endpoints(&config.subnet_jsonrpc_http) - .map_err(|e| { - Box::new(std::io::Error::new( - InvalidInput, - format!("Invalid subnet endpoint: {e}"), - )) - })? - .0; + let http_endpoint = topos_certificate_producer_subnet_runtime::derive_endpoints( + &config.subnet_jsonrpc_http, + ) + .map_err(|e| { + Box::new(std::io::Error::new( + InvalidInput, + format!("Invalid subnet endpoint: {e}"), + )) + })? + .0; match SubnetRuntimeProxyWorker::get_subnet_id( &http_endpoint, config.subnet_contract_address.as_str(), @@ -176,7 +179,7 @@ pub async fn launch( } pub async fn run( - config: SequencerConfiguration, + config: CertificateProducerConfiguration, shutdown: (CancellationToken, mpsc::Sender<()>), ) -> Result> { loop { @@ -202,7 +205,7 @@ pub async fn run( // Shutdown signal _ = shutdown.0.cancelled() => { - info!("Stopping Sequencer launch..."); + info!("Stopping Certificate Producer..."); drop(shutdown.1); launching.abort(); return Ok(ExitStatus::default()); @@ -212,18 +215,22 @@ pub async fn run( if let Some(mut app) = app_context { match app.run(shutdown_appcontext).await { AppContextStatus::Restarting => { - // We finish the loop, restarting sequencer here - warn!("Restarting sequencer..."); + // We finish the loop, restarting the CP here + warn!("Restarting Certificate Producer..."); tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; } AppContextStatus::Finished => { - info!("Sequencer app finished, exiting..."); + info!("Certificate Producer done, exiting..."); return Ok(ExitStatus::default()); } } } else { - warn!("Sequencer startup sequencer failed, restarting sequencer..."); - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + let retry = 10; + warn!( + "Certificate Producer startup sequence failed, retrying in {}s...", + retry + ); + tokio::time::sleep(tokio::time::Duration::from_secs(retry)).await; } } } diff --git a/crates/topos-certificate-spammer/src/lib.rs b/crates/topos-certificate-spammer/src/lib.rs index b0847894e..884a6c307 100644 --- a/crates/topos-certificate-spammer/src/lib.rs +++ b/crates/topos-certificate-spammer/src/lib.rs @@ -30,9 +30,9 @@ struct FileNodes { nodes: Vec, } -/// Represents connection from one sequencer to a TCE node +/// Represents connection from one Certificate Producer to a TCE node /// Multiple different subnets could be connected to the same TCE node address (represented with TargetNodeConnection with different SubnetId and created client) -/// Multiple topos-sequencers from the same subnet could be connected to the same TCE node address (so they would have same SubnetID, but different client instances) +/// Multiple topos-certificate-producers from the same subnet could be connected to the same TCE node address (so they would have same SubnetID, but different client instances) struct TargetNodeConnection { address: NodeApiAddress, client: Arc>, diff --git a/crates/topos-config/src/base.rs b/crates/topos-config/src/base.rs index ef0938b08..57d49d04e 100644 --- a/crates/topos-config/src/base.rs +++ b/crates/topos-config/src/base.rs @@ -46,8 +46,8 @@ impl BaseConfig { self.subnet == "topos" } - pub fn need_sequencer(&self) -> bool { - matches!(self.role, NodeRole::Sequencer) + pub fn need_certificate_producer(&self) -> bool { + matches!(self.role, NodeRole::CertificateProducer) } pub fn need_edge(&self) -> bool { diff --git a/crates/topos-config/src/sequencer.rs b/crates/topos-config/src/certificate_producer.rs similarity index 85% rename from crates/topos-config/src/sequencer.rs rename to crates/topos-config/src/certificate_producer.rs index 907958c41..9f7eb5435 100644 --- a/crates/topos-config/src/sequencer.rs +++ b/crates/topos-config/src/certificate_producer.rs @@ -9,8 +9,8 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "kebab-case")] -pub struct SequencerConfig { - /// SubnetId of your Sequencer, hex encoded 32 bytes prefixed with 0x +pub struct CertificateProducerConfig { + /// SubnetId of your Certificate Producer, hex encoded 32 bytes prefixed with 0x pub subnet_id: Option, /// JSON-RPC endpoint of the Edge node, websocket and http support expected @@ -54,17 +54,17 @@ fn default_tce_grpc_endpoint() -> String { "http://[::1]:1340".to_string() } -impl Config for SequencerConfig { +impl Config for CertificateProducerConfig { type Output = Self; fn load_from_file(figment: Figment, home: &Path) -> Figment { let home = home.join("config.toml"); - let sequencer = Figment::new() + let certificate_producer = Figment::new() .merge(Toml::file(home).nested()) - .select("sequencer"); + .select("certificate_producer"); - figment.merge(sequencer) + figment.merge(certificate_producer) } fn load_context(figment: Figment) -> Result { @@ -72,6 +72,6 @@ impl Config for SequencerConfig { } fn profile() -> String { - "sequencer".to_string() + "certificate_producer".to_string() } } diff --git a/crates/topos-config/src/lib.rs b/crates/topos-config/src/lib.rs index 59610b749..eaf99e13c 100644 --- a/crates/topos-config/src/lib.rs +++ b/crates/topos-config/src/lib.rs @@ -1,8 +1,8 @@ pub(crate) mod base; +pub mod certificate_producer; pub mod edge; pub mod genesis; pub mod node; -pub mod sequencer; pub mod tce; use std::path::Path; diff --git a/crates/topos-config/src/node.rs b/crates/topos-config/src/node.rs index 4297d6434..d45f17592 100644 --- a/crates/topos-config/src/node.rs +++ b/crates/topos-config/src/node.rs @@ -8,15 +8,15 @@ use figment::{ use serde::{Deserialize, Serialize}; use crate::{ - base::BaseConfig, edge::EdgeConfig, load_config, sequencer::SequencerConfig, tce::TceConfig, - Config, + base::BaseConfig, certificate_producer::CertificateProducerConfig, edge::EdgeConfig, + load_config, tce::TceConfig, Config, }; #[derive(clap::ValueEnum, Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum NodeRole { Validator, - Sequencer, + CertificateProducer, FullNode, } @@ -24,7 +24,7 @@ pub enum NodeRole { pub struct NodeConfig { pub base: BaseConfig, pub tce: Option, - pub sequencer: Option, + pub certificate_producer: Option, pub edge: Option, } @@ -34,9 +34,9 @@ impl NodeConfig { let mut config = NodeConfig { base: base.clone(), - sequencer: base - .need_sequencer() - .then(|| load_config::(home, None)), + certificate_producer: base + .need_certificate_producer() + .then(|| load_config::(home, None)), tce: base .need_tce() .then(|| load_config::(home, None)), diff --git a/crates/topos-core/proto/topos/tce/v1/api.proto b/crates/topos-core/proto/topos/tce/v1/api.proto index 935b17456..120f011d7 100644 --- a/crates/topos-core/proto/topos/tce/v1/api.proto +++ b/crates/topos-core/proto/topos/tce/v1/api.proto @@ -88,7 +88,7 @@ message WatchCertificatesResponse { repeated topos.shared.v1.SubnetId subnet_ids = 1; } - // Target Certificate pushed from the TCE to the sequencer + // Target Certificate pushed from the TCE to the certificate producer message CertificatePushed { topos.uci.v1.Certificate certificate = 1; repeated topos.shared.v1.Positions.TargetStreamPosition positions = 2; diff --git a/crates/topos-core/src/api/grpc/generated/topos.bin b/crates/topos-core/src/api/grpc/generated/topos.bin index 635dba028..a0359f540 100644 Binary files a/crates/topos-core/src/api/grpc/generated/topos.bin and b/crates/topos-core/src/api/grpc/generated/topos.bin differ diff --git a/crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs b/crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs index 7f2ea84af..be7c91157 100644 --- a/crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs +++ b/crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs @@ -562,7 +562,7 @@ pub mod watch_certificates_response { super::super::super::shared::v1::SubnetId, >, } - /// Target Certificate pushed from the TCE to the sequencer + /// Target Certificate pushed from the TCE to the certificate producer #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CertificatePushed { diff --git a/crates/topos-tce-proxy/src/client.rs b/crates/topos-tce-proxy/src/client.rs index 69f81adb9..228d173ab 100644 --- a/crates/topos-tce-proxy/src/client.rs +++ b/crates/topos-tce-proxy/src/client.rs @@ -348,8 +348,8 @@ impl TceClientBuilder { continue; } Err(e) => { - // Backoff maximum period timeout. We need to restart sequencer. - error!("Failed to submit certificate to the tce network, backoff timeout with error: {e}. Restarting sequencer..."); + // Backoff maximum period timeout. We need to restart the certificate producer. + error!("Failed to submit certificate to the tce network, backoff timeout with error: {e}. Restarting the certificate producer..."); if let Some(tce_proxy_event_sender) = tce_proxy_event_sender.clone() { if let Err(e) = tce_proxy_event_sender.send(TceProxyEvent::TceServiceFailure).await { error!("Unable to send tce communication failure signal: {e}"); @@ -529,7 +529,7 @@ impl TceClientBuilder { }; } None => { - error!("Unexpected termination of the TCE proxy service of the Sequencer"); + error!("Unexpected termination of the TCE proxy service of the Certificate Producer"); break; } } diff --git a/crates/topos-tce-proxy/src/lib.rs b/crates/topos-tce-proxy/src/lib.rs index fcef0a39f..a635838dc 100644 --- a/crates/topos-tce-proxy/src/lib.rs +++ b/crates/topos-tce-proxy/src/lib.rs @@ -77,9 +77,9 @@ pub enum TceProxyEvent { certificates: Vec<(Certificate, u64)>, ctx: Context, }, - /// Failed watching certificates channel. Requires a restart of the sequencer tce proxy to recover. + /// Failed watching certificates channel. Requires a restart of the certificate producer tce proxy to recover. WatchCertificatesChannelFailed, - /// Failure in communication with the TCE grpc service. Sequencer needs to be restarted + /// Failure in communication with the TCE grpc service. Certificate Producer needs to be restarted TceServiceFailure, } @@ -87,7 +87,7 @@ pub enum TceProxyEvent { pub struct TceProxyConfig { /// The [`SubnetId`] this config handles certificate proxying for. pub subnet_id: SubnetId, - /// The GRPC endpoint where the Sequencer is expecting to find a TCE node. + /// The GRPC endpoint where the Certificate Producer is expecting to find a TCE node. pub tce_endpoint: String, /// The positions in the index of the known Certificates. pub positions: Vec, diff --git a/crates/topos-tce-proxy/src/worker.rs b/crates/topos-tce-proxy/src/worker.rs index 0869893af..6cf7037cc 100644 --- a/crates/topos-tce-proxy/src/worker.rs +++ b/crates/topos-tce-proxy/src/worker.rs @@ -96,7 +96,7 @@ impl TceProxyWorker { Some(cmd) = command_rcv.recv() => { match cmd { TceProxyCommand::SubmitCertificate{cert, ctx} => { - let span = info_span!("Sequencer TCE Proxy"); + let span = info_span!("Certificate Producer TCE Proxy"); span.set_parent(ctx); async { info!("Submitting new certificate to the TCE network: {}", &cert.id); diff --git a/crates/topos-tce-storage/src/fullnode/mod.rs b/crates/topos-tce-storage/src/fullnode/mod.rs index e5de4d8b8..b48c5210e 100644 --- a/crates/topos-tce-storage/src/fullnode/mod.rs +++ b/crates/topos-tce-storage/src/fullnode/mod.rs @@ -150,7 +150,7 @@ impl WriteStore for FullNodeStore { }; // Return list of new target stream positions of certificate that will be persisted - // Information is needed by sequencer/subnet contract to know from + // Information is needed by certificate producer/subnet contract to know from // where to continue with streaming on restart let mut target_subnet_stream_positions: HashMap = HashMap::new(); diff --git a/crates/topos-tce-storage/src/lib.rs b/crates/topos-tce-storage/src/lib.rs index 1390aef1c..edf937ef6 100644 --- a/crates/topos-tce-storage/src/lib.rs +++ b/crates/topos-tce-storage/src/lib.rs @@ -136,7 +136,7 @@ pub struct CertificatePositions { /// Uniquely identify the source certificate stream head of one subnet. /// The head represent the internal state of the TCE regarding a source subnet stream for -/// certificates that it receives from local sequencer +/// certificates that it receives from the local certificate producer #[derive(Serialize, Deserialize, Debug, Clone)] pub struct SourceHead { /// Certificate id of the head diff --git a/crates/topos-test-sdk/src/sequencer/mod.rs b/crates/topos-test-sdk/src/certificate_producer/mod.rs similarity index 100% rename from crates/topos-test-sdk/src/sequencer/mod.rs rename to crates/topos-test-sdk/src/certificate_producer/mod.rs diff --git a/crates/topos-test-sdk/src/lib.rs b/crates/topos-test-sdk/src/lib.rs index 8c68c678d..a34105c63 100644 --- a/crates/topos-test-sdk/src/lib.rs +++ b/crates/topos-test-sdk/src/lib.rs @@ -1,8 +1,8 @@ pub mod certificates; +pub mod certificate_producer; pub mod networking; pub mod p2p; -pub mod sequencer; pub mod storage; pub mod tce; diff --git a/crates/topos/Cargo.toml b/crates/topos/Cargo.toml index ebf00e859..80eb72520 100644 --- a/crates/topos/Cargo.toml +++ b/crates/topos/Cargo.toml @@ -10,7 +10,7 @@ workspace = true topos-config = { path = "../topos-config/" } topos-tce = { path = "../topos-tce/" } topos-p2p = { path = "../topos-p2p" } -topos-sequencer = { path = "../topos-sequencer" } +topos-certificate-producer = { path = "../topos-certificate-producer" } topos-core = { workspace = true, features = ["api"] } topos-certificate-spammer = { path = "../topos-certificate-spammer" } topos-tce-broadcast = { path = "../topos-tce-broadcast", optional = true } diff --git a/crates/topos/src/components/node/commands/init.rs b/crates/topos/src/components/node/commands/init.rs index bad806672..261057ff4 100644 --- a/crates/topos/src/components/node/commands/init.rs +++ b/crates/topos/src/components/node/commands/init.rs @@ -24,7 +24,7 @@ pub struct Init { pub secrets_config: Option, /// For certain use cases, we manually provide private keys to a running node, and don't want to - /// rely on polygon-edge during runtime. Example: A sequencer which runs for an external EVM chain + /// rely on polygon-edge during runtime. Example: A Certificate Producer which runs for an external EVM chain #[arg(long, env = "TOPOS_NO_EDGE_PROCESS", action)] pub no_edge_process: bool, } diff --git a/crates/topos/src/components/node/mod.rs b/crates/topos/src/components/node/mod.rs index 4f7014775..7896b1eb5 100644 --- a/crates/topos/src/components/node/mod.rs +++ b/crates/topos/src/components/node/mod.rs @@ -230,18 +230,18 @@ pub(crate) async fn handle_command( std::process::exit(1); } - // Sequencer - if matches!(config.base.role, NodeRole::Sequencer) { - let sequencer_config = config - .sequencer + // Certificate Producer + if matches!(config.base.role, NodeRole::CertificateProducer) { + let cert_prod_config = config + .certificate_producer .clone() - .expect("valid sequencer configuration"); + .expect("valid Certificate Producer configuration"); info!( - "Running sequencer with configuration {:?}", - sequencer_config + "Running Certificate Producer with configuration {:?}", + cert_prod_config ); - processes.push(services::process::spawn_sequencer_process( - sequencer_config, + processes.push(services::process::spawn_certificate_producer_process( + cert_prod_config, &keys, (shutdown_token.clone(), shutdown_sender.clone()), )); diff --git a/crates/topos/src/components/node/services/process.rs b/crates/topos/src/components/node/services/process.rs index 3e8419dae..b26cf82fa 100644 --- a/crates/topos/src/components/node/services/process.rs +++ b/crates/topos/src/components/node/services/process.rs @@ -4,12 +4,12 @@ use std::process::ExitStatus; use thiserror::Error; use tokio::{spawn, sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; +use topos_certificate_producer::CertificateProducerConfiguration; +use topos_config::certificate_producer::CertificateProducerConfig; use topos_config::edge::command::CommandConfig; -use topos_config::sequencer::SequencerConfig; use topos_config::tce::broadcast::ReliableBroadcastParams; use topos_config::tce::{AuthKey, StorageConfiguration, TceConfig}; use topos_p2p::Multiaddr; -use topos_sequencer::SequencerConfiguration; use topos_wallet::SecretManager; use tracing::{debug, error, warn}; @@ -19,18 +19,18 @@ use topos_config::genesis::Genesis; pub enum Errors { #[error("TCE error")] TceFailure, - #[error("Sequencer error")] - SequencerFailure, + #[error("CertificateProducer error")] + CertificateProducerFailure, #[error("Edge error: {0}")] EdgeTerminated(#[from] std::io::Error), } -pub(crate) fn spawn_sequencer_process( - config: SequencerConfig, +pub(crate) fn spawn_certificate_producer_process( + config: CertificateProducerConfig, keys: &SecretManager, shutdown: (CancellationToken, mpsc::Sender<()>), ) -> JoinHandle> { - let config = SequencerConfiguration { + let config = CertificateProducerConfiguration { subnet_id: config.subnet_id, public_key: keys.validator_pubkey(), subnet_jsonrpc_http: config.subnet_jsonrpc_http, @@ -42,12 +42,14 @@ pub(crate) fn spawn_sequencer_process( start_block: config.start_block, }; - debug!("Sequencer args: {config:?}"); + debug!("Certificate Producer args: {config:?}"); spawn(async move { - topos_sequencer::run(config, shutdown).await.map_err(|e| { - error!("Sequencer failure: {e:?}"); - Errors::SequencerFailure - }) + topos_certificate_producer::run(config, shutdown) + .await + .map_err(|e| { + error!("Certificate Producer failure: {e:?}"); + Errors::CertificateProducerFailure + }) }) } diff --git a/crates/topos/tests/config.rs b/crates/topos/tests/config.rs index 33e619906..e1ee37cb0 100644 --- a/crates/topos/tests/config.rs +++ b/crates/topos/tests/config.rs @@ -204,7 +204,7 @@ mod serial_integration { let node_init_home_cli = tmp_home_dir_cli.path().to_str().unwrap(); let node_edge_path_cli = node_edge_path_env.clone(); let node_init_name_cli = "TEST_NODE_CLI"; - let node_init_role_cli = "sequencer"; + let node_init_role_cli = "certificate-producer"; let node_init_subnet_cli = "topos-cli"; let mut cmd = Command::cargo_bin("topos")?; @@ -240,7 +240,7 @@ mod serial_integration { // Check if config file params are according to cli params let config_contents = std::fs::read_to_string(&config_path).unwrap(); assert!(config_contents.contains("name = \"TEST_NODE_CLI\"")); - assert!(config_contents.contains("role = \"sequencer\"")); + assert!(config_contents.contains("role = \"certificate-producer\"")); assert!(config_contents.contains("subnet = \"topos-cli\"")); Ok(())