From 4e9fef72c5e9409b8378ad5d90db78e37fe302e3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 22 Oct 2018 10:23:43 +0200 Subject: [PATCH 01/15] Add libp2p-mdns --- Cargo.toml | 9 +- core/src/peer_id.rs | 14 + examples/mdns-passive-discovery.rs | 76 ++++ misc/mdns/Cargo.toml | 21 ++ misc/mdns/src/dns.rs | 369 +++++++++++++++++++ misc/mdns/src/lib.rs | 561 +++++++++++++++++++++++++++++ src/lib.rs | 1 + 7 files changed, 1045 insertions(+), 6 deletions(-) create mode 100644 examples/mdns-passive-discovery.rs create mode 100644 misc/mdns/Cargo.toml create mode 100644 misc/mdns/src/dns.rs create mode 100644 misc/mdns/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 85dd7987817..94b75f350c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ libp2p-mplex = { path = "./muxers/mplex" } libp2p-identify = { path = "./protocols/identify" } libp2p-kad = { path = "./protocols/kad" } libp2p-floodsub = { path = "./protocols/floodsub" } +libp2p-mdns = { path = "./misc/mdns" } libp2p-peerstore = { path = "./stores/peerstore" } libp2p-ping = { path = "./protocols/ping" } libp2p-ratelimit = { path = "./transports/ratelimit" } @@ -40,17 +41,13 @@ tokio-current-thread = "0.1" stdweb = { version = "0.1.3", default-features = false } [dev-dependencies] -bigint = "4.2" -env_logger = "0.5.4" -rand = "0.4" -structopt = "0.2" +rand = "0.5" tokio = "0.1" -tokio-io = "0.1" -tokio-stdin = "0.1" [workspace] members = [ "core", + "misc/mdns", "misc/multiaddr", "misc/multihash", "misc/multistream-select", diff --git a/core/src/peer_id.rs b/core/src/peer_id.rs index 395f4c53a08..5b52d68e75e 100644 --- a/core/src/peer_id.rs +++ b/core/src/peer_id.rs @@ -123,6 +123,20 @@ impl From for PeerId { } } +impl PartialEq for PeerId { + #[inline] + fn eq(&self, other: &multihash::Multihash) -> bool { + &self.multihash == other + } +} + +impl PartialEq for multihash::Multihash { + #[inline] + fn eq(&self, other: &PeerId) -> bool { + self == &other.multihash + } +} + impl Into for PeerId { #[inline] fn into(self) -> multihash::Multihash { diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs new file mode 100644 index 00000000000..63a64502ced --- /dev/null +++ b/examples/mdns-passive-discovery.rs @@ -0,0 +1,76 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate futures; +extern crate libp2p; +extern crate rand; +extern crate tokio; + +use futures::prelude::*; +use libp2p::mdns::{MdnsPacket, MdnsService}; +use std::io; + +fn main() { + // This example provides passive discovery of the libp2p nodes on the network that send + // mDNS queries and answers. + + // We start by creating the service. + let mut service = MdnsService::new().expect("Error while creating mDNS service"); + + // Create a never-ending `Future` that polls the service for events. + let future = futures::future::poll_fn(move || -> Poll<(), io::Error> { + loop { + // Grab the next available packet from the service. + let packet = match service.poll() { + Async::Ready(packet) => packet, + Async::NotReady => return Ok(Async::NotReady), + }; + + match packet { + MdnsPacket::Query(query) => { + // We detected a libp2p mDNS query on the network. In a real application, you + // probably want to answer this query by doing `query.respond(...)`. + println!("Detected query from {:?}", query.remote_addr()); + } + MdnsPacket::Response(response) => { + // We detected a libp2p mDNS response on the network. Responses are for + // everyone and not just for the requester, which makes it possible to + // passively listen. + for peer in response.discovered_peers() { + println!("Discovered peer {:?}", peer.id()); + // These are the self-reported addresses of the peer we just discovered. + for addr in peer.addresses() { + println!(" Address = {:?}", addr); + } + } + } + MdnsPacket::ServiceDiscovery(query) => { + // The last possibility is a service detection query from DNS-SD. + // Just like `Query`, in a real application you probably want to call + // `query.respond`. + println!("Detected service query from {:?}", query.remote_addr()); + } + } + } + }); + + // Blocks the thread until the future runs to completion (which will never happen). + tokio::run(future.map_err(|err| panic!("{:?}", err))); +} diff --git a/misc/mdns/Cargo.toml b/misc/mdns/Cargo.toml new file mode 100644 index 00000000000..48307189a9e --- /dev/null +++ b/misc/mdns/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "libp2p-mdns" +version = "0.1.0" +description = "Implementation of the libp2p mDNS discovery method" +authors = ["Parity Technologies "] +license = "MIT" + +[dependencies] +data-encoding = "2.0" +dns-parser = "0.8" +futures = "0.1" +libp2p-core = { path = "../../core" } +multiaddr = { path = "../multiaddr" } +net2 = "0.2" +rand = "0.5" +tokio-reactor = "0.1" +tokio-timer = "0.2" +tokio-udp = "0.1" + +[dev-dependencies] +tokio = "0.1" diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs new file mode 100644 index 00000000000..05f088cf4b1 --- /dev/null +++ b/misc/mdns/src/dns.rs @@ -0,0 +1,369 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::{Multiaddr, PeerId}; +use std::{cmp, error, fmt, str, time::Duration}; +use {META_QUERY_SERVICE, SERVICE_NAME}; + +/// Decodes a `` (as defined by RFC1035) into a `Vec` of ASCII characters. +// TODO: better error type? +pub fn decode_character_string(mut from: &[u8]) -> Result, ()> { + if from.is_empty() { + return Ok(Vec::new()); + } + + // Remove the initial and trailing " if any. + if from[0] == b'"' { + if from.len() == 1 || from.last() != Some(&b'"') { + return Err(()); + } + let len = from.len(); + from = &from[1..len - 1]; + } + + // TODO: remove the backslashes if any + Ok(from.to_vec()) +} + +/// Builds the binary representation of a DNS query to send on the network. +pub fn build_query() -> Vec { + let mut out = Vec::with_capacity(33); + + // Program-generated transaction ID ; unused by our implementation. + append_u16(&mut out, rand::random()); + + // Flags ; 0x0 for a regular query. + append_u16(&mut out, 0x0); + + // Number of questions. + append_u16(&mut out, 0x1); + + // Number of answers, authorities, and additionals. + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x0); + + // Our single question. + // The name. + append_qname(&mut out, SERVICE_NAME); + + // Flags. + append_u16(&mut out, 0xc); + append_u16(&mut out, 0x1); + + // Since the output is constant, we reserve the right amount ahead of time. + // If this assert fails, adjust the capacity of `out` in the source code. + debug_assert_eq!(out.capacity(), out.len()); + out +} + +/// Builds the response to the DNS query. +/// +/// If there are more than 2^16-1 addresses, ignores the rest. +pub fn build_query_response( + id: u16, + peer_id: PeerId, + addresses: impl ExactSizeIterator, + ttl: Duration, +) -> Result, MdnsResponseError> { + // Convert the TTL into seconds. + let ttl = duration_to_secs(ttl); + + // Add a limit to 2^16-1 addresses, as the protocol limits to this number. + let addresses = addresses.take(65535); + + // This capacity was determined empirically and is a reasonable upper limit. + let mut out = Vec::with_capacity(320); + + append_u16(&mut out, id); + // Flags ; 0x80 for an answer. + append_u16(&mut out, 0x8000); + // Number of questions, answers, authorities, additionals. + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x1); + append_u16(&mut out, 0x0); + append_u16(&mut out, addresses.len() as u16); + + // Our single answer. + // The name. + append_qname(&mut out, SERVICE_NAME); + + // Flags. + out.push(0x00); + out.push(0x0c); + out.push(0x80); + out.push(0x01); + + // TTL for the answer + append_u32(&mut out, ttl); + + let peer_id_base58 = peer_id.to_base58(); + + // Peer Id. + let peer_name = format!( + "{}.{}", + data_encoding::BASE32_DNSCURVE.encode(&peer_id.into_bytes()), + str::from_utf8(SERVICE_NAME).expect("SERVICE_NAME is always ASCII") + ); + let mut peer_id_bytes = Vec::with_capacity(64); + append_qname(&mut peer_id_bytes, peer_name.as_bytes()); + debug_assert!(peer_id_bytes.len() <= 0xffff); + append_u16(&mut out, peer_id_bytes.len() as u16); + out.extend_from_slice(&peer_id_bytes); + + // The TXT records for answers. + for addr in addresses { + let txt_to_send = format!("dnsaddr={}/p2p/{}", addr.to_string(), peer_id_base58); + let mut txt_to_send_bytes = Vec::with_capacity(txt_to_send.len()); + append_character_string(&mut txt_to_send_bytes, txt_to_send.as_bytes())?; + append_txt_record(&mut out, &peer_id_bytes, ttl, Some(&txt_to_send_bytes[..]))?; + } + + if out.len() > 9000 { + return Err(MdnsResponseError::ResponseTooLong); + } + + Ok(out) +} + +/// Builds the response to the DNS query. +pub fn build_service_discovery_response(id: u16, ttl: Duration) -> Vec { + // Convert the TTL into seconds. + let ttl = duration_to_secs(ttl); + + // This capacity was determined empirically. + let mut out = Vec::with_capacity(69); + + append_u16(&mut out, id); + // Flags ; 0x80 for an answer. + append_u16(&mut out, 0x8000); + // Number of questions, answers, authorities, additionals. + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x1); + append_u16(&mut out, 0x0); + append_u16(&mut out, 0x0); + + // Our single answer. + // The name. + append_qname(&mut out, META_QUERY_SERVICE); + + // Flags. + out.push(0x00); + out.push(0x0c); + out.push(0x80); + out.push(0x01); + + // TTL for the answer + append_u32(&mut out, ttl); + + // Service name. + { + let mut name = Vec::new(); + append_qname(&mut name, SERVICE_NAME); + append_u16(&mut out, name.len() as u16); + out.extend_from_slice(&name); + } + + // Since the output size is constant, we reserve the right amount ahead of time. + // If this assert fails, adjust the capacity of `out` in the source code. + debug_assert_eq!(out.capacity(), out.len()); + out +} + +/// Returns the number of secs of a duration. +fn duration_to_secs(duration: Duration) -> u32 { + let secs = duration + .as_secs() + .saturating_add(if duration.subsec_nanos() > 0 { 1 } else { 0 }); + cmp::min(secs, From::from(u32::max_value())) as u32 +} + +/// Appends a big-endian u32 to `out`. +fn append_u32(out: &mut Vec, value: u32) { + out.push(((value >> 24) & 0xff) as u8); + out.push(((value >> 16) & 0xff) as u8); + out.push(((value >> 8) & 0xff) as u8); + out.push((value & 0xff) as u8); +} + +/// Appends a big-endian u16 to `out`. +fn append_u16(out: &mut Vec, value: u16) { + out.push(((value >> 8) & 0xff) as u8); + out.push((value & 0xff) as u8); +} + +/// Appends a `QNAME` (as defined by RFC1035) to the `Vec`. +/// +/// # Panic +/// +/// Panics if `name` has a zero-length component or a component that is too long. +/// This is fine considering that this function is not public and is only called in a controlled +/// environment. +/// +fn append_qname(out: &mut Vec, name: &[u8]) { + debug_assert!(name.is_ascii()); + + for element in name.split(|&c| c == b'.') { + assert!(element.len() < 256, "Service name has a label too long"); + assert_ne!(element.len(), 0, "Service name contains zero length label"); + out.push(element.len() as u8); + for chr in element.iter() { + out.push(*chr); + } + } + + out.push(0); +} + +/// Appends a `` (as defined by RFC1035) to the `Vec`. +fn append_character_string(out: &mut Vec, ascii_str: &[u8]) -> Result<(), MdnsResponseError> { + if !ascii_str.is_ascii() { + return Err(MdnsResponseError::NonAsciiMultiaddr); + } + + if !ascii_str.iter().any(|&c| c == b' ') { + for &chr in ascii_str.iter() { + out.push(chr); + } + return Ok(()); + } + + out.push(b'"'); + + for &chr in ascii_str.iter() { + if chr == b'\\' { + out.push(b'\\'); + out.push(b'\\'); + } else if chr == b'"' { + out.push(b'\\'); + out.push(b'"'); + } else { + out.push(chr); + } + } + + out.push(b'"'); + Ok(()) +} + +/// Appends a TXT record to the answer in `out`. +fn append_txt_record<'a>( + out: &mut Vec, + name: &[u8], + ttl_secs: u32, + entries: impl IntoIterator, +) -> Result<(), MdnsResponseError> { + // The name. + out.extend_from_slice(name); + + // Flags. + out.push(0x00); + out.push(0x10); + out.push(0x80); + out.push(0x01); + + // TTL for the answer + append_u32(out, ttl_secs); + + // Add the strings. + let mut buffer = Vec::new(); + for entry in entries { + if entry.len() > u8::max_value() as usize { + return Err(MdnsResponseError::TxtRecordTooLong); + } + buffer.push(entry.len() as u8); + buffer.extend_from_slice(entry); + } + + // It is illegal to have an empty TXT record, but we can have one zero-bytes entry, which does + // the same. + if buffer.is_empty() { + buffer.push(0); + } + + if buffer.len() > u16::max_value() as usize { + return Err(MdnsResponseError::TxtRecordTooLong); + } + append_u16(out, buffer.len() as u16); + out.extend_from_slice(&buffer); + Ok(()) +} + +/// Error that can happen when producing a DNS response. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MdnsResponseError { + TxtRecordTooLong, + NonAsciiMultiaddr, + ResponseTooLong, +} + +impl fmt::Display for MdnsResponseError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + MdnsResponseError::TxtRecordTooLong => { + write!(f, "TXT record invalid because it is too long") + } + MdnsResponseError::NonAsciiMultiaddr => write!( + f, + "A multiaddr contains non-ASCII characters when serializd" + ), + MdnsResponseError::ResponseTooLong => write!(f, "DNS response is too long"), + } + } +} + +impl error::Error for MdnsResponseError {} + +#[cfg(test)] +mod tests { + use super::*; + use dns_parser::Packet; + use libp2p_core::{PeerId, PublicKey}; + use std::time::Duration; + + #[test] + fn build_query_correct() { + let query = build_query(); + assert!(Packet::parse(&query).is_ok()); + } + + #[test] + fn build_query_response_correct() { + let my_peer_id = PeerId::from_public_key(PublicKey::Rsa(vec![1, 2, 3, 4])); + let addr1 = "/ip4/1.2.3.4/tcp/5000".parse().unwrap(); + let addr2 = "/ip6/::1/udp/10000".parse().unwrap(); + let query = build_query_response( + 0xf8f8, + my_peer_id, + vec![addr1, addr2].into_iter(), + Duration::from_secs(60), + ) + .unwrap(); + assert!(Packet::parse(&query).is_ok()); + } + + #[test] + fn build_service_discovery_response_correct() { + let query = build_service_discovery_response(0x1234, Duration::from_secs(120)); + assert!(Packet::parse(&query).is_ok()); + } + + // TODO: test limits and errors +} diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs new file mode 100644 index 00000000000..3fb11a24b0a --- /dev/null +++ b/misc/mdns/src/lib.rs @@ -0,0 +1,561 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! mDNS is a protocol defined by [RFC 6762](https://tools.ietf.org/html/rfc6762) that allows +//! querying nodes that correspond to a certain domain name. +//! +//! In the context of libp2p, the mDNS protocol is used to discover other nodes on the local +//! network that support libp2p. +//! +//! # Usage +//! +//! In order to use mDNS to discover peers on the local network, use the `MdnsService`. This is +//! done by creating a `MdnsService` then polling it in the same way as you would poll a stream. +//! +//! Polling the `MdnsService` can produce either an `MdnsQuery`, corresponding to an mDNS query +//! received by another node on the local network, or an `MdnsResponse` corresponding to a response +//! to a query previously emitted locally. The `MdnsService` will automatically produce queries, +//! which means that you will receive responses automatically. +//! +//! When you receive an `MdnsQuery`, use the `respond` method to send back an answer to the node +//! that emitted the query. +//! +//! When you receive an `MdnsResponse`, use the provided methods to query the information received +//! in the response. +//! +//! # Example +//! +//! ```rust +//! # extern crate futures; +//! # extern crate libp2p_core; +//! # extern crate libp2p_mdns; +//! # use futures::prelude::*; +//! # use libp2p_mdns::{MdnsService, MdnsPacket}; +//! # use std::{io, time::Duration}; +//! # fn main() { +//! # let my_peer_id = libp2p_core::PublicKey::Rsa(vec![1, 2, 3, 4]).into_peer_id(); +//! # let my_listened_addrs = Vec::new(); +//! let mut service = MdnsService::new().expect("Error while creating mDNS service"); +//! let _future_to_poll = futures::stream::poll_fn(move || -> Poll, io::Error> { +//! loop { +//! let packet = match service.poll() { +//! Async::Ready(packet) => packet, +//! Async::NotReady => return Ok(Async::NotReady), +//! }; +//! +//! match packet { +//! MdnsPacket::Query(query) => { +//! println!("Query from {:?}", query.remote_addr()); +//! query.respond( +//! my_peer_id.clone(), +//! my_listened_addrs.clone(), +//! Duration::from_secs(120), +//! ); +//! } +//! MdnsPacket::Response(response) => { +//! for peer in response.discovered_peers() { +//! println!("Discovered peer {:?}", peer.id()); +//! for addr in peer.addresses() { +//! println!("Address = {:?}", addr); +//! } +//! } +//! } +//! MdnsPacket::ServiceDiscovery(query) => { +//! query.respond(std::time::Duration::from_secs(120)); +//! } +//! } +//! } +//! }).for_each(|_| Ok(())); +//! # } + +extern crate data_encoding; +extern crate dns_parser; +extern crate futures; +extern crate libp2p_core; +extern crate multiaddr; +extern crate net2; +extern crate rand; +extern crate tokio_reactor; +extern crate tokio_timer; +extern crate tokio_udp; + +#[cfg(test)] +extern crate tokio; + +use dns_parser::{Packet, RData}; +use futures::{prelude::*, task}; +use libp2p_core::{Multiaddr, PeerId}; +use multiaddr::Protocol; +use std::{fmt, io, net::SocketAddr, str, time::Duration, time::Instant}; +use tokio_reactor::Handle; +use tokio_timer::Interval; +use tokio_udp::UdpSocket; + +pub use dns::MdnsResponseError; + +mod dns; + +/// Hardcoded name of the mDNS service. Part of the mDNS libp2p specifications. +const SERVICE_NAME: &'static [u8] = b"_p2p._udp.local"; +/// Hardcoded name of the service used for DNS-SD. +const META_QUERY_SERVICE: &'static [u8] = b"_services._dns-sd._udp.local"; + +/// A running service that discovers libp2p peers and responds to other libp2p peers' queries on +/// the local network. +/// +/// See the crate root documentation for more info. +pub struct MdnsService { + socket: UdpSocket, + /// Interval for sending queries. + query_interval: Interval, + /// Whether we send queries on the network at all. + /// Note that we still need to have an interval for querying, as we need to wake up the socket + /// regularly to recover from errors. Otherwise we could simply use an `Option`. + silent: bool, + /// Buffer used for receiving data. + recv_buffer: [u8; 2048], + /// Buffers pending to send on the socket. + send_buffers: Vec>, +} + +impl MdnsService { + /// Starts a new mDNS service. + #[inline] + pub fn new() -> io::Result { + Self::new_inner(false) + } + + /// Same as `new`, but we don't send automatically send queries on the network. + #[inline] + pub fn silent() -> io::Result { + Self::new_inner(true) + } + + /// Starts a new mDNS service. + fn new_inner(silent: bool) -> io::Result { + let socket = { + #[cfg(unix)] + fn platform_specific(s: &net2::UdpBuilder) -> io::Result<()> { + net2::unix::UnixUdpBuilderExt::reuse_port(s, true)?; + Ok(()) + } + #[cfg(not(unix))] + fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> {} + let builder = net2::UdpBuilder::new_v4()?; + builder.reuse_address(true)?; + platform_specific(&builder)?; + builder.bind(("0.0.0.0", 5353))? + }; + + let socket = UdpSocket::from_std(socket, &Handle::default())?; + socket.set_multicast_loop_v4(true)?; + socket.set_multicast_ttl_v4(255)?; + // TODO: correct interfaces? + socket.join_multicast_v4(&From::from([224, 0, 0, 251]), &From::from([0, 0, 0, 0]))?; + + Ok(MdnsService { + socket, + query_interval: Interval::new(Instant::now(), Duration::from_secs(20)), + silent, + recv_buffer: [0; 2048], + send_buffers: Vec::new(), + }) + } + + /// Polls the service for packets. + pub fn poll(&mut self) -> Async { + // Send a query every time `query_interval` fires. + // Note that we don't use a loop here ; it is pretty unlikely that we need it, and there is + // no point in sending multiple requests in a row. + match self.query_interval.poll() { + Ok(Async::Ready(_)) => { + if !self.silent { + let query = dns::build_query(); + self.send_buffers.push(query.to_vec()); + } + } + Ok(Async::NotReady) => (), + _ => unreachable!("A tokio_timer::Interval never errors"), // TODO: is that true? + }; + + // Flush the send buffer. + // This has to be after the push to `send_buffers`. + while !self.send_buffers.is_empty() { + let to_send = self.send_buffers.remove(0); + match self + .socket + .poll_send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))) + { + Ok(Async::Ready(bytes_written)) => { + debug_assert_eq!(bytes_written, to_send.len()); + } + Ok(Async::NotReady) => { + self.send_buffers.insert(0, to_send); + break; + } + Err(_) => { + // Errors are non-fatal because they can happen for example if we lose + // connection to the network. + self.send_buffers.clear(); + break; + } + } + } + + // Check for any incoming packet. + match self.socket.poll_recv_from(&mut self.recv_buffer) { + Ok(Async::Ready((len, from))) => { + match Packet::parse(&self.recv_buffer[..len]) { + Ok(packet) => { + if packet.header.query { + if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME) + { + return Async::Ready(MdnsPacket::Query(MdnsQuery { + from, + query_id: packet.header.id, + send_buffers: &mut self.send_buffers, + })); + } else if packet + .questions + .iter() + .any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE) + { + // TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE? + return Async::Ready(MdnsPacket::ServiceDiscovery( + MdnsServiceDiscovery { + from, + query_id: packet.header.id, + send_buffers: &mut self.send_buffers, + }, + )); + } else { + // Note that ideally we would use a loop instead. However as of the + // writing of this code non-lexical lifetimes haven't been merged + // yet, and I can't manage to write this code without having borrow + // issues. + task::current().notify(); + return Async::NotReady; + } + } else { + return Async::Ready(MdnsPacket::Response(MdnsResponse { + packet, + from, + })); + } + } + Err(_) => { + // Ignore errors while parsing the packet. We need to poll again for the + // next packet. + // Note that ideally we would use a loop instead. However as of the writing + // of this code non-lexical lifetimes haven't been merged yet, and I can't + // manage to write this code without having borrow issues. + task::current().notify(); + return Async::NotReady; + } + } + } + Ok(Async::NotReady) => (), + Err(_) => { + // Error are non-fatal and can happen if we get disconnected from example. + // The query interval will wake up the task at some point so that we can try again. + } + }; + + Async::NotReady + } +} + +/// A valid mDNS packet received by the service. +#[derive(Debug)] +pub enum MdnsPacket<'a> { + /// A query made by a remote. + Query(MdnsQuery<'a>), + /// A response received by a remote to one of our queries. + Response(MdnsResponse<'a>), + /// A request for service discovery. + ServiceDiscovery(MdnsServiceDiscovery<'a>), +} + +/// A received mDNS query. +pub struct MdnsQuery<'a> { + /// Sender of the address. + from: SocketAddr, + /// Id of the received DNS query. We need to pass this ID back in the results. + query_id: u16, + /// Queue of pending buffers. + send_buffers: &'a mut Vec>, +} + +impl<'a> MdnsQuery<'a> { + /// Respond to the query. + /// + /// Pass the ID of the local peer, and the list o addresses we're listening on. + /// + /// If there are more than 2^16-1 addresses, ignores the other. + /// + /// > **Note**: Keep in mind that we will also receive this response in an `MdnsResponse`. + #[inline] + pub fn respond( + self, + peer_id: PeerId, + addresses: TAddresses, + ttl: Duration, + ) -> Result<(), MdnsResponseError> + where + TAddresses: IntoIterator, + TAddresses::IntoIter: ExactSizeIterator, + { + let response = + dns::build_query_response(self.query_id, peer_id, addresses.into_iter(), ttl)?; + self.send_buffers.push(response); + Ok(()) + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsQuery<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsQuery") + .field("from", self.remote_addr()) + .field("query_id", &self.query_id) + .finish() + } +} + +/// A received mDNS service discovery query. +pub struct MdnsServiceDiscovery<'a> { + /// Sender of the address. + from: SocketAddr, + /// Id of the received DNS query. We need to pass this ID back in the results. + query_id: u16, + /// Queue of pending buffers. + send_buffers: &'a mut Vec>, +} + +impl<'a> MdnsServiceDiscovery<'a> { + /// Respond to the query. + #[inline] + pub fn respond(self, ttl: Duration) { + let response = dns::build_service_discovery_response(self.query_id, ttl); + self.send_buffers.push(response); + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsServiceDiscovery<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsServiceDiscovery") + .field("from", self.remote_addr()) + .field("query_id", &self.query_id) + .finish() + } +} + +/// A received mDNS response. +pub struct MdnsResponse<'a> { + packet: Packet<'a>, + from: SocketAddr, +} + +impl<'a> MdnsResponse<'a> { + /// Returns the list of peers that have been reported in this packet. + /// + /// > **Note**: Keep in mind that this will also contain the responses we sent ourselves. + pub fn discovered_peers<'b>(&'b self) -> impl Iterator> { + let packet = &self.packet; + self.packet.answers.iter().filter_map(move |record| { + if record.name.to_string().as_bytes() != SERVICE_NAME { + return None; + } + + let record_value = match record.data { + RData::PTR(record) => record.0.to_string(), + _ => return None, + }; + + let peer_name = { + let mut iter = record_value.splitn(2, |c| c == '.'); + let name = match iter.next() { + Some(n) => n.to_owned(), + None => return None, + }; + if iter.next().map(|v| v.as_bytes()) != Some(SERVICE_NAME) { + return None; + } + name + }; + + let peer_id = match data_encoding::BASE32_DNSCURVE.decode(peer_name.as_bytes()) { + Ok(bytes) => match PeerId::from_bytes(bytes) { + Ok(id) => id, + Err(_) => return None, + }, + Err(_) => return None, + }; + + Some(MdnsPeer { + packet, + record_value, + peer_id, + }) + }) + } + + /// Source address of the packet. + #[inline] + pub fn remote_addr(&self) -> &SocketAddr { + &self.from + } +} + +impl<'a> fmt::Debug for MdnsResponse<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsResponse") + .field("from", self.remote_addr()) + .finish() + } +} + +/// A peer discovered by the service. +pub struct MdnsPeer<'a> { + /// The original packet ; will be used to determine the addresses. + packet: &'a Packet<'a>, + /// Cached value of `concat(base32(peer_id), service name)`. + record_value: String, + /// Id of the peer. + peer_id: PeerId, +} + +impl<'a> MdnsPeer<'a> { + /// Returns the id of the peer. + #[inline] + pub fn id(&self) -> &PeerId { + &self.peer_id + } + + /// Returns the list of addresses the peer says it is listening on. + /// + /// Filters out invalid addresses. + pub fn addresses<'b>(&'b self) -> impl Iterator + 'b { + let my_peer_id = &self.peer_id; + let record_value = &self.record_value; + self.packet + .additional + .iter() + .filter_map(move |add_record| { + if &add_record.name.to_string() != record_value { + return None; + } + + if let RData::TXT(ref txt) = add_record.data { + Some(txt) + } else { + None + } + }) + .flat_map(|txt| txt.iter()) + .filter_map(move |txt| { + let addr = match dns::decode_character_string(txt) { + Ok(a) => a, + Err(_) => return None, + }; + if !addr.starts_with(b"dnsaddr=") { + return None; + } + let addr = match str::from_utf8(&addr[8..]) { + Ok(a) => a, + Err(_) => return None, + }; + let mut addr = match addr.parse::() { + Ok(a) => a, + Err(_) => return None, + }; + match addr.pop() { + Some(Protocol::P2p(ref peer_id)) if peer_id == my_peer_id => (), + _ => return None, + }; + Some(addr) + }) + } +} + +impl<'a> fmt::Debug for MdnsPeer<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("MdnsPeer") + .field("peer_id", &self.peer_id) + .finish() + } +} + +#[cfg(test)] +mod tests { + use libp2p_core::PublicKey; + use std::{io, time::Duration}; + use tokio::{self, prelude::*}; + use {MdnsPacket, MdnsService}; + + #[test] + fn discover_ourselves() { + let mut service = MdnsService::new().unwrap(); + let peer_id = + PublicKey::Rsa((0..32).map(|_| rand::random::()).collect()).into_peer_id(); + let stream = stream::poll_fn(move || -> Poll, io::Error> { + loop { + let packet = match service.poll() { + Async::Ready(packet) => packet, + Async::NotReady => return Ok(Async::NotReady), + }; + + match packet { + MdnsPacket::Query(query) => { + query.respond(peer_id.clone(), None, Duration::from_secs(120)); + } + MdnsPacket::Response(response) => { + for peer in response.discovered_peers() { + if peer.id() == &peer_id { + return Ok(Async::Ready(None)); + } + } + } + MdnsPacket::ServiceDiscovery(_) => {} + } + } + }); + + tokio::run( + stream + .map_err(|err| panic!("{:?}", err)) + .for_each(|_| Ok(())), + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 71550220a1d..22ebdd524de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,6 +147,7 @@ pub extern crate libp2p_identify as identify; pub extern crate libp2p_kad as kad; pub extern crate libp2p_floodsub as floodsub; pub extern crate libp2p_mplex as mplex; +pub extern crate libp2p_mdns as mdns; pub extern crate libp2p_peerstore as peerstore; pub extern crate libp2p_ping as ping; pub extern crate libp2p_ratelimit as ratelimit; From 6c4ad950f4b8e21ee32593be73105fad416cb121 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 27 Oct 2018 13:08:16 +0200 Subject: [PATCH 02/15] Fix win32 --- misc/mdns/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs index 3fb11a24b0a..d834bff175c 100644 --- a/misc/mdns/src/lib.rs +++ b/misc/mdns/src/lib.rs @@ -157,7 +157,7 @@ impl MdnsService { Ok(()) } #[cfg(not(unix))] - fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> {} + fn platform_specific(_: &net2::UdpBuilder) -> io::Result<()> { Ok(()) } let builder = net2::UdpBuilder::new_v4()?; builder.reuse_address(true)?; platform_specific(&builder)?; From f1e0c4996f281b3a9aefeeefa77400f149d75336 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 27 Oct 2018 13:17:59 +0200 Subject: [PATCH 03/15] Make compatible with Rust 1.29 --- misc/mdns/src/dns.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index 05f088cf4b1..65c8b3320cf 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -18,7 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use data_encoding; use libp2p_core::{Multiaddr, PeerId}; +use rand; use std::{cmp, error, fmt, str, time::Duration}; use {META_QUERY_SERVICE, SERVICE_NAME}; From 795f7e2c3ed5d88392bb3eab965bb56ae44bf824 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sun, 28 Oct 2018 18:06:58 +0100 Subject: [PATCH 04/15] Remove mDNS on esmcripten --- Cargo.toml | 2 +- src/lib.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 94b75f350c0..13d09600399 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,6 @@ libp2p-mplex = { path = "./muxers/mplex" } libp2p-identify = { path = "./protocols/identify" } libp2p-kad = { path = "./protocols/kad" } libp2p-floodsub = { path = "./protocols/floodsub" } -libp2p-mdns = { path = "./misc/mdns" } libp2p-peerstore = { path = "./stores/peerstore" } libp2p-ping = { path = "./protocols/ping" } libp2p-ratelimit = { path = "./transports/ratelimit" } @@ -34,6 +33,7 @@ tokio-io = "0.1" [target.'cfg(not(target_os = "emscripten"))'.dependencies] libp2p-dns = { path = "./transports/dns" } +libp2p-mdns = { path = "./misc/mdns" } libp2p-tcp-transport = { path = "./transports/tcp" } tokio-current-thread = "0.1" diff --git a/src/lib.rs b/src/lib.rs index 22ebdd524de..eb84e2aa086 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,6 +147,7 @@ pub extern crate libp2p_identify as identify; pub extern crate libp2p_kad as kad; pub extern crate libp2p_floodsub as floodsub; pub extern crate libp2p_mplex as mplex; +#[cfg(not(target_os = "emscripten"))] pub extern crate libp2p_mdns as mdns; pub extern crate libp2p_peerstore as peerstore; pub extern crate libp2p_ping as ping; From 183e4ebe87d4c77eb557b3edacdac494cd7173d9 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 29 Oct 2018 13:52:03 +0100 Subject: [PATCH 05/15] Fix concerns --- misc/mdns/src/dns.rs | 13 ++++++++----- misc/mdns/src/lib.rs | 4 ++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index 65c8b3320cf..a4f84b67ab1 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -18,6 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +//! Contains methods that handle the DNS encoding and decoding capabilities not available in the +//! `dns_parser` library. + use data_encoding; use libp2p_core::{Multiaddr, PeerId}; use rand; @@ -67,8 +70,8 @@ pub fn build_query() -> Vec { append_qname(&mut out, SERVICE_NAME); // Flags. - append_u16(&mut out, 0xc); - append_u16(&mut out, 0x1); + append_u16(&mut out, 0x0c); + append_u16(&mut out, 0x01); // Since the output is constant, we reserve the right amount ahead of time. // If this assert fails, adjust the capacity of `out` in the source code. @@ -109,7 +112,7 @@ pub fn build_query_response( // Flags. out.push(0x00); - out.push(0x0c); + out.push(0x0c); // PTR record. out.push(0x80); out.push(0x01); @@ -168,7 +171,7 @@ pub fn build_service_discovery_response(id: u16, ttl: Duration) -> Vec { // Flags. out.push(0x00); - out.push(0x0c); + out.push(0x0c); // PTR record requested. out.push(0x80); out.push(0x01); @@ -277,7 +280,7 @@ fn append_txt_record<'a>( // Flags. out.push(0x00); - out.push(0x10); + out.push(0x10); // TXT record. out.push(0x80); out.push(0x01); diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs index d834bff175c..91003c03eff 100644 --- a/misc/mdns/src/lib.rs +++ b/misc/mdns/src/lib.rs @@ -290,7 +290,7 @@ impl MdnsService { pub enum MdnsPacket<'a> { /// A query made by a remote. Query(MdnsQuery<'a>), - /// A response received by a remote to one of our queries. + /// A response sent by a remote in response to one of our queries. Response(MdnsResponse<'a>), /// A request for service discovery. ServiceDiscovery(MdnsServiceDiscovery<'a>), @@ -311,7 +311,7 @@ impl<'a> MdnsQuery<'a> { /// /// Pass the ID of the local peer, and the list o addresses we're listening on. /// - /// If there are more than 2^16-1 addresses, ignores the other. + /// If there are more than 2^16-1 addresses, ignores the others. /// /// > **Note**: Keep in mind that we will also receive this response in an `MdnsResponse`. #[inline] From 4b435ee99eb6096f279285da9393e249125bc925 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 30 Oct 2018 08:24:02 +0100 Subject: [PATCH 06/15] More concern --- misc/mdns/src/dns.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index a4f84b67ab1..4782c98d804 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -141,6 +141,7 @@ pub fn build_query_response( append_txt_record(&mut out, &peer_id_bytes, ttl, Some(&txt_to_send_bytes[..]))?; } + // The DNS specs specify that the maximum allowed size is 9000 bytes. if out.len() > 9000 { return Err(MdnsResponseError::ResponseTooLong); } From ff9b1c36d14d76f1d37fb0a748e6ce45d2abc30f Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 2 Nov 2018 13:31:04 +0100 Subject: [PATCH 07/15] Use append_u16 --- misc/mdns/src/dns.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index 4782c98d804..8214eefa9a7 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -111,10 +111,8 @@ pub fn build_query_response( append_qname(&mut out, SERVICE_NAME); // Flags. - out.push(0x00); - out.push(0x0c); // PTR record. - out.push(0x80); - out.push(0x01); + append_u16(&mut out, 0x000c); + append_u16(&mut out, 0x8001); // TTL for the answer append_u32(&mut out, ttl); @@ -171,10 +169,8 @@ pub fn build_service_discovery_response(id: u16, ttl: Duration) -> Vec { append_qname(&mut out, META_QUERY_SERVICE); // Flags. - out.push(0x00); - out.push(0x0c); // PTR record requested. - out.push(0x80); - out.push(0x01); + append_u16(&mut out, 0x000c); + append_u16(&mut out, 0x8001); // TTL for the answer append_u32(&mut out, ttl); From 8843055c8abb58e13b5644dae7f3c1521b8b3d72 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 2 Nov 2018 13:34:27 +0100 Subject: [PATCH 08/15] Make decode_character_string return a Cow --- misc/mdns/src/dns.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index 8214eefa9a7..9a707bd7b52 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -24,14 +24,14 @@ use data_encoding; use libp2p_core::{Multiaddr, PeerId}; use rand; -use std::{cmp, error, fmt, str, time::Duration}; +use std::{borrow::Cow, cmp, error, fmt, str, time::Duration}; use {META_QUERY_SERVICE, SERVICE_NAME}; /// Decodes a `` (as defined by RFC1035) into a `Vec` of ASCII characters. // TODO: better error type? -pub fn decode_character_string(mut from: &[u8]) -> Result, ()> { +pub fn decode_character_string(mut from: &[u8]) -> Result, ()> { if from.is_empty() { - return Ok(Vec::new()); + return Ok(Cow::Owned(Vec::new())); } // Remove the initial and trailing " if any. @@ -44,7 +44,7 @@ pub fn decode_character_string(mut from: &[u8]) -> Result, ()> { } // TODO: remove the backslashes if any - Ok(from.to_vec()) + Ok(Cow::Borrowed(from)) } /// Builds the binary representation of a DNS query to send on the network. From 323e3b0a9e2b32827add02ab14d329c522a13902 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 2 Nov 2018 13:51:01 +0100 Subject: [PATCH 09/15] Add TODO --- misc/mdns/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs index 91003c03eff..42293cc2a18 100644 --- a/misc/mdns/src/lib.rs +++ b/misc/mdns/src/lib.rs @@ -485,6 +485,7 @@ impl<'a> MdnsPeer<'a> { }) .flat_map(|txt| txt.iter()) .filter_map(move |txt| { + // TODO: wrong, txt can be multiple character strings let addr = match dns::decode_character_string(txt) { Ok(a) => a, Err(_) => return None, From 739739745e1d1ecee2430e69b90a745b8f9da398 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 21 Nov 2018 16:14:19 +0100 Subject: [PATCH 10/15] Don't send queries from 5353 --- misc/mdns/src/lib.rs | 40 +++++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs index 42293cc2a18..36255e07c5f 100644 --- a/misc/mdns/src/lib.rs +++ b/misc/mdns/src/lib.rs @@ -122,17 +122,22 @@ const META_QUERY_SERVICE: &'static [u8] = b"_services._dns-sd._udp.local"; /// /// See the crate root documentation for more info. pub struct MdnsService { + /// Main socket for listening. socket: UdpSocket, + /// Socket for sending queries on the network. + query_socket: UdpSocket, /// Interval for sending queries. query_interval: Interval, /// Whether we send queries on the network at all. /// Note that we still need to have an interval for querying, as we need to wake up the socket /// regularly to recover from errors. Otherwise we could simply use an `Option`. silent: bool, - /// Buffer used for receiving data. + /// Buffer used for receiving data from the main socket. recv_buffer: [u8; 2048], - /// Buffers pending to send on the socket. + /// Buffers pending to send on the main socket. send_buffers: Vec>, + /// Buffers pending to send on the query socket. + query_send_buffers: Vec>, } impl MdnsService { @@ -172,10 +177,12 @@ impl MdnsService { Ok(MdnsService { socket, + query_socket: UdpSocket::bind(&From::from(([0, 0, 0, 0], 0)))?, query_interval: Interval::new(Instant::now(), Duration::from_secs(20)), silent, recv_buffer: [0; 2048], send_buffers: Vec::new(), + query_send_buffers: Vec::new(), }) } @@ -188,15 +195,14 @@ impl MdnsService { Ok(Async::Ready(_)) => { if !self.silent { let query = dns::build_query(); - self.send_buffers.push(query.to_vec()); + self.query_send_buffers.push(query.to_vec()); } } Ok(Async::NotReady) => (), _ => unreachable!("A tokio_timer::Interval never errors"), // TODO: is that true? }; - // Flush the send buffer. - // This has to be after the push to `send_buffers`. + // Flush the send buffer of the main socket. while !self.send_buffers.is_empty() { let to_send = self.send_buffers.remove(0); match self @@ -219,6 +225,30 @@ impl MdnsService { } } + // Flush the query send buffer. + // This has to be after the push to `query_send_buffers`. + while !self.query_send_buffers.is_empty() { + let to_send = self.query_send_buffers.remove(0); + match self + .query_socket + .poll_send_to(&to_send, &From::from(([224, 0, 0, 251], 5353))) + { + Ok(Async::Ready(bytes_written)) => { + debug_assert_eq!(bytes_written, to_send.len()); + } + Ok(Async::NotReady) => { + self.query_send_buffers.insert(0, to_send); + break; + } + Err(_) => { + // Errors are non-fatal because they can happen for example if we lose + // connection to the network. + self.query_send_buffers.clear(); + break; + } + } + } + // Check for any incoming packet. match self.socket.poll_recv_from(&mut self.recv_buffer) { Ok(Async::Ready((len, from))) => { From d748cf514858f3fb2efa715efca8d3373e8677d2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 21 Nov 2018 16:19:26 +0100 Subject: [PATCH 11/15] Fix flags --- misc/mdns/src/dns.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index 9a707bd7b52..013905d4b06 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -98,8 +98,8 @@ pub fn build_query_response( let mut out = Vec::with_capacity(320); append_u16(&mut out, id); - // Flags ; 0x80 for an answer. - append_u16(&mut out, 0x8000); + // Flags ; 0x84 for an answer. + append_u16(&mut out, 0x8400); // Number of questions, answers, authorities, additionals. append_u16(&mut out, 0x0); append_u16(&mut out, 0x1); From 0bb333561174dd9886f0d900c58448b1aa17cf90 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 21 Nov 2018 16:39:45 +0100 Subject: [PATCH 12/15] More flags fix --- misc/mdns/src/dns.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index 013905d4b06..b5ffb9cfe2f 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -156,8 +156,8 @@ pub fn build_service_discovery_response(id: u16, ttl: Duration) -> Vec { let mut out = Vec::with_capacity(69); append_u16(&mut out, id); - // Flags ; 0x80 for an answer. - append_u16(&mut out, 0x8000); + // Flags ; 0x84 for an answer. + append_u16(&mut out, 0x8400); // Number of questions, answers, authorities, additionals. append_u16(&mut out, 0x0); append_u16(&mut out, 0x1); From e68a788efdbb001e659e6a72f79f6d0edb73e2aa Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 22 Nov 2018 18:14:45 +0100 Subject: [PATCH 13/15] More concerns --- misc/mdns/src/dns.rs | 2 +- misc/mdns/src/lib.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index b5ffb9cfe2f..2c34510c304 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -112,7 +112,7 @@ pub fn build_query_response( // Flags. append_u16(&mut out, 0x000c); - append_u16(&mut out, 0x8001); + append_u16(&mut out, 0x8000); // TTL for the answer append_u32(&mut out, ttl); diff --git a/misc/mdns/src/lib.rs b/misc/mdns/src/lib.rs index 36255e07c5f..8e2836feb22 100644 --- a/misc/mdns/src/lib.rs +++ b/misc/mdns/src/lib.rs @@ -103,7 +103,7 @@ use dns_parser::{Packet, RData}; use futures::{prelude::*, task}; use libp2p_core::{Multiaddr, PeerId}; use multiaddr::Protocol; -use std::{fmt, io, net::SocketAddr, str, time::Duration, time::Instant}; +use std::{fmt, io, net::Ipv4Addr, net::SocketAddr, str, time::Duration, time::Instant}; use tokio_reactor::Handle; use tokio_timer::Interval; use tokio_udp::UdpSocket; @@ -173,7 +173,7 @@ impl MdnsService { socket.set_multicast_loop_v4(true)?; socket.set_multicast_ttl_v4(255)?; // TODO: correct interfaces? - socket.join_multicast_v4(&From::from([224, 0, 0, 251]), &From::from([0, 0, 0, 0]))?; + socket.join_multicast_v4(&From::from([224, 0, 0, 251]), &Ipv4Addr::UNSPECIFIED)?; Ok(MdnsService { socket, From e1e97455250e4b57b496a25e567d18608dbc10f1 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 23 Nov 2018 16:07:47 +0100 Subject: [PATCH 14/15] Fix flags --- misc/mdns/src/dns.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/mdns/src/dns.rs b/misc/mdns/src/dns.rs index 2c34510c304..6260285601a 100644 --- a/misc/mdns/src/dns.rs +++ b/misc/mdns/src/dns.rs @@ -112,7 +112,7 @@ pub fn build_query_response( // Flags. append_u16(&mut out, 0x000c); - append_u16(&mut out, 0x8000); + append_u16(&mut out, 0x0001); // TTL for the answer append_u32(&mut out, ttl); From 65b5edebe5765279d0aad698e32ada8a1a1dab02 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 24 Nov 2018 13:45:38 +0100 Subject: [PATCH 15/15] Fix removed env_logger --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index ea7962d0ddb..d3c0b455f76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ libp2p-tcp-transport = { path = "./transports/tcp" } stdweb = { version = "0.1.3", default-features = false } [dev-dependencies] +env_logger = "0.5.4" rand = "0.5" tokio = "0.1" tokio-stdin-stdout = "0.1"