diff --git a/Cargo.toml b/Cargo.toml index 7b93286b28b..d3c0b455f76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,16 +36,15 @@ tokio-io = "0.1" [target.'cfg(not(target_os = "emscripten"))'.dependencies] libp2p-dns = { path = "./transports/dns" } +libp2p-mdns = { path = "./misc/mdns" } libp2p-tcp-transport = { path = "./transports/tcp" } [target.'cfg(target_os = "emscripten")'.dependencies] stdweb = { version = "0.1.3", default-features = false } [dev-dependencies] -bigint = "4.2" env_logger = "0.5.4" -rand = "0.4" -structopt = "0.2" +rand = "0.5" tokio = "0.1" tokio-stdin-stdout = "0.1" @@ -53,6 +52,7 @@ tokio-stdin-stdout = "0.1" members = [ "core", "misc/core-derive", + "misc/mdns", "misc/multiaddr", "misc/multihash", "misc/multistream-select", diff --git a/core/src/peer_id.rs b/core/src/peer_id.rs index c54df19c14c..2994cf62e7c 100644 --- a/core/src/peer_id.rs +++ b/core/src/peer_id.rs @@ -133,6 +133,20 @@ impl From for PeerId { } } +impl PartialEq for PeerId { + #[inline] + fn eq(&self, other: &multihash::Multihash) -> bool { + &self.multihash == other + } +} + +impl PartialEq for multihash::Multihash { + #[inline] + fn eq(&self, other: &PeerId) -> bool { + self == &other.multihash + } +} + impl Into for PeerId { #[inline] fn into(self) -> multihash::Multihash { diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs new file mode 100644 index 00000000000..63a64502ced --- /dev/null +++ b/examples/mdns-passive-discovery.rs @@ -0,0 +1,76 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate futures; +extern crate libp2p; +extern crate rand; +extern crate tokio; + +use futures::prelude::*; +use libp2p::mdns::{MdnsPacket, MdnsService}; +use std::io; + +fn main() { + // This example provides passive discovery of the libp2p nodes on the network that send + // mDNS queries and answers. + + // We start by creating the service. + let mut service = MdnsService::new().expect("Error while creating mDNS service"); + + // Create a never-ending `Future` that polls the service for events. + let future = futures::future::poll_fn(move || -> Poll<(), io::Error> { + loop { + // Grab the next available packet from the service. + let packet = match service.poll() { + Async::Ready(packet) => packet, + Async::NotReady => return Ok(Async::NotReady), + }; + + match packet { + MdnsPacket::Query(query) => { + // We detected a libp2p mDNS query on the network. In a real application, you + // probably want to answer this query by doing `query.respond(...)`. + println!("Detected query from {:?}", query.remote_addr()); + } + MdnsPacket::Response(response) => { + // We detected a libp2p mDNS response on the network. Responses are for + // everyone and not just for the requester, which makes it possible to + // passively listen. + for peer in response.discovered_peers() { + println!("Discovered peer {:?}", peer.id()); + // These are the self-reported addresses of the peer we just discovered. + for addr in peer.addresses() { + println!(" Address = {:?}", addr); + } + } + } + MdnsPacket::ServiceDiscovery(query) => { + // The last possibility is a service detection query from DNS-SD. + // Just like `Query`, in a real application you probably want to call + // `query.respond`. + println!("Detected service query from {:?}", query.remote_addr()); + } + } + } + }); + + // Blocks the thread until the future runs to completion (which will never happen). + tokio::run(future.map_err(|err| panic!("{:?}", err))); +} diff --git a/misc/mdns/Cargo.toml b/misc/mdns/Cargo.toml new file mode 100644 index 00000000000..48307189a9e --- /dev/null +++ b/misc/mdns/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "libp2p-mdns" +version = "0.1.0" +description = "Implementation of the libp2p mDNS discovery method" +authors = ["Parity Technologies "] +license = "MIT" + +[dependencies] +data-encoding = "2.0" +dns-parser = "0.8" +futures = "0.1" +libp2p-core = { path = "../../core" } +multiaddr = { path = "../multiaddr" } +net2 = "0.2" +rand = "0.5" +tokio-reactor = "0.1" +tokio-timer = "0.2" +tokio-udp = "0.1" + +[dev-dependencies] +tokio = "0.1" diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs new file mode 100644 index 00000000000..6260285601a --- /dev/null +++ b/misc/mdns/src/dns.rs @@ -0,0 +1,371 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Contains methods that handle the DNS encoding and decoding capabilities not available in the +//! `dns_parser` library. + +use data_encoding; +use libp2p_core::{Multiaddr, PeerId}; +use rand; +use std::{borrow::Cow, cmp, error, fmt, str, time::Duration}; +use {META_QUERY_SERVICE, SERVICE_NAME}; + +/// Decodes a `` (as defined by RFC1035) into a `Vec` of ASCII characters. +// TODO: better error type? +pub fn decode_character_string(mut from: &[u8]) -> Result, ()> { + if from.is_empty() { + return Ok(Cow::Owned(Vec::new())); + } + + // Remove the initial and trailing " if any. + if from[0] == b'"' { + if from.len() == 1 || from.last() != Some(&b'"') { + return Err(()); + } + let len = from.len(); + from = &from[1..len - 1]; + } + + // TODO: remove the backslashes if any + Ok(Cow::Borrowed(from)) +} + +/// Builds the binary representation of a DNS query to send on the network. +pub fn build_query() -> Vec { + let mut out = Vec::with_capacity(33); + + // Program-generated transaction ID ; unused by our implementation. + append_u16(&mut out, rand::random()); + + // Flags ; 0x0 for a regular query. + append_u16(&mut out, 0x0); + + // Number of questions. + append_u16(&mut out, 0x1); + + // Number of answers, authorities, and additionals. + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x0); + + // Our single question. + // The name. + append_qname(&mut out, SERVICE_NAME); + + // Flags. + append_u16(&mut out, 0x0c); + append_u16(&mut out, 0x01); + + // Since the output is constant, we reserve the right amount ahead of time. + // If this assert fails, adjust the capacity of `out` in the source code. + debug_assert_eq!(out.capacity(), out.len()); + out +} + +/// Builds the response to the DNS query. +/// +/// If there are more than 2^16-1 addresses, ignores the rest. +pub fn build_query_response( + id: u16, + peer_id: PeerId, + addresses: impl ExactSizeIterator, + ttl: Duration, +) -> Result, MdnsResponseError> { + // Convert the TTL into seconds. + let ttl = duration_to_secs(ttl); + + // Add a limit to 2^16-1 addresses, as the protocol limits to this number. + let addresses = addresses.take(65535); + + // This capacity was determined empirically and is a reasonable upper limit. + let mut out = Vec::with_capacity(320); + + append_u16(&mut out, id); + // Flags ; 0x84 for an answer. + append_u16(&mut out, 0x8400); + // Number of questions, answers, authorities, additionals. + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x1); + append_u16(&mut out, 0x0); + append_u16(&mut out, addresses.len() as u16); + + // Our single answer. + // The name. + append_qname(&mut out, SERVICE_NAME); + + // Flags. + append_u16(&mut out, 0x000c); + append_u16(&mut out, 0x0001); + + // TTL for the answer + append_u32(&mut out, ttl); + + let peer_id_base58 = peer_id.to_base58(); + + // Peer Id. + let peer_name = format!( + "{}.{}", + data_encoding::BASE32_DNSCURVE.encode(&peer_id.into_bytes()), + str::from_utf8(SERVICE_NAME).expect("SERVICE_NAME is always ASCII") + ); + let mut peer_id_bytes = Vec::with_capacity(64); + append_qname(&mut peer_id_bytes, peer_name.as_bytes()); + debug_assert!(peer_id_bytes.len() <= 0xffff); + append_u16(&mut out, peer_id_bytes.len() as u16); + out.extend_from_slice(&peer_id_bytes); + + // The TXT records for answers. + for addr in addresses { + let txt_to_send = format!("dnsaddr={}/p2p/{}", addr.to_string(), peer_id_base58); + let mut txt_to_send_bytes = Vec::with_capacity(txt_to_send.len()); + append_character_string(&mut txt_to_send_bytes, txt_to_send.as_bytes())?; + append_txt_record(&mut out, &peer_id_bytes, ttl, Some(&txt_to_send_bytes[..]))?; + } + + // The DNS specs specify that the maximum allowed size is 9000 bytes. + if out.len() > 9000 { + return Err(MdnsResponseError::ResponseTooLong); + } + + Ok(out) +} + +/// Builds the response to the DNS query. +pub fn build_service_discovery_response(id: u16, ttl: Duration) -> Vec { + // Convert the TTL into seconds. + let ttl = duration_to_secs(ttl); + + // This capacity was determined empirically. + let mut out = Vec::with_capacity(69); + + append_u16(&mut out, id); + // Flags ; 0x84 for an answer. + append_u16(&mut out, 0x8400); + // Number of questions, answers, authorities, additionals. + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x1); + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x0); + + // Our single answer. + // The name. + append_qname(&mut out, META_QUERY_SERVICE); + + // Flags. + append_u16(&mut out, 0x000c); + append_u16(&mut out, 0x8001); + + // TTL for the answer + append_u32(&mut out, ttl); + + // Service name. + { + let mut name = Vec::new(); + append_qname(&mut name, SERVICE_NAME); + append_u16(&mut out, name.len() as u16); + out.extend_from_slice(&name); + } + + // Since the output size is constant, we reserve the right amount ahead of time. + // If this assert fails, adjust the capacity of `out` in the source code. + debug_assert_eq!(out.capacity(), out.len()); + out +} + +/// Returns the number of secs of a duration. +fn duration_to_secs(duration: Duration) -> u32 { + let secs = duration + .as_secs() + .saturating_add(if duration.subsec_nanos() > 0 { 1 } else { 0 }); + cmp::min(secs, From::from(u32::max_value())) as u32 +} + +/// Appends a big-endian u32 to `out`. +fn append_u32(out: &mut Vec, value: u32) { + out.push(((value >> 24) & 0xff) as u8); + out.push(((value >> 16) & 0xff) as u8); + out.push(((value >> 8) & 0xff) as u8); + out.push((value & 0xff) as u8); +} + +/// Appends a big-endian u16 to `out`. +fn append_u16(out: &mut Vec, value: u16) { + out.push(((value >> 8) & 0xff) as u8); + out.push((value & 0xff) as u8); +} + +/// Appends a `QNAME` (as defined by RFC1035) to the `Vec`. +/// +/// # Panic +/// +/// Panics if `name` has a zero-length component or a component that is too long. +/// This is fine considering that this function is not public and is only called in a controlled +/// environment. +/// +fn append_qname(out: &mut Vec, name: &[u8]) { + debug_assert!(name.is_ascii()); + + for element in name.split(|&c| c == b'.') { + assert!(element.len() < 256, "Service name has a label too long"); + assert_ne!(element.len(), 0, "Service name contains zero length label"); + out.push(element.len() as u8); + for chr in element.iter() { + out.push(*chr); + } + } + + out.push(0); +} + +/// Appends a `` (as defined by RFC1035) to the `Vec`. +fn append_character_string(out: &mut Vec, ascii_str: &[u8]) -> Result<(), MdnsResponseError> { + if !ascii_str.is_ascii() { + return Err(MdnsResponseError::NonAsciiMultiaddr); + } + + if !ascii_str.iter().any(|&c| c == b' ') { + for &chr in ascii_str.iter() { + out.push(chr); + } + return Ok(()); + } + + out.push(b'"'); + + for &chr in ascii_str.iter() { + if chr == b'\\' { + out.push(b'\\'); + out.push(b'\\'); + } else if chr == b'"' { + out.push(b'\\'); + out.push(b'"'); + } else { + out.push(chr); + } + } + + out.push(b'"'); + Ok(()) +} + +/// Appends a TXT record to the answer in `out`. +fn append_txt_record<'a>( + out: &mut Vec, + name: &[u8], + ttl_secs: u32, + entries: impl IntoIterator, +) -> Result<(), MdnsResponseError> { + // The name. + out.extend_from_slice(name); + + // Flags. + out.push(0x00); + out.push(0x10); // TXT record. + out.push(0x80); + out.push(0x01); + + // TTL for the answer + append_u32(out, ttl_secs); + + // Add the strings. + let mut buffer = Vec::new(); + for entry in entries { + if entry.len() > u8::max_value() as usize { + return Err(MdnsResponseError::TxtRecordTooLong); + } + buffer.push(entry.len() as u8); + buffer.extend_from_slice(entry); + } + + // It is illegal to have an empty TXT record, but we can have one zero-bytes entry, which does + // the same. + if buffer.is_empty() { + buffer.push(0); + } + + if buffer.len() > u16::max_value() as usize { + return Err(MdnsResponseError::TxtRecordTooLong); + } + append_u16(out, buffer.len() as u16); + out.extend_from_slice(&buffer); + Ok(()) +} + +/// Error that can happen when producing a DNS response. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MdnsResponseError { + TxtRecordTooLong, + NonAsciiMultiaddr, + ResponseTooLong, +} + +impl fmt::Display for MdnsResponseError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + MdnsResponseError::TxtRecordTooLong => { + write!(f, "TXT record invalid because it is too long") + } + MdnsResponseError::NonAsciiMultiaddr => write!( + f, + "A multiaddr contains non-ASCII characters when serializd" + ), + MdnsResponseError::ResponseTooLong => write!(f, "DNS response is too long"), + } + } +} + +impl error::Error for MdnsResponseError {} + +#[cfg(test)] +mod tests { + use super::*; + use dns_parser::Packet; + use libp2p_core::{PeerId, PublicKey}; + use std::time::Duration; + + #[test] + fn build_query_correct() { + let query = build_query(); + assert!(Packet::parse(&query).is_ok()); + } + + #[test] + fn build_query_response_correct() { + let my_peer_id = PeerId::from_public_key(PublicKey::Rsa(vec![1, 2, 3, 4])); + let addr1 = "/ip4/1.2.3.4/tcp/5000".parse().unwrap(); + let addr2 = "/ip6/::1/udp/10000".parse().unwrap(); + let query = build_query_response( + 0xf8f8, + my_peer_id, + vec![addr1, addr2].into_iter(), + Duration::from_secs(60), + ) + .unwrap(); + assert!(Packet::parse(&query).is_ok()); + } + + #[test] + fn build_service_discovery_response_correct() { + let query = build_service_discovery_response(0x1234, Duration::from_secs(120)); + assert!(Packet::parse(&query).is_ok()); + } + + // TODO: test limits and errors +} diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs new file mode 100644 index 00000000000..8e2836feb22 --- /dev/null +++ b/misc/mdns/src/lib.rs @@ -0,0 +1,592 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! mDNS is a protocol defined by [RFC 6762](https://tools.ietf.org/html/rfc6762) that allows +//! querying nodes that correspond to a certain domain name. +//! +//! In the context of libp2p, the mDNS protocol is used to discover other nodes on the local +//! network that support libp2p. +//! +//! # Usage +//! +//! In order to use mDNS to discover peers on the local network, use the `MdnsService`. This is +//! done by creating a `MdnsService` then polling it in the same way as you would poll a stream. +//! +//! Polling the `MdnsService` can produce either an `MdnsQuery`, corresponding to an mDNS query +//! received by another node on the local network, or an `MdnsResponse` corresponding to a response +//! to a query previously emitted locally. The `MdnsService` will automatically produce queries, +//! which means that you will receive responses automatically. +//! +//! When you receive an `MdnsQuery`, use the `respond` method to send back an answer to the node +//! that emitted the query. +//! +//! When you receive an `MdnsResponse`, use the provided methods to query the information received +//! in the response. +//! +//! # Example +//! +//! ```rust +//! # extern crate futures; +//! # extern crate libp2p_core; +//! # extern crate libp2p_mdns; +//! # use futures::prelude::*; +//! # use libp2p_mdns::{MdnsService, MdnsPacket}; +//! # use std::{io, time::Duration}; +//! # fn main() { +//! # let my_peer_id = libp2p_core::PublicKey::Rsa(vec![1, 2, 3, 4]).into_peer_id(); +//! # let my_listened_addrs = Vec::new(); +//! let mut service = MdnsService::new().expect("Error while creating mDNS service"); +//! let _future_to_poll = futures::stream::poll_fn(move || -> Poll, io::Error> { +//! loop { +//! let packet = match service.poll() { +//! Async::Ready(packet) => packet, +//! Async::NotReady => return Ok(Async::NotReady), +//! }; +//! +//! match packet { +//! MdnsPacket::Query(query) => { +//! println!("Query from {:?}", query.remote_addr()); +//! query.respond( +//! my_peer_id.clone(), +//! my_listened_addrs.clone(), +//! Duration::from_secs(120), +//! ); +//! } +//! MdnsPacket::Response(response) => { +//! for peer in response.discovered_peers() { +//! println!("Discovered peer {:?}", peer.id()); +//! for addr in peer.addresses() { +//! println!("Address = {:?}", addr); +//! } +//! } +//! } +//! MdnsPacket::ServiceDiscovery(query) => { +//! query.respond(std::time::Duration::from_secs(120)); +//! } +//! } +//! } +//! }).for_each(|_| Ok(())); +//! # } + +extern crate data_encoding; +extern crate dns_parser; +extern crate futures; +extern crate libp2p_core; +extern crate multiaddr; +extern crate net2; +extern crate rand; +extern crate tokio_reactor; +extern crate tokio_timer; +extern crate tokio_udp; + +#[cfg(test)] +extern crate tokio; + +use dns_parser::{Packet, RData}; +use futures::{prelude::*, task}; +use libp2p_core::{Multiaddr, PeerId}; +use multiaddr::Protocol; +use std::{fmt, io, net::Ipv4Addr, net::SocketAddr, str, time::Duration, time::Instant}; +use tokio_reactor::Handle; +use tokio_timer::Interval; +use tokio_udp::UdpSocket; + +pub use dns::MdnsResponseError; + +mod dns; + +/// Hardcoded name of the mDNS service. Part of the mDNS libp2p specifications. +const SERVICE_NAME: &'static [u8] = b"_p2p._udp.local"; +/// Hardcoded name of the service used for DNS-SD. +const META_QUERY_SERVICE: &'static [u8] = b"_services._dns-sd._udp.local"; + +/// A running service that discovers libp2p peers and responds to other libp2p peers' queries on +/// the local network. +/// +/// See the crate root documentation for more info. +pub struct MdnsService { + /// Main socket for listening. + socket: UdpSocket, + /// Socket for sending queries on the network. + query_socket: UdpSocket, + /// Interval for sending queries. + query_interval: Interval, + /// Whether we send queries on the network at all. + /// Note that we still need to have an interval for querying, as we need to wake up the socket + /// regularly to recover from errors. Otherwise we could simply use an `Option`. + silent: bool, + /// Buffer used for receiving data from the main socket. + recv_buffer: [u8; 2048], + /// Buffers pending to send on the main socket. + send_buffers: Vec>, + /// Buffers pending to send on the query socket. + query_send_buffers: Vec>, +} + +impl MdnsService { + /// Starts a new mDNS service. + #[inline] + pub fn new() -> io::Result { + Self::new_inner(false) + } + + /// Same as `new`, but we don't send automatically send queries on the network. + #[inline] + pub fn silent() -> io::Result { + Self::new_inner(true) + } + + /// Starts a new mDNS service. + fn new_inner(silent: bool) -> io::Result { + let socket = { + #[cfg(unix)] + fn platform_specific(s: &net2::UdpBuilder) -> io::Result<()> { + net2::unix::UnixUdpBuilderExt::reuse_port(s, true)?; + Ok(()) + } + #[cfg(not(unix))] + fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> { Ok(()) } + let builder = net2::UdpBuilder::new_v4()?; + builder.reuse_address(true)?; + platform_specific(&builder)?; + builder.bind(("0.0.0.0", 5353))? + }; + + let socket = UdpSocket::from_std(socket, &Handle::default())?; + socket.set_multicast_loop_v4(true)?; + socket.set_multicast_ttl_v4(255)?; + // TODO: correct interfaces? + socket.join_multicast_v4(&From::from([224, 0, 0, 251]), &Ipv4Addr::UNSPECIFIED)?; + + Ok(MdnsService { + socket, + query_socket: UdpSocket::bind(&From::from(([0, 0, 0, 0], 0)))?, + query_interval: Interval::new(Instant::now(), Duration::from_secs(20)), + silent, + recv_buffer: [0; 2048], + send_buffers: Vec::new(), + query_send_buffers: Vec::new(), + }) + } + + /// Polls the service for packets. + pub fn poll(&mut self) -> Async { + // Send a query every time `query_interval` fires. + // Note that we don't use a loop here ; it is pretty unlikely that we need it, and there is + // no point in sending multiple requests in a row. + match self.query_interval.poll() { + Ok(Async::Ready(_)) => { + if !self.silent { + let query = dns::build_query(); + self.query_send_buffers.push(query.to_vec()); + } + } + Ok(Async::NotReady) => (), + _ => unreachable!("A tokio_timer::Interval never errors"), // TODO: is that true? + }; + + // Flush the send buffer of the main socket. + while !self.send_buffers.is_empty() { + let to_send = self.send_buffers.remove(0); + match self + .socket + .poll_send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))) + { + Ok(Async::Ready(bytes_written)) => { + debug_assert_eq!(bytes_written, to_send.len()); + } + Ok(Async::NotReady) => { + self.send_buffers.insert(0, to_send); + break; + } + Err(_) => { + // Errors are non-fatal because they can happen for example if we lose + // connection to the network. + self.send_buffers.clear(); + break; + } + } + } + + // Flush the query send buffer. + // This has to be after the push to `query_send_buffers`. + while !self.query_send_buffers.is_empty() { + let to_send = self.query_send_buffers.remove(0); + match self + .query_socket + .poll_send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))) + { + Ok(Async::Ready(bytes_written)) => { + debug_assert_eq!(bytes_written, to_send.len()); + } + Ok(Async::NotReady) => { + self.query_send_buffers.insert(0, to_send); + break; + } + Err(_) => { + // Errors are non-fatal because they can happen for example if we lose + // connection to the network. + self.query_send_buffers.clear(); + break; + } + } + } + + // Check for any incoming packet. + match self.socket.poll_recv_from(&mut self.recv_buffer) { + Ok(Async::Ready((len, from))) => { + match Packet::parse(&self.recv_buffer[..len]) { + Ok(packet) => { + if packet.header.query { + if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) + { + return Async::Ready(MdnsPacket::Query(MdnsQuery { + from, + query_id: packet.header.id, + send_buffers: &mut self.send_buffers, + })); + } else if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) + { + // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? + return Async::Ready(MdnsPacket::ServiceDiscovery( + MdnsServiceDiscovery { + from, + query_id: packet.header.id, + send_buffers: &mut self.send_buffers, + }, + )); + } else { + // Note that ideally we would use a loop instead. However as of the + // writing of this code non-lexical lifetimes haven't been merged + // yet, and I can't manage to write this code without having borrow + // issues. + task::current().notify(); + return Async::NotReady; + } + } else { + return Async::Ready(MdnsPacket::Response(MdnsResponse { + packet, + from, + })); + } + } + Err(_) => { + // Ignore errors while parsing the packet. We need to poll again for the + // next packet. + // Note that ideally we would use a loop instead. However as of the writing + // of this code non-lexical lifetimes haven't been merged yet, and I can't + // manage to write this code without having borrow issues. + task::current().notify(); + return Async::NotReady; + } + } + } + Ok(Async::NotReady) => (), + Err(_) => { + // Error are non-fatal and can happen if we get disconnected from example. + // The query interval will wake up the task at some point so that we can try again. + } + }; + + Async::NotReady + } +} + +/// A valid mDNS packet received by the service. +#[derive(Debug)] +pub enum MdnsPacket<'a> { + /// A query made by a remote. + Query(MdnsQuery<'a>), + /// A response sent by a remote in response to one of our queries. + Response(MdnsResponse<'a>), + /// A request for service discovery. + ServiceDiscovery(MdnsServiceDiscovery<'a>), +} + +/// A received mDNS query. +pub struct MdnsQuery<'a> { + /// Sender of the address. + from: SocketAddr, + /// Id of the received DNS query. We need to pass this ID back in the results. + query_id: u16, + /// Queue of pending buffers. + send_buffers: &'a mut Vec>, +} + +impl<'a> MdnsQuery<'a> { + /// Respond to the query. + /// + /// Pass the ID of the local peer, and the list o addresses we're listening on. + /// + /// If there are more than 2^16-1 addresses, ignores the others. + /// + /// > **Note**: Keep in mind that we will also receive this response in an `MdnsResponse`. + #[inline] + pub fn respond( + self, + peer_id: PeerId, + addresses: TAddresses, + ttl: Duration, + ) -> Result<(), MdnsResponseError> + where + TAddresses: IntoIterator, + TAddresses::IntoIter: ExactSizeIterator, + { + let response = + dns::build_query_response(self.query_id, peer_id, addresses.into_iter(), ttl)?; + self.send_buffers.push(response); + Ok(()) + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsQuery<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsQuery") + .field("from", self.remote_addr()) + .field("query_id", &self.query_id) + .finish() + } +} + +/// A received mDNS service discovery query. +pub struct MdnsServiceDiscovery<'a> { + /// Sender of the address. + from: SocketAddr, + /// Id of the received DNS query. We need to pass this ID back in the results. + query_id: u16, + /// Queue of pending buffers. + send_buffers: &'a mut Vec>, +} + +impl<'a> MdnsServiceDiscovery<'a> { + /// Respond to the query. + #[inline] + pub fn respond(self, ttl: Duration) { + let response = dns::build_service_discovery_response(self.query_id, ttl); + self.send_buffers.push(response); + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsServiceDiscovery<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsServiceDiscovery") + .field("from", self.remote_addr()) + .field("query_id", &self.query_id) + .finish() + } +} + +/// A received mDNS response. +pub struct MdnsResponse<'a> { + packet: Packet<'a>, + from: SocketAddr, +} + +impl<'a> MdnsResponse<'a> { + /// Returns the list of peers that have been reported in this packet. + /// + /// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. + pub fn discovered_peers<'b>(&'b self) -> impl Iterator> { + let packet = &self.packet; + self.packet.answers.iter().filter_map(move |record| { + if record.name.to_string().as_bytes() != SERVICE_NAME { + return None; + } + + let record_value = match record.data { + RData::PTR(record) => record.0.to_string(), + _ => return None, + }; + + let peer_name = { + let mut iter = record_value.splitn(2, |c| c == '.'); + let name = match iter.next() { + Some(n) => n.to_owned(), + None => return None, + }; + if iter.next().map(|v| v.as_bytes()) != Some(SERVICE_NAME) { + return None; + } + name + }; + + let peer_id = match data_encoding::BASE32_DNSCURVE.decode(peer_name.as_bytes()) { + Ok(bytes) => match PeerId::from_bytes(bytes) { + Ok(id) => id, + Err(_) => return None, + }, + Err(_) => return None, + }; + + Some(MdnsPeer { + packet, + record_value, + peer_id, + }) + }) + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsResponse<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsResponse") + .field("from", self.remote_addr()) + .finish() + } +} + +/// A peer discovered by the service. +pub struct MdnsPeer<'a> { + /// The original packet ; will be used to determine the addresses. + packet: &'a Packet<'a>, + /// Cached value of `concat(base32(peer_id), service name)`. + record_value: String, + /// Id of the peer. + peer_id: PeerId, +} + +impl<'a> MdnsPeer<'a> { + /// Returns the id of the peer. + #[inline] + pub fn id(&self) -> &PeerId { + &self.peer_id + } + + /// Returns the list of addresses the peer says it is listening on. + /// + /// Filters out invalid addresses. + pub fn addresses<'b>(&'b self) -> impl Iterator + 'b { + let my_peer_id = &self.peer_id; + let record_value = &self.record_value; + self.packet + .additional + .iter() + .filter_map(move |add_record| { + if &add_record.name.to_string() != record_value { + return None; + } + + if let RData::TXT(ref txt) = add_record.data { + Some(txt) + } else { + None + } + }) + .flat_map(|txt| txt.iter()) + .filter_map(move |txt| { + // TODO: wrong, txt can be multiple character strings + let addr = match dns::decode_character_string(txt) { + Ok(a) => a, + Err(_) => return None, + }; + if !addr.starts_with(b"dnsaddr=") { + return None; + } + let addr = match str::from_utf8(&addr[8..]) { + Ok(a) => a, + Err(_) => return None, + }; + let mut addr = match addr.parse::() { + Ok(a) => a, + Err(_) => return None, + }; + match addr.pop() { + Some(Protocol::P2p(ref peer_id)) if peer_id == my_peer_id => (), + _ => return None, + }; + Some(addr) + }) + } +} + +impl<'a> fmt::Debug for MdnsPeer<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsPeer") + .field("peer_id", &self.peer_id) + .finish() + } +} + +#[cfg(test)] +mod tests { + use libp2p_core::PublicKey; + use std::{io, time::Duration}; + use tokio::{self, prelude::*}; + use {MdnsPacket, MdnsService}; + + #[test] + fn discover_ourselves() { + let mut service = MdnsService::new().unwrap(); + let peer_id = + PublicKey::Rsa((0..32).map(|_| rand::random::()).collect()).into_peer_id(); + let stream = stream::poll_fn(move || -> Poll, io::Error> { + loop { + let packet = match service.poll() { + Async::Ready(packet) => packet, + Async::NotReady => return Ok(Async::NotReady), + }; + + match packet { + MdnsPacket::Query(query) => { + query.respond(peer_id.clone(), None, Duration::from_secs(120)); + } + MdnsPacket::Response(response) => { + for peer in response.discovered_peers() { + if peer.id() == &peer_id { + return Ok(Async::Ready(None)); + } + } + } + MdnsPacket::ServiceDiscovery(_) => {} + } + } + }); + + tokio::run( + stream + .map_err(|err| panic!("{:?}", err)) + .for_each(|_| Ok(())), + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index d38fc20b498..f96774f2264 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,6 +147,8 @@ pub extern crate libp2p_identify as identify; pub extern crate libp2p_kad as kad; pub extern crate libp2p_floodsub as floodsub; pub extern crate libp2p_mplex as mplex; +#[cfg(not(target_os = "emscripten"))] +pub extern crate libp2p_mdns as mdns; pub extern crate libp2p_peerstore as peerstore; pub extern crate libp2p_ping as ping; pub extern crate libp2p_plaintext as plaintext;