diff --git a/Cargo.lock b/Cargo.lock index f8e3c6d07ee..6f8d942d372 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2753,6 +2753,7 @@ dependencies = [ name = "libp2p-metrics" version = "0.14.0" dependencies = [ + "futures", "instant", "libp2p-core", "libp2p-dcutr", @@ -2763,6 +2764,7 @@ dependencies = [ "libp2p-ping", "libp2p-relay", "libp2p-swarm", + "pin-project", "prometheus-client", ] diff --git a/libp2p/src/builder.rs b/libp2p/src/builder.rs index 60b977f4397..eb7695604f8 100644 --- a/libp2p/src/builder.rs +++ b/libp2p/src/builder.rs @@ -73,7 +73,7 @@ mod tests { use crate::SwarmBuilder; use libp2p_core::{muxing::StreamMuxerBox, transport::dummy::DummyTransport}; use libp2p_identity::PeerId; - use libp2p_swarm::{NetworkBehaviour, Swarm}; + use libp2p_swarm::NetworkBehaviour; #[test] #[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "noise"))] @@ -301,7 +301,7 @@ mod tests { relay: libp2p_relay::client::Behaviour, } - let (builder, _bandwidth_sinks) = SwarmBuilder::with_new_identity() + let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( Default::default(), @@ -317,8 +317,7 @@ mod tests { .unwrap() .with_relay_client(libp2p_tls::Config::new, libp2p_yamux::Config::default) .unwrap() - .with_bandwidth_logging(); - let _: Swarm = builder + .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_key, relay| MyBehaviour { relay }) .unwrap() .build(); @@ -327,16 +326,14 @@ mod tests { #[test] #[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "yamux"))] fn tcp_bandwidth_logging() -> Result<(), Box> { - let (builder, _logging) = SwarmBuilder::with_new_identity() + let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_tcp( Default::default(), libp2p_tls::Config::new, libp2p_yamux::Config::default, )? - .with_bandwidth_logging(); - - builder + .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); @@ -347,12 +344,10 @@ mod tests { #[test] #[cfg(all(feature = "tokio", feature = "quic"))] fn quic_bandwidth_logging() -> Result<(), Box> { - let (builder, _logging) = SwarmBuilder::with_new_identity() + let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_quic() - .with_bandwidth_logging(); - - builder + .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); @@ -363,12 +358,10 @@ mod tests { #[test] #[cfg(feature = "tokio")] fn other_transport_bandwidth_logging() -> Result<(), Box> { - let (builder, _logging) = SwarmBuilder::with_new_identity() + let _ = SwarmBuilder::with_new_identity() .with_tokio() .with_other_transport(|_| DummyTransport::<(PeerId, StreamMuxerBox)>::new())? - .with_bandwidth_logging(); - - builder + .with_bandwidth_logging(&mut libp2p_metrics::Registry::default()) .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) .unwrap() .build(); diff --git a/libp2p/src/builder/phase/bandwidth_logging.rs b/libp2p/src/builder/phase/bandwidth_logging.rs index 134de7a924b..be848744cb0 100644 --- a/libp2p/src/builder/phase/bandwidth_logging.rs +++ b/libp2p/src/builder/phase/bandwidth_logging.rs @@ -1,5 +1,7 @@ +use multiaddr::Multiaddr; + use super::*; -use crate::bandwidth::BandwidthSinks; +use crate::metrics::bandwidth::{BandwidthSinks, Muxer}; use crate::transport_ext::TransportExt; use crate::SwarmBuilder; use std::collections::HashMap; @@ -11,29 +13,28 @@ pub struct BandwidthLoggingPhase { pub(crate) transport: T, } +#[cfg(feature = "metrics")] impl SwarmBuilder> { pub fn with_bandwidth_logging( self, - ) -> ( - SwarmBuilder>, - Arc>>>, - ) { - let (transport, sinks) = self.phase.transport.with_bandwidth_logging(); - ( - SwarmBuilder { - phase: BehaviourPhase { - relay_behaviour: self.phase.relay_behaviour, - transport, - }, - keypair: self.keypair, - phantom: PhantomData, + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder> { + SwarmBuilder { + phase: BehaviourPhase { + relay_behaviour: self.phase.relay_behaviour, + transport: crate::metrics::bandwidth::Transport::new(self.phase.transport, registry), }, - sinks, - ) + keypair: self.keypair, + phantom: PhantomData, + } } +} +impl + SwarmBuilder> +{ pub fn without_bandwidth_logging(self) -> SwarmBuilder> { SwarmBuilder { phase: BehaviourPhase { diff --git a/libp2p/src/builder/phase/other_transport.rs b/libp2p/src/builder/phase/other_transport.rs index 3699a278163..7891e7119b5 100644 --- a/libp2p/src/builder/phase/other_transport.rs +++ b/libp2p/src/builder/phase/other_transport.rs @@ -144,23 +144,22 @@ impl .with_relay_client(security_upgrade, multiplexer_upgrade) } } +#[cfg(feature = "metrics")] impl SwarmBuilder> { pub fn with_bandwidth_logging( self, - ) -> ( - SwarmBuilder< - Provider, - BehaviourPhase, - >, - Arc>>>, - ) { + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder< + Provider, + BehaviourPhase, + > { self.without_any_other_transports() .without_dns() .without_websocket() .without_relay() - .with_bandwidth_logging() + .with_bandwidth_logging(registry) } } impl diff --git a/libp2p/src/builder/phase/quic.rs b/libp2p/src/builder/phase/quic.rs index 716febd4abd..4ffc9846f3c 100644 --- a/libp2p/src/builder/phase/quic.rs +++ b/libp2p/src/builder/phase/quic.rs @@ -252,18 +252,16 @@ impl_quic_phase_with_websocket!( impl SwarmBuilder> { pub fn with_bandwidth_logging( self, - ) -> ( - SwarmBuilder< - Provider, - BehaviourPhase, - >, - Arc>>>, - ) { + registry: &mut libp2p_metrics::Registry, + ) -> SwarmBuilder< + Provider, + BehaviourPhase, + > { self.without_quic() .without_any_other_transports() .without_dns() .without_websocket() .without_relay() - .with_bandwidth_logging() + .with_bandwidth_logging(registry) } } diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 506f8a574ce..c2b023db999 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -19,6 +19,7 @@ ping = ["libp2p-ping"] relay = ["libp2p-relay"] [dependencies] +futures = "0.3.26" instant = "0.1.12" libp2p-core = { workspace = true } libp2p-dcutr = { workspace = true, optional = true } @@ -29,6 +30,7 @@ libp2p-kad = { workspace = true, optional = true } libp2p-ping = { workspace = true, optional = true } libp2p-relay = { workspace = true, optional = true } libp2p-swarm = { workspace = true } +pin-project = "1.0.0" prometheus-client = { workspace = true } [dev-dependencies] diff --git a/misc/metrics/src/bandwidth.rs b/misc/metrics/src/bandwidth.rs new file mode 100644 index 00000000000..58f0fc81663 --- /dev/null +++ b/misc/metrics/src/bandwidth.rs @@ -0,0 +1,383 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::{ + muxing::{StreamMuxer, StreamMuxerBox, StreamMuxerEvent}, + transport::{ListenerId, TransportError, TransportEvent}, + Multiaddr, +}; + +use futures::{ + io::{IoSlice, IoSliceMut}, + prelude::*, + ready, +}; +use libp2p_identity::PeerId; +use prometheus_client::registry::Registry; +use prometheus_client::{ + encoding::{DescriptorEncoder, EncodeMetric}, + metrics::{counter::ConstCounter, MetricType}, +}; +use std::{ + collections::HashMap, + convert::TryFrom as _, + io, + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, + }, + task::{Context, Poll}, +}; + +// TODO: rename to Transport +/// See `Transport::map`. +#[derive(Debug, Clone)] +#[pin_project::pin_project] +pub struct Transport { + #[pin] + transport: T, + sinks: Arc>>>, +} + +impl Transport { + pub fn new(transport: T, registry: &mut Registry) -> Self { + let sinks: Arc>>> = + Arc::new(RwLock::new(HashMap::new())); + + registry.register_collector(Box::new(SinksCollector(sinks.clone()))); + + Transport { transport, sinks } + } +} + +impl libp2p_core::Transport for Transport +where + // TODO: Consider depending on StreamMuxer only. + T: libp2p_core::Transport, +{ + type Output = (PeerId, StreamMuxerBox); + type Error = T::Error; + type ListenerUpgrade = MapFuture; + type Dial = MapFuture; + + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + self.transport.listen_on(id, addr) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + self.transport.remove_listener(id) + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let sinks = self + .sinks + .write() + .expect("todo") + .entry(as_string(&addr)) + .or_default() + .clone(); + let future = self.transport.dial(addr.clone())?; + Ok(MapFuture { + inner: future, + sinks: Some(sinks), + }) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + let sinks = self + .sinks + .write() + .expect("todo") + .entry(as_string(&addr)) + .or_default() + .clone(); + let future = self.transport.dial_as_listener(addr.clone())?; + Ok(MapFuture { + inner: future, + sinks: Some(sinks), + }) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + match this.transport.poll(cx) { + Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade, + local_addr, + send_back_addr, + }) => { + // TODO: Abstract into method? + let sinks = this + .sinks + .write() + .expect("todo") + .entry(as_string(&send_back_addr)) + .or_default() + .clone(); + Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade: MapFuture { + inner: upgrade, + sinks: Some(sinks), + }, + local_addr, + send_back_addr, + }) + } + Poll::Ready(other) => { + let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched")); + Poll::Ready(mapped) + } + Poll::Pending => Poll::Pending, + } + } +} + +/// Custom `Future` to avoid boxing. +/// +/// Applies a function to the inner future's result. +#[pin_project::pin_project] +#[derive(Clone, Debug)] +pub struct MapFuture { + #[pin] + inner: T, + sinks: Option>, +} + +impl Future for MapFuture +where + T: TryFuture, +{ + type Output = Result<(PeerId, StreamMuxerBox), T::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let (peer_id, stream_muxer) = match TryFuture::try_poll(this.inner, cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + }; + Poll::Ready(Ok(( + peer_id, + StreamMuxerBox::new(Muxer::new( + stream_muxer, + this.sinks.take().expect("todo"), + )), + ))) + } +} + +/// Wraps around a [`StreamMuxer`] and counts the number of bytes that go through all the opened +/// streams. +#[derive(Clone)] +#[pin_project::pin_project] +pub struct Muxer { + #[pin] + inner: SMInner, + sinks: Arc, +} + +impl Muxer { + /// Creates a new [`BandwidthLogging`] around the stream muxer. + pub fn new(inner: SMInner, sinks: Arc) -> Self { + Self { inner, sinks } + } +} + +impl StreamMuxer for Muxer +where + SMInner: StreamMuxer, +{ + type Substream = InstrumentedStream; + type Error = SMInner::Error; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + this.inner.poll(cx) + } + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let inner = ready!(this.inner.poll_inbound(cx)?); + let logged = InstrumentedStream { + inner, + sinks: this.sinks.clone(), + }; + Poll::Ready(Ok(logged)) + } + + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let inner = ready!(this.inner.poll_outbound(cx)?); + let logged = InstrumentedStream { + inner, + sinks: this.sinks.clone(), + }; + Poll::Ready(Ok(logged)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) + } +} + +/// Allows obtaining the average bandwidth of the streams. +#[derive(Default, Debug)] +pub struct BandwidthSinks { + inbound: AtomicU64, + outbound: AtomicU64, +} + +/// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it. +#[pin_project::pin_project] +pub struct InstrumentedStream { + #[pin] + inner: SMInner, + sinks: Arc, +} + +impl AsyncRead for InstrumentedStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_read(cx, buf))?; + this.sinks.inbound.fetch_add( + u64::try_from(num_bytes).unwrap_or(u64::max_value()), + Ordering::Relaxed, + ); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?; + this.sinks.inbound.fetch_add( + u64::try_from(num_bytes).unwrap_or(u64::max_value()), + Ordering::Relaxed, + ); + Poll::Ready(Ok(num_bytes)) + } +} + +impl AsyncWrite for InstrumentedStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_write(cx, buf))?; + this.sinks.outbound.fetch_add( + u64::try_from(num_bytes).unwrap_or(u64::max_value()), + Ordering::Relaxed, + ); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?; + this.sinks.outbound.fetch_add( + u64::try_from(num_bytes).unwrap_or(u64::max_value()), + Ordering::Relaxed, + ); + Poll::Ready(Ok(num_bytes)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) + } +} + +#[derive(Debug)] +pub struct SinksCollector(Arc>>>); + +impl prometheus_client::collector::Collector for SinksCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + let mut family_encoder = + encoder.encode_descriptor("bandwidth", "todo", None, MetricType::Counter)?; + for (protocols, sink) in self.0.read().expect("todo").iter() { + let labels = [("protocols", protocols.as_str()), ("direction", "inbound")]; + let metric_encoder = family_encoder.encode_family(&labels)?; + ConstCounter::new(sink.inbound.load(Ordering::Relaxed)).encode(metric_encoder)?; + + let labels = [("protocols", protocols.as_str()), ("direction", "outbound")]; + let metric_encoder = family_encoder.encode_family(&labels)?; + ConstCounter::new(sink.outbound.load(Ordering::Relaxed)).encode(metric_encoder)?; + } + + Ok(()) + } +} + +fn as_string(ma: &Multiaddr) -> String { + let len = ma + .protocol_stack() + .fold(0, |acc, proto| acc + proto.len() + 1); + let mut protocols = String::with_capacity(len); + for proto_tag in ma.protocol_stack() { + protocols.push('/'); + protocols.push_str(proto_tag); + } + protocols +} diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 1f8dd224674..cd99bc7d7fd 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -27,6 +27,8 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +// TODO: pub? +pub mod bandwidth; #[cfg(feature = "dcutr")] mod dcutr; #[cfg(feature = "gossipsub")] @@ -42,9 +44,7 @@ mod protocol_stack; mod relay; mod swarm; -use std::{sync::{Arc, RwLock}, collections::HashMap}; - -use prometheus_client::registry::Registry; +pub use prometheus_client::registry::Registry; /// Set of Swarm and protocol metrics derived from emitted events. pub struct Metrics {