Skip to content

Commit

Permalink
Rework config structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Apr 1, 2022
1 parent e4e06b9 commit 48deb12
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 114 deletions.
14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ futures-lite = "1.11.3"
futures-channel = "0.3.13"
log = "0.4.14"
futures = "0.3.13"
async-trait = "0.1.42"
async-compat = "0.1.0"
multicast-socket = "0.2.1"
hex = "0.4.3"
pretty-hash = "0.4.1"
hyperswarm-dht = { git = "https://github.com/Frando/hyperswarm-dht.git", branch = "hyperspace" }
colmeia-hyperswarm-mdns = { git = "https://github.com/bltavares/colmeia.git", rev = "e92ab71981356197a21592b7ce6854e209582985" }
libutp-rs = { git = "https://github.com/Frando/libutp-rs.git", branch = "feat/clone", optional = true }
hyperswarm-dht = { git = "https://github.com/datrs/hyperswarm-dht.git", branch = "hyperspace" }
# hyperswarm-dht = { path = "../hyperswarm-dht" }
colmeia-hyperswarm-mdns = { git = "https://github.com/bltavares/colmeia.git", rev = "53761799f7a9ee123875534e0108d7483a117885" }
libutp-rs = { git = "https://github.com/Frando/libutp-rs.git", branch = "feat/clone2", optional = true }
async-trait = "0.1.53"
async-compat = "0.2.1"

[dev-dependencies]
env_logger = "0.8.3"
async-std = { version = "1.9.0", features = ["unstable", "attributes"] }
clap = "3.0.0-beta.2"
rand = "0.8.3"
blake2-rfc = "0.2.18"
anyhow = "1.0.56"
clap = { version = "3.1.7", features = ["derive"] }

# [patch.crates-io]
# hyperswarm-dht = { path = "../hyperswarm-dht" }
Expand Down
84 changes: 64 additions & 20 deletions src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,75 @@
use async_std::net::ToSocketAddrs;
use async_std::stream::StreamExt;
use async_std::task::JoinHandle;
use hyperswarm_dht::{DhtConfig, HyperDht};
use log::*;
use std::io;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs};

use hyperswarm_dht::{DhtConfig, HyperDht};
use crate::util::to_socket_addr;

pub const DEFAULT_DHT_BIND_ADDR: &str = "0.0.0.0:49737";

pub async fn run_bootstrap_node<A: ToSocketAddrs>(
local_addr: Option<A>,
addr: Option<A>,
) -> io::Result<(SocketAddr, JoinHandle<io::Result<()>>)> {
let config = DhtConfig::default()
.empty_bootstrap_nodes()
.set_ephemeral(false);
let config = if let Some(addr) = local_addr {
config.bind(addr).await.map_err(|(_, e)| e)?
let node = if let Some(addr) = addr {
BootstrapNode::with_addr(addr)?
} else {
config
BootstrapNode::default()
};
let mut bs = HyperDht::with_config(config).await?;
let addr = bs.local_addr()?;
debug!("Running DHT on address: {}", addr);
let task = async_std::task::spawn(async move {
loop {
let event = bs.next().await;
trace!("[bootstrap node] event {:?}", event);
}
});
Ok((addr, task))
node.run().await
}

#[derive(Debug)]
pub struct BootstrapNode {
config: DhtConfig,
addr: Option<SocketAddr>,
}

impl Default for BootstrapNode {
fn default() -> Self {
Self::with_addr(DEFAULT_DHT_BIND_ADDR).unwrap()
}
}

impl BootstrapNode {
pub fn new(config: DhtConfig, addr: Option<SocketAddr>) -> Self {
Self { config, addr }
}

pub fn with_addr<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
let addr = to_socket_addr(addr)?;
let config = DhtConfig::default()
.empty_bootstrap_nodes()
.set_ephemeral(false);
Ok(Self::new(config, Some(addr)))
}

pub fn with_config<A: ToSocketAddrs>(config: DhtConfig, addr: Option<A>) -> io::Result<Self> {
let addr = if let Some(addr) = addr {
to_socket_addr(addr)?
} else {
to_socket_addr(DEFAULT_DHT_BIND_ADDR)?
};
Ok(Self::new(config, Some(addr)))
}

pub async fn run(self) -> io::Result<(SocketAddr, JoinHandle<io::Result<()>>)> {
let Self { addr, config } = self;
let addr = addr.unwrap_or(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
49737,
));
let config = config.bind(addr).await.map_err(|(_, e)| e)?;
let mut bs = HyperDht::with_config(config).await?;
let addr = bs.local_addr()?;
debug!("Running DHT on address: {}", addr);
let task = async_std::task::spawn(async move {
loop {
let event = bs.next().await;
trace!("[bootstrap node] event {:?}", event);
}
});
Ok((addr, task))
}
}
56 changes: 49 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,61 @@
use std::net::SocketAddr;
use std::net::{SocketAddr, ToSocketAddrs};

