Skip to content

Commit

Permalink
initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Feb 26, 2021
1 parent 655e752 commit bc5f585
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 2 deletions.
11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,16 @@ authors = [
[features]

[dependencies]
hyperswarm-dht = { git = "https://github.com/Frando/hyperswarm-dht.git", branch = "hyperspace" }
async-std = { version = "1.9.0", features = ["unstable"] }
futures-lite = "1.11.3"
futures-channel = "0.3.13"
log = "0.4.14"
futures = "0.3.13"

[dev-dependencies]
env_logger = "0.8.3"
async-std = { version = "1.9.0", features = ["unstable", "attributes"] }

[patches.crates-io]
hyperswarm-dht = { path = "../hyperswarm-dht" }
92 changes: 92 additions & 0 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::stream::StreamExt;
use async_std::task;
// use log::*;
// use std::convert::TryFrom;
// use std::net::SocketAddr;

use hyperswarm::{bootstrap_dht, Hyperswarm, IdBytes, JoinOpts, SwarmEvent};
use hyperswarm_dht::DhtConfig;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let bs_addr = "localhost:6060";
let bs_addr = bootstrap_dht(Some(bs_addr)).await?;

let config1 = DhtConfig::default().set_bootstrap_nodes(&[bs_addr]);
let config2 = DhtConfig::default().set_bootstrap_nodes(&[bs_addr]);

let mut swarm1 = Hyperswarm::with_config(config1).await?;
let mut swarm2 = Hyperswarm::with_config(config2).await?;

let cmd1 = swarm1.commands();
let cmd2 = swarm2.commands();

let task1 = task::spawn(async move {
while let Some(event) = swarm1.next().await {
match event {
SwarmEvent::Connection(stream) => on_connection(stream, "rust1".into()),
_ => {}
}
}
});

let task2 = task::spawn(async move {
while let Some(event) = swarm2.next().await {
match event {
SwarmEvent::Connection(stream) => on_connection(stream, "rust2".into()),
_ => {}
}
}
});

let topic = IdBytes::from([0u8; 32]);
let opts = JoinOpts {
announce: true,
lookup: true,
};

cmd1.join(topic.clone(), opts.clone());
cmd2.join(topic.clone(), opts.clone());

task1.await;
task2.await;

Ok(())
}

fn on_connection(mut stream: TcpStream, local_name: String) {
task::spawn(async move {
stream
.write_all(format!("hi from {}", local_name).as_bytes())
.await
.unwrap();
let mut buf = vec![0u8; 100];
loop {
match stream.read(&mut buf).await {
Ok(n) if n > 0 => {
let text = String::from_utf8(buf[..n].to_vec()).unwrap();
eprintln!("[{}] read: {}", local_name, text);
}
Ok(_) => {
eprintln!("[{}]: connection closed", local_name);
break;
}
Err(e) => {
eprintln!("[{}]: error: {}", local_name, e);
break;
}
}
}
});
}

// async fn timeout(ms: u64) {
// let _ = async_std::future::timeout(
// std::time::Duration::from_millis(ms),
// futures::future::pending::<()>(),
// )
// .await;
// }
31 changes: 31 additions & 0 deletions src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use async_std::net::ToSocketAddrs;
use async_std::stream::StreamExt;
use log::*;
use std::net::SocketAddr;

use hyperswarm_dht::{DhtConfig, HyperDht};

pub async fn bootstrap_dht<A: ToSocketAddrs>(local_addr: Option<A>) -> std::io::Result<SocketAddr> {
let config = DhtConfig::default()
.empty_bootstrap_nodes()
.set_ephemeral(false);
let config = if let Some(addr) = local_addr {
config.bind(addr).await.map_err(|(_, e)| e)?
} else {
config
};
let mut bs = HyperDht::with_config(config).await?;
let addr = bs.local_addr()?;
debug!("Running DHT on address: {}", addr);
async_std::task::spawn(async move {
loop {
bs.next().await;
}
// loop {
// process each incoming message
// let _event = bs.next().await;
// debug!("bootstrap event: {:?}", event);
// }
});
Ok(addr)
}
230 changes: 230 additions & 0 deletions src/hyperswarm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use async_std::channel;
use async_std::net::{TcpListener, TcpStream};
// use async_std::prelude::*;
use async_std::stream::StreamExt;
// use async_std::sync::{Arc, Mutex};
use async_std::task;
use async_std::task::JoinHandle;
use futures::future::FutureExt;
// use futures_channel::oneshot;
use futures_lite::{future, Stream};
use log::*;
// use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

use hyperswarm_dht::{DhtConfig, HyperDht, HyperDhtEvent, IdBytes, Lookup, Peers, QueryOpts};

#[derive(Debug)]
pub enum SwarmEvent {
Bootstrapped,
Connection(TcpStream),
}

#[derive(Debug)]
pub enum Command {
Join(IdBytes, JoinOpts),
}

#[derive(Debug, Clone, PartialEq)]
pub struct JoinOpts {
pub lookup: bool,
pub announce: bool,
}

#[derive(Debug, Clone)]
pub struct QueryResult {
peers: Vec<Peers>,
topic: IdBytes,
}

#[derive(Debug)]
pub struct Hyperswarm {
task: Option<JoinHandle<io::Result<()>>>,
command_tx: channel::Sender<Command>,
events_rx: channel::Receiver<SwarmEvent>,
}

