From e42420d9f6253b7157de6c0a7182d90eeb845cfd Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 27 Aug 2024 10:12:56 -0400 Subject: [PATCH 1/3] fix(net): Allow custom noise configuration --- crates/net/src/builder.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/crates/net/src/builder.rs b/crates/net/src/builder.rs index 89da688..39a555d 100644 --- a/crates/net/src/builder.rs +++ b/crates/net/src/builder.rs @@ -33,8 +33,8 @@ pub struct NetworkDriverBuilder { keypair: Option, /// The [TcpConfig] for the swarm. tcp_config: Option, - // /// The [NoiseConfig] for the swarm. - // noise_config: Option, + /// The [NoiseConfig] for the swarm. + noise_config: Option, /// The [YamuxConfig] for the swarm. yamux_config: Option, } @@ -75,11 +75,11 @@ impl NetworkDriverBuilder { self } - // /// Specifies the [NoiseConfig] for the swarm. - // pub fn with_noise_config(&mut self, noise_config: NoiseConfig) -> &mut Self { - // self.noise_config = Some(noise_config); - // self - // } + /// Specifies the [NoiseConfig] for the swarm. + pub fn with_noise_config(&mut self, noise_config: NoiseConfig) -> &mut Self { + self.noise_config = Some(noise_config); + self + } /// Specifies the [YamuxConfig] for the swarm. pub fn with_yamux_config(&mut self, yamux_config: YamuxConfig) -> &mut Self { @@ -111,12 +111,18 @@ impl NetworkDriverBuilder { let behaviour = Behaviour::new(config, &[Box::new(handler.clone())])?; // Build the swarm. + let noise_config = self.noise_config.take(); let keypair = self.keypair.take().unwrap_or(Keypair::generate_secp256k1()); let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp(self.tcp_config.take().unwrap_or_default(), NoiseConfig::new, || { - self.yamux_config.take().unwrap_or_default() - })? + .with_tcp( + self.tcp_config.take().unwrap_or_default(), + |i: &Keypair| match noise_config { + Some(cfg) => core::result::Result::Ok(cfg), + None => NoiseConfig::new(i), + }, + || self.yamux_config.take().unwrap_or_default(), + )? .with_behaviour(|_| behaviour)? .build(); let addr = self.socket.take().ok_or_else(|| eyre::eyre!("socket address not set"))?; From 338b5546204144e41f62a2283c317fd5b7ed1d95 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 27 Aug 2024 10:38:56 -0400 Subject: [PATCH 2/3] feat(net): refactor gossip driver --- crates/net/src/builder.rs | 10 ++------ crates/net/src/driver.rs | 37 +++++---------------------- crates/net/src/gossip/driver.rs | 45 +++++++++++++++++++++++++++++---- 3 files changed, 49 insertions(+), 43 deletions(-) diff --git a/crates/net/src/builder.rs b/crates/net/src/builder.rs index 39a555d..a84e349 100644 --- a/crates/net/src/builder.rs +++ b/crates/net/src/builder.rs @@ -128,18 +128,12 @@ impl NetworkDriverBuilder { let addr = self.socket.take().ok_or_else(|| eyre::eyre!("socket address not set"))?; let addr = NetworkAddress::try_from(addr)?; let swarm_addr = Multiaddr::from(addr); - let swarm = GossipDriver::new(swarm, swarm_addr); + let gossip = GossipDriver::new(swarm, swarm_addr, handler); // Build the discovery service let discovery = DiscoveryBuilder::new().with_address(addr).with_chain_id(chain_id).build()?; - Ok(NetworkDriver { - unsafe_block_recv, - unsafe_block_signer_sender, - handler, - swarm, - discovery, - }) + Ok(NetworkDriver { unsafe_block_recv, unsafe_block_signer_sender, gossip, discovery }) } } diff --git a/crates/net/src/driver.rs b/crates/net/src/driver.rs index 9d6e98f..c14e1db 100644 --- a/crates/net/src/driver.rs +++ b/crates/net/src/driver.rs @@ -1,18 +1,11 @@ //! Driver for network services. use crate::{ - builder::NetworkDriverBuilder, - discovery::driver::DiscoveryDriver, - gossip::{ - driver::GossipDriver, - event::Event, - handler::{BlockHandler, Handler}, - }, - types::envelope::ExecutionPayloadEnvelope, + builder::NetworkDriverBuilder, discovery::driver::DiscoveryDriver, + gossip::driver::GossipDriver, types::envelope::ExecutionPayloadEnvelope, }; use alloy::primitives::Address; use eyre::Result; -use libp2p::swarm::SwarmEvent; use std::sync::mpsc::Receiver; use tokio::{select, sync::watch}; @@ -27,10 +20,8 @@ pub struct NetworkDriver { pub unsafe_block_recv: Receiver, /// Channel to send unsafe signer updates. pub unsafe_block_signer_sender: watch::Sender
, - /// Block handler. - pub handler: BlockHandler, /// The swarm instance. - pub swarm: GossipDriver, + pub gossip: GossipDriver, /// The discovery service driver. pub discovery: DiscoveryDriver, } @@ -45,29 +36,15 @@ impl NetworkDriver { /// and continually listens for new peers and messages to handle pub fn start(mut self) -> Result<()> { let mut peer_recv = self.discovery.start()?; - self.swarm.listen()?; + self.gossip.listen()?; tokio::spawn(async move { loop { select! { peer = peer_recv.recv() => { - if let Some(peer) = peer { - _ = self.swarm.dial(peer); - } + self.gossip.dial_opt(peer).await; }, - event = self.swarm.select_next_some() => { - if let SwarmEvent::Behaviour(Event::Gossipsub(libp2p::gossipsub::Event::Message { - propagation_source: src, - message_id: id, - message, - })) = event { - if self.handler.topics().contains(&message.topic) { - let status = self.handler.handle(message); - _ = self.swarm - .behaviour_mut() - .gossipsub - .report_message_validation_result(&id, &src, status); - } - } + event = self.gossip.select_next_some() => { + self.gossip.handle_event(event); }, } } diff --git a/crates/net/src/gossip/driver.rs b/crates/net/src/gossip/driver.rs index 247ba92..856cd6d 100644 --- a/crates/net/src/gossip/driver.rs +++ b/crates/net/src/gossip/driver.rs @@ -1,9 +1,13 @@ //! Consensus-layer gossipsub driver for Optimism. -use crate::gossip::{behaviour::Behaviour, event::Event}; +use crate::gossip::{ + behaviour::Behaviour, + event::Event, + handler::{BlockHandler, Handler}, +}; use eyre::Result; use futures::stream::StreamExt; -use libp2p::{Multiaddr, Swarm}; +use libp2p::{swarm::SwarmEvent, Multiaddr, Swarm}; /// A [libp2p::Swarm] instance with an associated address to listen on. pub struct GossipDriver { @@ -11,12 +15,14 @@ pub struct GossipDriver { pub swarm: Swarm, /// The address to listen on. pub addr: Multiaddr, + /// Block handler. + pub handler: BlockHandler, } impl GossipDriver { /// Creates a new [GossipDriver] instance. - pub fn new(swarm: Swarm, addr: Multiaddr) -> Self { - Self { swarm, addr } + pub fn new(swarm: Swarm, addr: Multiaddr, handler: BlockHandler) -> Self { + Self { swarm, addr, handler } } /// Listens on the address. @@ -31,14 +37,43 @@ impl GossipDriver { } /// Attempts to select the next event from the Swarm. - pub async fn select_next_some(&mut self) -> libp2p::swarm::SwarmEvent { + pub async fn select_next_some(&mut self) -> SwarmEvent { self.swarm.select_next_some().await } + /// Dials the given [Option]. + pub async fn dial_opt(&mut self, peer: Option>) { + let Some(addr) = peer else { + return; + }; + if let Err(e) = self.dial(addr).await { + tracing::error!("Failed to dial peer: {:?}", e); + } + } + /// Dials the given [Multiaddr]. pub async fn dial(&mut self, peer: impl Into) -> Result<()> { let addr: Multiaddr = peer.into(); self.swarm.dial(addr).map_err(|_| eyre::eyre!("dial failed"))?; Ok(()) } + + /// Handles the [`SwarmEvent`]. + pub fn handle_event(&mut self, event: SwarmEvent) { + if let SwarmEvent::Behaviour(Event::Gossipsub(libp2p::gossipsub::Event::Message { + propagation_source: src, + message_id: id, + message, + })) = event + { + if self.handler.topics().contains(&message.topic) { + let status = self.handler.handle(message); + _ = self + .swarm + .behaviour_mut() + .gossipsub + .report_message_validation_result(&id, &src, status); + } + } + } } From dbbd049fd2ef8f2fdc406eb135d260b169378ca1 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 27 Aug 2024 10:41:59 -0400 Subject: [PATCH 3/3] fix(net): doc lints --- crates/net/src/gossip/driver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/src/gossip/driver.rs b/crates/net/src/gossip/driver.rs index 856cd6d..9418d21 100644 --- a/crates/net/src/gossip/driver.rs +++ b/crates/net/src/gossip/driver.rs @@ -41,7 +41,7 @@ impl GossipDriver { self.swarm.select_next_some().await } - /// Dials the given [Option]. + /// Dials the given [`Option`]. pub async fn dial_opt(&mut self, peer: Option>) { let Some(addr) = peer else { return;