Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/ipa-metrics-migrate' into perf-w…
Browse files Browse the repository at this point in the history
…ork-oct2024
  • Loading branch information
akoshelev committed Oct 28, 2024
2 parents 631bfc3 + ce1afea commit 203b70d
Show file tree
Hide file tree
Showing 20 changed files with 301 additions and 279 deletions.
9 changes: 5 additions & 4 deletions ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ web-app = [
"http-body",
"http-body-util",
]
test-fixture = ["weak-field"]
test-fixture = ["weak-field", "ipa-metrics-tracing", "ipa-metrics/partitions"]
# Include observability instruments that detect lack of progress inside MPC. If there is a bug that leads to helper
# miscommunication, this feature helps to detect it. Turning it on has some cost.
# If "shuttle" feature is enabled, turning this on has no effect.
Expand Down Expand Up @@ -86,6 +86,8 @@ ipa-prf = []
relaxed-dp = []

[dependencies]
ipa-metrics = { path = "../ipa-metrics", features = [] }
ipa-metrics-tracing = { optional = true, path = "../ipa-metrics-tracing" }
ipa-step = { version = "*", path = "../ipa-step" }
ipa-step-derive = { version = "*", path = "../ipa-step-derive" }

Expand Down Expand Up @@ -128,9 +130,6 @@ hyper-util = { version = "0.1.3", optional = true, features = ["http2"] }
http-body-util = { version = "0.1.1", optional = true }
http-body = { version = "1", optional = true }
iai = { version = "0.1.1", optional = true }
metrics = "0.21.0"
metrics-tracing-context = "0.14.0"
metrics-util = { version = "0.15.0" }
once_cell = "1.18"
pin-project = "1.0"
rand = "0.8"
Expand Down Expand Up @@ -175,6 +174,8 @@ permutation = "0.4.1"
proptest = "1.4"
rustls = { version = "0.23" }
tempfile = "3"
ipa-metrics-tracing = { path = "../ipa-metrics-tracing" }
ipa-metrics = { path = "../ipa-metrics", features = ["partitions"] }


[lib]
Expand Down
61 changes: 39 additions & 22 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use clap::{self, Parser, Subcommand};
use hyper::http::uri::Scheme;
use ipa_core::{
cli::{
client_config_setup, keygen, test_setup, ConfGenArgs, KeygenArgs, TestSetupArgs, Verbosity,
client_config_setup, keygen, test_setup, ConfGenArgs, KeygenArgs, LoggingHandle,
TestSetupArgs, Verbosity,
},
config::{hpke_registry, HpkeServerConfig, NetworkConfig, ServerConfig, TlsConfig},
error::BoxError,
Expand Down Expand Up @@ -113,7 +114,7 @@ fn read_file(path: &Path) -> Result<BufReader<fs::File>, BoxError> {
.map_err(|e| format!("failed to open file {}: {e:?}", path.display()))?)
}

