diff --git a/.gitignore b/.gitignore index b75a144..e00bcd7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ target/ tmp/ Cargo.lock .DS_Store +SANDBOX diff --git a/Cargo.toml b/Cargo.toml index 2cbc2df..3fe6ba6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hyperswarm" -version = "1.0.0" +version = "0.1.0" license = "MIT OR Apache-2.0" repository = "https://github.com/Frando/hyperswarm" documentation = "https://docs.rs/hyperswarm" @@ -16,16 +16,24 @@ authors = [ [features] [dependencies] -hyperswarm-dht = { git = "https://github.com/Frando/hyperswarm-dht.git", branch = "hyperspace" } async-std = { version = "1.9.0", features = ["unstable"] } futures-lite = "1.11.3" futures-channel = "0.3.13" log = "0.4.14" futures = "0.3.13" +async-trait = "0.1.42" +async-compat = "0.1.0" +multicast-socket = "0.2.1" +hex = "0.4.3" +pretty-hash = "0.4.1" +hyperswarm-dht = { git = "https://github.com/Frando/hyperswarm-dht.git", branch = "hyperspace" } +colmeia-hyperswarm-mdns = { git = "https://github.com/bltavares/colmeia.git", rev = "e92ab71981356197a21592b7ce6854e209582985" } +libutp-rs = { git = "https://github.com/Frando/libutp-rs.git", branch = "feat/clone" } [dev-dependencies] env_logger = "0.8.3" async-std = { version = "1.9.0", features = ["unstable", "attributes"] } -[patches.crates-io] -hyperswarm-dht = { path = "../hyperswarm-dht" } +# [patch.crates-io] +# hyperswarm-dht = { path = "../hyperswarm-dht" } +# libutp-rs = { path = "../libutp-rs" } diff --git a/README.md b/README.md index 218985a..f9037a5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -

hyperswarm

+

hyperswarm-rs

Peer to peer networking stack @@ -26,19 +26,15 @@
-

+

API Docs | - - Releases - - | Contributing -

