Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(libp2p): track bandwidth per transport protocol stack #4727

Merged
merged 29 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9c940fa
feat(libp2p): track bandwidth per transport protocol stack
mxinden Oct 25, 2023
e1b4497
Move bandwidth logging to misc/metrics
mxinden Oct 27, 2023
84f68e7
Add bandwidth metrics to server
mxinden Oct 27, 2023
c3e785d
Rename bandwidth metric
mxinden Oct 27, 2023
76cbf19
Use Counter instead of Collector
mxinden Oct 28, 2023
f378cc7
Introduce with_bandwidth_metrics builder step
mxinden Oct 28, 2023
8cc2b1b
Remove collector
mxinden Oct 28, 2023
67fb7cd
Remove license header
mxinden Oct 30, 2023
fd7c5d9
Fix metrics example
mxinden Oct 30, 2023
f9fd560
Depend on StreamMuxer and not StreamMuxerBox
mxinden Oct 30, 2023
fe46696
Adjust doc comment
mxinden Oct 30, 2023
60c6e7a
Use subregistry
mxinden Oct 30, 2023
cda371d
Fix dead-lock when cloning metrics
mxinden Oct 31, 2023
68414d5
Box closure wrapping muxer
mxinden Nov 1, 2023
689a947
Fix wasm compilation
mxinden Nov 1, 2023
aa0fa95
Use protocol stack
mxinden Nov 1, 2023
77a427b
Minor changes
mxinden Nov 1, 2023
314bf0f
Expose through BandwidthMetricTransport
mxinden Nov 1, 2023
297130a
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into b…
mxinden Nov 1, 2023
78e564b
Add changelog entry
mxinden Nov 1, 2023
3276d96
Bump misc/server version
mxinden Nov 1, 2023
7641531
Deprecate BandwidthLogging
mxinden Nov 1, 2023
190492b
fmt
mxinden Nov 1, 2023
bd83ba9
Update misc/server/CHANGELOG.md
mxinden Nov 4, 2023
164c812
Rename to BandwidthTransport
mxinden Nov 4, 2023
756241d
Allow deprecated at the top of module
mxinden Nov 4, 2023
b01f319
fmt
mxinden Nov 10, 2023
d9b6d7d
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into b…
mxinden Nov 10, 2023
f4c1ec7
Fix changelog and version
mxinden Nov 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion examples/metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ mod http_service;
fn main() -> Result<(), Box<dyn Error>> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();

