diff --git a/.github/workflows/docker_build_push.yml b/.github/workflows/docker_build_push.yml index dd2dd2d42..78acde8fd 100644 --- a/.github/workflows/docker_build_push.yml +++ b/.github/workflows/docker_build_push.yml @@ -26,8 +26,6 @@ jobs: network: uses: ./.github/workflows/docker_utils.yml secrets: inherit - with: - features: "network" docker_e2e: runs-on: ubuntu-latest-16-core @@ -84,8 +82,7 @@ jobs: frontend-erc20-e2e: runs-on: ubuntu-latest needs: docker - # if: ${{ github.event_name == 'pull_request' }} - if: ${{ false }} # disable for now because of: TP-768 + if: ${{ github.event_name == 'pull_request' }} steps: - name: Set environment run: | diff --git a/.github/workflows/docker_utils.yml b/.github/workflows/docker_utils.yml index 4610b2719..549a99c24 100644 --- a/.github/workflows/docker_utils.yml +++ b/.github/workflows/docker_utils.yml @@ -1,7 +1,6 @@ name: template - docker env: - DEFAULT_FEATURES: tce,sequencer,network,node,subnet REGISTRY: ghcr.io IMAGE_NAME: ${{ github.repository }} AWS_SHARED_CREDENTIALS_FILE: "${{ github.workspace }}/.aws/credentials" @@ -19,10 +18,6 @@ on: required: false type: string default: stable - features: - required: false - type: string - default: tce,sequencer,network,node,subnet outputs: tags: description: "Docker tags" @@ -32,8 +27,6 @@ jobs: docker: name: Build and push docker image to GitHub Container Registry runs-on: ubuntu-latest-16-core - outputs: - tags: ${{ steps.meta.outputs.tags }} steps: - name: Checkout uses: actions/checkout@v4 @@ -73,17 +66,6 @@ jobs: aws configure set aws_secret_access_key ${{ env.AWS_SECRET_ACCESS_KEY }} --profile default aws configure set aws_session_token ${{ env.AWS_SESSION_TOKEN }} --profile default - - name: Extract metadata (tags, labels) for Docker - id: meta - uses: docker/metadata-action@v4 - with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - tags: | - type=ref,event=branch,suffix=${{ inputs.features != env.DEFAULT_FEATURES && format('-{0}', inputs.features) || '' }} - type=ref,event=pr,suffix=${{ inputs.features != env.DEFAULT_FEATURES && format('-{0}', inputs.features) || '' }} - type=semver,suffix=${{ inputs.features != env.DEFAULT_FEATURES && format('-{0}', inputs.features) || '' }},pattern={{version}} - type=semver,suffix=${{ inputs.features != env.DEFAULT_FEATURES && format('-{0}', inputs.features) || '' }},pattern={{major}}.{{minor}} - - name: Push to GitHub Container Registry uses: docker/build-push-action@v3 with: @@ -92,7 +74,6 @@ jobs: # push only images targeting topos (e.g.: exclude test, lint, etc.) push: ${{ inputs.target == 'topos' }} target: ${{ inputs.target }} - tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} secret-files: | "aws=${{ github.workspace }}/.aws/credentials" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6a31d5957..112259cef 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -60,6 +60,6 @@ jobs: with: AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID}} AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY}} - - run: cargo nextest run cert_delivery --locked --no-default-features -F tce + - run: cargo nextest run cert_delivery --locked --no-default-features env: RUST_LOG: topos=warn diff --git a/crates/topos-p2p/Cargo.toml b/crates/topos-p2p/Cargo.toml index 037011e61..66769469c 100644 --- a/crates/topos-p2p/Cargo.toml +++ b/crates/topos-p2p/Cargo.toml @@ -41,5 +41,5 @@ test-log.workspace = true env_logger.workspace = true rstest = { workspace = true, features = ["async-timeout"] } tracing-subscriber.workspace = true -topos-test-sdk = { path = "../topos-test-sdk/", features = ["tce"] } +topos-test-sdk = { path = "../topos-test-sdk/" } rand.workspace = true diff --git a/crates/topos-tce-proxy/Cargo.toml b/crates/topos-tce-proxy/Cargo.toml index 6ceecc9e8..a2b388de2 100644 --- a/crates/topos-tce-proxy/Cargo.toml +++ b/crates/topos-tce-proxy/Cargo.toml @@ -49,4 +49,4 @@ serial_test.workspace = true byteorder = "1.4.3" dockertest = "0.3.1" topos-tce-storage = { path = "../topos-tce-storage" } -topos-test-sdk = { path = "../topos-test-sdk/", features=["tce"] } +topos-test-sdk = { path = "../topos-test-sdk/" } diff --git a/crates/topos-test-sdk/Cargo.toml b/crates/topos-test-sdk/Cargo.toml index 331ac4778..ed8428451 100644 --- a/crates/topos-test-sdk/Cargo.toml +++ b/crates/topos-test-sdk/Cargo.toml @@ -11,13 +11,13 @@ workspace = true topos-core = { workspace = true, features = ["uci", "api"] } topos-crypto = { path = "../topos-crypto/" } topos-p2p = { path = "../topos-p2p/" } -topos-tce = { path = "../topos-tce/", optional = true } -topos-tce-api = { path = "../topos-tce-api/", optional = true } -topos-tce-broadcast = { path = "../topos-tce-broadcast/", optional = true } -topos-tce-gatekeeper = { path = "../topos-tce-gatekeeper/", optional = true } +topos-tce = { path = "../topos-tce/" } +topos-tce-api = { path = "../topos-tce-api/" } +topos-tce-broadcast = { path = "../topos-tce-broadcast/" } +topos-tce-gatekeeper = { path = "../topos-tce-gatekeeper/" } topos-tce-storage = { path = "../topos-tce-storage/" } -topos-tce-synchronizer = { path = "../topos-tce-synchronizer/", optional = true } -topos-tce-transport = { path = "../topos-tce-transport/", optional = true } +topos-tce-synchronizer = { path = "../topos-tce-synchronizer/" } +topos-tce-transport = { path = "../topos-tce-transport/" } hex.workspace = true ethers.workspace = true @@ -42,17 +42,5 @@ tokio.workspace = true tracing.workspace = true async-stream.workspace = true -[features] -default = [] -tce = [ - "topos-core/api", - "topos-tce", - "topos-tce-api", - "topos-tce-broadcast", - "topos-tce-gatekeeper", - "topos-tce-synchronizer", - "topos-tce-transport", -] - [build-dependencies] tonic-build.workspace = true diff --git a/crates/topos-test-sdk/src/lib.rs b/crates/topos-test-sdk/src/lib.rs index e219508ef..d52976982 100644 --- a/crates/topos-test-sdk/src/lib.rs +++ b/crates/topos-test-sdk/src/lib.rs @@ -4,7 +4,6 @@ pub mod networking; pub mod p2p; pub mod sequencer; pub mod storage; -#[cfg(feature = "tce")] pub mod tce; use std::{collections::HashSet, net::SocketAddr, sync::Mutex}; diff --git a/crates/topos-wallet/src/lib.rs b/crates/topos-wallet/src/lib.rs index fceebe7c5..54eb30ea8 100644 --- a/crates/topos-wallet/src/lib.rs +++ b/crates/topos-wallet/src/lib.rs @@ -46,6 +46,7 @@ impl SecretManager { } pub fn from_aws(_secrets_config: &str) -> Self { + println!("loading from aws-sm"); todo!() } diff --git a/crates/topos/Cargo.toml b/crates/topos/Cargo.toml index 06001077d..17b8289e4 100644 --- a/crates/topos/Cargo.toml +++ b/crates/topos/Cargo.toml @@ -7,12 +7,12 @@ edition = "2021" workspace = true [dependencies] -topos-tce = { path = "../topos-tce/", optional = true } +topos-tce = { path = "../topos-tce/" } topos-p2p = { path = "../topos-p2p" } -topos-tce-transport = { path = "../topos-tce-transport", optional = true } -topos-sequencer = { path = "../topos-sequencer", optional = true } +topos-tce-transport = { path = "../topos-tce-transport" } +topos-sequencer = { path = "../topos-sequencer" } topos-core = { workspace = true, features = ["api"] } -topos-certificate-spammer = { path = "../topos-certificate-spammer", optional = true } +topos-certificate-spammer = { path = "../topos-certificate-spammer" } topos-tce-broadcast = { path = "../topos-tce-broadcast", optional = true } topos-wallet = { path = "../topos-wallet" } @@ -54,7 +54,7 @@ topos-tce-synchronizer = { path = "../topos-tce-synchronizer" } topos-tce-gatekeeper = { path = "../topos-tce-gatekeeper" } topos-tce-api = { path = "../topos-tce-api" } topos-tce-storage = { path = "../topos-tce-storage" } -topos-test-sdk = { path = "../topos-test-sdk", features = ["tce"] } +topos-test-sdk = { path = "../topos-test-sdk" } serde.workspace = true serde_json.workspace = true test-log.workspace = true @@ -68,11 +68,5 @@ rstest = { workspace = true, features = ["async-timeout"] } tempfile = "3.8.0" [features] -default = ["tce", "sequencer", "network", "node", "setup", "subnet"] +default = [] broadcast_via_channels = ["default", "topos-tce-broadcast/task-manager-channels"] -tce = ["topos-tce", "topos-tce-transport"] -sequencer = ["topos-sequencer"] -network = ["topos-certificate-spammer"] -node = ["tce", "sequencer"] -setup = [] -subnet = [] diff --git a/crates/topos/src/components/doctor.rs b/crates/topos/src/components/doctor.rs deleted file mode 100644 index ebd35cf94..000000000 --- a/crates/topos/src/components/doctor.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub async fn handle_doctor() -> Result<(), Box> { - println!("Topos CLI: version {}", env!("TOPOS_VERSION")); - - Ok(()) -} diff --git a/crates/topos/src/components/mod.rs b/crates/topos/src/components/mod.rs index 440682fc8..2c665c653 100644 --- a/crates/topos/src/components/mod.rs +++ b/crates/topos/src/components/mod.rs @@ -1,13 +1,3 @@ -pub(crate) mod doctor; -#[cfg(feature = "network")] -pub(crate) mod network; -#[cfg(feature = "node")] pub(crate) mod node; -#[cfg(feature = "sequencer")] -pub(crate) mod sequencer; -#[cfg(feature = "setup")] +pub(crate) mod regtest; pub(crate) mod setup; -#[cfg(feature = "subnet")] -pub(crate) mod subnet; -#[cfg(feature = "tce")] -pub(crate) mod tce; diff --git a/crates/topos/src/components/network/commands.rs b/crates/topos/src/components/network/commands.rs deleted file mode 100644 index 2749f7c46..000000000 --- a/crates/topos/src/components/network/commands.rs +++ /dev/null @@ -1,30 +0,0 @@ -use clap::{Args, Subcommand}; - -mod spam; - -pub(crate) use spam::Spam; - -/// Topos CLI subcommand for network related functionalities (e.g., running the certificate spammer) -#[derive(Args, Debug)] -pub(crate) struct NetworkCommand { - #[clap(from_global)] - pub(crate) verbose: u8, - - #[clap(subcommand)] - pub(crate) subcommands: Option, -} - -#[derive(Subcommand, Debug)] -pub(crate) enum NetworkCommands { - Spam(Box), -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_run() { - assert!(NetworkCommands::has_subcommand("spam")); - } -} diff --git a/crates/topos/src/components/node/commands.rs b/crates/topos/src/components/node/commands.rs index bd465f795..5af078110 100644 --- a/crates/topos/src/components/node/commands.rs +++ b/crates/topos/src/components/node/commands.rs @@ -1,13 +1,22 @@ use std::path::PathBuf; use clap::{Args, Subcommand}; +use serde::Serialize; mod init; +mod status; mod up; pub(crate) use init::Init; +pub(crate) use status::Status; pub(crate) use up::Up; +#[derive(Args, Debug, Serialize)] +pub(crate) struct NodeArgument { + #[clap(short, long, default_value = "http://[::1]:1340")] + pub(crate) node: String, +} + /// Utility to manage your nodes in the Topos network #[derive(Args, Debug)] pub(crate) struct NodeCommand { @@ -25,10 +34,11 @@ pub(crate) struct NodeCommand { pub(crate) subcommands: Option, } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Debug, Serialize)] pub(crate) enum NodeCommands { Up(Box), Init(Box), + Status(Status), } #[cfg(test)] diff --git a/crates/topos/src/components/node/commands/init.rs b/crates/topos/src/components/node/commands/init.rs index b4ee14682..fb4637673 100644 --- a/crates/topos/src/components/node/commands/init.rs +++ b/crates/topos/src/components/node/commands/init.rs @@ -3,7 +3,7 @@ use clap::Args; use serde::Serialize; #[derive(Args, Debug, Serialize)] -#[command(about = "Setup your node!", trailing_var_arg = true)] +#[command(about = "Setup your node", trailing_var_arg = true)] #[serde(rename_all = "kebab-case")] pub struct Init { /// Name to identify your node diff --git a/crates/topos/src/components/tce/commands/status.rs b/crates/topos/src/components/node/commands/status.rs similarity index 65% rename from crates/topos/src/components/tce/commands/status.rs rename to crates/topos/src/components/node/commands/status.rs index 58845cd21..3a70b74dd 100644 --- a/crates/topos/src/components/tce/commands/status.rs +++ b/crates/topos/src/components/node/commands/status.rs @@ -1,7 +1,9 @@ use super::NodeArgument; use clap::Args; +use serde::Serialize; -#[derive(Args, Debug)] +#[derive(Args, Debug, Serialize)] +#[command(about = "Get node status")] pub(crate) struct Status { #[command(flatten)] pub(crate) node_args: NodeArgument, diff --git a/crates/topos/src/components/node/commands/up.rs b/crates/topos/src/components/node/commands/up.rs index 40e2032d5..d479ea73c 100644 --- a/crates/topos/src/components/node/commands/up.rs +++ b/crates/topos/src/components/node/commands/up.rs @@ -1,8 +1,8 @@ use clap::Args; use serde::Serialize; -#[derive(Args, Debug, Serialize)] -#[command(about = "Spawn your node!")] +#[derive(Args, Clone, Debug, Serialize)] +#[command(about = "Spawn your node")] #[serde(rename_all = "kebab-case")] pub struct Up { /// Name to identify your node @@ -18,4 +18,12 @@ pub struct Up { /// Usable for cases where edge endpoint is available as infura (or similar cloud provider) endpoint #[arg(long, env = "TOPOS_NO_EDGE_PROCESS", action)] pub no_edge_process: bool, + /// Socket of the opentelemetry agent endpoint. + /// If not provided open telemetry will not be used + #[arg(long, env = "TOPOS_OTLP_AGENT")] + pub otlp_agent: Option, + /// Otlp service name. + /// If not provided open telemetry will not be used + #[arg(long, env = "TOPOS_OTLP_SERVICE_NAME")] + pub otlp_service_name: Option, } diff --git a/crates/topos/src/components/node/mod.rs b/crates/topos/src/components/node/mod.rs index 3bad0cc5f..79f72356f 100644 --- a/crates/topos/src/components/node/mod.rs +++ b/crates/topos/src/components/node/mod.rs @@ -1,13 +1,21 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; -use std::path::Path; +use opentelemetry::global; +use opentelemetry::sdk::metrics::controllers::BasicController; use std::{ fs::{create_dir_all, remove_dir_all, OpenOptions}, io::Write, }; -use tokio::{signal, sync::mpsc}; +use std::{path::Path, sync::Arc}; +use tokio::{ + signal, + sync::{mpsc, Mutex}, +}; use tokio_util::sync::CancellationToken; +use tonic::transport::{Channel, Endpoint}; +use tower::Service; use tracing::{error, info}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use self::commands::{NodeCommand, NodeCommands}; use crate::config::genesis::Genesis; @@ -16,11 +24,23 @@ use crate::{ config::{insert_into_toml, node::NodeConfig, node::NodeRole}, tracing::setup_tracing, }; -use services::*; +use topos_core::api::grpc::tce::v1::console_service_client::ConsoleServiceClient; use topos_wallet::SecretManager; pub(crate) mod commands; -pub mod services; +pub(crate) mod services; + +pub(crate) struct NodeService { + pub(crate) console_client: Arc>>, +} + +impl NodeService { + pub(crate) fn with_grpc_endpoint(endpoint: &str) -> Self { + Self { + console_client: setup_console_tce_grpc(endpoint), + } + } +} pub(crate) async fn handle_command( NodeCommand { @@ -30,8 +50,6 @@ pub(crate) async fn handle_command( edge_path, }: NodeCommand, ) -> Result<(), Box> { - setup_tracing(verbose, None, None)?; - match subcommands { Some(NodeCommands::Init(cmd)) => { let cmd = *cmd; @@ -57,8 +75,11 @@ pub(crate) async fn handle_command( let mut config_toml = toml::Table::new(); // Generate the Edge configuration - if let Ok(result) = - generate_edge_config(edge_path.join(BINARY_NAME), node_path.clone()).await + if let Ok(result) = services::process::generate_edge_config( + edge_path.join(BINARY_NAME), + node_path.clone(), + ) + .await { if result.is_err() { println!("Failed to generate edge config"); @@ -67,7 +88,7 @@ pub(crate) async fn handle_command( } } - let node_config = NodeConfig::new(&node_path, Some(cmd)); + let node_config = NodeConfig::new(&node_path, Some(NodeCommands::Init(Box::new(cmd)))); // Creating the TOML output insert_into_toml(&mut config_toml, node_config); @@ -92,7 +113,10 @@ pub(crate) async fn handle_command( Ok(()) } Some(NodeCommands::Up(cmd)) => { - let name = cmd + let cmd_cloned = cmd.clone(); + let command = *cmd; + + let name = cmd_cloned .name .as_ref() .expect("No name or default was given for node"); @@ -107,9 +131,8 @@ pub(crate) async fn handle_command( std::process::exit(1); } - // FIXME: Handle properly the `cmd` - let config = NodeConfig::new(&node_path, None); - info!( + let config = NodeConfig::new(&node_path, Some(NodeCommands::Up(Box::new(command)))); + println!( "⚙️ Reading the configuration from {}/config.toml", node_path.display() ); @@ -122,7 +145,7 @@ pub(crate) async fn handle_command( let genesis = match Genesis::new(genesis_file_path.clone()) { Ok(genesis) => genesis, Err(_) => { - error!( + println!( "Could not load genesis.json file on path {} \n Please make sure to have \ a valid genesis.json file for your subnet in the {}/subnet/{} folder.", genesis_file_path.display(), @@ -146,12 +169,18 @@ pub(crate) async fn handle_command( let shutdown_token = CancellationToken::new(); let shutdown_trigger = shutdown_token.clone(); + + // Setup instrumentation if both otlp agent and otlp service name + // are provided as arguments + let basic_controller = + setup_tracing(verbose, cmd_cloned.otlp_agent, cmd_cloned.otlp_service_name)?; + let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); let mut processes = FuturesUnordered::new(); // Edge node - if cmd.no_edge_process { + if cmd_cloned.no_edge_process { info!("Using external edge node, skip running of local edge instance...") } else if let Some(edge_config) = config.edge { let data_dir = node_path.clone(); @@ -162,7 +191,7 @@ pub(crate) async fn handle_command( data_dir.display(), edge_config.args ); - processes.push(services::spawn_edge_process( + processes.push(services::process::spawn_edge_process( edge_path.join(BINARY_NAME), data_dir, genesis.path.clone(), @@ -183,7 +212,7 @@ pub(crate) async fn handle_command( "Running sequencer with configuration {:?}", sequencer_config ); - processes.push(services::spawn_sequencer_process( + processes.push(services::process::spawn_sequencer_process( sequencer_config, &keys, (shutdown_token.clone(), shutdown_sender.clone()), @@ -193,7 +222,7 @@ pub(crate) async fn handle_command( // TCE if config.base.subnet == "topos" { info!("Running topos TCE service...",); - processes.push(services::spawn_tce_process( + processes.push(services::process::spawn_tce_process( config.tce.clone().unwrap(), keys, genesis, @@ -206,28 +235,56 @@ pub(crate) async fn handle_command( tokio::select! { _ = signal::ctrl_c() => { info!("Received ctrl_c, shutting down application..."); - shutdown(shutdown_trigger, shutdown_receiver).await; + shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await; } Some(result) = processes.next() => { info!("Terminate: {result:?}"); if let Err(e) = result { error!("Termination: {e}"); } - shutdown(shutdown_trigger, shutdown_receiver).await; + shutdown(basic_controller, shutdown_trigger, shutdown_receiver).await; processes.clear(); } }; Ok(()) } + Some(NodeCommands::Status(status)) => { + let mut node_service = NodeService::with_grpc_endpoint(&status.node_args.node); + let exit_code = i32::from(!(node_service.call(status).await?)); + std::process::exit(exit_code); + } None => Ok(()), } } -pub async fn shutdown(trigger: CancellationToken, mut termination: mpsc::Receiver<()>) { +fn setup_console_tce_grpc(endpoint: &str) -> Arc>> { + match Endpoint::from_shared(endpoint.to_string()) { + Ok(endpoint) => Arc::new(Mutex::new(ConsoleServiceClient::new( + endpoint.connect_lazy(), + ))), + Err(e) => { + error!("Failure to setup the gRPC API endpoint on {endpoint}: {e}"); + std::process::exit(1); + } + } +} + +pub async fn shutdown( + basic_controller: Option, + trigger: CancellationToken, + mut termination: mpsc::Receiver<()>, +) { trigger.cancel(); // Wait that all sender get dropped info!("Waiting that all components dropped"); let _ = termination.recv().await; info!("Shutdown procedure finished, exiting..."); + // Shutdown tracing + global::shutdown_tracer_provider(); + if let Some(basic_controller) = basic_controller { + if let Err(e) = basic_controller.stop(&tracing::Span::current().context()) { + error!("Error stopping tracing: {e}"); + } + } } diff --git a/crates/topos/src/components/node/services.rs b/crates/topos/src/components/node/services.rs index 6526869b2..102a1bd4c 100644 --- a/crates/topos/src/components/node/services.rs +++ b/crates/topos/src/components/node/services.rs @@ -1,137 +1,2 @@ -use crate::config::sequencer::SequencerConfig; -use crate::config::tce::TceConfig; -use crate::edge::CommandConfig; -use std::collections::HashMap; -use std::path::PathBuf; -use std::time::Duration; -use thiserror::Error; -use tokio::{spawn, sync::mpsc, task::JoinHandle}; -use tokio_util::sync::CancellationToken; -use topos_p2p::config::NetworkConfig; -use topos_sequencer::SequencerConfiguration; -use topos_tce::config::{AuthKey, StorageConfiguration, TceConfiguration}; -use topos_tce_transport::ReliableBroadcastParams; -use topos_wallet::SecretManager; -use tracing::{debug, error, info}; - -use crate::config::genesis::Genesis; - -#[derive(Error, Debug)] -pub enum Errors { - #[error("Failure on the TCE")] - TceFailure, - #[error("Failure on the Sequencer")] - SequencerFailure, - #[error("Failure on the Edge")] - EdgeTerminated(#[from] std::io::Error), -} - -pub fn generate_edge_config( - edge_path: PathBuf, - config_path: PathBuf, -) -> JoinHandle> { - // Create the Polygon Edge config - info!("Generating the configuration at {config_path:?}"); - info!("Polygon-edge binary located at: {edge_path:?}"); - spawn(async move { - match CommandConfig::new(edge_path) - .init(&config_path) - .spawn() - .await - { - Ok(status) => { - info!("Edge process terminated: {status:?}"); - Ok(()) - } - Err(e) => { - println!("Failed to run the edge binary: {e:?}"); - Err(Errors::EdgeTerminated(e)) - } - } - }) -} - -pub(crate) fn spawn_sequencer_process( - config: SequencerConfig, - keys: &SecretManager, - shutdown: (CancellationToken, mpsc::Sender<()>), -) -> JoinHandle> { - let config = SequencerConfiguration { - subnet_id: config.subnet_id, - public_key: keys.validator_pubkey(), - subnet_jsonrpc_http: config.subnet_jsonrpc_http, - subnet_jsonrpc_ws: config.subnet_jsonrpc_ws, - subnet_contract_address: config.subnet_contract_address, - tce_grpc_endpoint: config.tce_grpc_endpoint, - signing_key: keys.validator.clone().unwrap(), - verifier: 0, - start_block: config.start_block, - }; - - debug!("Sequencer args: {config:?}"); - spawn(async move { - topos_sequencer::run(config, shutdown).await.map_err(|e| { - error!("Failure on the Sequencer: {e:?}"); - Errors::SequencerFailure - }) - }) -} - -pub(crate) fn spawn_tce_process( - config: TceConfig, - keys: SecretManager, - genesis: Genesis, - shutdown: (CancellationToken, mpsc::Sender<()>), -) -> JoinHandle> { - let tce_config = TceConfiguration { - boot_peers: genesis - .boot_peers(Some(topos_p2p::constants::TCE_BOOTNODE_PORT)) - .into_iter() - .chain(config.parse_boot_peers()) - .collect::>(), - validators: genesis.validators().expect("Cannot parse validators"), - auth_key: keys.network.map(AuthKey::PrivateKey), - signing_key: keys.validator.map(AuthKey::PrivateKey), - tce_addr: format!("/ip4/{}", config.libp2p_api_addr.ip()), - tce_local_port: config.libp2p_api_addr.port(), - tce_params: ReliableBroadcastParams::new(genesis.validator_count()), - api_addr: config.grpc_api_addr, - graphql_api_addr: config.graphql_api_addr, - metrics_api_addr: config.metrics_api_addr, - storage: StorageConfiguration::RocksDB(Some(config.db_path)), - network_bootstrap_timeout: Duration::from_secs(180), - minimum_cluster_size: config - .minimum_tce_cluster_size - .unwrap_or(NetworkConfig::MINIMUM_CLUSTER_SIZE), - version: env!("TOPOS_VERSION"), - }; - - debug!("TCE args: {tce_config:?}"); - spawn(async move { - topos_tce::run(&tce_config, shutdown).await.map_err(|e| { - error!("TCE process terminated: {e:?}"); - Errors::TceFailure - }) - }) -} - -pub fn spawn_edge_process( - edge_path: PathBuf, - data_dir: PathBuf, - genesis_path: PathBuf, - edge_args: HashMap, -) -> JoinHandle> { - spawn(async move { - match CommandConfig::new(edge_path) - .server(&data_dir, &genesis_path, edge_args) - .spawn() - .await - { - Ok(status) => { - info!("Edge process terminated: {status:?}"); - Ok(()) - } - Err(e) => Err(Errors::EdgeTerminated(e)), - } - }) -} +pub(crate) mod process; +pub(crate) mod status; diff --git a/crates/topos/src/components/node/services/process.rs b/crates/topos/src/components/node/services/process.rs new file mode 100644 index 000000000..ce8b4fde4 --- /dev/null +++ b/crates/topos/src/components/node/services/process.rs @@ -0,0 +1,137 @@ +use crate::config::sequencer::SequencerConfig; +use crate::config::tce::TceConfig; +use crate::edge::CommandConfig; +use std::collections::HashMap; +use std::path::PathBuf; +use std::time::Duration; +use thiserror::Error; +use tokio::{spawn, sync::mpsc, task::JoinHandle}; +use tokio_util::sync::CancellationToken; +use topos_p2p::config::NetworkConfig; +use topos_sequencer::SequencerConfiguration; +use topos_tce::config::{AuthKey, StorageConfiguration, TceConfiguration}; +use topos_tce_transport::ReliableBroadcastParams; +use topos_wallet::SecretManager; +use tracing::{debug, error, info}; + +use crate::config::genesis::Genesis; + +#[derive(Error, Debug)] +pub enum Errors { + #[error("TCE error")] + TceFailure, + #[error("Sequencer error")] + SequencerFailure, + #[error("Edge error: {0}")] + EdgeTerminated(#[from] std::io::Error), +} + +pub fn generate_edge_config( + edge_path: PathBuf, + config_path: PathBuf, +) -> JoinHandle> { + // Create the Polygon Edge config + info!("Generating the configuration at {config_path:?}"); + info!("Polygon-edge binary located at: {edge_path:?}"); + spawn(async move { + match CommandConfig::new(edge_path) + .init(&config_path) + .spawn() + .await + { + Ok(status) => { + info!("Edge process terminated: {status:?}"); + Ok(()) + } + Err(e) => { + println!("Failed to run the edge binary: {e:?}"); + Err(Errors::EdgeTerminated(e)) + } + } + }) +} + +pub(crate) fn spawn_sequencer_process( + config: SequencerConfig, + keys: &SecretManager, + shutdown: (CancellationToken, mpsc::Sender<()>), +) -> JoinHandle> { + let config = SequencerConfiguration { + subnet_id: config.subnet_id, + public_key: keys.validator_pubkey(), + subnet_jsonrpc_http: config.subnet_jsonrpc_http, + subnet_jsonrpc_ws: config.subnet_jsonrpc_ws, + subnet_contract_address: config.subnet_contract_address, + tce_grpc_endpoint: config.tce_grpc_endpoint, + signing_key: keys.validator.clone().unwrap(), + verifier: 0, + start_block: config.start_block, + }; + + debug!("Sequencer args: {config:?}"); + spawn(async move { + topos_sequencer::run(config, shutdown).await.map_err(|e| { + error!("Sequencer failure: {e:?}"); + Errors::SequencerFailure + }) + }) +} + +pub(crate) fn spawn_tce_process( + config: TceConfig, + keys: SecretManager, + genesis: Genesis, + shutdown: (CancellationToken, mpsc::Sender<()>), +) -> JoinHandle> { + let tce_config = TceConfiguration { + boot_peers: genesis + .boot_peers(Some(topos_p2p::constants::TCE_BOOTNODE_PORT)) + .into_iter() + .chain(config.parse_boot_peers()) + .collect::>(), + validators: genesis.validators().expect("Cannot parse validators"), + auth_key: keys.network.map(AuthKey::PrivateKey), + signing_key: keys.validator.map(AuthKey::PrivateKey), + tce_addr: format!("/ip4/{}", config.libp2p_api_addr.ip()), + tce_local_port: config.libp2p_api_addr.port(), + tce_params: ReliableBroadcastParams::new(genesis.validator_count()), + api_addr: config.grpc_api_addr, + graphql_api_addr: config.graphql_api_addr, + metrics_api_addr: config.metrics_api_addr, + storage: StorageConfiguration::RocksDB(Some(config.db_path)), + network_bootstrap_timeout: Duration::from_secs(90), + minimum_cluster_size: config + .minimum_tce_cluster_size + .unwrap_or(NetworkConfig::MINIMUM_CLUSTER_SIZE), + version: env!("TOPOS_VERSION"), + }; + + debug!("TCE args: {tce_config:?}"); + spawn(async move { + topos_tce::run(&tce_config, shutdown).await.map_err(|e| { + error!("TCE process terminated: {e:?}"); + Errors::TceFailure + }) + }) +} + +pub fn spawn_edge_process( + edge_path: PathBuf, + data_dir: PathBuf, + genesis_path: PathBuf, + edge_args: HashMap, +) -> JoinHandle> { + spawn(async move { + match CommandConfig::new(edge_path) + .server(&data_dir, &genesis_path, edge_args) + .spawn() + .await + { + Ok(status) => { + info!("Edge process terminated: {status:?}"); + Ok(()) + } + Err(e) => Err(Errors::EdgeTerminated(e)), + } + }) +} diff --git a/crates/topos/src/components/tce/services/status.rs b/crates/topos/src/components/node/services/status.rs similarity index 92% rename from crates/topos/src/components/tce/services/status.rs rename to crates/topos/src/components/node/services/status.rs index 09b7bd5ac..62feac520 100644 --- a/crates/topos/src/components/tce/services/status.rs +++ b/crates/topos/src/components/node/services/status.rs @@ -10,9 +10,9 @@ use topos_core::api::grpc::tce::v1::StatusRequest; use tower::Service; use tracing::{debug, error}; -use crate::components::tce::{commands::Status, TCEService}; +use crate::components::node::{commands::Status, NodeService}; -impl Service for TCEService { +impl Service for NodeService { type Response = bool; type Error = std::io::Error; diff --git a/crates/topos/src/components/regtest/commands.rs b/crates/topos/src/components/regtest/commands.rs new file mode 100644 index 000000000..865109c1a --- /dev/null +++ b/crates/topos/src/components/regtest/commands.rs @@ -0,0 +1,34 @@ +use clap::{Args, Subcommand}; + +mod push_certificate; +mod spam; + +pub(crate) use push_certificate::PushCertificate; +pub(crate) use spam::Spam; + +/// Run test commands (e.g., pushing a certificate to a TCE process) +#[derive(Args, Debug)] +pub(crate) struct RegtestCommand { + #[clap(from_global)] + pub(crate) verbose: u8, + + #[clap(subcommand)] + pub(crate) subcommands: Option, +} + +#[derive(Subcommand, Debug)] +pub(crate) enum RegtestCommands { + PushCertificate(Box), + Spam(Box), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_run() { + assert!(RegtestCommands::has_subcommand("push-certificate")); + assert!(RegtestCommands::has_subcommand("spam")); + } +} diff --git a/crates/topos/src/components/tce/commands/push_certificate.rs b/crates/topos/src/components/regtest/commands/push_certificate.rs similarity index 92% rename from crates/topos/src/components/tce/commands/push_certificate.rs rename to crates/topos/src/components/regtest/commands/push_certificate.rs index f688c3859..1d8681f68 100644 --- a/crates/topos/src/components/tce/commands/push_certificate.rs +++ b/crates/topos/src/components/regtest/commands/push_certificate.rs @@ -3,6 +3,7 @@ use clap::Args; use crate::options::input_format::InputFormat; #[derive(Args, Debug)] +#[command(about = "Push a certificate to a TCE process")] pub(crate) struct PushCertificate { #[arg(short, long="format", value_enum, default_value_t = InputFormat::Plain)] pub(crate) format: InputFormat, diff --git a/crates/topos/src/components/network/commands/spam.rs b/crates/topos/src/components/regtest/commands/spam.rs similarity index 100% rename from crates/topos/src/components/network/commands/spam.rs rename to crates/topos/src/components/regtest/commands/spam.rs diff --git a/crates/topos/src/components/network/mod.rs b/crates/topos/src/components/regtest/mod.rs similarity index 76% rename from crates/topos/src/components/network/mod.rs rename to crates/topos/src/components/regtest/mod.rs index 20f53f139..9cf04db8f 100644 --- a/crates/topos/src/components/network/mod.rs +++ b/crates/topos/src/components/regtest/mod.rs @@ -1,4 +1,4 @@ -use self::commands::{NetworkCommand, NetworkCommands}; +use self::commands::{RegtestCommand, RegtestCommands}; use opentelemetry::global; use tokio::{ @@ -6,21 +6,43 @@ use tokio::{ sync::{mpsc, oneshot}, }; use topos_certificate_spammer::CertificateSpammerConfig; -use tracing::{error, info}; +use tracing::{debug, error, info}; use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::tracing::setup_tracing; pub(crate) mod commands; +pub(crate) mod services; pub(crate) async fn handle_command( - NetworkCommand { - subcommands, + RegtestCommand { verbose, - }: NetworkCommand, + subcommands, + }: RegtestCommand, ) -> Result<(), Box> { match subcommands { - Some(NetworkCommands::Spam(cmd)) => { + Some(RegtestCommands::PushCertificate(cmd)) => { + debug!("Start executing PushCertificate command"); + match services::push_certificate::check_delivery( + cmd.timeout_broadcast, + cmd.format, + cmd.nodes, + cmd.timeout, + ) + .await + .map_err(Box::::from) + { + Err(e) => { + error!("Check failed: {:?}", e); + std::process::exit(1); + } + _ => { + info!("Check passed"); + Ok(()) + } + } + } + Some(RegtestCommands::Spam(cmd)) => { let config = CertificateSpammerConfig { target_nodes: cmd.target_nodes, target_nodes_path: cmd.target_nodes_path, @@ -34,7 +56,6 @@ pub(crate) async fn handle_command( // Setup instrumentation if both otlp agent and otlp service name // are provided as arguments - // We may want to use instrumentation in e2e tests let basic_controller = setup_tracing(verbose, cmd.otlp_agent, cmd.otlp_service_name)?; let (shutdown_sender, shutdown_receiver) = mpsc::channel::>(1); diff --git a/crates/topos/src/components/tce/services.rs b/crates/topos/src/components/regtest/services.rs similarity index 73% rename from crates/topos/src/components/tce/services.rs rename to crates/topos/src/components/regtest/services.rs index 9dc8c56b7..bc5b128b6 100644 --- a/crates/topos/src/components/tce/services.rs +++ b/crates/topos/src/components/regtest/services.rs @@ -1,2 +1 @@ pub(crate) mod push_certificate; -mod status; diff --git a/crates/topos/src/components/tce/services/push_certificate.rs b/crates/topos/src/components/regtest/services/push_certificate.rs similarity index 100% rename from crates/topos/src/components/tce/services/push_certificate.rs rename to crates/topos/src/components/regtest/services/push_certificate.rs diff --git a/crates/topos/src/components/sequencer/commands.rs b/crates/topos/src/components/sequencer/commands.rs deleted file mode 100644 index 6b65785f8..000000000 --- a/crates/topos/src/components/sequencer/commands.rs +++ /dev/null @@ -1,30 +0,0 @@ -use clap::{Args, Subcommand}; - -mod run; - -pub(crate) use run::Run; - -/// Topos CLI subcommand for the Sequencer components -#[derive(Args, Debug)] -pub(crate) struct SequencerCommand { - #[clap(from_global)] - pub(crate) verbose: u8, - - #[clap(subcommand)] - pub(crate) subcommands: Option, -} - -#[derive(Subcommand, Debug)] -pub(crate) enum SequencerCommands { - Run(Box), -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_run() { - assert!(SequencerCommands::has_subcommand("run")); - } -} diff --git a/crates/topos/src/components/sequencer/commands/run.rs b/crates/topos/src/components/sequencer/commands/run.rs deleted file mode 100644 index 6f01e5b9d..000000000 --- a/crates/topos/src/components/sequencer/commands/run.rs +++ /dev/null @@ -1,65 +0,0 @@ -use clap::Args; -use serde::Serialize; -use std::path::PathBuf; - -#[derive(Args, Debug, Serialize)] -#[command(about = "Run a full Topos Sequencer instance")] -pub struct Run { - /// SubnetId of the local subnet node, hex encoded 32 bytes starting with 0x - #[clap(long, env = "TOPOS_LOCAL_SUBNET_ID")] - pub subnet_id: Option, - - /// Subnet endpoint in the form [ip address]:[port] - /// Topos sequencer expects both websocket and http protocol available - /// on this subnet endpoint. If optional `subnet_jsonrpc_ws` is not provided websocket endpoint - /// will be deduced from this parameter. - #[clap( - long, - default_value = "127.0.0.1:8545", - env = "TOPOS_SUBNET_JSONRPC_HTTP" - )] - pub subnet_jsonrpc_http: String, - - /// Optional explicit websocket endpoint for the subnet jsonrpc api. If this parameter is not provided, - /// it will be derived from the `subnet_jsonrpc_http`. - /// Full uri value is expected, e.g. `wss://arbitrum.infura.com/v3/ws/mykey` or `ws://127.0.0.1/ws` - #[clap(long, env = "TOPOS_SUBNET_JSONRPC_WS")] - pub subnet_jsonrpc_ws: Option, - - // Core contract address - #[clap(long, env = "SUBNET_CONTRACT_ADDRESS")] - pub subnet_contract_address: String, - - /// Base Uri of TCE node to call grpc service api - #[clap( - long, - default_value = "http://[::1]:1340", - env = "TOPOS_BASE_TCE_API_URL" - )] - pub base_tce_api_url: String, - - /// Polygon subnet node data dir, containing `consensus/validator.key`, e.g. `../test-chain-1` - #[clap(long, env = "TOPOS_LOCAL_SUBNET_DATA_DIR")] - pub subnet_data_dir: PathBuf, - - /// Verifier version - #[clap(long, default_value = "0", env = "TOPOS_SEQUENCER_VERIFIER_VERSION")] - pub verifier: u32, - - /// Socket of the opentelemetry agent endpoint - /// If not provided open telemetry will not be used - #[arg(long, env = "TOPOS_OTLP_AGENT")] - pub otlp_agent: Option, - - /// Otlp service name - /// If not provided open telemetry will not be used - #[arg(long, env = "TOPOS_OTLP_SERVICE_NAME")] - pub otlp_service_name: Option, - - /// Start synchronizing from particular block number - /// Default is to sync from genesis block (0) - #[arg(long, env = "TOPOS_START_BLOCK")] - pub start_block: Option, -} - -impl Run {} diff --git a/crates/topos/src/components/sequencer/mod.rs b/crates/topos/src/components/sequencer/mod.rs deleted file mode 100644 index a874c59bc..000000000 --- a/crates/topos/src/components/sequencer/mod.rs +++ /dev/null @@ -1,71 +0,0 @@ -use self::commands::{SequencerCommand, SequencerCommands}; -use tokio::{signal, sync::mpsc}; -use tokio_util::sync::CancellationToken; -use topos_sequencer::{self, SequencerConfiguration}; -use topos_wallet::SecretManager; -use tracing::{error, info, warn}; - -use crate::tracing::setup_tracing; - -pub(crate) mod commands; - -pub(crate) async fn handle_command( - SequencerCommand { - verbose, - subcommands, - }: SequencerCommand, -) -> Result<(), Box> { - match subcommands { - Some(SequencerCommands::Run(cmd)) => { - let keys = SecretManager::from_fs(cmd.subnet_data_dir); - let config = SequencerConfiguration { - subnet_id: cmd.subnet_id, - public_key: None, - subnet_jsonrpc_http: cmd.subnet_jsonrpc_http, - subnet_jsonrpc_ws: cmd.subnet_jsonrpc_ws, - subnet_contract_address: cmd.subnet_contract_address, - tce_grpc_endpoint: cmd.base_tce_api_url, - signing_key: keys.validator.clone().unwrap(), - verifier: cmd.verifier, - start_block: cmd.start_block, - }; - - // Setup instrumentation if both otlp agent and otlp service name are provided as arguments - setup_tracing(verbose, cmd.otlp_agent, cmd.otlp_service_name)?; - - warn!("DEPRECATED: Please run with `topos node up`"); - - print_sequencer_info(&config); - let shutdown_token = CancellationToken::new(); - let shutdown_trigger = shutdown_token.clone(); - - let (shutdown_sender, mut shutdown_receiver) = mpsc::channel(1); - - tokio::select! { - _ = signal::ctrl_c() => { - info!("Received ctrl_c, shutting down application..."); - shutdown_trigger.cancel(); - - // Wait that all sender get dropped - let _ = shutdown_receiver.recv().await; - - info!("Shutdown procedure finished, exiting..."); - } - result = topos_sequencer::run(config, (shutdown_token, shutdown_sender)) => { - if let Err(ref error) = result { - error!("Sequencer node terminated {:?}", error); - std::process::exit(1); - } - } - } - - Ok(()) - } - None => Ok(()), - } -} - -pub fn print_sequencer_info(config: &SequencerConfiguration) { - info!("Sequencer Node"); - info!("{:?}", config); -} diff --git a/crates/topos/src/components/setup/commands.rs b/crates/topos/src/components/setup/commands.rs index 585e95802..0d5072a1c 100644 --- a/crates/topos/src/components/setup/commands.rs +++ b/crates/topos/src/components/setup/commands.rs @@ -7,9 +7,6 @@ pub(crate) use subnet::Subnet; /// Topos CLI subcommand for the setup of various Topos related components (e.g., installation of Polygon Edge binary) #[derive(Args, Debug)] pub(crate) struct SetupCommand { - #[clap(from_global)] - pub(crate) verbose: u8, - #[clap(subcommand)] pub(crate) subcommands: Option, } diff --git a/crates/topos/src/components/setup/mod.rs b/crates/topos/src/components/setup/mod.rs index baeb0c9f9..8d93c47fd 100644 --- a/crates/topos/src/components/setup/mod.rs +++ b/crates/topos/src/components/setup/mod.rs @@ -2,22 +2,15 @@ use self::commands::{SetupCommand, SetupCommands}; use tokio::{signal, spawn}; use tracing::{error, info}; -use crate::tracing::setup_tracing; - use topos::{install_polygon_edge, list_polygon_edge_releases}; pub(crate) mod commands; pub(crate) async fn handle_command( - SetupCommand { - subcommands, - verbose, - }: SetupCommand, + SetupCommand { subcommands }: SetupCommand, ) -> Result<(), Box> { match subcommands { Some(SetupCommands::Subnet(cmd)) => { - setup_tracing(verbose, None, None)?; - spawn(async move { if cmd.list_releases { info!( diff --git a/crates/topos/src/components/subnet/commands.rs b/crates/topos/src/components/subnet/commands.rs deleted file mode 100644 index 8b6bf74fb..000000000 --- a/crates/topos/src/components/subnet/commands.rs +++ /dev/null @@ -1,30 +0,0 @@ -use clap::{Args, Subcommand}; - -mod subnet; - -pub(crate) use subnet::Run; - -/// Topos CLI subcommand for the Polygon Edge related functionalities -#[derive(Args, Debug)] -pub(crate) struct SubnetCommand { - #[clap(from_global)] - pub(crate) verbose: u8, - - #[clap(subcommand)] - pub(crate) subcommands: Option, -} - -#[derive(Subcommand, Debug)] -pub(crate) enum SubnetCommands { - Run(Run), -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_run() { - assert!(SubnetCommands::has_subcommand("run")); - } -} diff --git a/crates/topos/src/components/subnet/commands/subnet.rs b/crates/topos/src/components/subnet/commands/subnet.rs deleted file mode 100644 index 34b074d59..000000000 --- a/crates/topos/src/components/subnet/commands/subnet.rs +++ /dev/null @@ -1,19 +0,0 @@ -use clap::Parser; - -use serde::Serialize; -use std::path::PathBuf; - -#[derive(Parser, Debug, Serialize)] -#[clap(about = "Run Polygon Edge", trailing_var_arg = true)] -pub struct Run { - /// Installation directory path for Polygon Edge binary. - /// If not provided, Polygon Edge binary will be expected in the current directory - #[arg(long, env = "TOPOS_POLYGON_EDGE_BIN_PATH", default_value = ".")] - pub path: PathBuf, - - /// Polygon Edge command line arguments - #[clap(allow_hyphen_values = true)] - pub polygon_edge_arguments: Vec, -} - -impl Run {} diff --git a/crates/topos/src/components/subnet/mod.rs b/crates/topos/src/components/subnet/mod.rs deleted file mode 100644 index 2e190a625..000000000 --- a/crates/topos/src/components/subnet/mod.rs +++ /dev/null @@ -1,62 +0,0 @@ -use self::commands::{SubnetCommand, SubnetCommands}; -use std::process::Stdio; -use tokio::{process::Command, signal, spawn}; -use tracing::{error, info}; - -use crate::tracing::setup_tracing; - -pub(crate) mod commands; - -pub(crate) async fn handle_command( - SubnetCommand { - subcommands, - verbose, - }: SubnetCommand, -) -> Result<(), Box> { - match subcommands { - Some(SubnetCommands::Run(cmd)) => { - setup_tracing(verbose, None, None)?; - - let binary_name = "polygon-edge".to_string(); - let polygon_edge_path = cmd.path.join(&binary_name); - - info!( - "Running binary {} with arguments:{:?}", - polygon_edge_path.display(), - cmd.polygon_edge_arguments - ); - - spawn(async move { - // Run Polygon Edge command. Pass all parameters. - match Command::new(polygon_edge_path) - .args(cmd.polygon_edge_arguments) - .stderr(Stdio::inherit()) - .stdout(Stdio::inherit()) - .stdin(Stdio::inherit()) - .spawn() - { - Ok(mut child) => { - if let Some(pid) = child.id() { - info!("Polygon Edge child process with pid {pid} successfully started"); - } - if let Err(e) = child.wait().await { - info!("Polygon Edge child process finished with error: {e}"); - } - std::process::exit(0); - } - Err(e) => { - error!("Error executing Polygon Edge: {e}"); - std::process::exit(1); - } - } - }); - - signal::ctrl_c() - .await - .expect("failed to listen for signals"); - - Ok(()) - } - None => Ok(()), - } -} diff --git a/crates/topos/src/components/tce/commands.rs b/crates/topos/src/components/tce/commands.rs deleted file mode 100644 index 871ee0842..000000000 --- a/crates/topos/src/components/tce/commands.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::path::PathBuf; - -use clap::{Args, Subcommand}; - -mod peer_id; -mod push_certificate; -mod run; -mod status; - -pub(crate) use push_certificate::PushCertificate; -pub(crate) use run::Run; -pub(crate) use status::Status; - -use self::peer_id::Keys; - -#[derive(Args, Debug)] -pub(crate) struct NodeArgument { - #[clap(short, long, default_value = "http://[::1]:1340")] - pub(crate) node: String, -} - -/// Topos CLI subcommand for the TCE related functionalities -#[derive(Args, Debug)] -pub(crate) struct TceCommand { - #[clap(from_global)] - pub(crate) verbose: u8, - - #[clap(from_global)] - pub(crate) home: PathBuf, - - #[clap(subcommand)] - pub(crate) subcommands: Option, -} - -#[derive(Subcommand, Debug)] -pub(crate) enum TceCommands { - PushCertificate(PushCertificate), - Keys(Keys), - Run(Box), - Status(Status), -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_run() { - assert!(TceCommands::has_subcommand("run")); - } -} diff --git a/crates/topos/src/components/tce/commands/peer_id.rs b/crates/topos/src/components/tce/commands/peer_id.rs deleted file mode 100644 index 26461f6d3..000000000 --- a/crates/topos/src/components/tce/commands/peer_id.rs +++ /dev/null @@ -1,7 +0,0 @@ -use clap::Args; - -#[derive(Args, Debug)] -pub(crate) struct Keys { - #[arg(long = "from-seed")] - pub(crate) from_seed: Option, -} diff --git a/crates/topos/src/components/tce/commands/run.rs b/crates/topos/src/components/tce/commands/run.rs deleted file mode 100644 index d4faf83b9..000000000 --- a/crates/topos/src/components/tce/commands/run.rs +++ /dev/null @@ -1,113 +0,0 @@ -use clap::Args; -use serde::Serialize; -use std::collections::HashSet; -use std::net::SocketAddr; -use std::str::FromStr; -use topos_core::types::{ValidatorId, ValidatorIdConversionError}; -use topos_p2p::{Multiaddr, PeerId}; -use topos_tce_transport::ReliableBroadcastParams; - -#[derive(Args, Debug, Serialize)] -#[command(about = "Run a full TCE instance")] -pub struct Run { - /// Boot nodes to connect to, pairs of , space separated, - /// quoted list like --boot-peers='a a1,b b1' - #[arg(long, default_value = "", env = "TCE_BOOT_PEERS")] - pub boot_peers: String, - - /// Validator nodes to connect to, list of Ethereum addresses, space separated, - /// quoted list like --validators='0xfd530a60b4b4cf799d74' - #[arg(long, default_value = "", env = "TCE_VALIDATORS", default_value = "")] - pub validators: String, - - /// Advertised (externally visible) , - /// if empty this machine ip address(es) are used - #[arg(long, env = "TCE_EXT_HOST", default_value = "/ip4/0.0.0.0")] - pub tce_ext_host: String, - - /// Port to listen on (host is 0.0.0.0, should be good for most installations) - #[arg(long, default_value_t = 0, env = "TCE_PORT")] - pub tce_local_port: u16, - - /// WebAPI external url (optional) - #[clap(long, env = "TCE_WEB_API_EXT_URL")] - pub web_api_ext_url: Option, - - /// WebAPI port - #[clap(long, default_value_t = 8080, env = "TCE_WEB_API_PORT")] - pub web_api_local_port: u16, - - /// Local peer secret key seed (optional, used for testing) - #[clap(long, env = "TCE_LOCAL_KS")] - pub local_key_seed: Option, - - /// Local peer secret key seed (optional, used for testing) - #[clap(long, env = "TCE_LOCAL_VPK")] - pub local_validator_private_key: Option, - - /// Storage database path, if not set RAM storage is used - #[clap(long, default_value = "./default_db/", env = "TCE_DB_PATH")] - pub db_path: Option, - - /// gRPC API Addr - #[clap(long, env = "TCE_API_ADDR", default_value = "[::1]:1340")] - pub api_addr: SocketAddr, - - /// GraphQL API Addr - #[clap(long, env = "TCE_GRAPHQL_API_ADDR", default_value = "[::1]:4000")] - pub graphql_api_addr: SocketAddr, - - /// Metrics server API Addr - #[clap(long, env = "TCE_METRICS_API_ADDR", default_value = "[::1]:3000")] - pub metrics_api_addr: SocketAddr, - - /// Broadcast parameters - #[command(flatten)] - pub tce_params: ReliableBroadcastParams, - - /// Socket of the opentelemetry agent endpoint - /// If not provided open telemetry will not be used - #[arg(long, env = "TOPOS_OTLP_AGENT")] - pub otlp_agent: Option, - - /// Otlp service name - /// If not provided open telemetry will not be used - #[arg(long, env = "TOPOS_OTLP_SERVICE_NAME")] - pub otlp_service_name: Option, - - #[arg(long, env = "TOPOS_MINIMUM_TCE_CLUSTER_SIZE")] - pub minimum_tce_cluster_size: Option, -} - -impl Run { - pub fn parse_boot_peers(&self) -> Vec<(PeerId, Multiaddr)> { - self.boot_peers - .split(&[',', ' ']) - .map(|s| s.to_string()) - .collect::>() - .chunks(2) - .filter_map(|pair| { - if pair.len() > 1 { - Some(( - pair[0].as_str().parse().unwrap(), - pair[1].as_str().parse().unwrap(), - )) - } else { - None - } - }) - .collect() - } - - pub fn parse_validators(&self) -> Result, ValidatorIdConversionError> { - if !self.validators.is_empty() { - return self - .validators - .split(&[',', ' ']) - .map(ValidatorId::from_str) - .collect::, ValidatorIdConversionError>>(); - } - - Ok(HashSet::new()) - } -} diff --git a/crates/topos/src/components/tce/mod.rs b/crates/topos/src/components/tce/mod.rs deleted file mode 100644 index 73cfb1bae..000000000 --- a/crates/topos/src/components/tce/mod.rs +++ /dev/null @@ -1,209 +0,0 @@ -use opentelemetry::global; -use std::str::FromStr; -use std::{path::PathBuf, sync::Arc, time::Duration}; -use tokio::{ - signal, - sync::{mpsc, Mutex}, -}; -use tokio_util::sync::CancellationToken; -use tonic::transport::{Channel, Endpoint}; -use topos_core::api::grpc::tce::v1::{ - api_service_client::ApiServiceClient, console_service_client::ConsoleServiceClient, -}; -use topos_p2p::config::NetworkConfig; -use topos_tce::config::{AuthKey, StorageConfiguration, TceConfiguration}; -use tower::Service; -use tracing::{debug, error, info, warn}; - -use crate::tracing::setup_tracing; - -use self::commands::{TceCommand, TceCommands}; - -pub(crate) mod commands; -pub(crate) mod parser; -pub(crate) mod services; - -pub(crate) struct TCEService { - pub(crate) console_client: Arc>>, - pub(crate) _api_client: Arc>>, -} - -impl TCEService { - pub(crate) fn with_grpc_endpoint(endpoint: &str) -> Self { - Self { - console_client: setup_console_tce_grpc(endpoint), - _api_client: setup_api_tce_grpc(endpoint), - } - } -} - -pub(crate) async fn handle_command( - TceCommand { - verbose, - mut subcommands, - .. - }: TceCommand, -) -> Result<(), Box> { - if let Some(TceCommands::Run(cmd)) = subcommands.as_mut() { - // Setup instrumentation if both otlp agent and otlp service name are provided as arguments - setup_tracing(verbose, cmd.otlp_agent.take(), cmd.otlp_service_name.take())?; - } else { - setup_tracing(verbose, None, None)?; - }; - - match subcommands { - Some(TceCommands::PushCertificate(cmd)) => { - debug!("Start executing PushCertificate command"); - match services::push_certificate::check_delivery( - cmd.timeout_broadcast, - cmd.format, - cmd.nodes, - cmd.timeout, - ) - .await - .map_err(Box::::from) - { - Err(e) => { - error!("Check failed: {:?}", e); - std::process::exit(1); - } - _ => { - info!("Check passed"); - Ok(()) - } - } - } - - Some(TceCommands::Run(cmd)) => { - let config = TceConfiguration { - boot_peers: cmd.parse_boot_peers(), - validators: cmd - .parse_validators() - .map_err(|_| Box::new(topos::Error::InvalidValidatorAddress))?, - auth_key: cmd - .local_key_seed - .clone() - .map(|s| AuthKey::Seed(s.as_bytes().to_vec())), - signing_key: cmd - .local_validator_private_key - .clone() - .map(|s| { - hex::decode(s) - .map(AuthKey::PrivateKey) - .map_err(|_| Box::new(topos::Error::InvalidPrivateKey)) - }) - .map_or(Ok(None), |v| v.map(Some))?, - tce_addr: cmd.tce_ext_host, - tce_local_port: cmd.tce_local_port, - tce_params: cmd.tce_params, - api_addr: cmd.api_addr, - graphql_api_addr: cmd.graphql_api_addr, - metrics_api_addr: cmd.metrics_api_addr, - storage: StorageConfiguration::RocksDB( - cmd.db_path - .as_ref() - .and_then(|path| PathBuf::from_str(path).ok()), - ), - network_bootstrap_timeout: Duration::from_secs(10), - minimum_cluster_size: cmd - .minimum_tce_cluster_size - .unwrap_or(NetworkConfig::MINIMUM_CLUSTER_SIZE), - version: env!("TOPOS_VERSION"), - }; - - print_node_info(&config); - - warn!("DEPRECATED: Please run with `topos node up`"); - - let shutdown_token = CancellationToken::new(); - let shutdown_trigger = shutdown_token.clone(); - - let (shutdown_sender, mut shutdown_receiver) = mpsc::channel(1); - - tokio::select! { - _ = signal::ctrl_c() => { - info!("Received ctrl_c, shutting down application..."); - shutdown_trigger.cancel(); - - // Wait that all sender get dropped - let _ = shutdown_receiver.recv().await; - - info!("Shutdown procedure finished, exiting..."); - } - result = topos_tce::run(&config, (shutdown_token, shutdown_sender)) => { - global::shutdown_tracer_provider(); - if let Err(ref error) = result { - error!("TCE node terminated {:?}", error); - std::process::exit(1); - } - } - - } - - Ok(()) - } - - Some(TceCommands::Keys(cmd)) => { - if let Some(slice) = cmd.from_seed { - println!( - "{}", - topos_p2p::utils::local_key_pair_from_slice(slice.as_bytes()) - .public() - .to_peer_id() - ) - }; - - Ok(()) - } - - Some(TceCommands::Status(status)) => { - debug!("Start executing Status command"); - - let mut tce_service = TCEService::with_grpc_endpoint(&status.node_args.node); - - debug!("Executing the Status on the TCE service"); - let exit_code = i32::from(!(tce_service.call(status).await?)); - - std::process::exit(exit_code); - } - - None => Ok(()), - } -} - -pub fn print_node_info(config: &TceConfiguration) { - tracing::warn!("Topos - version: {}", config.version); - - if let StorageConfiguration::RocksDB(Some(ref path)) = config.storage { - info!("RocksDB at {:?}", path); - } - - warn!("API gRPC endpoint reachable at {}", config.api_addr); - info!( - "API GraphQL endpoint reachable at {}", - config.graphql_api_addr - ); - warn!("Broadcast Parameters {:?}", config.tce_params); -} - -fn setup_console_tce_grpc(endpoint: &str) -> Arc>> { - match Endpoint::from_shared(endpoint.to_string()) { - Ok(endpoint) => Arc::new(Mutex::new(ConsoleServiceClient::new( - endpoint.connect_lazy(), - ))), - Err(e) => { - error!("Failure to setup the gRPC API endpoint on {endpoint}: {e}"); - std::process::exit(1); - } - } -} - -fn setup_api_tce_grpc(endpoint: &str) -> Arc>> { - match Endpoint::from_shared(endpoint.to_string()) { - Ok(endpoint) => Arc::new(Mutex::new(ApiServiceClient::new(endpoint.connect_lazy()))), - Err(e) => { - error!("Failure to setup the gRPC API endpoint on {endpoint}: {e}"); - std::process::exit(1); - } - } -} diff --git a/crates/topos/src/components/tce/parser.rs b/crates/topos/src/components/tce/parser.rs deleted file mode 100644 index 8c5395860..000000000 --- a/crates/topos/src/components/tce/parser.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::{ - fs::File, - io::{self, Read}, - path::Path, - str::FromStr, -}; - -use topos_p2p::PeerId; - -use crate::options::input_format::{InputFormat, Parser}; - -pub(crate) struct PeerList(pub(crate) Option); - -impl Parser for InputFormat { - type Result = Result, io::Error>; - - fn parse(&self, PeerList(input): PeerList) -> Self::Result { - let mut input_string = String::new(); - _ = match input { - Some(path) if Path::new(&path).is_file() => { - File::open(path)?.read_to_string(&mut input_string)? - } - Some(string) => { - input_string = string; - 0 - } - None => io::stdin().read_to_string(&mut input_string)?, - }; - - match self { - InputFormat::Json => Ok(serde_json::from_str::>(&input_string)?), - InputFormat::Plain => Ok(input_string - .trim() - .split(&[',', '\n']) - .filter_map(|s| PeerId::from_str(s.trim()).ok()) - .collect()), - } - } -} diff --git a/crates/topos/src/config/base.rs b/crates/topos/src/config/base.rs index f8092bfec..59bde8a3a 100644 --- a/crates/topos/src/config/base.rs +++ b/crates/topos/src/config/base.rs @@ -6,7 +6,6 @@ use figment::{ }; use serde::{Deserialize, Serialize}; -use crate::components::node::commands::Init; use crate::config::node::NodeRole; use crate::config::Config; @@ -57,8 +56,6 @@ impl BaseConfig { } impl Config for BaseConfig { - type Command = Init; - type Output = Self; fn load_from_file(figment: Figment, home: &Path) -> Figment { @@ -75,7 +72,7 @@ impl Config for BaseConfig { figment.extract() } - fn profile(&self) -> String { + fn profile() -> String { "base".to_string() } } diff --git a/crates/topos/src/config/edge.rs b/crates/topos/src/config/edge.rs index 0f55ae636..0886cbf4b 100644 --- a/crates/topos/src/config/edge.rs +++ b/crates/topos/src/config/edge.rs @@ -1,13 +1,10 @@ -use std::{collections::HashMap, path::Path}; - use crate::config::Config; use figment::{ providers::{Format, Toml}, Figment, }; use serde::{Deserialize, Serialize}; - -use crate::components::subnet::commands::Run; +use std::{collections::HashMap, path::Path}; // TODO: Provides the default arguments here // Serde `flatten` and `default` doesn't work together yet @@ -20,8 +17,6 @@ pub struct EdgeConfig { } impl Config for EdgeConfig { - type Command = Run; - type Output = EdgeConfig; fn load_from_file(figment: Figment, home: &Path) -> Figment { @@ -38,7 +33,7 @@ impl Config for EdgeConfig { figment.extract() } - fn profile(&self) -> String { + fn profile() -> String { "edge".to_string() } } diff --git a/crates/topos/src/config/mod.rs b/crates/topos/src/config/mod.rs index 40a954db6..8e6169ea1 100644 --- a/crates/topos/src/config/mod.rs +++ b/crates/topos/src/config/mod.rs @@ -1,24 +1,19 @@ -#[cfg(feature = "node")] pub(crate) mod base; -#[cfg(feature = "node")] pub(crate) mod edge; -#[cfg(feature = "node")] pub(crate) mod node; -#[cfg(feature = "sequencer")] pub(crate) mod sequencer; -#[cfg(feature = "tce")] pub mod tce; pub(crate) mod genesis; +use crate::components::node::commands::NodeCommands; use std::path::Path; -use figment::{error::Kind, providers::Serialized, Figment}; +use figment::providers::Serialized; +use figment::{error::Kind, Figment}; use serde::Serialize; pub(crate) trait Config: Serialize { - /// The command line command to load the configuration. - type Command: Serialize; /// The configuration type returned (should be Self). type Output; @@ -33,36 +28,39 @@ pub(crate) trait Config: Serialize { /// Return the profile name of the configuration to be used /// when generating the file. - fn profile(&self) -> String; + fn profile() -> String; /// Convert the configuration to a TOML table. fn to_toml(&self) -> Result { toml::Table::try_from(self) } - /// Load the configuration from the command line command. - fn load_from_command(figment: Figment, command: Self::Command) -> Figment { - figment.merge(Serialized::defaults(command)) - } - /// Main function to load the configuration. /// It will load the configuration from the file and the command line (if any) /// and then extract the configuration from the context in order to build the Config. /// The Config is then returned or an error if the configuration is not valid. - fn load(home: &Path, command: Option) -> Result { + fn load(home: &Path, command: Option) -> Result { let mut figment = Figment::new(); figment = Self::load_from_file(figment, home); if let Some(command) = command { - figment = Self::load_from_command(figment, command); + match command { + NodeCommands::Up(up) => { + figment = figment.merge(Serialized::from(up, Self::profile())) + } + NodeCommands::Init(init) => { + figment = figment.merge(Serialized::from(init, Self::profile())) + } + _ => (), + } } Self::load_context(figment) } } -pub(crate) fn load_config(node_path: &Path, command: Option) -> T::Output { +pub(crate) fn load_config(node_path: &Path, command: Option) -> T::Output { match T::load(node_path, command) { Ok(config) => config, Err(figment::Error { diff --git a/crates/topos/src/config/node.rs b/crates/topos/src/config/node.rs index 156bdc99b..cbe61171d 100644 --- a/crates/topos/src/config/node.rs +++ b/crates/topos/src/config/node.rs @@ -5,15 +5,14 @@ use figment::{ Figment, }; +use crate::components::node::commands::NodeCommands; use serde::{Deserialize, Serialize}; -use crate::components::node::{self, commands::Up}; use crate::config::{ - base::BaseConfig, edge::EdgeConfig, sequencer::SequencerConfig, tce::TceConfig, Config, + base::BaseConfig, edge::EdgeConfig, load_config, sequencer::SequencerConfig, tce::TceConfig, + Config, }; -use super::load_config; - #[derive(clap::ValueEnum, Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum NodeRole { @@ -31,25 +30,25 @@ pub(crate) struct NodeConfig { } impl NodeConfig { - pub fn new(from: &Path, cmd: Option) -> Self { - let base = load_config::(from, cmd); + pub fn new(home: &Path, cmd: Option) -> Self { + let base = load_config::(home, cmd); let mut config = NodeConfig { base: base.clone(), sequencer: base .need_sequencer() - .then(|| load_config::(from, None)), + .then(|| load_config::(home, None)), tce: base .need_tce() - .then(|| load_config::(from, None)), + .then(|| load_config::(home, None)), edge: base .need_edge() - .then(|| load_config::(from, None)), + .then(|| load_config::(home, None)), }; // Make the TCE DB path relative to the folder if let Some(config) = config.tce.as_mut() { - config.db_path = from.join(&config.db_path); + config.db_path = home.join(&config.db_path); } config @@ -57,13 +56,8 @@ impl NodeConfig { } impl Config for NodeConfig { - type Command = Up; type Output = NodeConfig; - fn profile(&self) -> String { - "default".to_string() - } - fn load_from_file(figment: Figment, home: &Path) -> Figment { let home = home.join("config.toml"); @@ -73,4 +67,8 @@ impl Config for NodeConfig { fn load_context(figment: Figment) -> Result { figment.extract() } + + fn profile() -> String { + "default".to_string() + } } diff --git a/crates/topos/src/config/sequencer.rs b/crates/topos/src/config/sequencer.rs index 0cfb84a4d..6f82b9f36 100644 --- a/crates/topos/src/config/sequencer.rs +++ b/crates/topos/src/config/sequencer.rs @@ -1,6 +1,5 @@ use std::path::Path; -use crate::components::sequencer::commands::Run; use crate::config::Config; use figment::{ providers::{Format, Toml}, @@ -56,8 +55,6 @@ fn default_tce_grpc_endpoint() -> String { } impl Config for SequencerConfig { - type Command = Run; - type Output = Self; fn load_from_file(figment: Figment, home: &Path) -> Figment { @@ -74,7 +71,7 @@ impl Config for SequencerConfig { figment.extract() } - fn profile(&self) -> String { + fn profile() -> String { "sequencer".to_string() } } diff --git a/crates/topos/src/config/tce.rs b/crates/topos/src/config/tce.rs index fb1132ad6..b2f7b1bb2 100644 --- a/crates/topos/src/config/tce.rs +++ b/crates/topos/src/config/tce.rs @@ -7,7 +7,6 @@ use figment::{ }; use serde::{Deserialize, Serialize}; -use crate::components::tce::commands::Run; use crate::config::Config; use topos_p2p::{Multiaddr, PeerId}; @@ -103,13 +102,8 @@ impl TceConfig { } impl Config for TceConfig { - type Command = Run; type Output = TceConfig; - fn profile(&self) -> String { - "tce".to_string() - } - fn load_from_file(figment: Figment, home: &Path) -> Figment { let home = home.join("config.toml"); @@ -123,4 +117,8 @@ impl Config for TceConfig { fn load_context(figment: Figment) -> Result { figment.extract() } + + fn profile() -> String { + "tce".to_string() + } } diff --git a/crates/topos/src/main.rs b/crates/topos/src/main.rs index 4b153a06e..7d00e4f20 100644 --- a/crates/topos/src/main.rs +++ b/crates/topos/src/main.rs @@ -4,10 +4,7 @@ pub(crate) mod components; pub(crate) mod options; mod tracing; -#[cfg(feature = "node")] mod config; - -#[cfg(feature = "node")] mod edge; use crate::options::ToposCommand; @@ -20,18 +17,8 @@ async fn main() -> Result<(), Box> { let args = options::Opt::parse(); match args.commands { - #[cfg(feature = "tce")] - ToposCommand::Tce(cmd) => components::tce::handle_command(cmd).await, - #[cfg(feature = "sequencer")] - ToposCommand::Sequencer(cmd) => components::sequencer::handle_command(cmd).await, - #[cfg(feature = "network")] - ToposCommand::Network(cmd) => components::network::handle_command(cmd).await, - #[cfg(feature = "setup")] ToposCommand::Setup(cmd) => components::setup::handle_command(cmd).await, - #[cfg(feature = "subnet")] - ToposCommand::Subnet(cmd) => components::subnet::handle_command(cmd).await, - #[cfg(feature = "node")] ToposCommand::Node(cmd) => components::node::handle_command(cmd).await, - ToposCommand::Doctor => components::doctor::handle_doctor().await, + ToposCommand::Regtest(cmd) => components::regtest::handle_command(cmd).await, } } diff --git a/crates/topos/src/options.rs b/crates/topos/src/options.rs index d48169802..e62339bc6 100644 --- a/crates/topos/src/options.rs +++ b/crates/topos/src/options.rs @@ -1,23 +1,9 @@ use clap::{Parser, Subcommand}; use std::{ffi::OsString, path::PathBuf}; -#[cfg(feature = "sequencer")] -use crate::components::sequencer::commands::SequencerCommand; - -#[cfg(feature = "tce")] -use crate::components::tce::commands::TceCommand; - -#[cfg(feature = "network")] -use crate::components::network::commands::NetworkCommand; - -#[cfg(feature = "setup")] -use crate::components::setup::commands::SetupCommand; - -#[cfg(feature = "subnet")] -use crate::components::subnet::commands::SubnetCommand; - -#[cfg(feature = "node")] use crate::components::node::commands::NodeCommand; +use crate::components::regtest::commands::RegtestCommand; +use crate::components::setup::commands::SetupCommand; pub(crate) mod input_format; @@ -57,17 +43,7 @@ fn get_default_home() -> OsString { #[derive(Subcommand, Debug)] pub(crate) enum ToposCommand { - #[cfg(feature = "tce")] - Tce(TceCommand), - #[cfg(feature = "sequencer")] - Sequencer(SequencerCommand), - #[cfg(feature = "network")] - Network(NetworkCommand), - #[cfg(feature = "setup")] Setup(SetupCommand), - #[cfg(feature = "subnet")] - Subnet(SubnetCommand), - #[cfg(feature = "node")] Node(NodeCommand), - Doctor, + Regtest(RegtestCommand), } diff --git a/crates/topos/src/tracing.rs b/crates/topos/src/tracing.rs index 2cb78e862..4e8c6945a 100644 --- a/crates/topos/src/tracing.rs +++ b/crates/topos/src/tracing.rs @@ -197,6 +197,8 @@ pub(crate) fn setup_tracing( opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + global::set_text_map_propagator(TraceContextPropagator::new()); + None } else { None diff --git a/crates/topos/tests/config.rs b/crates/topos/tests/config.rs index 9da6bdefe..69098128f 100644 --- a/crates/topos/tests/config.rs +++ b/crates/topos/tests/config.rs @@ -1,4 +1,5 @@ use assert_cmd::prelude::*; +use std::fs::remove_dir_all; use std::path::PathBuf; use std::process::Command; use tempfile::tempdir; @@ -25,9 +26,9 @@ async fn polygon_edge_path(path: &str) -> String { } #[tokio::test] -async fn test_handle_command_init() -> Result<(), Box> { - let temporary_test_folder = "/tmp/topos/handle_command_init"; - let path = polygon_edge_path(temporary_test_folder).await; +async fn handle_command_init() -> Result<(), Box> { + let tmp_home_dir = tempdir()?; + let path = polygon_edge_path(tmp_home_dir.path().to_str().unwrap()).await; let mut cmd = Command::cargo_bin("topos")?; cmd.arg("node") @@ -35,14 +36,14 @@ async fn test_handle_command_init() -> Result<(), Box> { .arg(path) .arg("init") .arg("--home") - .arg(temporary_test_folder); + .arg(tmp_home_dir.path().to_str().unwrap()); let output = cmd.assert().success(); let result: &str = std::str::from_utf8(&output.get_output().stdout)?; assert!(result.contains("Created node config file")); - let home = PathBuf::from(temporary_test_folder); + let home = PathBuf::from(tmp_home_dir.path()); // Verification: check that the config file was created let config_path = home.join("node").join("default").join("config.toml"); @@ -56,14 +57,12 @@ async fn test_handle_command_init() -> Result<(), Box> { assert!(config_contents.contains("[edge]")); assert!(config_contents.contains("[tce]")); - std::fs::remove_dir_all(temporary_test_folder)?; - Ok(()) } #[test] -fn test_nothing_written_if_failure() -> Result<(), Box> { - let temporary_test_folder = "/tmp/topos/test_nothing_written_if_failure"; +fn nothing_written_if_failure() -> Result<(), Box> { + let tmp_home_dir = tempdir()?; let mut cmd = Command::cargo_bin("topos")?; cmd.arg("node") @@ -71,35 +70,33 @@ fn test_nothing_written_if_failure() -> Result<(), Box> { .arg("./inexistent/folder/") // Command will fail .arg("init") .arg("--home") - .arg(temporary_test_folder); + .arg(tmp_home_dir.path().to_str().unwrap()); // Should fail cmd.assert().failure(); - let home = PathBuf::from(temporary_test_folder); + let home = PathBuf::from(tmp_home_dir.path().to_str().unwrap()); // Check that files were NOT created let config_path = home.join("node").join("default"); assert!(!config_path.exists()); - std::fs::remove_dir_all(temporary_test_folder)?; - Ok(()) } #[tokio::test] -async fn test_handle_command_init_with_custom_name() -> Result<(), Box> { - let temporary_test_folder = "/tmp/topos/test_handle_command_init_with_custom_name"; +async fn handle_command_init_with_custom_name() -> Result<(), Box> { + let tmp_home_dir = tempdir()?; let node_name = "TEST_NODE"; - let path = polygon_edge_path(temporary_test_folder).await; + let path = polygon_edge_path(tmp_home_dir.path().to_str().unwrap()).await; let mut cmd = Command::cargo_bin("topos")?; cmd.arg("node") .arg("--edge-path") - .arg(path) + .arg(path.clone()) .arg("init") .arg("--home") - .arg(temporary_test_folder) + .arg(tmp_home_dir.path().to_str().unwrap()) .arg("--name") .arg(node_name); @@ -108,7 +105,7 @@ async fn test_handle_command_init_with_custom_name() -> Result<(), Box Result<(), Box Result<(), Box> { +async fn command_init_precedence_env() -> Result<(), Box> { let tmp_home_directory = tempdir()?; // Test node init with env variables @@ -151,6 +146,7 @@ async fn test_command_init_precedence_env() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { +async fn command_init_precedence_cli_env() -> Result<(), Box> { let tmp_home_dir = tempdir()?; // Test node init with both cli and env flags @@ -221,5 +218,7 @@ async fn test_command_init_precedence_cli_env() -> Result<(), Box Result<(), Box> { let mut cmd = Command::cargo_bin("topos")?; - cmd.arg("tce").arg("push-certificate").arg("-h"); + cmd.arg("regtest").arg("push-certificate").arg("-h"); let output = cmd.assert().success(); @@ -15,3 +20,55 @@ fn help_display() -> Result<(), Box> { Ok(()) } + +#[rstest] +#[test_log::test(tokio::test)] +#[timeout(Duration::from_secs(20))] +// FIXME: This test is flaky, it fails sometimes because of sample failure +async fn assert_delivery() -> Result<(), Box> { + let mut peers_context = create_network(5, vec![]).await; + + let mut status: Vec = Vec::new(); + + for (_peer_id, client) in peers_context.iter_mut() { + let response = client + .console_grpc_client + .status(StatusRequest {}) + .await + .expect("Can't get status"); + + status.push(response.into_inner().has_active_sample); + } + + assert!(status.iter().all(|s| *s)); + + let nodes: String = peers_context + .iter() + .map(|peer| peer.1.api_entrypoint.clone()) + .collect::>() + .join(","); + + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + + _ = thread::spawn(|| { + let mut cmd = Command::cargo_bin("topos").unwrap(); + cmd.env("TOPOS_LOG_FORMAT", "json"); + cmd.env("RUST_LOG", "topos=debug"); + + cmd.arg("tce") + .arg("push-certificates") + .args(["-f", "plain"]) + .arg("-n") + .arg(nodes); + + cmd.assert().success(); + + tx.send(()).unwrap(); + }); + + _ = tokio::time::timeout(Duration::from_secs(15), rx) + .await + .unwrap(); + + Ok(()) +} diff --git a/crates/topos/tests/network.rs b/crates/topos/tests/regtest.rs similarity index 75% rename from crates/topos/tests/network.rs rename to crates/topos/tests/regtest.rs index c7cdddaa4..ad194558c 100644 --- a/crates/topos/tests/network.rs +++ b/crates/topos/tests/regtest.rs @@ -5,9 +5,9 @@ use std::process::Command; use assert_cmd::prelude::*; #[test] -fn network_spam_help_display() -> Result<(), Box> { +fn regtest_spam_help_display() -> Result<(), Box> { let mut cmd = Command::cargo_bin("topos")?; - cmd.arg("network").arg("spam").arg("-h"); + cmd.arg("regtest").arg("spam").arg("-h"); let output = cmd.assert().success(); diff --git a/crates/topos/tests/sequencer.rs b/crates/topos/tests/sequencer.rs deleted file mode 100644 index e9c4ab805..000000000 --- a/crates/topos/tests/sequencer.rs +++ /dev/null @@ -1,19 +0,0 @@ -mod utils; - -use std::process::Command; - -use assert_cmd::prelude::*; - -#[test] -fn sequencer_help_display() -> Result<(), Box> { - let mut cmd = Command::cargo_bin("topos")?; - cmd.arg("sequencer").arg("run").arg("-h"); - - let output = cmd.assert().success(); - - let result: &str = std::str::from_utf8(&output.get_output().stdout)?; - - insta::assert_snapshot!(utils::sanitize_config_folder_path(result)); - - Ok(()) -} diff --git a/crates/topos/tests/snapshots/node__help_display.snap b/crates/topos/tests/snapshots/node__help_display.snap index d992989b6..e2392dbf5 100644 --- a/crates/topos/tests/snapshots/node__help_display.snap +++ b/crates/topos/tests/snapshots/node__help_display.snap @@ -1,5 +1,6 @@ --- source: crates/topos/tests/node.rs +assertion_line: 16 expression: "utils::sanitize_config_folder_path(result)" --- Utility to manage your nodes in the Topos network @@ -7,9 +8,10 @@ Utility to manage your nodes in the Topos network Usage: topos node [OPTIONS] [COMMAND] Commands: - up Spawn your node! - init Setup your node! - help Print this message or the help of the given subcommand(s) + up Spawn your node + init Setup your node + status Get node status + help Print this message or the help of the given subcommand(s) Options: --edge-path Installation directory path for Polygon Edge binary [env: TOPOS_POLYGON_EDGE_BIN_PATH=] [default: .] diff --git a/crates/topos/tests/snapshots/push_certificate__help_display.snap b/crates/topos/tests/snapshots/push_certificate__help_display.snap index 6cd2e1097..584913996 100644 --- a/crates/topos/tests/snapshots/push_certificate__help_display.snap +++ b/crates/topos/tests/snapshots/push_certificate__help_display.snap @@ -1,8 +1,11 @@ --- source: crates/topos/tests/push-certificate.rs -expression: result +assertion_line: 19 +expression: "utils::sanitize_config_folder_path(result)" --- -Usage: topos tce push-certificate [OPTIONS] +Push a certificate to a TCE process + +Usage: topos regtest push-certificate [OPTIONS] Options: -f, --format diff --git a/crates/topos/tests/snapshots/network__network_spam_help_display.snap b/crates/topos/tests/snapshots/regtest__regtest_spam_help_display.snap similarity index 94% rename from crates/topos/tests/snapshots/network__network_spam_help_display.snap rename to crates/topos/tests/snapshots/regtest__regtest_spam_help_display.snap index 14d60e60a..1f5d22d73 100644 --- a/crates/topos/tests/snapshots/network__network_spam_help_display.snap +++ b/crates/topos/tests/snapshots/regtest__regtest_spam_help_display.snap @@ -1,10 +1,11 @@ --- -source: crates/topos/tests/network.rs -expression: result +source: crates/topos/tests/regtest.rs +assertion_line: 16 +expression: "utils::sanitize_config_folder_path(result)" --- Run a test topos certificate spammer to send test certificates to the network, generating randomly among the `nb_subnets` subnets the batch of `cert_per_batch` certificates at every `batch-interval` -Usage: topos network spam [OPTIONS] +Usage: topos regtest spam [OPTIONS] Options: --target-nodes diff --git a/crates/topos/tests/snapshots/sequencer__sequencer_help_display.snap b/crates/topos/tests/snapshots/sequencer__sequencer_help_display.snap deleted file mode 100644 index a2a3306ea..000000000 --- a/crates/topos/tests/snapshots/sequencer__sequencer_help_display.snap +++ /dev/null @@ -1,36 +0,0 @@ ---- -source: crates/topos/tests/sequencer.rs -expression: "utils::sanitize_config_folder_path(result)" ---- -Run a full Topos Sequencer instance - -Usage: topos sequencer run [OPTIONS] --subnet-contract-address --subnet-data-dir - -Options: - --subnet-id - SubnetId of the local subnet node, hex encoded 32 bytes starting with 0x [env: TOPOS_LOCAL_SUBNET_ID=] - -v, --verbose... - Defines the verbosity level - --home - Home directory for the configuration [env: TOPOS_HOME=] [default: /home/runner/.config/topos] - --subnet-jsonrpc-http - Subnet endpoint in the form [ip address]:[port] Topos sequencer expects both websocket and http protocol available on this subnet endpoint. If optional `subnet_jsonrpc_ws` is not provided websocket endpoint will be deduced from this parameter [env: TOPOS_SUBNET_JSONRPC_HTTP=] [default: 127.0.0.1:8545] - --subnet-jsonrpc-ws - Optional explicit websocket endpoint for the subnet jsonrpc api. If this parameter is not provided, it will be derived from the `subnet_jsonrpc_http`. Full uri value is expected, e.g. `wss://arbitrum.infura.com/v3/ws/mykey` or `ws://127.0.0.1/ws` [env: TOPOS_SUBNET_JSONRPC_WS=] - --subnet-contract-address - [env: SUBNET_CONTRACT_ADDRESS=] - --base-tce-api-url - Base Uri of TCE node to call grpc service api [env: TOPOS_BASE_TCE_API_URL=] [default: http://[::1]:1340] - --subnet-data-dir - Polygon subnet node data dir, containing `consensus/validator.key`, e.g. `../test-chain-1` [env: TOPOS_LOCAL_SUBNET_DATA_DIR=] - --verifier - Verifier version [env: TOPOS_SEQUENCER_VERIFIER_VERSION=] [default: 0] - --otlp-agent - Socket of the opentelemetry agent endpoint If not provided open telemetry will not be used [env: TOPOS_OTLP_AGENT=] - --otlp-service-name - Otlp service name If not provided open telemetry will not be used [env: TOPOS_OTLP_SERVICE_NAME=] - --start-block - Start synchronizing from particular block number Default is to sync from genesis block (0) [env: TOPOS_START_BLOCK=] - -h, --help - Print help - diff --git a/crates/topos/tests/snapshots/tce__can_get_a_peer_id_from_a_seed.snap b/crates/topos/tests/snapshots/tce__can_get_a_peer_id_from_a_seed.snap deleted file mode 100644 index 7bdbd0580..000000000 --- a/crates/topos/tests/snapshots/tce__can_get_a_peer_id_from_a_seed.snap +++ /dev/null @@ -1,6 +0,0 @@ ---- -source: crates/topos/tests/tce.rs -expression: result ---- -12D3KooWRhFCXBhmsMnur3up3vJsDoqWh4c39PKXgSWwzAzDHNLn - diff --git a/crates/topos/tests/snapshots/tce__do_not_push_empty_list.snap b/crates/topos/tests/snapshots/tce__do_not_push_empty_list.snap deleted file mode 100644 index 3d2a1913d..000000000 --- a/crates/topos/tests/snapshots/tce__do_not_push_empty_list.snap +++ /dev/null @@ -1,12 +0,0 @@ ---- -source: crates/topos/tests/tce.rs -expression: "serde_json::from_slice::(&output.get_output().stdout).unwrap()" ---- -{ - "fields": { - "message": "Pushing an empty list is prevented unless you provide the --force flag" - }, - "level": "ERROR", - "target": "topos::components::tce::services::push_peer_list", - "timestamp": "[timestamp]" -} diff --git a/crates/topos/tests/snapshots/tce__help_display.snap b/crates/topos/tests/snapshots/tce__help_display.snap deleted file mode 100644 index 60707477e..000000000 --- a/crates/topos/tests/snapshots/tce__help_display.snap +++ /dev/null @@ -1,52 +0,0 @@ ---- -source: crates/topos/tests/tce.rs -expression: "utils::sanitize_config_folder_path(result)" ---- -Run a full TCE instance - -Usage: topos tce run [OPTIONS] - -Options: - --boot-peers - Boot nodes to connect to, pairs of , space separated, quoted list like --boot-peers='a a1,b b1' [env: TCE_BOOT_PEERS=] [default: ] - -v, --verbose... - Defines the verbosity level - --home - Home directory for the configuration [env: TOPOS_HOME=] [default: /home/runner/.config/topos] - --validators - Validator nodes to connect to, list of Ethereum addresses, space separated, quoted list like --validators='0xfd530a60b4b4cf799d74' [env: TCE_VALIDATORS=] [default: ] - --tce-ext-host - Advertised (externally visible) , if empty this machine ip address(es) are used [env: TCE_EXT_HOST=] [default: /ip4/0.0.0.0] - --tce-local-port - Port to listen on (host is 0.0.0.0, should be good for most installations) [env: TCE_PORT=] [default: 0] - --web-api-ext-url - WebAPI external url (optional) [env: TCE_WEB_API_EXT_URL=] - --web-api-local-port - WebAPI port [env: TCE_WEB_API_PORT=] [default: 8080] - --local-key-seed - Local peer secret key seed (optional, used for testing) [env: TCE_LOCAL_KS=] - --local-validator-private-key - Local peer secret key seed (optional, used for testing) [env: TCE_LOCAL_VPK=] - --db-path - Storage database path, if not set RAM storage is used [env: TCE_DB_PATH=] [default: ./default_db/] - --api-addr - gRPC API Addr [env: TCE_API_ADDR=] [default: [::1]:1340] - --graphql-api-addr - GraphQL API Addr [env: TCE_GRAPHQL_API_ADDR=] [default: [::1]:4000] - --metrics-api-addr - Metrics server API Addr [env: TCE_METRICS_API_ADDR=] [default: [::1]:3000] - --echo-threshold - Echo threshold [env: TCE_ECHO_THRESHOLD=] [default: 1] - --ready-threshold - Ready threshold [env: TCE_READY_THRESHOLD=] [default: 1] - --delivery-threshold - Delivery threshold [env: TCE_DELIVERY_THRESHOLD=] [default: 1] - --otlp-agent - Socket of the opentelemetry agent endpoint If not provided open telemetry will not be used [env: TOPOS_OTLP_AGENT=] - --otlp-service-name - Otlp service name If not provided open telemetry will not be used [env: TOPOS_OTLP_SERVICE_NAME=] - --minimum-tce-cluster-size - [env: TOPOS_MINIMUM_TCE_CLUSTER_SIZE=] - -h, --help - Print help - diff --git a/crates/topos/tests/tce.rs b/crates/topos/tests/tce.rs deleted file mode 100644 index ae612fbfb..000000000 --- a/crates/topos/tests/tce.rs +++ /dev/null @@ -1,47 +0,0 @@ -mod utils; - -use std::process::Command; - -use assert_cmd::prelude::*; -use tonic::{Request, Response, Status}; - -use topos_core::api::grpc::tce::v1::{ - console_service_server::ConsoleService, StatusRequest, StatusResponse, -}; - -#[test] -fn help_display() -> Result<(), Box> { - let mut cmd = Command::cargo_bin("topos")?; - cmd.arg("tce").arg("run").arg("-h"); - - let output = cmd.assert().success(); - - let result: &str = std::str::from_utf8(&output.get_output().stdout)?; - - insta::assert_snapshot!(utils::sanitize_config_folder_path(result)); - - Ok(()) -} - -#[tokio::test] -async fn can_get_a_peer_id_from_a_seed() -> Result<(), Box> { - let mut cmd = Command::cargo_bin("topos")?; - cmd.arg("tce").arg("keys").arg("--from-seed").arg("1"); - - let output = cmd.assert().success(); - - let result: &str = std::str::from_utf8(&output.get_output().stdout)?; - - insta::assert_snapshot!(result); - - Ok(()) -} - -struct DummyServer; - -#[tonic::async_trait] -impl ConsoleService for DummyServer { - async fn status(&self, _: Request) -> Result, Status> { - unimplemented!() - } -} diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index ef8626e40..cc0deae45 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -28,7 +28,7 @@ services: "autoheal": "true" "prometheus-job": "boot" healthcheck: - test: ./topos tce status --node http://localhost:1340 + test: ./topos node status --node http://localhost:1340 interval: 15s volumes: - shared:/tmp/shared @@ -40,7 +40,6 @@ services: context: ../ args: - TOOLCHAIN_VERSION=stable - - FEATURES=tce,network,node,subnet depends_on: init: condition: service_completed_successfully @@ -56,7 +55,7 @@ services: - env/node.env - env/telemetry.env environment: - - RUST_LOG=topos=debug,topos_tce_storage=info,topos_tce_synchronizer=info + - RUST_LOG=topos=info,topos_tce_storage=info,topos_tce_synchronizer=info peer: image: ghcr.io/topos-protocol/topos:main @@ -65,7 +64,7 @@ services: labels: "autoheal": "true" healthcheck: - test: ./topos tce status --node http://localhost:1340 + test: ./topos node status --node http://localhost:1340 interval: 5s volumes: - shared:/tmp/shared @@ -75,7 +74,6 @@ services: context: ../ args: - TOOLCHAIN_VERSION=stable - - FEATURES=tce,network,node,subnet depends_on: init: condition: service_completed_successfully @@ -95,7 +93,7 @@ services: - env/node.env - env/telemetry.env environment: - - RUST_LOG=topos=debug,topos_tce_storage=info,topos_tce_synchronizer=debug + - RUST_LOG=topos=info,topos_tce_storage=info,topos_tce_synchronizer=info sync: image: ghcr.io/topos-protocol/topos:main @@ -111,7 +109,6 @@ services: context: ../ args: - TOOLCHAIN_VERSION=stable - - FEATURES=tce depends_on: autoheal: condition: service_started @@ -127,19 +124,18 @@ services: - env/node.env - env/telemetry.env environment: - - RUST_LOG=topos=debug + - RUST_LOG=topos=info spammer: container_name: spam - command: network spam + command: regtest spam image: ghcr.io/topos-protocol/topos:main init: true build: context: ../ args: - TOOLCHAIN_VERSION=stable - - FEATURES=network volumes: - shared:/tmp/shared env_file: @@ -155,7 +151,7 @@ services: check: container_name: check image: ghcr.io/topos-protocol/topos:main - command: tce push-certificate -f json + command: regtest push-certificate -f json profiles: - CI - check @@ -167,7 +163,6 @@ services: args: - TOOLCHAIN_VERSION=stable - GITHUB_TOKEN - - FEATURES=tce depends_on: boot: condition: service_healthy diff --git a/tools/init.sh b/tools/init.sh index f5c6eafe5..c606db6fd 100755 --- a/tools/init.sh +++ b/tools/init.sh @@ -32,9 +32,7 @@ case "$1" in ;; "peer") if [[ ${LOCAL_TEST_NET:-"false"} == "true" ]]; then - - PEER=$($TOPOS_BIN tce keys --from-seed=$HOSTNAME) - + export TOPOS_HOME=$TOPOS_HOME export TCE_LOCAL_KS=$HOSTNAME export TCE_EXT_HOST