+
## Installation @@ -46,6 +42,90 @@ $ cargo add hyperswarm ``` +## Usage + +Hyperswarm is a networking stack for connecting peers who are interested in a topic. This project is a port of the [Node.js implementation of Hyperswarm](https://github.com/hyperswarm/hyperswarm). + +This crate exposes a `Hyperswarm` struct. After binding it, this will: + +- Start and bootstrap a local DHT node +- Bind a socket for mDNS discovery +- Announce and lookup any 32 byte topic key over both mDNS and the DHT +- Connect to all peers that are found over both TCP and UTP + +It currently depends on the unreleased [hyperswarm-dht](https://github.com/mattsse/hyperswarm-dht) crate and therefore is also not yet released on crates.io. + +The API is designed to be very simple: + +```rust +use async_std::task; +use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use hyperswarm::{Config, Hyperswarm, HyperswarmStream, TopicConfig}; +use std::io; + +#[async_std::main] +async fn main() -> io::Result<()> { + // Bind and initialize the swarm with the default config. + // On the config you can e.g. set bootstrap addresses. + let config = Config::default(); + let mut swarm = Hyperswarm::bind(config).await?; + + // A topic is any 32 byte array. Usually, this would be the hash of some identifier. + // Configuring the swarm for a topic starts to lookup and/or announce this topic + // and connect to peers that are found. + let topic = [0u8; 32]; + swarm.configure(topic, TopicConfig::announce_and_lookup()); + + // The swarm is a Stream of new HyperswarmStream peer connections. + // The HyperswarmStream is a wrapper around either a TcpStream or a UtpSocket. + // Usually you'll want to run some loop over the connection, so let's spawn a task + // for each connection. + while let Some(stream) = swarm.next().await { + task::spawn(on_connection(stream?)); + } + + Ok(()) +} + +// A HyperswarmStream is AsyncRead + AsyncWrite, so you can use it just +// like a TcpStream. Here, we'll send an initial message and then keep +// reading from the stream until it is closed by the remote. +async fn on_connection(mut stream: HyperswarmStream) -> io::Result<()> { + stream.write_all(b"hello there").await?; + let mut buf = vec![0u8; 64]; + loop { + match stream.read(&mut buf).await { + Ok(0) => return Ok(()), + Err(e) => return Err(e), + Ok(n) => eprintln!("received: {}", std::str::from_utf8(&buf[..n]).unwrap()), + } + } +} +``` + +See [`examples/simple.rs`](examples/simple.rs) for a working example that also runs a bootstrap node. That example can also find and connect to NodeJS peers. To try it out: + +```sh +cargo run --example simple +# in another terminal +node js/simple.js +``` + +Currently, the DHT bootstrap node has to be run from Rust. The Rust implementation does not find peers on a NodeJS bootstrap node. + +## Roadmap + +- [x] Initial implementation +- [ ] Find peers over the Hyperswarm DHT + - [x] Both NodeJS and Rust peers are found if connecting to a Rust bootstrap node + - [ ] Fix [hyperswarm-dht](https://github.com/mattsse/hyperswarm-dht) to work with NodeJS bootstrap nodes +- [ ] Find peers over mDNS + - [ ] Change colmeia-mdns to better fit the architecture here or copy and adapt the mdns code over into the mdns module +- [x] Connect to peers over TCP +- [ ] Connect to peers over UTP + - [x] Can connect to peers over UTP + - [ ] Fix issues in [libutp-rs](https://github.com/johsunds/libutp-rs) - sometimes the connection is not flushed properly + ## Safety This crate uses ``#![deny(unsafe_code)]`` to ensure everything is implemented in 100% Safe Rust. diff --git a/examples/simple.rs b/examples/simple.rs index 3b91ec3..5fd06d4 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,63 +1,58 @@ -use async_std::net::TcpStream; use async_std::prelude::*; use async_std::stream::StreamExt; use async_std::task; -// use log::*; -// use std::convert::TryFrom; -// use std::net::SocketAddr; +// use std::net::{SocketAddr, ToSocketAddrs}; -use hyperswarm::{bootstrap_dht, Hyperswarm, IdBytes, JoinOpts, SwarmEvent}; -use hyperswarm_dht::DhtConfig; +use hyperswarm::{run_bootstrap_node, Config, Hyperswarm, HyperswarmStream, TopicConfig}; #[async_std::main] async fn main() -> Result<(), Box> { env_logger::init(); let bs_addr = "localhost:6060"; - let bs_addr = bootstrap_dht(Some(bs_addr)).await?; + let (bs_addr, bs_task) = run_bootstrap_node(Some(bs_addr)).await?; + // let bs_addr: SocketAddr = bs_addr.to_socket_addrs().unwrap().next().unwrap(); - let config1 = DhtConfig::default().set_bootstrap_nodes(&[bs_addr]); - let config2 = DhtConfig::default().set_bootstrap_nodes(&[bs_addr]); + let config = Config::default().set_bootstrap_nodes(vec![bs_addr]); - let mut swarm1 = Hyperswarm::with_config(config1).await?; - let mut swarm2 = Hyperswarm::with_config(config2).await?; + let mut swarm1 = Hyperswarm::bind(config.clone()).await?; + let mut swarm2 = Hyperswarm::bind(config).await?; - let cmd1 = swarm1.commands(); - let cmd2 = swarm2.commands(); + let handle1 = swarm1.handle(); + let handle2 = swarm2.handle(); let task1 = task::spawn(async move { - while let Some(event) = swarm1.next().await { - match event { - SwarmEvent::Connection(stream) => on_connection(stream, "rust1".into()), - _ => {} - } + while let Some(stream) = swarm1.next().await { + let stream = stream.unwrap(); + on_connection(stream, "rust1".into()); } }); let task2 = task::spawn(async move { - while let Some(event) = swarm2.next().await { - match event { - SwarmEvent::Connection(stream) => on_connection(stream, "rust2".into()), - _ => {} - } + while let Some(stream) = swarm2.next().await { + let stream = stream.unwrap(); + on_connection(stream, "rust2".into()); } }); - let topic = IdBytes::from([0u8; 32]); - let opts = JoinOpts { - announce: true, - lookup: true, - }; - - cmd1.join(topic.clone(), opts.clone()); - cmd2.join(topic.clone(), opts.clone()); + let topic = [0u8; 32]; + handle1.configure(topic, TopicConfig::both()); + handle2.configure(topic, TopicConfig::both()); task1.await; task2.await; + bs_task.await?; Ok(()) } -fn on_connection(mut stream: TcpStream, local_name: String) { +fn on_connection(mut stream: HyperswarmStream, local_name: String) { + let label = format!( + "[{} -> {}://{}]", + local_name, + stream.protocol(), + stream.peer_addr() + ); + eprintln!("{} connect", label); task::spawn(async move { stream .write_all(format!("hi from {}", local_name).as_bytes()) @@ -68,25 +63,17 @@ fn on_connection(mut stream: TcpStream, local_name: String) { match stream.read(&mut buf).await { Ok(n) if n > 0 => { let text = String::from_utf8(buf[..n].to_vec()).unwrap(); - eprintln!("[{}] read: {}", local_name, text); + eprintln!("{} read: {}", label, text); } Ok(_) => { - eprintln!("[{}]: connection closed", local_name); + eprintln!("{} close", label); break; } Err(e) => { - eprintln!("[{}]: error: {}", local_name, e); + eprintln!("{} error: {}", label, e); break; } } } }); } - -// async fn timeout(ms: u64) { -// let _ = async_std::future::timeout( -// std::time::Duration::from_millis(ms), -// futures::future::pending::<()>(), -// ) -// .await; -// } diff --git a/js/.gitignore b/js/.gitignore new file mode 100644 index 0000000..22fbdbe --- /dev/null +++ b/js/.gitignore @@ -0,0 +1,3 @@ +node_modules +yarn.lock +package-lock.json diff --git a/js/package.json b/js/package.json new file mode 100644 index 0000000..e0a855e --- /dev/null +++ b/js/package.json @@ -0,0 +1,9 @@ +{ + "name": "js", + "version": "1.0.0", + "main": "index.js", + "license": "MIT", + "dependencies": { + "hyperswarm": "^2.15.3" + } +} diff --git a/js/simple.js b/js/simple.js new file mode 100644 index 0000000..bb5cb28 --- /dev/null +++ b/js/simple.js @@ -0,0 +1,65 @@ +const hyperswarm = require('hyperswarm') + +const opts = { + runBootstrap: false +} +main(opts).catch(console.error) + +async function main (opts = {}) { + let bootstrap + if (opts.runBootstrap) { + bootstrap = await bootstrapDHT(6060) + } else { + bootstrap = 'localhost:6060' + } + console.log({ bootstrap }) + + const topic = Buffer.alloc(32, 0) + + const swarm1 = runNode(bootstrap, 'node1') + const swarm2 = runNode(bootstrap, 'node2') + + const config = { announce: true, lookup: true } + swarm1.join(topic, config) + swarm2.join(topic, config) +} + +function runNode (bootstrap, name) { + const swarm = hyperswarm({ + announceLocalAddress: true, + bootstrap: [bootstrap] + }) + + swarm.on('connection', (socket, info) => { + const peer = info.peer + let peerAddr = peer ? `${peer.host}:${peer.port}` : 'unknown' + const label = `[${name} -> ${info.type}://${peerAddr}]` + console.log(`${label} connect`) + socket.write(Buffer.from(`hi from ${name}!`)) + socket.on('data', buf => { + console.log(`${label} read: ${buf.toString()}`) + }) + socket.on('error', err => { + console.log(`${label} error: ${err.toString()}`) + }) + socket.on('close', () => { + console.log(`${label} close`) + }) + }) + + return swarm +} + +async function bootstrapDHT (port) { + const bootstrapper = require('@hyperswarm/dht')({ + bootstrap: false + }) + bootstrapper.listen(port) + await new Promise(resolve => { + return bootstrapper.once('listening', resolve) + }) + const bootstrapPort = bootstrapper.address().port + const bootstrapAddr = `localhost:${bootstrapPort}` + console.log(`bootstrap node running on ${bootstrapAddr}`) + return bootstrapAddr +} diff --git a/src/bootstrap.rs b/src/bootstrap.rs index 0dcf3cd..18571c4 100644 --- a/src/bootstrap.rs +++ b/src/bootstrap.rs @@ -1,11 +1,15 @@ use async_std::net::ToSocketAddrs; use async_std::stream::StreamExt; +use async_std::task::JoinHandle; use log::*; +use std::io; use std::net::SocketAddr; use hyperswarm_dht::{DhtConfig, HyperDht}; -pub async fn bootstrap_dht(local_addr: Option) -> std::io::Result { +pub async fn run_bootstrap_node( + local_addr: Option, +) -> io::Result<(SocketAddr, JoinHandle>)> { let config = DhtConfig::default() .empty_bootstrap_nodes() .set_ephemeral(false); @@ -17,15 +21,11 @@ pub async fn bootstrap_dht(local_addr: Option) -> std::io:: let mut bs = HyperDht::with_config(config).await?; let addr = bs.local_addr()?; debug!("Running DHT on address: {}", addr); - async_std::task::spawn(async move { + let task = async_std::task::spawn(async move { loop { - bs.next().await; + let event = bs.next().await; + trace!("[bootstrap node] event {:?}", event); } - // loop { - // process each incoming message - // let _event = bs.next().await; - // debug!("bootstrap event: {:?}", event); - // } }); - Ok(addr) + Ok((addr, task)) } diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..8664a26 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,38 @@ +use std::net::SocketAddr; + +#[derive(Default, Clone)] +pub struct Config { + pub bootstrap: Option>, + pub ephemeral: bool, +} + +impl Config { + pub fn set_bootstrap_nodes(mut self, nodes: Vec) -> Self { + self.bootstrap = Some(nodes); + self + } + + pub fn set_ephemeral(mut self, ephemeral: bool) -> Self { + self.ephemeral = ephemeral; + self + } +} + +#[derive(Debug, Default, PartialEq, Clone)] +pub struct TopicConfig { + pub announce: bool, + pub lookup: bool, +} + +impl TopicConfig { + pub fn both() -> Self { + Self { + announce: true, + lookup: true, + } + } + + pub fn announce_and_lookup() -> Self { + Self::both() + } +} diff --git a/src/discovery/combined.rs b/src/discovery/combined.rs new file mode 100644 index 0000000..0cc88c9 --- /dev/null +++ b/src/discovery/combined.rs @@ -0,0 +1,51 @@ +use async_std::stream::Stream; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use super::dht::DhtDiscovery; +use super::mdns::MdnsDiscovery; +use super::{Discovery, PeerInfo, Topic}; +use crate::config::Config; + +#[derive(Debug)] +pub struct CombinedDiscovery { + dht: DhtDiscovery, + mdns: MdnsDiscovery, +} + +impl CombinedDiscovery { + pub async fn listen(local_port: u16, config: Config) -> io::Result { + let mdns = MdnsDiscovery::listen(local_port).await?; + let dht = DhtDiscovery::listen(local_port, config).await?; + Ok(Self { mdns, dht }) + } +} + +impl Discovery for CombinedDiscovery { + fn lookup(&mut self, topic: Topic) { + self.mdns.lookup(topic); + self.dht.lookup(topic); + } + + fn announce(&mut self, topic: Topic) { + self.mdns.announce(topic); + self.dht.announce(topic); + } +} + +impl Stream for CombinedDiscovery { + type Item = io::Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let next = Pin::new(&mut this.dht).poll_next(cx); + if next.is_ready() { + return next; + } + let next = Pin::new(&mut this.mdns).poll_next(cx); + if next.is_ready() { + return next; + } + Poll::Pending + } +} diff --git a/src/discovery/dht.rs b/src/discovery/dht.rs new file mode 100644 index 0000000..a3627fb --- /dev/null +++ b/src/discovery/dht.rs @@ -0,0 +1,126 @@ +use async_std::stream::Stream; +use futures_lite::ready; +use hyperswarm_dht::{DhtConfig, HyperDht, HyperDhtEvent, QueryOpts}; +use log::*; +use std::collections::VecDeque; +use std::fmt; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::config::Config; + +use super::{Discovery, DiscoveryMethod, PeerInfo, Topic}; + +// #[derive(Debug)] +pub struct DhtDiscovery { + state: HyperDht, + bootstrapped: bool, + local_port: u16, + pending_commands: VecDeque, + pending_events: VecDeque, +} + +impl fmt::Debug for DhtDiscovery { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DhtDiscovery") + .field("bootstrapped", &self.bootstrapped) + .field("local_port", &self.local_port) + .finish() + } +} + +#[derive(Debug)] +enum Command { + Lookup(QueryOpts), + Announce(QueryOpts), +} + +impl DhtDiscovery { + pub async fn listen(local_port: u16, config: Config) -> io::Result { + let dht_config = DhtConfig::default(); + let dht_config = if let Some(bootstrap) = config.bootstrap.as_ref() { + dht_config.set_bootstrap_nodes(bootstrap) + } else { + dht_config + }; + let dht_config = dht_config.set_ephemeral(config.ephemeral); + let state = HyperDht::with_config(dht_config).await?; + let this = Self { + state, + local_port, + bootstrapped: false, + pending_commands: VecDeque::new(), + pending_events: VecDeque::new(), + }; + Ok(this) + } + + fn execute_pending_commands(&mut self) { + while let Some(command) = self.pending_commands.pop_front() { + match command { + Command::Announce(opts) => self.state.announce(opts), + Command::Lookup(opts) => self.state.lookup(opts), + }; + } + } +} + +impl Discovery for DhtDiscovery { + fn lookup(&mut self, topic: Topic) { + let opts = QueryOpts { + topic: topic.into(), + port: Some(self.local_port as u32), + local_addr: None, + }; + if self.bootstrapped { + self.state.lookup(opts); + } else { + self.pending_commands.push_back(Command::Lookup(opts)) + } + } + + fn announce(&mut self, topic: Topic) { + let opts = QueryOpts { + topic: topic.into(), + port: Some(self.local_port as u32), + local_addr: None, + }; + if self.bootstrapped { + self.state.lookup(opts); + } else { + self.pending_commands.push_back(Command::Announce(opts)) + } + } +} + +impl Stream for DhtDiscovery { + type Item = io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(Some(Ok(event))); + } + + let event = ready!(Pin::new(&mut self.state).poll_next(cx)); + trace!("DHT event: {:?}", event); + let event = event.unwrap(); + match event { + HyperDhtEvent::Bootstrapped { .. } => { + self.execute_pending_commands(); + } + HyperDhtEvent::AnnounceResult { .. } => {} + HyperDhtEvent::LookupResult { lookup, .. } => { + let topic = lookup.topic.0; + let peers = lookup.remotes(); + for addr in peers { + let info = PeerInfo::new(*addr, Some(topic), DiscoveryMethod::Dht); + self.pending_events.push_back(info); + } + } + HyperDhtEvent::UnAnnounceResult { .. } => {} + _ => {} + } + } + } +} diff --git a/src/discovery/mdns.rs b/src/discovery/mdns.rs new file mode 100644 index 0000000..0d447c3 --- /dev/null +++ b/src/discovery/mdns.rs @@ -0,0 +1,157 @@ +use async_std::channel; +use async_std::stream::Stream; +use async_std::task::{Context, Poll}; +use colmeia_hyperswarm_mdns::{self_id, Announcer, Locator}; +use futures_lite::ready; +// use log::*; +use std::convert::TryInto; +use std::fmt; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::time::Duration; + +use super::{Discovery, DiscoveryMethod, PeerInfo, Topic}; + +mod socket { + use multicast_socket::MulticastSocket; + use std::io; + use std::net::{Ipv4Addr, SocketAddrV4}; + + const MDNS_IP: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251); + + pub fn create() -> io::Result { + let addr = SocketAddrV4::new(MDNS_IP, 5353); + MulticastSocket::all_interfaces(addr) + } +} + +enum Command { + Lookup(Topic), + Announce(Topic), +} + +pub type CommandFut = Pin> + Send>>; + +pub struct MdnsDiscovery { + announcer: Announcer, + locator: Locator, + // local_port: u16, + // self_id: String, + pending_commands_rx: channel::Receiver, + pending_commands_tx: channel::Sender, + pending_future: Option> + Send>>>, +} + +impl fmt::Debug for MdnsDiscovery { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MdnsDiscovery").finish() + } +} + +impl Discovery for MdnsDiscovery { + fn lookup(&mut self, topic: Topic) { + self.pending_commands_tx + .try_send(Command::Lookup(topic)) + .unwrap(); + } + + fn announce(&mut self, topic: Topic) { + self.pending_commands_tx + .try_send(Command::Announce(topic)) + .unwrap(); + } +} + +impl MdnsDiscovery { + pub async fn listen(local_port: u16) -> io::Result { + let self_id = self_id(); + let socket = socket::create()?; + let lookup_interval = Duration::from_secs(60); + let locator = Locator::listen(socket, lookup_interval, self_id.as_bytes()); + let socket = socket::create()?; + let announcer = Announcer::listen(socket, local_port, self_id.clone()); + let (pending_commands_tx, pending_commands_rx) = channel::unbounded(); + Ok(Self { + locator, + announcer, + // self_id, + // local_port, + pending_commands_rx, + pending_commands_tx, + pending_future: None, + }) + } + + fn poll_pending_future(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(ref mut fut) = self.pending_future { + let res = ready!(Pin::new(fut).poll(cx)); + self.pending_future = None; + if let Err(e) = res { + return Poll::Ready(Err(e)); + } + } + Poll::Ready(Ok(())) + } +} + +impl Stream for MdnsDiscovery { + type Item = io::Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + if let Err(e) = ready!(this.poll_pending_future(cx)) { + return Poll::Ready(Some(Err(e))); + } + + if let Poll::Ready(Some(_command)) = Pin::new(&mut this.pending_commands_rx).poll_next(cx) { + // TODO: Boxing the add_topic future does not work because there's no valid + // lifetime. Best would be to make the add_topic functions sync, or return + // a future that can be boxed. + // let fut = match command { + // Command::Lookup(topic) => { + // let fut = this.locator.add_topic(&topic); + // let fut = fut.map(|r| { + // r.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) + // }); + // let fut: CommandFut = fut.boxed(); + // fut + // } + // Command::Announce(topic) => { + // let fut = this.announcer.add_topic(&topic); + // let fut = fut.map(|r| { + // r.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e))) + // }); + // let fut: CommandFut = fut.boxed(); + // fut + // } + // }; + // this.pending_future = Some(fut); + } + + if let Err(e) = ready!(this.poll_pending_future(cx)) { + return Poll::Ready(Some(Err(e))); + } + + let _ = Pin::new(&mut this.announcer).poll_next(cx); + + let res = ready!(Pin::new(&mut this.locator).poll_next(cx)); + if let Some((topic, peer_addr)) = res { + let topic = topic.try_into(); + if let Ok(topic) = topic { + Poll::Ready(Some(Ok(PeerInfo::new( + peer_addr, + Some(topic), + DiscoveryMethod::Mdns, + )))) + } else { + Poll::Ready(Some(Err(io::Error::new( + io::ErrorKind::Other, + "Received invalid topic", + )))) + } + } else { + Poll::Pending + } + } +} diff --git a/src/discovery/mod.rs b/src/discovery/mod.rs new file mode 100644 index 0000000..90fc343 --- /dev/null +++ b/src/discovery/mod.rs @@ -0,0 +1,55 @@ +use async_std::stream::Stream; +use std::fmt; +use std::io; +use std::net::SocketAddr; + +pub mod combined; +pub mod dht; +pub mod mdns; + +pub type Topic = [u8; 32]; + +#[derive(Clone, Debug)] +pub enum DiscoveryMethod { + Mdns, + Dht, +} + +#[derive(Clone)] +pub struct PeerInfo { + addr: SocketAddr, + topic: Option, + discovery_method: DiscoveryMethod, +} + +impl fmt::Debug for PeerInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeerInfo") + .field("addr", &self.addr) + .field( + "topic", + &self.topic.map(|topic| pretty_hash::fmt(&topic).unwrap()), + ) + .field("discovery_method", &self.discovery_method) + .finish() + } +} + +impl PeerInfo { + pub fn new(addr: SocketAddr, topic: Option, discovery_method: DiscoveryMethod) -> Self { + Self { + addr, + topic, + discovery_method, + } + } + + pub fn addr(&self) -> SocketAddr { + self.addr + } +} + +pub trait Discovery: Stream> { + fn lookup(&mut self, topic: Topic); + fn announce(&mut self, topic: Topic); +} diff --git a/src/hyperswarm.rs b/src/hyperswarm.rs deleted file mode 100644 index 5767cfa..0000000 --- a/src/hyperswarm.rs +++ /dev/null @@ -1,230 +0,0 @@ -use async_std::channel; -use async_std::net::{TcpListener, TcpStream}; -// use async_std::prelude::*; -use async_std::stream::StreamExt; -// use async_std::sync::{Arc, Mutex}; -use async_std::task; -use async_std::task::JoinHandle; -use futures::future::FutureExt; -// use futures_channel::oneshot; -use futures_lite::{future, Stream}; -use log::*; -// use std::collections::HashMap; -use std::io; -use std::net::SocketAddr; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use hyperswarm_dht::{DhtConfig, HyperDht, HyperDhtEvent, IdBytes, Lookup, Peers, QueryOpts}; - -#[derive(Debug)] -pub enum SwarmEvent { - Bootstrapped, - Connection(TcpStream), -} - -#[derive(Debug)] -pub enum Command { - Join(IdBytes, JoinOpts), -} - -#[derive(Debug, Clone, PartialEq)] -pub struct JoinOpts { - pub lookup: bool, - pub announce: bool, -} - -#[derive(Debug, Clone)] -pub struct QueryResult { - peers: Vec, - topic: IdBytes, -} - -#[derive(Debug)] -pub struct Hyperswarm { - task: Option>>, - command_tx: channel::Sender, - events_rx: channel::Receiver, -} - -impl Hyperswarm { - pub fn with_state(state: HyperDht) -> Self { - let (command_tx, command_rx) = channel::unbounded(); - let (events_tx, events_rx) = channel::unbounded(); - let task = task::spawn(run_loop(state, command_rx, events_tx)); - Self { - events_rx, - command_tx, - task: Some(task), - } - } - - pub async fn with_config(config: DhtConfig) -> io::Result { - let state = HyperDht::with_config(config).await?; - Ok(Self::with_state(state)) - } - - pub fn commands(&self) -> SwarmCommands { - SwarmCommands(self.command_tx.clone()) - } -} - -impl Stream for Hyperswarm { - type Item = SwarmEvent; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.events_rx).poll_next(cx) - } -} - -#[derive(Debug)] -pub struct SwarmCommands(channel::Sender); - -impl SwarmCommands { - pub fn join(&self, topic: IdBytes, opts: JoinOpts) { - self.0.try_send(Command::Join(topic, opts)).unwrap() - } -} - -async fn run_loop( - mut state: HyperDht, - mut command_rx: channel::Receiver, - events_tx: channel::Sender, -) -> io::Result<()> { - enum Event { - Dht(HyperDhtEvent), - Command(Command), - }; - - let (connect_tx, connect_rx) = channel::unbounded(); - let connect_task = task::spawn(connect_loop(connect_rx, events_tx.clone())); - // TODO: Allow to configure local port on which to accept peer connections. - let (local_addr, accept_task) = accept_task(None, events_tx.clone()).await?; - let local_port = local_addr.port() as u32; - - // wait for bootstrap event first. - wait_for_bootstrap(&mut state).await?; - events_tx.try_send(SwarmEvent::Bootstrapped).unwrap(); - - while let Some(event) = future::race( - state.next().map(|e| e.map(Event::Dht)), - command_rx.next().map(|e| e.map(Event::Command)), - ) - .await - { - match event { - Event::Dht(event) => { - debug!("swarm event: {:?}", event); - match event { - HyperDhtEvent::Bootstrapped { .. } => { - // handled above, may not occur again? - unreachable!("received second bootstrap event"); - } - HyperDhtEvent::AnnounceResult { .. } => {} - HyperDhtEvent::LookupResult { lookup, .. } => { - connect_tx.try_send(lookup).unwrap(); - } - HyperDhtEvent::UnAnnounceResult { .. } => {} - _ => {} - } - } - Event::Command(command) => { - debug!("swarm command: {:?}", command); - match command { - Command::Join(topic, join_opts) => { - let opts = QueryOpts { - topic, - port: Some(local_port), - local_addr: None, - }; - if join_opts.announce { - state.announce(opts.clone()); - } - if join_opts.lookup { - state.lookup(opts); - } - } - } - } - } - } - - connect_task.await; - accept_task.await?; - - Ok(()) -} - -async fn connect_loop( - mut connect_rx: channel::Receiver, - events_tx: channel::Sender, -) { - while let Some(lookup) = connect_rx.next().await { - let peers = lookup.remotes(); - // TODO: Connect over utp if tcp fails. - for addr in peers { - debug!("Connecting to peer {}", addr); - let tcp_socket = TcpStream::connect(addr).await; - // TODO: Also connect via UTP. - // .race(UtpStream::connect(addr)); - match tcp_socket { - Ok(stream) => { - debug!("Connected to peer {}", addr); - events_tx - .send(SwarmEvent::Connection(stream)) - .await - .unwrap(); - } - Err(err) => { - error!("Error connecting to peer {}: {}", addr, err); - } - } - } - } -} - -async fn accept_task( - local_port: Option, - events_tx: channel::Sender, -) -> io::Result<(SocketAddr, JoinHandle>)> { - let port = local_port.unwrap_or(0); - let address = format!("127.0.0.1:{}", port); - let listener = TcpListener::bind(&address).await?; - let local_addr = listener.local_addr()?; - let accept_task = task::spawn(accept_loop(listener, events_tx)); - Ok((local_addr, accept_task)) -} - -async fn accept_loop( - listener: TcpListener, - events_tx: channel::Sender, -) -> io::Result<()> { - debug!( - "accepting peer connections on tcp://{}", - listener.local_addr()? - ); - - let mut incoming = listener.incoming(); - while let Some(Ok(stream)) = incoming.next().await { - let peer_addr = stream.peer_addr().unwrap().to_string(); - debug!("Accepted connection from peer {}", peer_addr); - events_tx - .send(SwarmEvent::Connection(stream)) - .await - .unwrap(); - } - Ok(()) -} - -async fn wait_for_bootstrap(state: &mut HyperDht) -> io::Result<()> { - let event = state.next().await; - match event { - Some(HyperDhtEvent::Bootstrapped { .. }) => { - debug!("swarm bootstrapped!"); - Ok(()) - } - _ => Err(io::Error::new( - io::ErrorKind::Other, - "Did not receive bootstrap as first event, abort.", - )), - } -} diff --git a/src/lib.rs b/src/lib.rs index fd489af..4380a24 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,9 +11,13 @@ // #![warn(missing_docs, missing_doc_code_examples, unreachable_pub)] mod bootstrap; -mod hyperswarm; +mod config; +mod swarm; -pub use bootstrap::*; -pub use hyperswarm::*; +pub mod discovery; +pub mod transport; -pub use hyperswarm_dht::IdBytes; +pub use bootstrap::run_bootstrap_node; +pub use config::{Config, TopicConfig}; +pub use swarm::{Hyperswarm, HyperswarmStream}; +// pub use hyperswarm_dht::{DhtConfig, IdBytes}; diff --git a/src/swarm.rs b/src/swarm.rs new file mode 100644 index 0000000..9013452 --- /dev/null +++ b/src/swarm.rs @@ -0,0 +1,307 @@ +use async_std::channel; +use futures_lite::{ready, AsyncRead, AsyncWrite, FutureExt, Stream}; +use log::*; +use std::collections::HashMap; +use std::fmt; +use std::future::Future; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::config::{Config, TopicConfig}; +use crate::discovery::Topic; +use crate::discovery::{combined::CombinedDiscovery, Discovery}; +use crate::transport::Incoming; +use crate::transport::{ + combined::{CombinedStream, CombinedTransport}, + Transport, +}; + +pub type ConnectFut = Pin> + Send + 'static>>; +type ConfigureCommand = (Topic, TopicConfig); + +pub struct Hyperswarm { + topics: HashMap, + discovery: CombinedDiscovery, + transport: CombinedTransport, + incoming: Incoming, + command_tx: channel::Sender, + command_rx: channel::Receiver, + pending_connects: Vec, +} +impl fmt::Debug for Hyperswarm { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Driver") + .field("topics", &self.topics) + .field("local_addr", &self.incoming.local_addr()) + // .field("discovery", &self.discovery) + // .field("transport", &self.transport) + // .field("incoming", &self.incoming) + .field( + "pending_connects", + &format!("<{}>", self.pending_connects.len()), + ) + .finish() + } +} + +impl Hyperswarm { + pub async fn bind(config: Config) -> io::Result { + let local_addr = "localhost:0"; + let mut transport = CombinedTransport::new(); + let incoming = transport.listen(local_addr).await?; + let local_addr = incoming.local_addr(); + let port = local_addr.port(); + let discovery = CombinedDiscovery::listen(port, config).await?; + let (command_tx, command_rx) = channel::unbounded::(); + Ok(Self { + topics: HashMap::new(), + discovery, + transport, + incoming, + pending_connects: vec![], + command_tx, + command_rx, + }) + } + + pub fn configure(&mut self, topic: Topic, config: TopicConfig) { + let old = self.topics.remove(&topic).unwrap_or_default(); + if config.announce && !old.announce { + self.discovery.announce(topic); + } + if config.lookup && !old.lookup { + self.discovery.lookup(topic); + } + // TODO: unannounce and stop-lookup + self.topics.insert(topic, config); + } + + fn poll_pending_connects( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut i = 0; + let mut iter = self.pending_connects.iter_mut(); + while let Some(ref mut fut) = iter.next() { + let res = Pin::new(fut).poll(cx); + if let Poll::Ready(res) = res { + self.pending_connects.remove(i); + return Poll::Ready(Some(res)); + } + i += 1; + } + Poll::Pending + } + + pub fn handle(&self) -> SwarmHandle { + SwarmHandle { + command_tx: self.command_tx.clone(), + } + } +} + +#[derive(Debug, Clone)] +pub struct SwarmHandle { + command_tx: channel::Sender, +} + +impl SwarmHandle { + pub fn configure(&self, topic: Topic, config: TopicConfig) { + self.command_tx.try_send((topic, config)).unwrap(); + } +} + +impl Stream for Hyperswarm { + type Item = io::Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + // Poll pending connect futures. + let res = this.poll_pending_connects(cx); + if res.is_ready() { + return res; + } + + // Poll commands. + while let Poll::Ready(Some((topic, config))) = Pin::new(&mut this.command_rx).poll_next(cx) + { + this.configure(topic, config); + } + + // Poll discovery results. + let discovery = Pin::new(&mut this.discovery).poll_next(cx); + match discovery { + Poll::Pending | Poll::Ready(None) => {} + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Ok(peer_info))) => { + debug!("discovery: {:?}", peer_info); + let fut = connect(this.transport.clone(), peer_info.addr()); + let mut fut = fut.boxed(); + let res = fut.poll(cx); + if let Poll::Ready(res) = res { + return Poll::Ready(Some(res)); + } else { + this.pending_connects.push(fut); + } + } + } + + // Poll incoming streams. + let stream = ready!(Pin::new(&mut this.incoming).poll_next(cx)); + let stream = stream.unwrap(); + let stream = stream.map(|stream| HyperswarmStream::new(stream, false)); + Poll::Ready(Some(stream)) + } +} + +async fn connect( + mut transport: CombinedTransport, + peer_addr: SocketAddr, +) -> io::Result { + transport + .connect(peer_addr) + .await + .map(|stream| HyperswarmStream::new(stream, true)) +} + +#[derive(Debug)] +pub struct HyperswarmStream { + inner: CombinedStream, + peer_addr: SocketAddr, + is_initiator: bool, +} + +impl HyperswarmStream { + pub fn new(inner: CombinedStream, is_initiator: bool) -> Self { + Self { + peer_addr: inner.peer_addr(), + inner, + is_initiator, + } + } + + pub fn peer_addr(&self) -> SocketAddr { + self.peer_addr + } + + pub fn protocol(&self) -> String { + self.inner.protocol() + } + + pub fn is_initiator(&self) -> bool { + self.is_initiator + } + + pub fn get_ref(&self) -> &CombinedStream { + &self.inner + } + + pub fn get_mut(&mut self) -> &mut CombinedStream { + &mut self.inner + } +} + +impl AsyncRead for HyperswarmStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for HyperswarmStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } +} + +#[cfg(test)] +mod test { + use super::{Config, Hyperswarm, HyperswarmStream, Topic, TopicConfig}; + use crate::run_bootstrap_node; + use async_std::task::{self, JoinHandle}; + use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; + use std::io::Result; + use std::net::SocketAddr; + + fn drive_stream(mut stream: HyperswarmStream, name: &'static str) { + task::spawn(async move { + stream + .write_all(format!("hello, here is {}", name).as_bytes()) + .await + .unwrap(); + let mut buf = vec![0u8; 64]; + loop { + match stream.read(&mut buf[..]).await { + Err(e) => eprintln!("[{}] ERROR: {}", name, e), + Ok(0) => { + eprintln!("[{}] stream closed", name,); + return; + } + Ok(n) => eprintln!( + "[{}] RECV: {:?}", + name, + String::from_utf8(buf[..n].to_vec()) + ), + }; + } + }); + } + + #[async_std::test] + async fn test_driver() -> Result<()> { + let (bs_addr, bs_task) = run_bootstrap_node::(None).await?; + eprintln!("ok go"); + let mut config = Config::default().set_bootstrap_nodes(vec![bs_addr]); + let mut swarm_a = Hyperswarm::bind(config.clone()).await?; + let mut swarm_b = Hyperswarm::bind(config).await?; + eprintln!("A {:?}", swarm_a); + eprintln!("B {:?}", swarm_b); + + let topic = [0u8; 32]; + let config = TopicConfig::both(); + swarm_a.configure(topic, config.clone()); + swarm_b.configure(topic, config.clone()); + + // let topic = [1u8; 32]; + // let config = TopicConfig::both(); + // swarm_a.configure(topic, config.clone()); + // swarm_b.configure(topic, config.clone()); + + let task_a = task::spawn(async move { + while let Some(stream) = swarm_a.next().await { + let stream = stream.unwrap(); + eprintln!("A incoming: {:?}", stream); + drive_stream(stream, "alice"); + } + }); + let task_b = task::spawn(async move { + while let Some(stream) = swarm_b.next().await { + let stream = stream.unwrap(); + eprintln!("B incoming: {:?}", stream); + drive_stream(stream, "bob"); + } + }); + + task_a.await; + task_b.await; + bs_task.await?; + Ok(()) + } +} diff --git a/src/transport/combined.rs b/src/transport/combined.rs new file mode 100644 index 0000000..41ce1ec --- /dev/null +++ b/src/transport/combined.rs @@ -0,0 +1,128 @@ +use async_std::stream::StreamExt; +use async_trait::async_trait; +use futures_lite::{AsyncRead, AsyncWrite, FutureExt}; +use std::io; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use super::tcp::{TcpStream, TcpTransport}; +use super::utp::{UtpStream, UtpTransport}; +use super::{Incoming, Transport}; + +#[derive(Clone, Debug)] +pub struct CombinedTransport { + tcp: TcpTransport, + utp: UtpTransport, +} + +#[async_trait] +impl Transport for CombinedTransport { + type Connection = CombinedStream; + fn new() -> Self { + Self { + tcp: TcpTransport::new(), + utp: UtpTransport::new(), + } + } + async fn listen(&mut self, local_addr: A) -> io::Result> + where + A: ToSocketAddrs + Send, + { + let addr = local_addr.to_socket_addrs()?.next().unwrap(); + + let tcp_incoming = self.tcp.listen(addr).await?; + let addr = tcp_incoming.local_addr(); + let tcp_incoming = tcp_incoming.map(|s| s.map(CombinedStream::Tcp)); + + let utp_incoming = self.utp.listen(addr).await?; + let utp_incoming = utp_incoming.map(|s| s.map(CombinedStream::Utp)); + + let combined = tcp_incoming.merge(utp_incoming); + let incoming = Incoming::new(combined, addr); + + // let incoming = Incoming::new(utp_incoming, addr); + + Ok(incoming) + } + async fn connect(&mut self, peer_addr: A) -> io::Result + where + A: ToSocketAddrs + Send, + { + let addr = peer_addr.to_socket_addrs()?.next().unwrap(); + let utp = &mut self.utp; + let tcp = &mut self.tcp; + let utp_fut = async { + let res = utp.connect(addr).await; + res.map(CombinedStream::Utp) + }; + // utp_fut.await + let tcp_fut = async { + let res = tcp.connect(addr).await; + res.map(CombinedStream::Tcp) + }; + utp_fut.race(tcp_fut).await + } +} + +#[derive(Debug, Clone)] +pub enum CombinedStream { + Tcp(TcpStream), + Utp(UtpStream), +} + +impl CombinedStream { + pub fn peer_addr(&self) -> SocketAddr { + match self { + Self::Tcp(stream) => stream.peer_addr().unwrap(), + Self::Utp(stream) => stream.peer_addr(), + } + } + + pub fn protocol(&self) -> String { + match self { + CombinedStream::Tcp(_) => "tcp".into(), + CombinedStream::Utp(_) => "utp".into(), + } + } +} + +impl AsyncRead for CombinedStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + match self.get_mut() { + CombinedStream::Tcp(ref mut stream) => Pin::new(stream).poll_read(cx, buf), + CombinedStream::Utp(ref mut stream) => Pin::new(stream).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for CombinedStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.get_mut() { + CombinedStream::Tcp(ref mut stream) => Pin::new(stream).poll_write(cx, buf), + CombinedStream::Utp(ref mut stream) => Pin::new(stream).poll_write(cx, buf), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + CombinedStream::Tcp(ref mut stream) => Pin::new(stream).poll_flush(cx), + CombinedStream::Utp(ref mut stream) => Pin::new(stream).poll_flush(cx), + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + CombinedStream::Tcp(ref mut stream) => Pin::new(stream).poll_close(cx), + CombinedStream::Utp(ref mut stream) => Pin::new(stream).poll_close(cx), + } + } +} diff --git a/src/transport/mod.rs b/src/transport/mod.rs new file mode 100644 index 0000000..4dbaf30 --- /dev/null +++ b/src/transport/mod.rs @@ -0,0 +1,117 @@ +use async_std::stream::Stream; +use async_trait::async_trait; +use futures::io::{AsyncRead, AsyncWrite}; +use std::fmt; +use std::io; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub mod combined; +pub mod tcp; +pub mod utp; + +pub struct Incoming { + local_addr: SocketAddr, + stream: Box> + Send + Unpin>, +} + +impl fmt::Debug for Incoming +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Incoming") + .field("local_addr", &self.local_addr) + // .field("stream", &*self.stream) + .finish() + } +} + +impl Incoming +where + S: AsyncRead + AsyncWrite + Send + Clone, +{ + pub fn new(listener: L, local_addr: SocketAddr) -> Self + where + L: Stream> + Send + Unpin + 'static, + { + let listener = Box::new(listener); + Self { + stream: listener, + local_addr, + } + } + + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } +} + +impl Stream for Incoming { + type Item = io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(cx) + } +} + +#[async_trait] +pub trait Transport: Clone { + type Connection: AsyncRead + AsyncWrite + Send; + fn new() -> Self; + async fn listen(&mut self, local_addr: A) -> io::Result> + where + A: ToSocketAddrs + Send; + async fn connect(&mut self, peer_addr: A) -> io::Result + where + A: ToSocketAddrs + Send; +} + +// trait TransportStream: AsyncRead + AsyncWrite + Send + Clone {} + +// impl TransportStream for TcpStream {} +// impl TransportStream for super::Connection {} + +// enum Transports { +// Tcp(TcpStream), +// Utp(super::Connection) +// } + +// impl Transports { +// // fn into_inner(&mut self) -> Box { +// // match self { +// // Self::Tcp(stream) => Box::new(stream), +// // Self::Utp(stream) => Box::new(stream), +// // } +// // } +// // fn as_mut(&mut self) -> &mut dyn TransportStream { +// // match self { +// // Self::Tcp(ref mut stream) => stream, +// // Self::Utp(ref mut stream) => stream, +// // } +// // } +// } + +// pub async fn demo() { +// let stream1 = { +// let stream = TcpStream::connect("localhost:1234").await.unwrap(); +// let stream: Box> = Box::new(stream); +// stream +// }; + +// let stream2 = { +// let addr = "localhost:1233"; +// // let stream = TcpStream::connect(addr).await.unwrap(); +// // let stream = super::Connection::tcp(stream, true, addr, None); +// let stream = FakeStream {}; +// let stream: Box> = Box::new(stream); +// stream +// }; +// let streams: HashMap<&str, Box> = HashMap::new(); +// streams.insert("stream1", stream1); +// streams.insert("stream2", stream2); +// } + +// pub struct Connection { +// stream: Box +// } diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs new file mode 100644 index 0000000..2ba6bd5 --- /dev/null +++ b/src/transport/tcp.rs @@ -0,0 +1,86 @@ +pub use async_std::net::TcpStream; +use async_std::net::{SocketAddr, TcpListener}; +use async_std::stream::Stream; +use async_trait::async_trait; +use futures_lite::{ready, Future}; +use std::fmt; +use std::io; +use std::net::ToSocketAddrs; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use super::{Incoming, Transport}; + +#[derive(Clone, Debug)] +pub struct TcpTransport; + +#[async_trait] +impl Transport for TcpTransport { + type Connection = TcpStream; + fn new() -> Self { + Self {} + } + async fn listen(&mut self, local_addr: A) -> io::Result> + where + A: ToSocketAddrs + Send, + { + let addr = local_addr.to_socket_addrs()?.next().unwrap(); + let listener = TcpListener::bind(addr).await?; + let incoming = TcpIncoming::new(listener)?; + let local_addr = incoming.local_addr()?; + let incoming = Incoming::new(incoming, local_addr); + Ok(incoming) + } + async fn connect(&mut self, peer_addr: A) -> io::Result + where + A: ToSocketAddrs + Send, + { + let addr = peer_addr.to_socket_addrs()?.next().unwrap(); + let stream = TcpStream::connect(addr).await?; + Ok(stream) + } +} + +pub struct TcpIncoming { + // listener: TcpListener, + local_addr: SocketAddr, + accept: Pin< + Box)> + Send + Sync>, + >, +} + +impl TcpIncoming { + pub fn new(listener: TcpListener) -> io::Result { + let local_addr = listener.local_addr()?; + let accept = Box::pin(accept(listener)); + Ok(Self { local_addr, accept }) + } + + pub fn local_addr(&self) -> io::Result { + Ok(self.local_addr) + } +} + +impl Stream for TcpIncoming { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (listener, res) = ready!(self.accept.as_mut().poll(cx)); + self.accept = Box::pin(accept(listener)); + let res = res.map(|r| r.0); + Poll::Ready(Some(res)) + } +} + +async fn accept(listener: TcpListener) -> (TcpListener, io::Result<(TcpStream, SocketAddr)>) { + let result = listener.accept().await; + (listener, result) +} + +impl fmt::Debug for TcpIncoming { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TcpIncoming") + .field("local_addr", &self.local_addr) + .finish() + } +} diff --git a/src/transport/utp.rs b/src/transport/utp.rs new file mode 100644 index 0000000..ef1035d --- /dev/null +++ b/src/transport/utp.rs @@ -0,0 +1,116 @@ +use async_compat::Compat; +use async_trait::async_trait; +use futures_lite::StreamExt; +use futures_lite::{AsyncRead, AsyncWrite}; +use libutp_rs::{UtpContext, UtpSocket}; +use std::fmt; +use std::io; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use super::{Incoming, Transport}; + +#[derive(Clone)] +pub struct UtpTransport { + context: Option, +} + +impl fmt::Debug for UtpTransport { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UtpTransport").finish() + } +} + +#[async_trait] +impl Transport for UtpTransport { + type Connection = UtpStream; + fn new() -> Self { + Self { context: None } + } + async fn listen(&mut self, local_addr: A) -> io::Result> + where + A: ToSocketAddrs + Send, + { + if self.context.is_some() { + panic!("may not listen more than once"); + } + let addr = local_addr.to_socket_addrs()?.next().unwrap(); + let context = UtpContext::bind(addr)?; + let listener = context.listener(); + self.context = Some(context); + let listener = listener.map(|s| s.map(|s| UtpStream::new(s))); + let incoming = Incoming::new(listener, addr); + Ok(incoming) + } + async fn connect(&mut self, peer_addr: A) -> io::Result + where + A: ToSocketAddrs + Send, + { + if self.context.is_none() { + panic!("socket is not bound! use listen() first"); + } + let addr = peer_addr.to_socket_addrs()?.next().unwrap(); + let stream = self.context.as_ref().unwrap().connect(addr).await?; + let stream = UtpStream::new(stream); + Ok(stream) + } +} + +pub struct UtpStream { + inner: Compat, +} + +impl fmt::Debug for UtpStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UtpStream").finish() + } +} + +impl UtpStream { + pub fn new(socket: UtpSocket) -> Self { + Self { + inner: Compat::new(socket), + } + } + + pub fn peer_addr(&self) -> SocketAddr { + self.inner.get_ref().peer_addr() + } +} + +impl AsyncRead for UtpStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for UtpStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } +} + +impl Clone for UtpStream { + fn clone(&self) -> Self { + Self { + inner: Compat::new(self.inner.get_ref().clone()), + } + } +}