impl Hyperswarm {
pub fn with_state(state: HyperDht) -> Self {
let (command_tx, command_rx) = channel::unbounded();
let (events_tx, events_rx) = channel::unbounded();
let task = task::spawn(run_loop(state, command_rx, events_tx));
Self {
events_rx,
command_tx,
task: Some(task),
}
}

pub async fn with_config(config: DhtConfig) -> io::Result<Self> {
let state = HyperDht::with_config(config).await?;
Ok(Self::with_state(state))
}

pub fn commands(&self) -> SwarmCommands {
SwarmCommands(self.command_tx.clone())
}
}

impl Stream for Hyperswarm {
type Item = SwarmEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.events_rx).poll_next(cx)
}
}

#[derive(Debug)]
pub struct SwarmCommands(channel::Sender<Command>);

impl SwarmCommands {
pub fn join(&self, topic: IdBytes, opts: JoinOpts) {
self.0.try_send(Command::Join(topic, opts)).unwrap()
}
}

async fn run_loop(
mut state: HyperDht,
mut command_rx: channel::Receiver<Command>,
events_tx: channel::Sender<SwarmEvent>,
) -> io::Result<()> {
enum Event {
Dht(HyperDhtEvent),
Command(Command),
};

let (connect_tx, connect_rx) = channel::unbounded();
let connect_task = task::spawn(connect_loop(connect_rx, events_tx.clone()));
// TODO: Allow to configure local port on which to accept peer connections.
let (local_addr, accept_task) = accept_task(None, events_tx.clone()).await?;
let local_port = local_addr.port() as u32;

// wait for bootstrap event first.
wait_for_bootstrap(&mut state).await?;
events_tx.try_send(SwarmEvent::Bootstrapped).unwrap();

while let Some(event) = future::race(
state.next().map(|e| e.map(Event::Dht)),
command_rx.next().map(|e| e.map(Event::Command)),
)
.await
{
match event {
Event::Dht(event) => {
debug!("swarm event: {:?}", event);
match event {
HyperDhtEvent::Bootstrapped { .. } => {
// handled above, may not occur again?
unreachable!("received second bootstrap event");
}
HyperDhtEvent::AnnounceResult { .. } => {}
HyperDhtEvent::LookupResult { lookup, .. } => {
connect_tx.try_send(lookup).unwrap();
}
HyperDhtEvent::UnAnnounceResult { .. } => {}
_ => {}
}
}
Event::Command(command) => {
debug!("swarm command: {:?}", command);
match command {
Command::Join(topic, join_opts) => {
let opts = QueryOpts {
topic,
port: Some(local_port),
local_addr: None,
};
if join_opts.announce {
state.announce(opts.clone());
}
if join_opts.lookup {
state.lookup(opts);
}
}
}
}
}
}

connect_task.await;
accept_task.await?;

Ok(())
}

async fn connect_loop(
mut connect_rx: channel::Receiver<Lookup>,
events_tx: channel::Sender<SwarmEvent>,
) {
while let Some(lookup) = connect_rx.next().await {
let peers = lookup.remotes();
// TODO: Connect over utp if tcp fails.
for addr in peers {
debug!("Connecting to peer {}", addr);
let tcp_socket = TcpStream::connect(addr).await;
// TODO: Also connect via UTP.
// .race(UtpStream::connect(addr));
match tcp_socket {
Ok(stream) => {
debug!("Connected to peer {}", addr);
events_tx
.send(SwarmEvent::Connection(stream))
.await
.unwrap();
}
Err(err) => {
error!("Error connecting to peer {}: {}", addr, err);
}
}
}
}
}

async fn accept_task(
local_port: Option<u32>,
events_tx: channel::Sender<SwarmEvent>,
) -> io::Result<(SocketAddr, JoinHandle<io::Result<()>>)> {
let port = local_port.unwrap_or(0);
let address = format!("127.0.0.1:{}", port);
let listener = TcpListener::bind(&address).await?;
let local_addr = listener.local_addr()?;
let accept_task = task::spawn(accept_loop(listener, events_tx));
Ok((local_addr, accept_task))
}

async fn accept_loop(
listener: TcpListener,
events_tx: channel::Sender<SwarmEvent>,
) -> io::Result<()> {
debug!(
"accepting peer connections on tcp://{}",
listener.local_addr()?
);

let mut incoming = listener.incoming();
while let Some(Ok(stream)) = incoming.next().await {
let peer_addr = stream.peer_addr().unwrap().to_string();
debug!("Accepted connection from peer {}", peer_addr);
events_tx
.send(SwarmEvent::Connection(stream))
.await
.unwrap();
}
Ok(())
}

async fn wait_for_bootstrap(state: &mut HyperDht) -> io::Result<()> {
let event = state.next().await;
match event {
Some(HyperDhtEvent::Bootstrapped { .. }) => {
debug!("swarm bootstrapped!");
Ok(())
}
_ => Err(io::Error::new(
io::ErrorKind::Other,
"Did not receive bootstrap as first event, abort.",
)),
}
}
12 changes: 10 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,13 @@
//! ```
#![forbid(unsafe_code, future_incompatible, rust_2018_idioms)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, missing_doc_code_examples, unreachable_pub)]
// #![deny(missing_debug_implementations, nonstandard_style)]
// #![warn(missing_docs, missing_doc_code_examples, unreachable_pub)]

mod bootstrap;
mod hyperswarm;

pub use bootstrap::*;
pub use hyperswarm::*;

pub use hyperswarm_dht::IdBytes;

0 comments on commit bc5f585

Please sign in to comment.