diff --git a/crates/topos-p2p/src/client.rs b/crates/topos-p2p/src/client.rs index 68dfb57bd..8c094360f 100644 --- a/crates/topos-p2p/src/client.rs +++ b/crates/topos-p2p/src/client.rs @@ -23,13 +23,6 @@ pub struct NetworkClient { } impl NetworkClient { - pub async fn start_listening(&self, peer_addr: libp2p::Multiaddr) -> Result<(), P2PError> { - let (sender, receiver) = oneshot::channel(); - let command = Command::StartListening { peer_addr, sender }; - - Self::send_command_with_receiver(&self.sender, command, receiver).await - } - pub async fn connected_peers(&self) -> Result, P2PError> { let (sender, receiver) = oneshot::channel(); Self::send_command_with_receiver(&self.sender, Command::ConnectedPeers { sender }, receiver) diff --git a/crates/topos-p2p/src/command.rs b/crates/topos-p2p/src/command.rs index f819e421c..40cf5ac94 100644 --- a/crates/topos-p2p/src/command.rs +++ b/crates/topos-p2p/src/command.rs @@ -10,12 +10,6 @@ use crate::{ #[derive(Debug)] pub enum Command { - /// Executed when the node is starting - StartListening { - peer_addr: Multiaddr, - sender: oneshot::Sender>, - }, - /// Command to ask for the current connected peer id list ConnectedPeers { sender: oneshot::Sender, P2PError>>, @@ -56,7 +50,6 @@ pub enum Command { impl Display for Command { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Command::StartListening { .. } => write!(f, "StartListening"), Command::ConnectedPeers { .. } => write!(f, "ConnectedPeers"), Command::RandomKnownPeer { .. } => write!(f, "RandomKnownPeer"), Command::Disconnect { .. } => write!(f, "Disconnect"), diff --git a/crates/topos-p2p/src/error.rs b/crates/topos-p2p/src/error.rs index 3ef76a25e..44b939dcf 100644 --- a/crates/topos-p2p/src/error.rs +++ b/crates/topos-p2p/src/error.rs @@ -41,6 +41,9 @@ pub enum P2PError { #[error("Unable to create gRPC client")] UnableToCreateGrpcClient(#[from] OutboundConnectionError), + + #[error("Public addresses is empty")] + MissingPublicAddresses, } #[derive(Error, Debug)] diff --git a/crates/topos-p2p/src/network.rs b/crates/topos-p2p/src/network.rs index 1ef9e7d1d..f6f2ebbde 100644 --- a/crates/topos-p2p/src/network.rs +++ b/crates/topos-p2p/src/network.rs @@ -39,8 +39,8 @@ const TWO_HOURS: Duration = Duration::from_secs(60 * 60 * 2); pub struct NetworkBuilder<'a> { discovery_protocol: Option<&'static str>, peer_key: Option, - listen_addr: Option, - exposed_addresses: Option, + listen_addresses: Option>, + public_addresses: Option>, store: Option, known_peers: &'a [(PeerId, Multiaddr)], local_port: Option, @@ -79,14 +79,14 @@ impl<'a> NetworkBuilder<'a> { self } - pub fn exposed_addresses(mut self, addr: Multiaddr) -> Self { - self.exposed_addresses = Some(addr); + pub fn public_addresses(mut self, addresses: Vec) -> Self { + self.public_addresses = Some(addresses); self } - pub fn listen_addr(mut self, addr: Multiaddr) -> Self { - self.listen_addr = Some(addr); + pub fn listen_addresses(mut self, addresses: Vec) -> Self { + self.listen_addresses = Some(addresses); self } @@ -171,6 +171,22 @@ impl<'a> NetworkBuilder<'a> { let grpc_over_p2p = GrpcOverP2P::new(command_sender.clone()); + let listen_addr = self + .listen_addresses + .take() + .expect("Node requires at least one address to listen for incoming connections"); + + let public_addresses = self + .public_addresses + .map(|addresses| { + if addresses.is_empty() { + listen_addr.clone() + } else { + addresses + } + }) + .unwrap_or(listen_addr.clone()); + Ok(( NetworkClient { retry_ttl: self.config.client_retry_ttl, @@ -188,14 +204,8 @@ impl<'a> NetworkBuilder<'a> { command_receiver, event_sender, local_peer_id: peer_id, - listening_on: self - .listen_addr - .take() - .expect("P2P runtime expect a MultiAddr"), - addresses: self - .exposed_addresses - .take() - .expect("P2P runtime expect a MultiAddr"), + listening_on: listen_addr, + public_addresses, bootstrapped: false, active_listeners: HashSet::new(), pending_record_requests: HashMap::new(), diff --git a/crates/topos-p2p/src/runtime/handle_command.rs b/crates/topos-p2p/src/runtime/handle_command.rs index 8540919cc..148f05ba5 100644 --- a/crates/topos-p2p/src/runtime/handle_command.rs +++ b/crates/topos-p2p/src/runtime/handle_command.rs @@ -26,11 +26,6 @@ impl Runtime { _ = response.send(connection); } - Command::StartListening { peer_addr, sender } => { - if sender.send(self.start_listening(peer_addr)).is_err() { - warn!("Unable to notify StartListening response: initiator is dropped"); - } - } Command::ConnectedPeers { sender } => { if sender diff --git a/crates/topos-p2p/src/runtime/mod.rs b/crates/topos-p2p/src/runtime/mod.rs index 3ef9e6060..721e3d662 100644 --- a/crates/topos-p2p/src/runtime/mod.rs +++ b/crates/topos-p2p/src/runtime/mod.rs @@ -26,9 +26,8 @@ pub struct Runtime { pub(crate) command_receiver: mpsc::Receiver, pub(crate) event_sender: mpsc::Sender, pub(crate) local_peer_id: PeerId, - pub(crate) listening_on: Multiaddr, - #[allow(unused)] - pub(crate) addresses: Multiaddr, + pub(crate) listening_on: Vec, + pub(crate) public_addresses: Vec, pub(crate) bootstrapped: bool, pub(crate) is_boot_node: bool, @@ -46,13 +45,6 @@ mod handle_command; mod handle_event; impl Runtime { - fn start_listening(&mut self, peer_addr: Multiaddr) -> Result<(), P2PError> { - self.swarm - .listen_on(peer_addr) - .map(|_| ()) - .map_err(Into::into) - } - pub async fn bootstrap(mut self) -> Result> { if self.bootstrapped { return Err(Box::new(P2PError::BootstrapError( @@ -62,16 +54,24 @@ impl Runtime { self.bootstrapped = true; - self.swarm.add_external_address(self.addresses.clone()); + debug!("Added public addresses: {:?}", self.public_addresses); + for address in &self.public_addresses { + self.swarm.add_external_address(address.clone()); + } - let addr = self.listening_on.clone(); - if let Err(error) = self.swarm.listen_on(addr) { - error!( - "Couldn't start listening on {} because of {error:?}", - self.listening_on - ); + let dht_address = self + .public_addresses + .first() + .map(Multiaddr::to_vec) + .ok_or(P2PError::MissingPublicAddresses)?; - return Err(Box::new(error)); + debug!("Starting to listen on {:?}", self.listening_on); + for addr in &self.listening_on { + if let Err(error) = self.swarm.listen_on(addr.clone()) { + error!("Couldn't start listening on {} because of {error:?}", addr); + + return Err(Box::new(error)); + } } debug!("Starting a boot node ? {:?}", self.is_boot_node); @@ -114,7 +114,7 @@ impl Runtime { let key = Key::new(&self.local_peer_id.to_string()); addr_query_id = if let Ok(query_id_record) = self.swarm.behaviour_mut().discovery.inner.put_record( - Record::new(key, self.addresses.to_vec()), + Record::new(key, dht_address.clone()), Quorum::Majority, ) { Some(query_id_record) @@ -170,7 +170,7 @@ impl Runtime { let key = Key::new(&self.local_peer_id.to_string()); if let Ok(query_id_record) = self.swarm.behaviour_mut().discovery.inner.put_record( - Record::new(key, self.addresses.to_vec()), + Record::new(key, dht_address.clone()), Quorum::Majority, ) { diff --git a/crates/topos-p2p/src/tests/command/random_peer.rs b/crates/topos-p2p/src/tests/command/random_peer.rs index 2cea9e4de..dee0f34d9 100644 --- a/crates/topos-p2p/src/tests/command/random_peer.rs +++ b/crates/topos-p2p/src/tests/command/random_peer.rs @@ -15,8 +15,8 @@ async fn no_random_peer() { let (client, _, runtime) = crate::network::builder() .peer_key(local.keypair.clone()) - .exposed_addresses(local.addr.clone()) - .listen_addr(local.addr.clone()) + .public_addresses(vec![local.addr.clone()]) + .listen_addresses(vec![local.addr.clone()]) .build() .await .expect("Unable to create p2p network"); @@ -46,8 +46,8 @@ async fn return_a_peer() { let (client, _, runtime) = crate::network::builder() .peer_key(local.keypair.clone()) - .exposed_addresses(local.addr.clone()) - .listen_addr(local.addr.clone()) + .public_addresses(vec![local.addr.clone()]) + .listen_addresses(vec![local.addr.clone()]) .build() .await .expect("Unable to create p2p network"); @@ -75,8 +75,8 @@ async fn return_a_random_peer_among_100() { let (client, _, runtime) = crate::network::builder() .peer_key(local.keypair.clone()) - .exposed_addresses(local.addr.clone()) - .listen_addr(local.addr.clone()) + .public_addresses(vec![local.addr.clone()]) + .listen_addresses(vec![local.addr.clone()]) .build() .await .expect("Unable to create p2p network"); diff --git a/crates/topos-p2p/src/tests/dht.rs b/crates/topos-p2p/src/tests/dht.rs index 7751eea00..7de14c4c3 100644 --- a/crates/topos-p2p/src/tests/dht.rs +++ b/crates/topos-p2p/src/tests/dht.rs @@ -4,6 +4,7 @@ use futures::StreamExt; use libp2p::{ kad::{record::Key, KademliaEvent, PutRecordOk, QueryResult, Record}, swarm::SwarmEvent, + Multiaddr, }; use rstest::rstest; use test_log::test; @@ -23,8 +24,8 @@ async fn put_value_in_dht() { let (_, _, runtime) = crate::network::builder() .peer_key(peer_2.keypair.clone()) .known_peers(&[(peer_1.peer_id(), peer_1.addr.clone())]) - .exposed_addresses(peer_2.addr.clone()) - .listen_addr(peer_2.addr.clone()) + .public_addresses(vec![peer_2.addr.clone()]) + .listen_addresses(vec![peer_2.addr.clone()]) .minimum_cluster_size(1) .discovery_config( DiscoveryConfig::default().with_replication_factor(NonZeroUsize::new(1).unwrap()), @@ -40,7 +41,14 @@ async fn put_value_in_dht() { _ = kad .inner .put_record( - Record::new(input_key.clone(), runtime.addresses.to_vec()), + Record::new( + input_key.clone(), + runtime + .public_addresses + .first() + .map(Multiaddr::to_vec) + .unwrap(), + ), libp2p::kad::Quorum::One, ) .unwrap(); diff --git a/crates/topos-p2p/src/tests/support/mod.rs b/crates/topos-p2p/src/tests/support/mod.rs index 08d878874..f76458f7f 100644 --- a/crates/topos-p2p/src/tests/support/mod.rs +++ b/crates/topos-p2p/src/tests/support/mod.rs @@ -19,8 +19,8 @@ pub async fn dummy_peer() -> (NetworkClient, PeerAddr) { let (client, _stream, runtime): (_, _, Runtime) = NetworkBuilder::default() .peer_key(key) - .listen_addr(addr_dummy.clone()) - .exposed_addresses(addr_dummy) + .listen_addresses(vec![addr_dummy.clone()]) + .public_addresses(vec![addr_dummy]) .build() .await .unwrap(); diff --git a/crates/topos-tce/src/config.rs b/crates/topos-tce/src/config.rs index c5ad9b95b..ddc9dfb53 100644 --- a/crates/topos-tce/src/config.rs +++ b/crates/topos-tce/src/config.rs @@ -25,12 +25,12 @@ pub struct TceConfiguration { pub api_addr: SocketAddr, pub graphql_api_addr: SocketAddr, pub metrics_api_addr: SocketAddr, - pub tce_addr: String, - pub tce_local_port: u16, pub storage: StorageConfiguration, pub network_bootstrap_timeout: Duration, pub minimum_cluster_size: usize, pub version: &'static str, + pub listen_addresses: Vec, + pub public_addresses: Vec, } #[derive(Debug)] diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 2a6525acf..60fd39a7a 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -71,11 +71,6 @@ pub async fn run( tracing::Span::current().record("peer_id", &peer_id.to_string()); - let external_addr: Multiaddr = - format!("{}/tcp/{}", config.tce_addr, config.tce_local_port).parse()?; - - let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", config.tce_local_port).parse()?; - let mut boot_peers = config.boot_peers.clone(); // Remove myself from the bootnode list boot_peers.retain(|(p, _)| *p != peer_id); @@ -142,9 +137,9 @@ pub async fn run( let (network_client, event_stream, unbootstrapped_runtime) = topos_p2p::network::builder() .peer_key(key) - .listen_addr(addr) + .listen_addresses(config.listen_addresses.clone()) .minimum_cluster_size(config.minimum_cluster_size) - .exposed_addresses(external_addr) + .public_addresses(config.public_addresses.clone()) .known_peers(&boot_peers) .grpc_context(grpc_context) .build() diff --git a/crates/topos-test-sdk/src/lib.rs b/crates/topos-test-sdk/src/lib.rs index d52976982..34e248294 100644 --- a/crates/topos-test-sdk/src/lib.rs +++ b/crates/topos-test-sdk/src/lib.rs @@ -6,9 +6,19 @@ pub mod sequencer; pub mod storage; pub mod tce; -use std::{collections::HashSet, net::SocketAddr, sync::Mutex}; +use std::{ + collections::HashSet, + net::SocketAddr, + path::PathBuf, + str::FromStr, + sync::Mutex, + thread, + time::{SystemTime, UNIX_EPOCH}, +}; use lazy_static::lazy_static; +use rand::Rng; +use rstest::fixture; lazy_static! { pub static ref PORT_MAPPING: Mutex> = Mutex::new(HashSet::new()); @@ -92,3 +102,28 @@ fn next_available_port() -> SocketAddr { addr } + +#[fixture] +fn folder_name() -> &'static str { + Box::leak(Box::new( + thread::current().name().unwrap().replace("::", "_"), + )) +} + +#[fixture] +pub fn create_folder(folder_name: &str) -> PathBuf { + let dir = env!("TOPOS_TEST_SDK_TMP"); + let mut temp_dir = + std::path::PathBuf::from_str(dir).expect("Unable to read CARGO_TARGET_TMPDIR"); + let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let mut rng = rand::thread_rng(); + + temp_dir.push(format!( + "{}/data_{}_{}", + folder_name, + time.as_nanos(), + rng.gen::() + )); + + temp_dir +} diff --git a/crates/topos-test-sdk/src/storage/mod.rs b/crates/topos-test-sdk/src/storage/mod.rs index ecfb92d74..d6aff1207 100644 --- a/crates/topos-test-sdk/src/storage/mod.rs +++ b/crates/topos-test-sdk/src/storage/mod.rs @@ -15,12 +15,7 @@ use topos_tce_storage::{ validator::ValidatorStore, StorageClient, }; -#[fixture] -fn folder_name() -> &'static str { - Box::leak(Box::new( - thread::current().name().unwrap().replace("::", "_"), - )) -} +use crate::folder_name; #[fixture(certificates = Vec::new())] pub async fn storage_client(certificates: Vec) -> StorageClient { @@ -31,20 +26,11 @@ pub async fn storage_client(certificates: Vec) -> StorageC #[fixture] pub fn create_folder(folder_name: &str) -> PathBuf { - let dir = env!("TOPOS_TEST_SDK_TMP"); - let mut temp_dir = - std::path::PathBuf::from_str(dir).expect("Unable to read CARGO_TARGET_TMPDIR"); - let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - let mut rng = rand::thread_rng(); - - temp_dir.push(format!( - "{}/data_{}_{}/rocksdb", - folder_name, - time.as_nanos(), - rng.gen::() - )); - - temp_dir + let mut path = crate::create_folder(folder_name); + + path.push("rocksdb"); + + path } #[fixture(certificates = Vec::new())] diff --git a/crates/topos-test-sdk/src/tce/mod.rs b/crates/topos-test-sdk/src/tce/mod.rs index 7cfb8703f..e65653749 100644 --- a/crates/topos-test-sdk/src/tce/mod.rs +++ b/crates/topos-test-sdk/src/tce/mod.rs @@ -156,7 +156,7 @@ impl NodeConfig { create_network_worker( self.seed, self.port, - self.addr.clone(), + vec![self.addr.clone()], peers, self.minimum_cluster_size, router, diff --git a/crates/topos-test-sdk/src/tce/p2p.rs b/crates/topos-test-sdk/src/tce/p2p.rs index 9920a9d28..2dfacf798 100644 --- a/crates/topos-test-sdk/src/tce/p2p.rs +++ b/crates/topos-test-sdk/src/tce/p2p.rs @@ -12,7 +12,7 @@ use super::NodeConfig; pub async fn create_network_worker( seed: u8, _port: u16, - addr: Multiaddr, + addr: Vec, peers: &[NodeConfig], minimum_cluster_size: usize, router: Option, @@ -50,8 +50,8 @@ pub async fn create_network_worker( topos_p2p::network::builder() .peer_key(key.clone()) .known_peers(&known_peers) - .exposed_addresses(addr.clone()) - .listen_addr(addr) + .public_addresses(addr.clone()) + .listen_addresses(addr) .minimum_cluster_size(minimum_cluster_size) .grpc_context(grpc_context) .build() @@ -74,7 +74,7 @@ pub async fn bootstrap_network( Box, > { let (network_client, network_stream, runtime) = - create_network_worker(seed, port, addr, peers, minimum_cluster_size, router).await?; + create_network_worker(seed, port, vec![addr], peers, minimum_cluster_size, router).await?; let runtime = runtime.bootstrap().await?; diff --git a/crates/topos/src/components/node/services/process.rs b/crates/topos/src/components/node/services/process.rs index 992111ce2..83e5ddd95 100644 --- a/crates/topos/src/components/node/services/process.rs +++ b/crates/topos/src/components/node/services/process.rs @@ -8,11 +8,12 @@ use thiserror::Error; use tokio::{spawn, sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; use topos_p2p::config::NetworkConfig; +use topos_p2p::Multiaddr; use topos_sequencer::SequencerConfiguration; use topos_tce::config::{AuthKey, StorageConfiguration, TceConfiguration}; use topos_tce_transport::ReliableBroadcastParams; use topos_wallet::SecretManager; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::config::genesis::Genesis; @@ -78,7 +79,7 @@ pub(crate) fn spawn_sequencer_process( } pub(crate) fn spawn_tce_process( - config: TceConfig, + mut config: TceConfig, keys: SecretManager, genesis: Genesis, shutdown: (CancellationToken, mpsc::Sender<()>), @@ -86,6 +87,21 @@ pub(crate) fn spawn_tce_process( let validators = genesis.validators().expect("Cannot parse validators"); let tce_params = ReliableBroadcastParams::new(validators.len()); + if let Some(socket) = config.libp2p_api_addr { + warn!( + "`libp2p_api_addr` is deprecated in favor of `listen_addresses` and \ + `public_addresses` and will be removed in the next version. In order to keep your \ + node running, `libp2p_api_addr` will be used." + ); + + let addr: Multiaddr = format!("/ip4/{}/tcp/{}", socket.ip(), socket.port()) + .parse() + .expect("Unable to generate Multiaddr from `libp2p_api_addr`"); + + config.p2p.listen_addresses = vec![addr.clone()]; + config.p2p.public_addresses = vec![addr]; + } + let tce_config = TceConfiguration { boot_peers: genesis .boot_peers(Some(topos_p2p::constants::TCE_BOOTNODE_PORT)) @@ -95,14 +111,14 @@ pub(crate) fn spawn_tce_process( validators, auth_key: keys.network.map(AuthKey::PrivateKey), signing_key: keys.validator.map(AuthKey::PrivateKey), - tce_addr: format!("/ip4/{}", config.libp2p_api_addr.ip()), - tce_local_port: config.libp2p_api_addr.port(), + listen_addresses: config.p2p.listen_addresses, + public_addresses: config.p2p.public_addresses, tce_params, api_addr: config.grpc_api_addr, graphql_api_addr: config.graphql_api_addr, metrics_api_addr: config.metrics_api_addr, storage: StorageConfiguration::RocksDB(Some(config.db_path)), - network_bootstrap_timeout: Duration::from_secs(90), + network_bootstrap_timeout: Duration::from_secs(config.network_bootstrap_timeout), minimum_cluster_size: config .minimum_tce_cluster_size .unwrap_or(NetworkConfig::MINIMUM_CLUSTER_SIZE), diff --git a/crates/topos/src/config/tce.rs b/crates/topos/src/config/tce.rs index 127263d6a..a414c8405 100644 --- a/crates/topos/src/config/tce.rs +++ b/crates/topos/src/config/tce.rs @@ -1,11 +1,13 @@ use std::path::Path; use std::{net::SocketAddr, path::PathBuf}; +use figment::providers::Serialized; use figment::{ providers::{Format, Toml}, Figment, }; -use serde::{Deserialize, Serialize}; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Deserializer, Serialize}; use crate::config::Config; use topos_p2p::{Multiaddr, PeerId}; @@ -20,17 +22,16 @@ pub struct TceConfig { pub db_path: PathBuf, /// Array of extra boot nodes to connect to pub extra_boot_peers: Option, - /// Ip for the p2p Multiaddr - pub tce_ext_host: Option, - /// Port for the p2p Multiaddr - pub tce_local_port: Option, - /// Local peer secret key seed (optional, used for testing) - pub local_key_seed: Option, /// Connection degree for the GossipSub overlay pub minimum_tce_cluster_size: Option, - /// gRPC API Addr - #[serde(default = "default_libp2p_api_addr")] - pub libp2p_api_addr: SocketAddr, + + /// libp2p addresses + pub libp2p_api_addr: Option, + + /// P2P configuration + #[serde(default)] + pub p2p: P2PConfig, + /// gRPC API Addr #[serde(default = "default_grpc_api_addr")] pub grpc_api_addr: SocketAddr, @@ -46,16 +47,73 @@ pub struct TceConfig { /// Otlp service name /// If not provided open telemetry will not be used pub otlp_service_name: Option, + + #[serde(default = "default_network_bootstrap_timeout")] + pub(crate) network_bootstrap_timeout: u64, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct P2PConfig { + /// List of multiaddresses to listen for incoming connections + #[serde(default = "default_listen_addresses")] + pub listen_addresses: Vec, + /// List of multiaddresses to advertise to the network + #[serde(default = "default_public_addresses")] + pub public_addresses: Vec, +} + +impl Default for P2PConfig { + fn default() -> Self { + Self { + listen_addresses: default_listen_addresses(), + public_addresses: default_public_addresses(), + } + } } fn default_db_path() -> PathBuf { PathBuf::from("./tce_rocksdb") } +const fn default_network_bootstrap_timeout() -> u64 { + 90 +} + const fn default_libp2p_api_addr() -> SocketAddr { SocketAddr::V4(std::net::SocketAddrV4::new(DEFAULT_IP, 9090)) } +fn default_listen_addresses() -> Vec { + vec![format!( + "/ip4/{}/tcp/{}", + default_libp2p_api_addr().ip(), + default_libp2p_api_addr().port() + ) + .parse() + .expect( + r#" + Listen multiaddresses generation failure. + This is a critical bug that need to be report on `https://github.com/topos-protocol/topos/issues` + "#, + )] +} + +fn default_public_addresses() -> Vec { + vec![format!( + "/ip4/{}/tcp/{}", + default_libp2p_api_addr().ip(), + default_libp2p_api_addr().port() + ) + .parse() + .expect( + r#" + Public multiaddresses generation failure. + This is a critical bug that need to be report on `https://github.com/topos-protocol/topos/issues` + "#, + )] +} + const fn default_grpc_api_addr() -> SocketAddr { SocketAddr::V4(std::net::SocketAddrV4::new(DEFAULT_IP, 1340)) } diff --git a/crates/topos/tests/config.rs b/crates/topos/tests/config.rs index f4e349675..3916f8466 100644 --- a/crates/topos/tests/config.rs +++ b/crates/topos/tests/config.rs @@ -1,9 +1,19 @@ -use assert_cmd::prelude::*; +use assert_cmd::{assert, prelude::*}; +use libp2p::swarm::dial_opts::DialOpts; +use libp2p::swarm::{DialError, ListenError, SwarmEvent}; +use libp2p::{build_multiaddr, Multiaddr, PeerId, Swarm}; use predicates::prelude::*; use std::path::PathBuf; -use std::process::Command; +use std::process::{Command, Stdio}; +use std::thread; +use std::time::Duration; use tempfile::tempdir; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use toml::map::Map; +use toml::Value; use topos::install_polygon_edge; +use topos_test_sdk::create_folder; use crate::utils::setup_polygon_edge; @@ -320,3 +330,108 @@ async fn command_node_up() -> Result<(), Box> { Ok(()) } + +/// Test node up running from config file +#[rstest::rstest] +#[test_log::test(tokio::test)] +async fn command_node_up_with_old_config( + create_folder: PathBuf, +) -> Result<(), Box> { + let tmp_home_dir = create_folder; + + // Create config file + let node_up_home_env = tmp_home_dir.to_str().unwrap(); + let node_edge_path_env = setup_polygon_edge(node_up_home_env).await; + let node_up_role_env = "validator"; + let node_up_name_env = "test_node_up_old_config"; + let node_up_subnet_env = "topos"; + + let mut cmd = Command::cargo_bin("topos")?; + cmd.arg("node") + .env("TOPOS_POLYGON_EDGE_BIN_PATH", &node_edge_path_env) + .env("TOPOS_HOME", node_up_home_env) + .env("TOPOS_NODE_NAME", node_up_name_env) + .env("TOPOS_NODE_SUBNET", node_up_subnet_env) + .arg("init"); + + let output = cmd.assert().success(); + let result: &str = std::str::from_utf8(&output.get_output().stdout)?; + assert!(result.contains("Created node config file")); + + // Run node init with cli flags + let home = PathBuf::from(node_up_home_env); + // Verification: check that the config file was created + let config_path = home.join("node").join(node_up_name_env).join("config.toml"); + println!("config path {:?}", config_path); + assert!(config_path.exists()); + + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(config_path.clone()) + .await?; + + let mut buf = String::new(); + let _ = file.read_to_string(&mut buf).await?; + + let mut current: Map = toml::from_str(&buf)?; + let tce = current.get_mut("tce").unwrap(); + + if let Value::Table(tce_table) = tce { + tce_table.insert( + "libp2p-api-addr".to_string(), + Value::String("0.0.0.0:9091".to_string()), + ); + tce_table.insert("network-bootstrap-timeout".to_string(), Value::Integer(5)); + tce_table.remove("p2p"); + } else { + panic!("TCE configuration table malformed"); + } + + file.set_len(0).await; + file.seek(std::io::SeekFrom::Start(0)).await; + file.write_all(toml::to_string(¤t)?.as_bytes()).await; + + drop(file); + + // Generate polygon edge genesis file + let polygon_edge_bin = format!("{}/polygon-edge", node_edge_path_env); + println!("polygon_edge_bin {:?}", polygon_edge_bin); + utils::generate_polygon_edge_genesis_file( + &polygon_edge_bin, + node_up_home_env, + node_up_name_env, + node_up_subnet_env, + ) + .await?; + let polygon_edge_genesis_path = home + .join("subnet") + .join(node_up_subnet_env) + .join("genesis.json"); + assert!(polygon_edge_genesis_path.exists()); + + let mut cmd = Command::cargo_bin("topos")?; + let child = cmd + .arg("node") + .env("TOPOS_POLYGON_EDGE_BIN_PATH", &node_edge_path_env) + .env("TOPOS_NODE_NAME", node_up_name_env) + .env("TOPOS_HOME", node_up_home_env) + .env("RUST_LOG", "topos=info") + .arg("up") + .stdout(Stdio::piped()) + .spawn()?; + + let join = thread::spawn(move || child.wait_with_output()); + tokio::time::sleep(Duration::from_secs(10)).await; + + let stdout = join.join().unwrap()?.stdout; + let stdout = String::from_utf8_lossy(&stdout); + + println!("STDOUT: {}", stdout); + assert!(stdout.contains(r#"Local node is listening on "/ip4/127.0.0.1/tcp/9091/p2p/"#)); + + // Cleanup + std::fs::remove_dir_all(node_up_home_env)?; + + Ok(()) +}