#[derive(Debug, Default, Clone)]
use crate::discovery::dht::DhtConfig;
use crate::discovery::mdns::MdnsConfig;

#[derive(Debug, Default)]
pub struct Config {
pub bootstrap: Option<Vec<SocketAddr>>,
pub ephemeral: bool,
pub mdns: Option<MdnsConfig>,
pub dht: Option<DhtConfig>,
pub local_addr: Option<SocketAddr>,
}

impl Config {
pub fn set_bootstrap_nodes(mut self, nodes: Option<Vec<SocketAddr>>) -> Self {
self.bootstrap = nodes;
pub fn new() -> Self {
Self::default()
}

pub fn all() -> Self {
Self {
mdns: Some(MdnsConfig::default()),
dht: Some(DhtConfig::default()),
local_addr: None,
}
}

pub fn with_defaults(mut self) -> Self {
self.mdns = self.mdns.or_else(|| Default::default());
self.dht = self.dht.or_else(|| Default::default());
self
}

/// Set DHT config
pub fn set_dht(mut self, config: DhtConfig) -> Self {
self.dht = Some(config);
self
}

/// Set MDNS config
pub fn set_mdns(mut self, config: MdnsConfig) -> Self {
self.mdns = Some(config);
self
}

/// Set local address bind TCP and UTP sockets for incoming connections on
pub fn set_local_addr(mut self, address: SocketAddr) -> Self {
self.local_addr = Some(address);
self
}

// shortcuts, not sure if this is good style?

pub fn set_bootstrap_nodes<T: ToSocketAddrs>(mut self, addresses: &[T]) -> Self {
self.dht = Some(self.dht.unwrap_or_default().set_bootstrap_nodes(addresses));
self
}

pub fn set_ephemeral(mut self, ephemeral: bool) -> Self {
self.ephemeral = ephemeral;
self.dht = Some(self.dht.unwrap_or_default().set_ephemeral(ephemeral));
self
}
}
Expand Down
11 changes: 3 additions & 8 deletions src/discovery/combined.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use async_std::stream::Stream;
use log::*;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -16,22 +15,20 @@ pub struct CombinedDiscovery {
}

impl CombinedDiscovery {
pub async fn bind(local_port: u16, config: Config) -> io::Result<Self> {
let mdns = MdnsDiscovery::bind(local_port, config.clone()).await?;
let dht = DhtDiscovery::bind(local_port, config).await?;
pub async fn bind(config: Config, announce_port: u16) -> io::Result<Self> {
let mdns = MdnsDiscovery::bind(config.mdns.unwrap_or_default(), announce_port).await?;
let dht = DhtDiscovery::bind(config.dht.unwrap_or_default(), announce_port).await?;
Ok(Self { mdns, dht })
}
}

impl Discovery for CombinedDiscovery {
fn lookup(&mut self, topic: Topic) {
debug!("lookup topic {}", hex::encode(topic));
self.mdns.lookup(topic);
self.dht.lookup(topic);
}

fn announce(&mut self, topic: Topic) {
debug!("announce topic {}", hex::encode(topic));
self.mdns.announce(topic);
self.dht.announce(topic);
}
Expand All @@ -43,12 +40,10 @@ impl Stream for CombinedDiscovery {
let this = self.get_mut();
let next = Pin::new(&mut this.dht).poll_next(cx);
if next.is_ready() {
debug!("Found on DHT: {:?}", next);
return next;
}
let next = Pin::new(&mut this.mdns).poll_next(cx);
if next.is_ready() {
debug!("Found on MDNS: {:?}", next);
return next;
}
Poll::Pending
Expand Down
27 changes: 10 additions & 17 deletions src/discovery/dht.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use async_std::stream::Stream;
use futures_lite::ready;
use hyperswarm_dht::{DhtConfig, HyperDht, HyperDhtEvent, QueryOpts};
use hyperswarm_dht::{HyperDht, HyperDhtEvent, QueryOpts};
use log::*;
use std::collections::VecDeque;
use std::fmt;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::config::Config;

use super::{Discovery, DiscoveryMethod, PeerInfo, Topic};

pub use hyperswarm_dht::DhtConfig;

// #[derive(Debug)]
pub struct DhtDiscovery {
state: HyperDht,
bootstrapped: bool,
local_port: u16,
announce_port: u16,
pending_commands: VecDeque<Command>,
pending_events: VecDeque<PeerInfo>,
}
Expand All @@ -25,7 +25,7 @@ impl fmt::Debug for DhtDiscovery {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DhtDiscovery")
.field("bootstrapped", &self.bootstrapped)
.field("local_port", &self.local_port)
.field("announce_port", &self.announce_port)
.finish()
}
}
Expand All @@ -37,18 +37,11 @@ enum Command {
}

impl DhtDiscovery {
pub async fn bind(local_port: u16, config: Config) -> io::Result<Self> {
let dht_config = DhtConfig::default();
let dht_config = if let Some(bootstrap) = config.bootstrap.as_ref() {
dht_config.set_bootstrap_nodes(bootstrap)
} else {
dht_config
};
let dht_config = dht_config.set_ephemeral(config.ephemeral);
let state = HyperDht::with_config(dht_config).await?;
pub async fn bind(config: DhtConfig, announce_port: u16) -> io::Result<Self> {
let state = HyperDht::with_config(config).await?;
let this = Self {
state,
local_port,
announce_port,
bootstrapped: false,
pending_commands: VecDeque::new(),
pending_events: VecDeque::new(),
Expand All @@ -70,7 +63,7 @@ impl Discovery for DhtDiscovery {
fn lookup(&mut self, topic: Topic) {
let opts = QueryOpts {
topic: topic.into(),
port: Some(self.local_port as u32),
port: Some(self.announce_port as u32),
local_addr: None,
};
self.pending_commands.push_back(Command::Lookup(opts))
Expand All @@ -79,7 +72,7 @@ impl Discovery for DhtDiscovery {
fn announce(&mut self, topic: Topic) {
let opts = QueryOpts {
topic: topic.into(),
port: Some(self.local_port as u32),
port: Some(self.announce_port as u32),
local_addr: None,
};
self.pending_commands.push_back(Command::Announce(opts))
Expand Down
52 changes: 44 additions & 8 deletions src/discovery/mdns.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use async_std::channel;
use async_std::stream::Stream;
use async_std::task::{Context, Poll};
use colmeia_hyperswarm_mdns::{self_id, Announcer, Locator};
use colmeia_hyperswarm_mdns::{Announcer, Locator};
use futures_lite::ready;
// use log::*;
use std::convert::TryInto;
use std::fmt;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::time::Duration;

use crate::Config;
use crate::IdBytes;

use super::{Discovery, DiscoveryMethod, PeerInfo, Topic};

Expand All @@ -28,6 +27,44 @@ mod socket {
}
}

#[derive(Debug, Clone, Default)]
pub struct MdnsConfig {
// pub socket: Option<MulticastSocket>,
pub id: Option<IdBytes>,
pub lookup_interval: Duration,
pub announce_port: Option<u16>,
}

impl MdnsConfig {
pub fn new() -> Self {
Self {
lookup_interval: Duration::from_secs(60),
id: None,
announce_port: None,
}
}

// pub fn set_socket(self, socket: MulticastSocket) -> Self {
// self.socket = Some(socket);
// self
// }

pub fn set_id(mut self, id: IdBytes) -> Self {
self.id = Some(id);
self
}

pub fn set_lookup_interval(mut self, interval: Duration) -> Self {
self.lookup_interval = interval;
self
}

pub fn set_announce_port(mut self, port: u16) -> Self {
self.announce_port = Some(port);
self
}
}

enum Command {
Lookup(Topic),
Announce(Topic),
Expand All @@ -52,13 +89,12 @@ impl fmt::Debug for MdnsDiscovery {
}

impl MdnsDiscovery {
pub async fn bind(local_port: u16, _config: Config) -> io::Result<Self> {
let self_id = self_id();
pub async fn bind(config: MdnsConfig, announce_port: u16) -> io::Result<Self> {
let self_id = config.id.unwrap_or_else(|| IdBytes::random());
let socket = socket::create()?;
let lookup_interval = Duration::from_secs(60);
let locator = Locator::listen(socket, lookup_interval, self_id.as_bytes());
let locator = Locator::listen(socket, config.lookup_interval, &self_id.0);
let socket = socket::create()?;
let announcer = Announcer::listen(socket, local_port, self_id.clone());
let announcer = Announcer::listen(socket, announce_port, hex::encode(&self_id.0));
let (pending_commands_tx, pending_commands_rx) = channel::unbounded();
Ok(Self {
locator,
Expand Down
Loading

0 comments on commit 48deb12

Please sign in to comment.