Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net): Refactor GossipDriver #47

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions crates/net/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,12 @@ impl NetworkDriverBuilder {
let addr = self.socket.take().ok_or_else(|| eyre::eyre!("socket address not set"))?;
let addr = NetworkAddress::try_from(addr)?;
let swarm_addr = Multiaddr::from(addr);
let swarm = GossipDriver::new(swarm, swarm_addr);
let gossip = GossipDriver::new(swarm, swarm_addr, handler);

// Build the discovery service
let discovery =
DiscoveryBuilder::new().with_address(addr).with_chain_id(chain_id).build()?;

Ok(NetworkDriver {
unsafe_block_recv,
unsafe_block_signer_sender,
handler,
swarm,
discovery,
})
Ok(NetworkDriver { unsafe_block_recv, unsafe_block_signer_sender, gossip, discovery })
}
}
37 changes: 7 additions & 30 deletions crates/net/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
//! Driver for network services.

use crate::{
builder::NetworkDriverBuilder,
discovery::driver::DiscoveryDriver,
gossip::{
driver::GossipDriver,
event::Event,
handler::{BlockHandler, Handler},
},
types::envelope::ExecutionPayloadEnvelope,
builder::NetworkDriverBuilder, discovery::driver::DiscoveryDriver,
gossip::driver::GossipDriver, types::envelope::ExecutionPayloadEnvelope,
};
use alloy::primitives::Address;
use eyre::Result;
use libp2p::swarm::SwarmEvent;
use std::sync::mpsc::Receiver;
use tokio::{select, sync::watch};

Expand All @@ -27,10 +20,8 @@ pub struct NetworkDriver {
pub unsafe_block_recv: Receiver<ExecutionPayloadEnvelope>,
/// Channel to send unsafe signer updates.
pub unsafe_block_signer_sender: watch::Sender<Address>,
/// Block handler.
pub handler: BlockHandler,
/// The swarm instance.
pub swarm: GossipDriver,
pub gossip: GossipDriver,
/// The discovery service driver.
pub discovery: DiscoveryDriver,
}
Expand All @@ -45,29 +36,15 @@ impl NetworkDriver {
/// and continually listens for new peers and messages to handle
pub fn start(mut self) -> Result<()> {
let mut peer_recv = self.discovery.start()?;
self.swarm.listen()?;
self.gossip.listen()?;
tokio::spawn(async move {
loop {
select! {
peer = peer_recv.recv() => {
if let Some(peer) = peer {
_ = self.swarm.dial(peer);
}
self.gossip.dial_opt(peer).await;
},
event = self.swarm.select_next_some() => {
if let SwarmEvent::Behaviour(Event::Gossipsub(libp2p::gossipsub::Event::Message {
propagation_source: src,
message_id: id,
message,
})) = event {
if self.handler.topics().contains(&message.topic) {
let status = self.handler.handle(message);
_ = self.swarm
.behaviour_mut()
.gossipsub
.report_message_validation_result(&id, &src, status);
}
}
event = self.gossip.select_next_some() => {
self.gossip.handle_event(event);
},
}
}
Expand Down
45 changes: 40 additions & 5 deletions crates/net/src/gossip/driver.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
//! Consensus-layer gossipsub driver for Optimism.

use crate::gossip::{behaviour::Behaviour, event::Event};
use crate::gossip::{
behaviour::Behaviour,
event::Event,
handler::{BlockHandler, Handler},
};
use eyre::Result;
use futures::stream::StreamExt;
use libp2p::{Multiaddr, Swarm};
use libp2p::{swarm::SwarmEvent, Multiaddr, Swarm};

/// A [libp2p::Swarm] instance with an associated address to listen on.
pub struct GossipDriver {
/// The [libp2p::Swarm] instance.
pub swarm: Swarm<Behaviour>,
/// The address to listen on.
pub addr: Multiaddr,
/// Block handler.
pub handler: BlockHandler,
}

impl GossipDriver {
/// Creates a new [GossipDriver] instance.
pub fn new(swarm: Swarm<Behaviour>, addr: Multiaddr) -> Self {
Self { swarm, addr }
pub fn new(swarm: Swarm<Behaviour>, addr: Multiaddr, handler: BlockHandler) -> Self {
Self { swarm, addr, handler }
}

/// Listens on the address.
Expand All @@ -31,14 +37,43 @@ impl GossipDriver {
}

/// Attempts to select the next event from the Swarm.
pub async fn select_next_some(&mut self) -> libp2p::swarm::SwarmEvent<Event> {
pub async fn select_next_some(&mut self) -> SwarmEvent<Event> {
self.swarm.select_next_some().await
}

/// Dials the given [`Option<Multiaddr>`].
pub async fn dial_opt(&mut self, peer: Option<impl Into<Multiaddr>>) {
let Some(addr) = peer else {
return;
};
if let Err(e) = self.dial(addr).await {
tracing::error!("Failed to dial peer: {:?}", e);
}
}

/// Dials the given [Multiaddr].
pub async fn dial(&mut self, peer: impl Into<Multiaddr>) -> Result<()> {
let addr: Multiaddr = peer.into();
self.swarm.dial(addr).map_err(|_| eyre::eyre!("dial failed"))?;
Ok(())
}

/// Handles the [`SwarmEvent<Event>`].
pub fn handle_event(&mut self, event: SwarmEvent<Event>) {
if let SwarmEvent::Behaviour(Event::Gossipsub(libp2p::gossipsub::Event::Message {
propagation_source: src,
message_id: id,
message,
})) = event
{
if self.handler.topics().contains(&message.topic) {
let status = self.handler.handle(message);
_ = self
.swarm
.behaviour_mut()
.gossipsub
.report_message_validation_result(&id, &src, status);
}
}
}
}