let mut swarm = libp2p::SwarmBuilder::with_new_identity()
let (builder, bandwidth_logging) = libp2p::SwarmBuilder::with_new_identity()
.with_async_std()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_bandwidth_logging();
let mut swarm = builder
.with_behaviour(|key| Behaviour::new(key.public()))?
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
.build();
Expand All @@ -58,6 +60,7 @@ fn main() -> Result<(), Box<dyn Error>> {

let mut metric_registry = Registry::default();
let metrics = Metrics::new(&mut metric_registry);
libp2p::bandwidth::register_bandwidth_sinks(&mut metric_registry, bandwidth_logging);
mxinden marked this conversation as resolved.
Show resolved Hide resolved
thread::spawn(move || block_on(http_service::metrics_server(metric_registry)));

block_on(async {
Expand Down
1 change: 1 addition & 0 deletions libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` featu
instant = "0.1.12" # Explicit dependency to be used in `wasm-bindgen` feature
# TODO feature flag?
rw-stream-sink = { workspace = true }
prometheus-client = { workspace = true }

libp2p-allow-block-list = { workspace = true }
libp2p-autonat = { workspace = true, optional = true }
Expand Down
37 changes: 36 additions & 1 deletion libp2p/src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@
prelude::*,
ready,
};
use prometheus_client::{
encoding::{DescriptorEncoder, EncodeMetric},
metrics::{counter::ConstCounter, MetricType},
};
use std::{
collections::HashMap,
convert::TryFrom as _,
io,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, RwLock,
},
task::{Context, Poll},
};
Expand Down Expand Up @@ -101,6 +106,7 @@
}

/// Allows obtaining the average bandwidth of the streams.
#[derive(Default, Debug)]
pub struct BandwidthSinks {
inbound: AtomicU64,
outbound: AtomicU64,
Expand All @@ -108,7 +114,7 @@

impl BandwidthSinks {
/// Returns a new [`BandwidthSinks`].
pub(crate) fn new() -> Arc<Self> {

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Compile with MSRV

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Check rustdoc intra-doc links

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-emscripten

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-unknown

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-wasi

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Test libp2p

associated function `new` is never used

Check failure on line 117 in libp2p/src/bandwidth.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-server

associated function `new` is never used
Arc::new(Self {
inbound: AtomicU64::new(0),
outbound: AtomicU64::new(0),
Expand Down Expand Up @@ -209,3 +215,32 @@
this.inner.poll_close(cx)
}
}

// TODO: Ideally this should go somewhere else. I.e. good to not depend on prometheus-client in libp2p.
pub fn register_bandwidth_sinks(
registry: &mut prometheus_client::registry::Registry,
sinks: Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>,
) {
registry.register_collector(Box::new(SinksCollector(sinks)));
}

#[derive(Debug)]
struct SinksCollector(Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>);

impl prometheus_client::collector::Collector for SinksCollector {
fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
let mut family_encoder =
encoder.encode_descriptor("bandwidth", "todo", None, MetricType::Counter)?;
mxinden marked this conversation as resolved.
Show resolved Hide resolved
for (protocols, sink) in self.0.read().expect("todo").iter() {
let labels = [("protocols", protocols.as_str()), ("direction", "inbound")];
let metric_encoder = family_encoder.encode_family(&labels)?;
ConstCounter::new(sink.inbound.load(Ordering::Relaxed)).encode(metric_encoder)?;

let labels = [("protocols", protocols.as_str()), ("direction", "outbound")];
let metric_encoder = family_encoder.encode_family(&labels)?;
ConstCounter::new(sink.outbound.load(Ordering::Relaxed)).encode(metric_encoder)?;
}

Ok(())
}
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
25 changes: 9 additions & 16 deletions libp2p/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ mod tests {
use crate::SwarmBuilder;
use libp2p_core::{muxing::StreamMuxerBox, transport::dummy::DummyTransport};
use libp2p_identity::PeerId;
use libp2p_swarm::{NetworkBehaviour, Swarm};
use libp2p_swarm::NetworkBehaviour;

#[test]
#[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "noise"))]
Expand Down Expand Up @@ -301,7 +301,7 @@ mod tests {
relay: libp2p_relay::client::Behaviour,
}

let (builder, _bandwidth_sinks) = SwarmBuilder::with_new_identity()
let _ = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
Default::default(),
Expand All @@ -317,8 +317,7 @@ mod tests {
.unwrap()
.with_relay_client(libp2p_tls::Config::new, libp2p_yamux::Config::default)
.unwrap()
.with_bandwidth_logging();
let _: Swarm<MyBehaviour> = builder
.with_bandwidth_logging(&mut libp2p_metrics::Registry::default())
.with_behaviour(|_key, relay| MyBehaviour { relay })
.unwrap()
.build();
Expand All @@ -327,16 +326,14 @@ mod tests {
#[test]
#[cfg(all(feature = "tokio", feature = "tcp", feature = "tls", feature = "yamux"))]
fn tcp_bandwidth_logging() -> Result<(), Box<dyn std::error::Error>> {
let (builder, _logging) = SwarmBuilder::with_new_identity()
let _ = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
Default::default(),
libp2p_tls::Config::new,
libp2p_yamux::Config::default,
)?
.with_bandwidth_logging();

builder
.with_bandwidth_logging(&mut libp2p_metrics::Registry::default())
.with_behaviour(|_| libp2p_swarm::dummy::Behaviour)
.unwrap()
.build();
mxinden marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -347,12 +344,10 @@ mod tests {
#[test]
#[cfg(all(feature = "tokio", feature = "quic"))]
fn quic_bandwidth_logging() -> Result<(), Box<dyn std::error::Error>> {
let (builder, _logging) = SwarmBuilder::with_new_identity()
let _ = SwarmBuilder::with_new_identity()
.with_tokio()
.with_quic()
.with_bandwidth_logging();

builder
.with_bandwidth_logging(&mut libp2p_metrics::Registry::default())
.with_behaviour(|_| libp2p_swarm::dummy::Behaviour)
.unwrap()
.build();
Expand All @@ -363,12 +358,10 @@ mod tests {
#[test]
#[cfg(feature = "tokio")]
fn other_transport_bandwidth_logging() -> Result<(), Box<dyn std::error::Error>> {
let (builder, _logging) = SwarmBuilder::with_new_identity()
let _ = SwarmBuilder::with_new_identity()
.with_tokio()
.with_other_transport(|_| DummyTransport::<(PeerId, StreamMuxerBox)>::new())?
.with_bandwidth_logging();

builder
.with_bandwidth_logging(&mut libp2p_metrics::Registry::default())
.with_behaviour(|_| libp2p_swarm::dummy::Behaviour)
.unwrap()
.build();
Expand Down
36 changes: 19 additions & 17 deletions libp2p/src/builder/phase/bandwidth_logging.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,40 @@
use multiaddr::Multiaddr;

use super::*;
use crate::bandwidth::BandwidthSinks;
use crate::metrics::bandwidth::{BandwidthSinks, Muxer};

Check failure on line 4 in libp2p/src/builder/phase/bandwidth_logging.rs

View workflow job for this annotation

GitHub Actions / Compile with select features (mdns tcp dns tokio)

failed to resolve: unresolved import

Check failure on line 4 in libp2p/src/builder/phase/bandwidth_logging.rs

View workflow job for this annotation

GitHub Actions / Compile with select features (mdns tcp dns async-std)

failed to resolve: unresolved import

Check failure on line 4 in libp2p/src/builder/phase/bandwidth_logging.rs

View workflow job for this annotation

GitHub Actions / examples

failed to resolve: unresolved import

Check failure on line 4 in libp2p/src/builder/phase/bandwidth_logging.rs

View workflow job for this annotation

GitHub Actions / IPFS Integration tests

failed to resolve: unresolved import

Check failure on line 4 in libp2p/src/builder/phase/bandwidth_logging.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-perf

failed to resolve: unresolved import
use crate::transport_ext::TransportExt;
use crate::SwarmBuilder;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

pub struct BandwidthLoggingPhase<T, R> {
pub(crate) relay_behaviour: R,
pub(crate) transport: T,
}

#[cfg(feature = "metrics")]
impl<T: AuthenticatedMultiplexedTransport, Provider, R>
SwarmBuilder<Provider, BandwidthLoggingPhase<T, R>>
{
pub fn with_bandwidth_logging(
self,
) -> (
SwarmBuilder<Provider, BehaviourPhase<impl AuthenticatedMultiplexedTransport, R>>,
Arc<BandwidthSinks>,
) {
let (transport, sinks) = self.phase.transport.with_bandwidth_logging();
(
SwarmBuilder {
phase: BehaviourPhase {
relay_behaviour: self.phase.relay_behaviour,
transport,
},
keypair: self.keypair,
phantom: PhantomData,
registry: &mut libp2p_metrics::Registry,
) -> SwarmBuilder<Provider, BehaviourPhase<impl AuthenticatedMultiplexedTransport, R>> {
SwarmBuilder {

Check failure on line 24 in libp2p/src/builder/phase/bandwidth_logging.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/libp2p/src/builder/phase/bandwidth_logging.rs
phase: BehaviourPhase {
relay_behaviour: self.phase.relay_behaviour,
transport: crate::metrics::bandwidth::Transport::new(self.phase.transport, registry),
},
sinks,
)
keypair: self.keypair,
phantom: PhantomData,
}
}
}

impl<T: AuthenticatedMultiplexedTransport, Provider, R>
SwarmBuilder<Provider, BandwidthLoggingPhase<T, R>>
{
pub fn without_bandwidth_logging(self) -> SwarmBuilder<Provider, BehaviourPhase<T, R>> {
SwarmBuilder {
phase: BehaviourPhase {
Expand Down
18 changes: 9 additions & 9 deletions libp2p/src/builder/phase/other_transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::convert::Infallible;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
use libp2p_core::Transport;
Expand Down Expand Up @@ -143,23 +144,22 @@ impl<T: AuthenticatedMultiplexedTransport, Provider>
.with_relay_client(security_upgrade, multiplexer_upgrade)
}
}
#[cfg(feature = "metrics")]
impl<Provider, T: AuthenticatedMultiplexedTransport>
SwarmBuilder<Provider, OtherTransportPhase<T>>
{
pub fn with_bandwidth_logging(
self,
) -> (
SwarmBuilder<
Provider,
BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
>,
Arc<BandwidthSinks>,
) {
registry: &mut libp2p_metrics::Registry,
) -> SwarmBuilder<
Provider,
BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
> {
self.without_any_other_transports()
.without_dns()
.without_websocket()
.without_relay()
.with_bandwidth_logging()
.with_bandwidth_logging(registry)
}
}
impl<Provider, T: AuthenticatedMultiplexedTransport>
Expand Down
19 changes: 10 additions & 9 deletions libp2p/src/builder/phase/quic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::bandwidth::BandwidthSinks;
use crate::SwarmBuilder;
#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
use libp2p_core::muxing::StreamMuxer;
Expand All @@ -8,7 +9,9 @@
all(not(target_arch = "wasm32"), feature = "websocket")
))]
use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
use std::{marker::PhantomData, sync::Arc};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::{Arc, RwLock};

pub struct QuicPhase<T> {
pub(crate) transport: T,
Expand Down Expand Up @@ -249,18 +252,16 @@
impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, QuicPhase<T>> {
pub fn with_bandwidth_logging(
self,
) -> (
SwarmBuilder<
Provider,
BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
>,
Arc<crate::bandwidth::BandwidthSinks>,
) {
registry: &mut libp2p_metrics::Registry,

Check failure on line 255 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / Compile with select features (mdns tcp dns tokio)

failed to resolve: use of undeclared crate or module `libp2p_metrics`

Check failure on line 255 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / Compile with select features (mdns tcp dns async-std)

failed to resolve: use of undeclared crate or module `libp2p_metrics`

Check failure on line 255 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / examples

failed to resolve: use of undeclared crate or module `libp2p_metrics`

Check failure on line 255 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / IPFS Integration tests

failed to resolve: use of undeclared crate or module `libp2p_metrics`

Check failure on line 255 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-perf

failed to resolve: use of undeclared crate or module `libp2p_metrics`
) -> SwarmBuilder<
Provider,
BehaviourPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
> {
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_websocket()
.without_relay()
.with_bandwidth_logging()
.with_bandwidth_logging(registry)

Check failure on line 265 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / Compile with select features (mdns tcp dns tokio)

no method named `with_bandwidth_logging` found for struct `builder::SwarmBuilder<Provider, bandwidth_logging::BandwidthLoggingPhase<T, relay::NoRelayBehaviour>>` in the current scope

Check failure on line 265 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / Compile with select features (mdns tcp dns async-std)

no method named `with_bandwidth_logging` found for struct `builder::SwarmBuilder<Provider, bandwidth_logging::BandwidthLoggingPhase<T, relay::NoRelayBehaviour>>` in the current scope

Check failure on line 265 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / examples

no method named `with_bandwidth_logging` found for struct `builder::SwarmBuilder<Provider, bandwidth_logging::BandwidthLoggingPhase<T, relay::NoRelayBehaviour>>` in the current scope

Check failure on line 265 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / IPFS Integration tests

no method named `with_bandwidth_logging` found for struct `builder::SwarmBuilder<Provider, bandwidth_logging::BandwidthLoggingPhase<T, relay::NoRelayBehaviour>>` in the current scope

Check failure on line 265 in libp2p/src/builder/phase/quic.rs

View workflow job for this annotation

GitHub Actions / Test libp2p-perf

no method named `with_bandwidth_logging` found for struct `builder::SwarmBuilder<Provider, bandwidth_logging::BandwidthLoggingPhase<T, relay::NoRelayBehaviour>>` in the current scope
}
}
38 changes: 33 additions & 5 deletions libp2p/src/transport_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
Transport,
};
use libp2p_identity::PeerId;
use std::sync::Arc;
use multiaddr::Multiaddr;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};

/// Trait automatically implemented on all objects that implement `Transport`. Provides some
/// additional utilities.
Expand Down Expand Up @@ -66,7 +70,12 @@
///
/// let (transport, sinks) = transport.with_bandwidth_logging();
/// ```
fn with_bandwidth_logging<S>(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc<BandwidthSinks>)
fn with_bandwidth_logging<S>(
self,
) -> (
Boxed<(PeerId, StreamMuxerBox)>,
Arc<RwLock<HashMap<String, Arc<BandwidthSinks>>>>,
)
where
Self: Sized + Send + Unpin + 'static,
Self::Dial: Send + 'static,
Expand All @@ -74,16 +83,35 @@
Self::Error: Send + Sync,
Self::Output: Into<(PeerId, S)>,
S: StreamMuxer + Send + 'static,
S::Substream: Send + 'static,

Check failure on line 86 in libp2p/src/transport_ext.rs

View workflow job for this annotation

GitHub Actions / rustfmt

Diff in /home/runner/work/rust-libp2p/rust-libp2p/libp2p/src/transport_ext.rs
S::Error: Send + Sync + 'static,
{
let sinks = BandwidthSinks::new();
let sinks: Arc<RwLock<HashMap<_, Arc<BandwidthSinks>>>> = Arc::new(RwLock::new(HashMap::new()));
let sinks_copy = sinks.clone();
let transport = Transport::map(self, |output, _| {
let transport = Transport::map(self, move |output, connected_point| {
fn as_string(ma: &Multiaddr) -> String {
let len = ma
.protocol_stack()
.fold(0, |acc, proto| acc + proto.len() + 1);
let mut protocols = String::with_capacity(len);
for proto_tag in ma.protocol_stack() {
protocols.push('/');
protocols.push_str(proto_tag);
}
protocols
}

let sink = sinks_copy
.write()
.expect("todo")
.entry(as_string(connected_point.get_remote_address()))
.or_default()
.clone();

let (peer_id, stream_muxer_box) = output.into();
(
peer_id,
StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)),
StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sink)),
)
})
.boxed();
Expand Down
Loading
Loading