Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

feat: update tce config addresses #415

Merged
merged 3 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ pub struct NetworkClient {
}

impl NetworkClient {
pub async fn start_listening(&self, peer_addr: libp2p::Multiaddr) -> Result<(), P2PError> {
let (sender, receiver) = oneshot::channel();
let command = Command::StartListening { peer_addr, sender };

Self::send_command_with_receiver(&self.sender, command, receiver).await
}

pub async fn connected_peers(&self) -> Result<Vec<PeerId>, P2PError> {
let (sender, receiver) = oneshot::channel();
Self::send_command_with_receiver(&self.sender, Command::ConnectedPeers { sender }, receiver)
Expand Down
7 changes: 0 additions & 7 deletions crates/topos-p2p/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ use crate::{

#[derive(Debug)]
pub enum Command {
/// Executed when the node is starting
StartListening {
peer_addr: Multiaddr,
sender: oneshot::Sender<Result<(), P2PError>>,
},

/// Command to ask for the current connected peer id list
ConnectedPeers {
sender: oneshot::Sender<Result<Vec<PeerId>, P2PError>>,
Expand Down Expand Up @@ -56,7 +50,6 @@ pub enum Command {
impl Display for Command {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Command::StartListening { .. } => write!(f, "StartListening"),
Command::ConnectedPeers { .. } => write!(f, "ConnectedPeers"),
Command::RandomKnownPeer { .. } => write!(f, "RandomKnownPeer"),
Command::Disconnect { .. } => write!(f, "Disconnect"),
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub enum P2PError {

#[error("Unable to create gRPC client")]
UnableToCreateGrpcClient(#[from] OutboundConnectionError),

#[error("Public addresses is empty")]
MissingPublicAddresses,
}

#[derive(Error, Debug)]
Expand Down
38 changes: 24 additions & 14 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ const TWO_HOURS: Duration = Duration::from_secs(60 * 60 * 2);
pub struct NetworkBuilder<'a> {
discovery_protocol: Option<&'static str>,
peer_key: Option<Keypair>,
listen_addr: Option<Multiaddr>,
exposed_addresses: Option<Multiaddr>,
listen_addresses: Option<Vec<Multiaddr>>,
public_addresses: Option<Vec<Multiaddr>>,
store: Option<MemoryStore>,
known_peers: &'a [(PeerId, Multiaddr)],
local_port: Option<u8>,
Expand Down Expand Up @@ -79,14 +79,14 @@ impl<'a> NetworkBuilder<'a> {
self
}

pub fn exposed_addresses(mut self, addr: Multiaddr) -> Self {
self.exposed_addresses = Some(addr);
pub fn public_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
self.public_addresses = Some(addresses);

self
}

pub fn listen_addr(mut self, addr: Multiaddr) -> Self {
self.listen_addr = Some(addr);
pub fn listen_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
self.listen_addresses = Some(addresses);

self
}
Expand Down Expand Up @@ -171,6 +171,22 @@ impl<'a> NetworkBuilder<'a> {

let grpc_over_p2p = GrpcOverP2P::new(command_sender.clone());

let listen_addr = self
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
.listen_addresses
.take()
.expect("Node requires at least one address to listen for incoming connections");

let public_addresses = self
.public_addresses
.map(|addresses| {
if addresses.is_empty() {
listen_addr.clone()
} else {
addresses
}
})
.unwrap_or(listen_addr.clone());

Ok((
NetworkClient {
retry_ttl: self.config.client_retry_ttl,
Expand All @@ -188,14 +204,8 @@ impl<'a> NetworkBuilder<'a> {
command_receiver,
event_sender,
local_peer_id: peer_id,
listening_on: self
.listen_addr
.take()
.expect("P2P runtime expect a MultiAddr"),
addresses: self
.exposed_addresses
.take()
.expect("P2P runtime expect a MultiAddr"),
listening_on: listen_addr,
public_addresses,
bootstrapped: false,
active_listeners: HashSet::new(),
pending_record_requests: HashMap::new(),
Expand Down
5 changes: 0 additions & 5 deletions crates/topos-p2p/src/runtime/handle_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ impl Runtime {

_ = response.send(connection);
}
Command::StartListening { peer_addr, sender } => {
if sender.send(self.start_listening(peer_addr)).is_err() {
warn!("Unable to notify StartListening response: initiator is dropped");
}
}

Command::ConnectedPeers { sender } => {
if sender
Expand Down
40 changes: 20 additions & 20 deletions crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ pub struct Runtime {
pub(crate) command_receiver: mpsc::Receiver<Command>,
pub(crate) event_sender: mpsc::Sender<Event>,
pub(crate) local_peer_id: PeerId,
pub(crate) listening_on: Multiaddr,
#[allow(unused)]
pub(crate) addresses: Multiaddr,
pub(crate) listening_on: Vec<Multiaddr>,
pub(crate) public_addresses: Vec<Multiaddr>,
pub(crate) bootstrapped: bool,
pub(crate) is_boot_node: bool,

Expand All @@ -46,13 +45,6 @@ mod handle_command;
mod handle_event;

impl Runtime {
fn start_listening(&mut self, peer_addr: Multiaddr) -> Result<(), P2PError> {
self.swarm
.listen_on(peer_addr)
.map(|_| ())
.map_err(Into::into)
}

pub async fn bootstrap(mut self) -> Result<Self, Box<dyn std::error::Error>> {
if self.bootstrapped {
return Err(Box::new(P2PError::BootstrapError(
Expand All @@ -62,16 +54,24 @@ impl Runtime {

self.bootstrapped = true;

self.swarm.add_external_address(self.addresses.clone());
debug!("Added public addresses: {:?}", self.public_addresses);
for address in &self.public_addresses {
self.swarm.add_external_address(address.clone());
}

let addr = self.listening_on.clone();
if let Err(error) = self.swarm.listen_on(addr) {
error!(
"Couldn't start listening on {} because of {error:?}",
self.listening_on
);
let dht_address = self
.public_addresses
.first()
.map(Multiaddr::to_vec)
.ok_or(P2PError::MissingPublicAddresses)?;

return Err(Box::new(error));
debug!("Starting to listen on {:?}", self.listening_on);
for addr in &self.listening_on {
if let Err(error) = self.swarm.listen_on(addr.clone()) {
error!("Couldn't start listening on {} because of {error:?}", addr);

return Err(Box::new(error));
}
}

debug!("Starting a boot node ? {:?}", self.is_boot_node);
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Runtime {
let key = Key::new(&self.local_peer_id.to_string());
addr_query_id = if let Ok(query_id_record) =
self.swarm.behaviour_mut().discovery.inner.put_record(
Record::new(key, self.addresses.to_vec()),
Record::new(key, dht_address.clone()),
Quorum::Majority,
) {
Some(query_id_record)
Expand Down Expand Up @@ -170,7 +170,7 @@ impl Runtime {
let key = Key::new(&self.local_peer_id.to_string());
if let Ok(query_id_record) =
self.swarm.behaviour_mut().discovery.inner.put_record(
Record::new(key, self.addresses.to_vec()),
Record::new(key, dht_address.clone()),
Quorum::Majority,
)
{
Expand Down
12 changes: 6 additions & 6 deletions crates/topos-p2p/src/tests/command/random_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ async fn no_random_peer() {

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.build()
.await
.expect("Unable to create p2p network");
Expand Down Expand Up @@ -46,8 +46,8 @@ async fn return_a_peer() {

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.build()
.await
.expect("Unable to create p2p network");
Expand Down Expand Up @@ -75,8 +75,8 @@ async fn return_a_random_peer_among_100() {

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.exposed_addresses(local.addr.clone())
.listen_addr(local.addr.clone())
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.build()
.await
.expect("Unable to create p2p network");
Expand Down
14 changes: 11 additions & 3 deletions crates/topos-p2p/src/tests/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::StreamExt;
use libp2p::{
kad::{record::Key, KademliaEvent, PutRecordOk, QueryResult, Record},
swarm::SwarmEvent,
Multiaddr,
};
use rstest::rstest;
use test_log::test;
Expand All @@ -23,8 +24,8 @@ async fn put_value_in_dht() {
let (_, _, runtime) = crate::network::builder()
.peer_key(peer_2.keypair.clone())
.known_peers(&[(peer_1.peer_id(), peer_1.addr.clone())])
.exposed_addresses(peer_2.addr.clone())
.listen_addr(peer_2.addr.clone())
.public_addresses(vec![peer_2.addr.clone()])
.listen_addresses(vec![peer_2.addr.clone()])
.minimum_cluster_size(1)
.discovery_config(
DiscoveryConfig::default().with_replication_factor(NonZeroUsize::new(1).unwrap()),
Expand All @@ -40,7 +41,14 @@ async fn put_value_in_dht() {
_ = kad
.inner
.put_record(
Record::new(input_key.clone(), runtime.addresses.to_vec()),
Record::new(
input_key.clone(),
runtime
.public_addresses
.first()
.map(Multiaddr::to_vec)
.unwrap(),
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
),
libp2p::kad::Quorum::One,
)
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub async fn dummy_peer() -> (NetworkClient, PeerAddr) {

let (client, _stream, runtime): (_, _, Runtime) = NetworkBuilder::default()
.peer_key(key)
.listen_addr(addr_dummy.clone())
.exposed_addresses(addr_dummy)
.listen_addresses(vec![addr_dummy.clone()])
.public_addresses(vec![addr_dummy])
.build()
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ pub struct TceConfiguration {
pub api_addr: SocketAddr,
pub graphql_api_addr: SocketAddr,
pub metrics_api_addr: SocketAddr,
pub tce_addr: String,
pub tce_local_port: u16,
pub storage: StorageConfiguration,
pub network_bootstrap_timeout: Duration,
pub minimum_cluster_size: usize,
pub version: &'static str,
pub listen_addresses: Vec<Multiaddr>,
pub public_addresses: Vec<Multiaddr>,
}

#[derive(Debug)]
Expand Down
9 changes: 2 additions & 7 deletions crates/topos-tce/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ pub async fn run(

tracing::Span::current().record("peer_id", &peer_id.to_string());

let external_addr: Multiaddr =
format!("{}/tcp/{}", config.tce_addr, config.tce_local_port).parse()?;

let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", config.tce_local_port).parse()?;

let mut boot_peers = config.boot_peers.clone();
// Remove myself from the bootnode list
boot_peers.retain(|(p, _)| *p != peer_id);
Expand Down Expand Up @@ -142,9 +137,9 @@ pub async fn run(

let (network_client, event_stream, unbootstrapped_runtime) = topos_p2p::network::builder()
.peer_key(key)
.listen_addr(addr)
.listen_addresses(config.listen_addresses.clone())
.minimum_cluster_size(config.minimum_cluster_size)
.exposed_addresses(external_addr)
.public_addresses(config.public_addresses.clone())
.known_peers(&boot_peers)
.grpc_context(grpc_context)
.build()
Expand Down
37 changes: 36 additions & 1 deletion crates/topos-test-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@ pub mod sequencer;
pub mod storage;
pub mod tce;

use std::{collections::HashSet, net::SocketAddr, sync::Mutex};
use std::{
collections::HashSet,
net::SocketAddr,
path::PathBuf,
str::FromStr,
sync::Mutex,
thread,
time::{SystemTime, UNIX_EPOCH},
};

use lazy_static::lazy_static;
use rand::Rng;
use rstest::fixture;

lazy_static! {
pub static ref PORT_MAPPING: Mutex<HashSet<u16>> = Mutex::new(HashSet::new());
Expand Down Expand Up @@ -92,3 +102,28 @@ fn next_available_port() -> SocketAddr {

addr
}

#[fixture]
fn folder_name() -> &'static str {
Box::leak(Box::new(
thread::current().name().unwrap().replace("::", "_"),
))
}

#[fixture]
pub fn create_folder(folder_name: &str) -> PathBuf {
let dir = env!("TOPOS_TEST_SDK_TMP");
let mut temp_dir =
std::path::PathBuf::from_str(dir).expect("Unable to read CARGO_TARGET_TMPDIR");
let time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let mut rng = rand::thread_rng();

temp_dir.push(format!(
"{}/data_{}_{}",
folder_name,
time.as_nanos(),
rng.gen::<u64>()
));

temp_dir
}
Loading
Loading