Skip to content

Commit

Permalink
Rewrote the implementation and added a README.
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Mar 10, 2021
1 parent bc5f585 commit 3a0ca76
Show file tree
Hide file tree
Showing 20 changed files with 1,405 additions and 297 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ target/
tmp/
Cargo.lock
.DS_Store
SANDBOX
16 changes: 12 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hyperswarm"
version = "1.0.0"
version = "0.1.0"
license = "MIT OR Apache-2.0"
repository = "https://github.com/Frando/hyperswarm"
documentation = "https://docs.rs/hyperswarm"
Expand All @@ -16,16 +16,24 @@ authors = [
[features]

[dependencies]
hyperswarm-dht = { git = "https://github.com/Frando/hyperswarm-dht.git", branch = "hyperspace" }
async-std = { version = "1.9.0", features = ["unstable"] }
futures-lite = "1.11.3"
futures-channel = "0.3.13"
log = "0.4.14"
futures = "0.3.13"
async-trait = "0.1.42"
async-compat = "0.1.0"
multicast-socket = "0.2.1"
hex = "0.4.3"
pretty-hash = "0.4.1"
hyperswarm-dht = { git = "https://github.com/Frando/hyperswarm-dht.git", branch = "hyperspace" }
colmeia-hyperswarm-mdns = { git = "https://github.com/bltavares/colmeia.git", rev = "e92ab71981356197a21592b7ce6854e209582985" }
libutp-rs = { git = "https://github.com/Frando/libutp-rs.git", branch = "feat/clone" }

[dev-dependencies]
env_logger = "0.8.3"
async-std = { version = "1.9.0", features = ["unstable", "attributes"] }

[patches.crates-io]
hyperswarm-dht = { path = "../hyperswarm-dht" }
# [patch.crates-io]
# hyperswarm-dht = { path = "../hyperswarm-dht" }
# libutp-rs = { path = "../libutp-rs" }
94 changes: 87 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<h1 align="center">hyperswarm</h1>
<h1 align="center">hyperswarm-rs</h1>
<div align="center">
<strong>
Peer to peer networking stack
Expand Down Expand Up @@ -26,26 +26,106 @@
</div>

<div align="center">
<h3>
<h4>
<a href="https://docs.rs/hyperswarm">
API Docs
</a>
<span> | </span>
<a href="https://github.com/Frando/hyperswarm/releases">
Releases
</a>
<span> | </span>
<a href="https://github.com/Frando/hyperswarm/blob/master.github/CONTRIBUTING.md">
Contributing
</a>
</h3>
</h4>
</div>

## Installation
```sh
$ cargo add hyperswarm
```

## Usage

Hyperswarm is a networking stack for connecting peers who are interested in a topic. This project is a port of the [Node.js implementation of Hyperswarm](https://github.com/hyperswarm/hyperswarm).

This crate exposes a `Hyperswarm` struct. After binding it, this will:

- Start and bootstrap a local DHT node
- Bind a socket for mDNS discovery
- Announce and lookup any 32 byte topic key over both mDNS and the DHT
- Connect to all peers that are found over both TCP and UTP

It currently depends on the unreleased [hyperswarm-dht](https://github.com/mattsse/hyperswarm-dht) crate and therefore is also not yet released on crates.io.

The API is designed to be very simple:

```rust
use async_std::task;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use hyperswarm::{Config, Hyperswarm, HyperswarmStream, TopicConfig};
use std::io;

#[async_std::main]
async fn main() -> io::Result<()> {
// Bind and initialize the swarm with the default config.
// On the config you can e.g. set bootstrap addresses.
let config = Config::default();
let mut swarm = Hyperswarm::bind(config).await?;

// A topic is any 32 byte array. Usually, this would be the hash of some identifier.
// Configuring the swarm for a topic starts to lookup and/or announce this topic
// and connect to peers that are found.
let topic = [0u8; 32];
swarm.configure(topic, TopicConfig::announce_and_lookup());

// The swarm is a Stream of new HyperswarmStream peer connections.
// The HyperswarmStream is a wrapper around either a TcpStream or a UtpSocket.
// Usually you'll want to run some loop over the connection, so let's spawn a task
// for each connection.
while let Some(stream) = swarm.next().await {
task::spawn(on_connection(stream?));
}

Ok(())
}

// A HyperswarmStream is AsyncRead + AsyncWrite, so you can use it just
// like a TcpStream. Here, we'll send an initial message and then keep
// reading from the stream until it is closed by the remote.
async fn on_connection(mut stream: HyperswarmStream) -> io::Result<()> {
stream.write_all(b"hello there").await?;
let mut buf = vec![0u8; 64];
loop {
match stream.read(&mut buf).await {
Ok(0) => return Ok(()),
Err(e) => return Err(e),
Ok(n) => eprintln!("received: {}", std::str::from_utf8(&buf[..n]).unwrap()),
}
}
}
```

See [`examples/simple.rs`](examples/simple.rs) for a working example that also runs a bootstrap node. That example can also find and connect to NodeJS peers. To try it out:

```sh
cargo run --example simple
# in another terminal
node js/simple.js
```

Currently, the DHT bootstrap node has to be run from Rust. The Rust implementation does not find peers on a NodeJS bootstrap node.

## Roadmap

- [x] Initial implementation
- [ ] Find peers over the Hyperswarm DHT
- [x] Both NodeJS and Rust peers are found if connecting to a Rust bootstrap node
- [ ] Fix [hyperswarm-dht](https://github.com/mattsse/hyperswarm-dht) to work with NodeJS bootstrap nodes
- [ ] Find peers over mDNS
- [ ] Change colmeia-mdns to better fit the architecture here or copy and adapt the mdns code over into the mdns module
- [x] Connect to peers over TCP
- [ ] Connect to peers over UTP
- [x] Can connect to peers over UTP
- [ ] Fix issues in [libutp-rs](https://github.com/johsunds/libutp-rs) - sometimes the connection is not flushed properly

## Safety
This crate uses ``#![deny(unsafe_code)]`` to ensure everything is implemented in
100% Safe Rust.
Expand Down
73 changes: 30 additions & 43 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,58 @@
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::stream::StreamExt;
use async_std::task;
// use log::*;
// use std::convert::TryFrom;
// use std::net::SocketAddr;
// use std::net::{SocketAddr, ToSocketAddrs};

use hyperswarm::{bootstrap_dht, Hyperswarm, IdBytes, JoinOpts, SwarmEvent};
use hyperswarm_dht::DhtConfig;
use hyperswarm::{run_bootstrap_node, Config, Hyperswarm, HyperswarmStream, TopicConfig};

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let bs_addr = "localhost:6060";
let bs_addr = bootstrap_dht(Some(bs_addr)).await?;
let (bs_addr, bs_task) = run_bootstrap_node(Some(bs_addr)).await?;
// let bs_addr: SocketAddr = bs_addr.to_socket_addrs().unwrap().next().unwrap();

let config1 = DhtConfig::default().set_bootstrap_nodes(&[bs_addr]);
let config2 = DhtConfig::default().set_bootstrap_nodes(&[bs_addr]);
let config = Config::default().set_bootstrap_nodes(vec![bs_addr]);

let mut swarm1 = Hyperswarm::with_config(config1).await?;
let mut swarm2 = Hyperswarm::with_config(config2).await?;
let mut swarm1 = Hyperswarm::bind(config.clone()).await?;
let mut swarm2 = Hyperswarm::bind(config).await?;

let cmd1 = swarm1.commands();
let cmd2 = swarm2.commands();
let handle1 = swarm1.handle();
let handle2 = swarm2.handle();

let task1 = task::spawn(async move {
while let Some(event) = swarm1.next().await {
match event {
SwarmEvent::Connection(stream) => on_connection(stream, "rust1".into()),
_ => {}
}
while let Some(stream) = swarm1.next().await {
let stream = stream.unwrap();
on_connection(stream, "rust1".into());
}
});

let task2 = task::spawn(async move {
while let Some(event) = swarm2.next().await {
match event {
SwarmEvent::Connection(stream) => on_connection(stream, "rust2".into()),
_ => {}
}
while let Some(stream) = swarm2.next().await {
let stream = stream.unwrap();
on_connection(stream, "rust2".into());
}
});

let topic = IdBytes::from([0u8; 32]);
let opts = JoinOpts {
announce: true,
lookup: true,
};

cmd1.join(topic.clone(), opts.clone());
cmd2.join(topic.clone(), opts.clone());
let topic = [0u8; 32];
handle1.configure(topic, TopicConfig::both());
handle2.configure(topic, TopicConfig::both());

task1.await;
task2.await;
bs_task.await?;

Ok(())
}

fn on_connection(mut stream: TcpStream, local_name: String) {
fn on_connection(mut stream: HyperswarmStream, local_name: String) {
let label = format!(
"[{} -> {}://{}]",
local_name,
stream.protocol(),
stream.peer_addr()
);
eprintln!("{} connect", label);
task::spawn(async move {
stream
.write_all(format!("hi from {}", local_name).as_bytes())
Expand All @@ -68,25 +63,17 @@ fn on_connection(mut stream: TcpStream, local_name: String) {
match stream.read(&mut buf).await {
Ok(n) if n > 0 => {
let text = String::from_utf8(buf[..n].to_vec()).unwrap();
eprintln!("[{}] read: {}", local_name, text);
eprintln!("{} read: {}", label, text);
}
Ok(_) => {
eprintln!("[{}]: connection closed", local_name);
eprintln!("{} close", label);
break;
}
Err(e) => {
eprintln!("[{}]: error: {}", local_name, e);
eprintln!("{} error: {}", label, e);
break;
}
}
}
});
}

// async fn timeout(ms: u64) {
// let _ = async_std::future::timeout(
// std::time::Duration::from_millis(ms),
// futures::future::pending::<()>(),
// )
// .await;
// }
3 changes: 3 additions & 0 deletions js/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
yarn.lock
package-lock.json
9 changes: 9 additions & 0 deletions js/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "js",
"version": "1.0.0",
"main": "index.js",
"license": "MIT",
"dependencies": {
"hyperswarm": "^2.15.3"
}
}
65 changes: 65 additions & 0 deletions js/simple.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
const hyperswarm = require('hyperswarm')

const opts = {
runBootstrap: false
}
main(opts).catch(console.error)

async function main (opts = {}) {
let bootstrap
if (opts.runBootstrap) {
bootstrap = await bootstrapDHT(6060)
} else {
bootstrap = 'localhost:6060'
}
console.log({ bootstrap })

const topic = Buffer.alloc(32, 0)

const swarm1 = runNode(bootstrap, 'node1')
const swarm2 = runNode(bootstrap, 'node2')

const config = { announce: true, lookup: true }
swarm1.join(topic, config)
swarm2.join(topic, config)
}

function runNode (bootstrap, name) {
const swarm = hyperswarm({
announceLocalAddress: true,
bootstrap: [bootstrap]
})

swarm.on('connection', (socket, info) => {
const peer = info.peer
let peerAddr = peer ? `${peer.host}:${peer.port}` : 'unknown'
const label = `[${name} -> ${info.type}://${peerAddr}]`
console.log(`${label} connect`)
socket.write(Buffer.from(`hi from ${name}!`))
socket.on('data', buf => {
console.log(`${label} read: ${buf.toString()}`)
})
socket.on('error', err => {
console.log(`${label} error: ${err.toString()}`)
})
socket.on('close', () => {
console.log(`${label} close`)
})
})

return swarm
}

async function bootstrapDHT (port) {
const bootstrapper = require('@hyperswarm/dht')({
bootstrap: false
})
bootstrapper.listen(port)
await new Promise(resolve => {
return bootstrapper.once('listening', resolve)
})
const bootstrapPort = bootstrapper.address().port
const bootstrapAddr = `localhost:${bootstrapPort}`
console.log(`bootstrap node running on ${bootstrapAddr}`)
return bootstrapAddr
}
18 changes: 9 additions & 9 deletions src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use async_std::net::ToSocketAddrs;
use async_std::stream::StreamExt;
use async_std::task::JoinHandle;
use log::*;
use std::io;
use std::net::SocketAddr;

use hyperswarm_dht::{DhtConfig, HyperDht};

pub async fn bootstrap_dht<A: ToSocketAddrs>(local_addr: Option<A>) -> std::io::Result<SocketAddr> {
pub async fn run_bootstrap_node<A: ToSocketAddrs>(
local_addr: Option<A>,
) -> io::Result<(SocketAddr, JoinHandle<io::Result<()>>)> {
let config = DhtConfig::default()
.empty_bootstrap_nodes()
.set_ephemeral(false);
Expand All @@ -17,15 +21,11 @@ pub async fn bootstrap_dht<A: ToSocketAddrs>(local_addr: Option<A>) -> std::io::
let mut bs = HyperDht::with_config(config).await?;
let addr = bs.local_addr()?;
debug!("Running DHT on address: {}", addr);
async_std::task::spawn(async move {
let task = async_std::task::spawn(async move {
loop {
bs.next().await;
let event = bs.next().await;
trace!("[bootstrap node] event {:?}", event);
}
// loop {
// process each incoming message
// let _event = bs.next().await;
// debug!("bootstrap event: {:?}", event);
// }
});
Ok(addr)
Ok((addr, task))
}
Loading

0 comments on commit 3a0ca76

Please sign in to comment.