async fn server(args: ServerArgs) -> Result<(), BoxError> {
async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), BoxError> {
let my_identity = HelperIdentity::try_from(args.identity.expect("enforced by clap")).unwrap();

let (identity, server_tls) = match (args.tls_cert, args.tls_key) {
Expand All @@ -136,7 +137,7 @@ async fn server(args: ServerArgs) -> Result<(), BoxError> {
private_key_file: sk_path,
});

let query_runtime = new_query_runtime();
let query_runtime = new_query_runtime(&logging_handle);
let app_config = AppConfig::default()
.with_key_registry(hpke_registry(mk_encryption.as_ref()).await?)
.with_active_work(args.active_work)
Expand Down Expand Up @@ -165,7 +166,7 @@ async fn server(args: ServerArgs) -> Result<(), BoxError> {
let shard_server_config = server_config.clone();
// ---

let http_runtime = new_http_runtime();
let http_runtime = new_http_runtime(&logging_handle);
let clients = MpcHelperClient::from_conf(
&IpaRuntime::from_tokio_runtime(&http_runtime),
&network_config,
Expand Down Expand Up @@ -230,18 +231,26 @@ async fn server(args: ServerArgs) -> Result<(), BoxError> {
/// if for some reason query runtime becomes overloaded.
/// When multi-threading feature is enabled it creates a runtime with thread-per-core,
/// otherwise a single-threaded runtime is created.
fn new_http_runtime() -> Runtime {
fn new_http_runtime(logging_handle: &LoggingHandle) -> Runtime {
if cfg!(feature = "multi-threading") {
tokio::runtime::Builder::new_multi_thread()
.thread_name("http-worker")
.enable_all()
logging_handle
.metrics_handle
.tokio_bind(
tokio::runtime::Builder::new_multi_thread()
.thread_name("http-worker")
.enable_all(),
)
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("http-worker")
.enable_all()
logging_handle
.metrics_handle
.tokio_bind(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("http-worker")
.enable_all(),
)
.build()
.unwrap()
}
Expand All @@ -250,21 +259,29 @@ fn new_http_runtime() -> Runtime {
/// This function creates a runtime suitable for executing MPC queries.
/// When multi-threading feature is enabled it creates a runtime with thread-per-core,
/// otherwise a single-threaded runtime is created.
fn new_query_runtime() -> Runtime {
fn new_query_runtime(logging_handle: &LoggingHandle) -> Runtime {
// it is intentional that IO driver is not enabled here (enable_time() call only).
// query runtime is supposed to use CPU/memory only, no writes to disk and all
// network communication is handled by HTTP runtime.
if cfg!(feature = "multi-threading") {
tokio::runtime::Builder::new_multi_thread()
.thread_name("query-executor")
.enable_time()
logging_handle
.metrics_handle
.tokio_bind(
tokio::runtime::Builder::new_multi_thread()
.thread_name("query-executor")
.enable_time(),
)
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("query-executor")
.enable_time()
logging_handle
.metrics_handle
.tokio_bind(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("query-executor")
.enable_time(),
)
.build()
.unwrap()
}
Expand All @@ -275,10 +292,10 @@ fn new_query_runtime() -> Runtime {
#[tokio::main(flavor = "current_thread")]
pub async fn main() {
let args = Args::parse();
let _handle = args.logging.setup_logging();
let handle = args.logging.setup_logging();

let res = match args.command {
None => server(args.server).await,
None => server(args.server, handle).await,
Some(HelperCommand::Keygen(args)) => keygen(&args),
Some(HelperCommand::TestSetup(args)) => test_setup(args),
Some(HelperCommand::Confgen(args)) => client_config_setup(args),
Expand Down
72 changes: 40 additions & 32 deletions ipa-core/src/cli/metric_collector.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,56 @@
use std::{io::stderr, thread};
use std::{io, thread, thread::JoinHandle};

use metrics_tracing_context::TracingContextLayer;
use metrics_util::{
debugging::{DebuggingRecorder, Snapshotter},
layers::Layer,
use ipa_metrics::{
MetricChannelType, MetricsCollectorController, MetricsCurrentThreadContext, MetricsProducer,
};
use tokio::runtime::Builder;

use crate::telemetry::stats::Metrics;

/// Collects metrics using `DebuggingRecorder` and dumps them to `stderr` when dropped.
/// Holds a reference to metrics controller and producer
pub struct CollectorHandle {
snapshotter: Snapshotter,
thread_handle: JoinHandle<()>,
/// This will be used once we start consuming metrics
_controller: MetricsCollectorController,
producer: MetricsProducer,
}

///
/// Initializes this collector by installing `DebuggingRecorder` to keep track of metrics
/// emitted from different parts of the app.
///
/// ## Panics
/// Panics if metric recorder has already been set
#[must_use]
pub fn install_collector() -> CollectorHandle {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

// use span fields as dimensions for metric
let recorder = TracingContextLayer::all().layer(recorder);
metrics::set_boxed_recorder(Box::new(recorder))
.expect("Metric recorder has been installed already");

// register metrics
crate::telemetry::metrics::register();
tracing::info!("Metrics enabled");

CollectorHandle { snapshotter }
/// ## Errors
/// If it fails to start a new thread
pub fn install_collector() -> io::Result<CollectorHandle> {
let (producer, controller, handle) =
ipa_metrics::install_new_thread(MetricChannelType::Unbounded)?;
tracing::info!("Metrics engine is enabled");

Ok(CollectorHandle {
thread_handle: handle,
_controller: controller,
producer,
})
}

impl Drop for CollectorHandle {
fn drop(&mut self) {
if !thread::panicking() {
let stats = Metrics::from_snapshot(self.snapshotter.snapshot());
stats
.print(&mut stderr())
.expect("Failed to dump metrics to stderr");
}
if !thread::panicking() && !self.thread_handle.is_finished() {
tracing::warn!("Metrics thread is still running");
};
}
}

impl CollectorHandle {
pub fn tokio_bind<'a>(&self, target: &'a mut Builder) -> &'a mut Builder {
let flush_fn = || MetricsCurrentThreadContext::flush();

target
.on_thread_start({
let producer = self.producer.clone();
move || {
producer.install();
}
})
.on_thread_stop(flush_fn)
.on_thread_park(flush_fn)
}
}
2 changes: 1 addition & 1 deletion ipa-core/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ pub use metric_collector::{install_collector, CollectorHandle};
pub use paths::PathExt as CliPaths;
#[cfg(feature = "web-app")]
pub use test_setup::{test_setup, TestSetupArgs};
pub use verbosity::Verbosity;
pub use verbosity::{LoggingHandle, Verbosity};
24 changes: 10 additions & 14 deletions ipa-core/src/cli/verbosity.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::io::{stderr, IsTerminal};

use clap::Parser;
use metrics_tracing_context::MetricsLayer;
use tracing::{info, metadata::LevelFilter, Level};
use tracing_subscriber::{
fmt, fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter,
Expand All @@ -24,11 +23,14 @@ pub struct Verbosity {
}

pub struct LoggingHandle {
#[allow(dead_code)] // we care about handle's drop semantic so it is ok to not read it
metrics_handle: Option<CollectorHandle>,
pub metrics_handle: CollectorHandle,
}

impl Verbosity {
/// Sets up logging and metrics infrastructure
///
/// ## Panics
/// If metrics failed to setup
#[must_use]
pub fn setup_logging(&self) -> LoggingHandle {
let filter_layer = self.log_filter();
Expand All @@ -39,20 +41,14 @@ impl Verbosity {
.with_ansi(std::io::stderr().is_terminal())
.with_writer(stderr);

let registry = tracing_subscriber::registry()
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer);
.with(fmt_layer)
.init();

if cfg!(feature = "disable-metrics") {
registry.init();
} else {
registry.with(MetricsLayer::new()).init();
}
let metrics_handle = install_collector().expect("Can install metrics");

let handle = LoggingHandle {
metrics_handle: (!self.quiet && !cfg!(feature = "disable-metrics"))
.then(install_collector),
};
let handle = LoggingHandle { metrics_handle };
set_global_panic_hook();

handle
Expand Down
13 changes: 7 additions & 6 deletions ipa-core/src/helpers/gateway/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use dashmap::{mapref::entry::Entry, DashMap};
use futures::Stream;
use ipa_metrics::counter;
#[cfg(all(test, feature = "shuttle"))]
use shuttle::future as tokio;
use typenum::Unsigned;
Expand Down Expand Up @@ -158,13 +159,13 @@ impl<I: TransportIdentity, M: Message> SendingEnd<I, M> {
))]
pub async fn send<B: Borrow<M>>(&self, record_id: RecordId, msg: B) -> Result<(), Error<I>> {
let r = self.inner.send(record_id, msg).await;
metrics::increment_counter!(RECORDS_SENT,
STEP => self.inner.channel_id.gate.as_ref().to_string(),
ROLE => self.sender_id.as_str(),
counter!(RECORDS_SENT, 1,
STEP => &self.inner.channel_id.gate,
ROLE => &self.sender_id
);
metrics::counter!(BYTES_SENT, M::Size::U64,
STEP => self.inner.channel_id.gate.as_ref().to_string(),
ROLE => self.sender_id.as_str(),
counter!(BYTES_SENT, M::Size::U64,
STEP => &self.inner.channel_id.gate,
ROLE => &self.sender_id
);

r
Expand Down
Loading

0 comments on commit 203b70d

Please sign in to comment.