From ad77ff828ce194278bd25fdf6da43ee5429c5ba7 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Wed, 16 Oct 2024 19:09:44 -0700 Subject: [PATCH 01/11] Add ipa-metrics crate Follows the design described here https://docs.google.com/document/d/1cRnE024bi7KJYeqMT8yMYluIplA8992bNIaeKBmrXp4/edit#heading=h.d8g3vju6yaqd --- Cargo.toml | 2 +- ipa-metrics/Cargo.toml | 20 +++ ipa-metrics/src/collector.rs | 162 +++++++++++++++++++ ipa-metrics/src/context.rs | 151 +++++++++++++++++ ipa-metrics/src/controller.rs | 30 ++++ ipa-metrics/src/key.rs | 287 +++++++++++++++++++++++++++++++++ ipa-metrics/src/kind.rs | 6 + ipa-metrics/src/label.rs | 160 ++++++++++++++++++ ipa-metrics/src/lib.rs | 56 +++++++ ipa-metrics/src/partitioned.rs | 211 ++++++++++++++++++++++++ ipa-metrics/src/producer.rs | 37 +++++ ipa-metrics/src/store.rs | 201 +++++++++++++++++++++++ 12 files changed, 1322 insertions(+), 1 deletion(-) create mode 100644 ipa-metrics/Cargo.toml create mode 100644 ipa-metrics/src/collector.rs create mode 100644 ipa-metrics/src/context.rs create mode 100644 ipa-metrics/src/controller.rs create mode 100644 ipa-metrics/src/key.rs create mode 100644 ipa-metrics/src/kind.rs create mode 100644 ipa-metrics/src/label.rs create mode 100644 ipa-metrics/src/lib.rs create mode 100644 ipa-metrics/src/partitioned.rs create mode 100644 ipa-metrics/src/producer.rs create mode 100644 ipa-metrics/src/store.rs diff --git a/Cargo.toml b/Cargo.toml index 377020368..1aed2b4b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["ipa-core", "ipa-step", "ipa-step-derive", "ipa-step-test"] +members = ["ipa-core", "ipa-step", "ipa-step-derive", "ipa-step-test", "ipa-metrics"] [profile.release] incremental = true diff --git a/ipa-metrics/Cargo.toml b/ipa-metrics/Cargo.toml new file mode 100644 index 000000000..ebaeb9473 --- /dev/null +++ b/ipa-metrics/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "ipa-metrics" +version = "0.1.0" +edition = "2021" + +[features] +default = [] +# support metric partitioning +partitions = [] + +[dependencies] +# crossbeam channels are faster than std +crossbeam-channel = "0.5" +# This crate uses raw entry API that is unstable in stdlib +hashbrown = "0.15" +# Fast non-collision-resistant hashing +rustc-hash = "2.0.0" +# logging +tracing = "0.1" + diff --git a/ipa-metrics/src/collector.rs b/ipa-metrics/src/collector.rs new file mode 100644 index 000000000..bf0a6bc06 --- /dev/null +++ b/ipa-metrics/src/collector.rs @@ -0,0 +1,162 @@ +use std::cell::RefCell; + +use crossbeam_channel::{Receiver, Select}; + +use crate::{ControllerCommand, MetricsStore}; + +thread_local! { + /// Collector that is installed in a thread. It is responsible for receiving metrics from + /// all threads and aggregating them. + static COLLECTOR: RefCell> = const { RefCell::new(None) } +} + +pub struct Installed; + +impl Installed { + pub fn block_until_shutdown(&self) -> MetricsStore { + MetricsCollector::with_current_mut(|c| { + c.event_loop(); + + std::mem::take(&mut c.local_store) + }) + } +} + +pub struct MetricsCollector { + pub(super) rx: Receiver, + pub(super) local_store: MetricsStore, + pub(super) command_rx: Receiver, +} + +impl MetricsCollector { + pub fn install(self) -> Installed { + COLLECTOR.with_borrow_mut(|c| { + assert!(c.replace(self).is_none(), "Already initialized"); + }); + + Installed + } + + fn event_loop(&mut self) { + let mut select = Select::new(); + let data_idx = select.recv(&self.rx); + let command_idx = select.recv(&self.command_rx); + + loop { + let next_op = select.select(); + match next_op.index() { + i if i == data_idx => match next_op.recv(&self.rx) { + Ok(store) => { + tracing::trace!("Collector received more data: {store:?}"); + println!("Collector received more data: {store:?}"); + self.local_store.merge(store) + } + Err(e) => { + tracing::debug!("No more threads collecting metrics. Disconnected: {e}"); + select.remove(data_idx); + } + }, + i if i == command_idx => match next_op.recv(&self.command_rx) { + Ok(ControllerCommand::Snapshot(tx)) => { + tracing::trace!("Snapshot request received"); + println!("snapshot request received"); + tx.send(self.local_store.clone()).unwrap(); + } + Ok(ControllerCommand::Stop(tx)) => { + tx.send(()).unwrap(); + break; + } + Err(e) => { + tracing::debug!("Metric controller is disconnected: {e}"); + break; + } + }, + _ => unreachable!(), + } + } + } + + pub fn with_current_mut T, T>(f: F) -> T { + COLLECTOR.with_borrow_mut(|c| { + let collector = c.as_mut().expect("Collector is installed"); + f(collector) + }) + } +} + +impl Drop for MetricsCollector { + fn drop(&mut self) { + tracing::debug!("Collector is dropped"); + } +} + +#[cfg(test)] +mod tests { + use std::{ + thread, + thread::{Scope, ScopedJoinHandle}, + }; + + use crate::{counter, installer, producer::Producer, thread_installer}; + + struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer); + + impl<'scope, 'env: 'scope> MeteredScope<'scope, 'env> { + fn spawn(&self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope, + { + let producer = self.1.clone(); + + self.0.spawn(move || { + producer.install(); + let r = f(); + let _ = producer.drop_handle(); + + r + }) + } + } + + trait IntoMetered<'scope, 'env: 'scope> { + fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env>; + } + + impl<'scope, 'env: 'scope> IntoMetered<'scope, 'env> for Scope<'scope, 'env> { + fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env> { + MeteredScope(self, meter) + } + } + + #[test] + fn start_stop() { + let (collector, producer, controller) = installer(); + let handle = thread::spawn(|| { + let store = collector.install().block_until_shutdown(); + store.counter_value(counter!("foo")) + }); + + thread::scope(move |s| { + let s = s.metered(producer); + s.spawn(|| counter!("foo", 3)).join().unwrap(); + s.spawn(|| counter!("foo", 5)).join().unwrap(); + controller.stop().unwrap(); + }); + + assert_eq!(8, handle.join().unwrap()) + } + + #[test] + fn with_thread() { + let (producer, controller, handle) = thread_installer().unwrap(); + thread::scope(move |s| { + let s = s.metered(producer); + s.spawn(|| counter!("baz", 4)); + s.spawn(|| counter!("bar", 1)); + s.spawn(|| controller.stop().unwrap()); + }); + + handle.join().unwrap() // Collector thread should be terminated by now + } +} diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs new file mode 100644 index 000000000..2680c0876 --- /dev/null +++ b/ipa-metrics/src/context.rs @@ -0,0 +1,151 @@ +use std::{cell::RefCell, mem}; + +use crossbeam_channel::Sender; + +use crate::MetricsStore; + +thread_local! { + pub(crate) static METRICS_CTX: RefCell = const { RefCell::new(MetricsContext::new()) } +} + +#[macro_export] +macro_rules! counter { + ($metric:expr, $val:expr $(, $l:expr => $v:expr)*) => {{ + let name = $crate::metric_name!($metric $(, $l => $v)*); + $crate::MetricsCurrentThreadContext::store_mut(|store| store.counter(&name).inc($val)) + }}; + ($metric:expr $(, $l:expr => $v:expr)*) => {{ + $crate::metric_name!($metric $(, $l => $v)*) + }}; +} + +/// Provides access to the metric store associated with the current thread. +/// If there is no store associated with the current thread, it will create a new one. +pub struct CurrentThreadContext; + +impl CurrentThreadContext { + pub fn init(tx: Sender) { + METRICS_CTX.with_borrow_mut(|ctx| ctx.init(tx)); + } + + pub fn flush() { + METRICS_CTX.with_borrow_mut(|ctx| ctx.flush()); + } + + pub fn is_connected() -> bool { + METRICS_CTX.with_borrow(|ctx| ctx.is_connected()) + } + + pub fn store T, T>(f: F) -> T { + METRICS_CTX.with_borrow(|ctx| f(ctx.store())) + } + + pub fn store_mut T, T>(f: F) -> T { + METRICS_CTX.with_borrow_mut(|ctx| f(ctx.store_mut())) + } +} + +/// This context is used inside thread-local storage, +/// so it must be wrapped inside [`std::cell::RefCell`]. +/// +/// For single-threaded applications, it is possible +/// to use it w/o connecting to the collector thread. +pub struct MetricsContext { + store: MetricsStore, + /// Handle to send metrics to the collector thread + tx: Option>, +} + +impl Default for MetricsContext { + fn default() -> Self { + Self::new() + } +} + +impl MetricsContext { + pub const fn new() -> Self { + Self { + store: MetricsStore::new(), + tx: None, + } + } + + /// Connects this context to the collector thread. + /// Sender will be used to send data from this thread + fn init(&mut self, tx: Sender) { + assert!(self.tx.is_none(), "Already connected"); + + self.tx = Some(tx); + } + + pub fn store(&self) -> &MetricsStore { + &self.store + } + + pub fn store_mut(&mut self) -> &mut MetricsStore { + &mut self.store + } + + fn is_connected(&self) -> bool { + self.tx.is_some() + } + + fn flush(&mut self) { + if self.is_connected() { + let store = mem::take(&mut self.store); + match self.tx.as_ref().unwrap().send(store) { + Ok(_) => {} + Err(e) => { + tracing::warn!("MetricsContext is not connected: {e}"); + } + } + } else { + tracing::warn!("MetricsContext is not connected"); + } + } +} + +impl Drop for MetricsContext { + fn drop(&mut self) { + if !self.store.is_empty() { + tracing::warn!( + "Non-empty metric store is dropped: {} metrics lost", + self.store.len() + ); + } + } +} + +#[cfg(test)] +mod tests { + use crate::MetricsContext; + + /// Each thread has its local store by default, and it is exclusive to it + #[test] + #[cfg(feature = "partitions")] + fn local_store() { + use crate::context::CurrentThreadContext; + + crate::set_partition(0xdeadbeef); + counter!("foo", 7); + + std::thread::spawn(|| { + counter!("foo", 1); + counter!("foo", 5); + assert_eq!( + 5, + CurrentThreadContext::store(|store| store.counter_value(&counter!("foo"))) + ); + }); + + assert_eq!( + 7, + CurrentThreadContext::store(|store| store.counter_value(&counter!("foo"))) + ); + } + + #[test] + fn default() { + assert_eq!(0, MetricsContext::default().store().len()) + } +} diff --git a/ipa-metrics/src/controller.rs b/ipa-metrics/src/controller.rs new file mode 100644 index 000000000..a70802f38 --- /dev/null +++ b/ipa-metrics/src/controller.rs @@ -0,0 +1,30 @@ +use crossbeam_channel::Sender; + +use crate::MetricsStore; + +pub enum Command { + Snapshot(Sender), + Stop(Sender<()>), +} + +pub struct Controller { + pub(super) tx: Sender, +} + +impl Controller { + pub fn snapshot(&self) -> Result { + let (tx, rx) = crossbeam_channel::bounded(0); + self.tx + .send(Command::Snapshot(tx)) + .map_err(|e| format!("An error occurred while requesting metrics snapshot: {e}"))?; + rx.recv().map_err(|e| format!("Disconnected channel: {e}")) + } + + pub fn stop(self) -> Result<(), String> { + let (tx, rx) = crossbeam_channel::bounded(0); + self.tx + .send(Command::Stop(tx)) + .map_err(|e| format!("An error occurred while requesting metrics snapshot: {e}"))?; + rx.recv().map_err(|e| format!("Disconnected channel: {e}")) + } +} diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs new file mode 100644 index 000000000..dec06a108 --- /dev/null +++ b/ipa-metrics/src/key.rs @@ -0,0 +1,287 @@ +//! Metric names supported by this crate. +//! +//! Providing a good use for metrics is a tradeoff between +//! performance and ergonomics. Flexible metric engines support +//! dynamic names, like "bytes.sent.{ip}" or "cpu.{core}.instructions" +//! but that comes with a significant performance cost. +//! String interning helps to mitigate this on the storage site +//! but callsites need to allocate those at every call. +//! +//! IPA metrics can be performance sensitive. There are counters +//! incremented on every send and receive operation, so they need +//! to be fast. For this reason, dynamic metric names are not supported. +//! Metric name can only be a string, known at compile time. +//! +//! However, it is not flexible enough. Most metrics have dimensions +//! attached to them. IPA example is `bytes.sent` metric with step breakdown. +//! It is very useful to know the required throughput per circuit. +//! +//! This metric engine supports up to 5 dimensions attached to every metric, +//! again trying to strike a good balance between performance and usability. + +use std::{ + array, + hash::{Hash, Hasher}, + iter, + iter::repeat, +}; + +pub use Name as MetricName; +pub(super) use OwnedName as OwnedMetricName; + +use crate::label::{Label, OwnedLabel, MAX_LABELS}; + +#[macro_export] +macro_rules! metric_name { + // Match when two key-value pairs are provided + // TODO: enforce uniqueness at compile time + ($metric:expr, $l1:expr => $v1:expr, $l2:expr => $v2:expr) => {{ + use $crate::UniqueElements; + let labels = [ + $crate::Label { + name: $l1, + val: $v1, + }, + $crate::Label { + name: $l2, + val: $v2, + }, + ] + .enforce_unique(); + $crate::MetricName::from_parts($metric, labels) + }}; + // Match when one key-value pair is provided + ($metric:expr, $l1:expr => $v1:expr) => {{ + $crate::MetricName::from_parts( + $metric, + [$crate::Label { + name: $l1, + val: $v1, + }], + ) + }}; + // Match when no key-value pairs are provided + ($metric:expr) => {{ + $crate::MetricName::from_parts($metric, []) + }}; +} + +/// Metric name that is created at callsite on each metric invocation. +/// For this reason, it is performance sensitive - it tries to borrow +/// whatever it can from callee stack. +#[derive(Debug, PartialEq)] +pub struct Name<'lv, const LABELS: usize = 0> { + pub(super) key: &'static str, + labels: [Label<'lv>; LABELS], +} + +impl<'lv, const LABELS: usize> Name<'lv, LABELS> { + pub fn from_parts>(key: I, labels: [Label<'lv>; LABELS]) -> Self { + assert!( + LABELS <= MAX_LABELS, + "Maximum 5 labels per metric is supported" + ); + + Self { + key: key.into(), + labels, + } + } + + /// [`ToOwned`] trait does not work because of + /// extra [`Borrow`] requirement + pub(super) fn to_owned(&self) -> OwnedName { + let labels: [_; 5] = array::from_fn(|i| { + if i < self.labels.len() { + Some(self.labels[i].to_owned()) + } else { + None + } + }); + + OwnedName { + key: self.key, + labels, + } + } +} + +/// Same as [`Name`], but intended for internal use. This is an owned +/// version of it, that does not borrow anything from outside. +/// This is the key inside metric stores which are simple hashmaps. +#[derive(Debug, Clone)] +pub struct OwnedName { + pub(super) key: &'static str, + labels: [Option; 5], +} + +impl OwnedName { + pub fn key(&self) -> &'static str { + self.key + } + + pub fn labels(&self) -> impl Iterator { + self.labels.iter().filter_map(|l| l.as_ref()) + } +} + +impl Hash for Name<'_, LABELS> { + fn hash(&self, state: &mut H) { + state.write(self.key.as_bytes()); + for label in &self.labels { + label.hash(state) + } + } +} + +impl From<&'static str> for Name<'_, 0> { + fn from(value: &'static str) -> Self { + Self { + key: value, + labels: [], + } + } +} + +pub trait UniqueElements { + fn enforce_unique(self) -> Self; +} + +impl UniqueElements for [Label<'_>; 2] { + fn enforce_unique(self) -> Self { + if self[0].name == self[1].name { + panic!("label names must be unique") + } + + self + } +} + +impl<'a, const LABELS: usize> PartialEq> for OwnedName { + fn eq(&self, other: &Name<'a, LABELS>) -> bool { + self.key == other.key + && iter::zip( + &self.labels, + other.labels.iter().map(Some).chain(repeat(None)), + ) + .all(|(a, b)| match (a, b) { + (Some(a), Some(b)) => a.as_borrowed() == *b, + (None, None) => true, + _ => false, + }) + } +} + +impl PartialEq for OwnedName { + fn eq(&self, other: &OwnedName) -> bool { + self.key == other.key + && iter::zip(&self.labels, &other.labels).all(|(a, b)| match (a, b) { + (Some(a), Some(b)) => a == b, + (None, None) => true, + _ => false, + }) + } +} + +impl Eq for OwnedName {} + +impl Hash for OwnedName { + fn hash(&self, state: &mut H) { + state.write(self.key.as_bytes()); + for label in self.labels.iter().flatten() { + label.hash(state) + } + } +} + +#[cfg(test)] +pub fn compute_hash(value: V) -> u64 { + let mut hasher = std::hash::DefaultHasher::default(); + value.hash(&mut hasher); + + hasher.finish() +} + +#[cfg(test)] +mod tests { + + use crate::{ + key::{compute_hash, Name}, + label::Label, + }; + + #[test] + fn eq() { + let name = Name::from("foo"); + assert_eq!(name.to_owned(), name); + } + + #[test] + fn hash_eq() { + let a = Name::from("foo"); + let b = Name::from("foo"); + assert_eq!(compute_hash(&a), compute_hash(b)); + assert_eq!(compute_hash(&a), compute_hash(a.to_owned())); + } + + #[test] + fn not_eq() { + let foo = Name::from("foo"); + let bar = Name::from("bar"); + assert_ne!(foo.to_owned(), bar); + } + + #[test] + fn hash_not_eq() { + let foo = Name::from("foo"); + let bar = Name::from("bar"); + assert_ne!(compute_hash(&foo), compute_hash(&bar)); + assert_ne!(compute_hash(foo), compute_hash(bar.to_owned())); + } + + #[test] + #[should_panic(expected = "Maximum 5 labels per metric is supported")] + fn more_than_5_labels() { + let _ = Name::from_parts( + "foo", + [ + Label { + name: "label_1", + val: &1, + }, + Label { + name: "label_2", + val: &1, + }, + Label { + name: "label_3", + val: &1, + }, + Label { + name: "label_4", + val: &1, + }, + Label { + name: "label_5", + val: &1, + }, + Label { + name: "label_6", + val: &1, + }, + ], + ); + } + + #[test] + fn eq_is_consistent() { + let a_name = metric_name!("foo", "label_1" => &1); + let b_name = metric_name!("foo", "label_1" => &1, "label_2" => &2); + + assert_eq!(a_name, a_name); + assert_eq!(a_name.to_owned(), a_name); + + assert_ne!(b_name.to_owned(), a_name); + assert_ne!(a_name.to_owned(), b_name); + } +} diff --git a/ipa-metrics/src/kind.rs b/ipa-metrics/src/kind.rs new file mode 100644 index 000000000..b6abf7e4e --- /dev/null +++ b/ipa-metrics/src/kind.rs @@ -0,0 +1,6 @@ +//! Different metric types supported by this crate. +//! Currently, only counters are supported. +//! TODO: add more + +/// Counters are simple 8 byte values. +pub type CounterValue = u64; diff --git a/ipa-metrics/src/label.rs b/ipa-metrics/src/label.rs new file mode 100644 index 000000000..d35680e08 --- /dev/null +++ b/ipa-metrics/src/label.rs @@ -0,0 +1,160 @@ +use std::{ + fmt::{Debug, Display, Formatter}, + hash::{Hash, Hasher}, +}; + +pub use Value as LabelValue; + +pub const MAX_LABELS: usize = 5; + +/// Dimension value (or label value) must be sendable to another thread +/// and there must be a way to show it +pub trait Value: Display + Send { + /// Creates a unique hash for this value. + /// It is easy to create collisions, so better avoid them, + /// by assigning a unique integer to each value + fn hash(&self) -> u64; + + /// Creates an owned copy of this value. Dynamic dispatch + /// is required, because values are stored in a generic store + /// that can't be specialized for value types. + fn boxed(&self) -> Box; +} + +impl LabelValue for u32 { + fn hash(&self) -> u64 { + u64::from(*self) + } + + fn boxed(&self) -> Box { + Box::new(*self) + } +} + +#[derive()] +pub struct Label<'lv> { + pub name: &'static str, + pub val: &'lv dyn Value, +} + +impl Label<'_> { + pub fn to_owned(&self) -> OwnedLabel { + OwnedLabel { + name: self.name, + val: self.val.boxed(), + } + } +} + +impl Debug for Label<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Label") + .field("name", &self.name) + .field("val", &format!("{}", self.val)) + .finish() + } +} + +impl Hash for Label<'_> { + fn hash(&self, state: &mut H) { + state.write(self.name.as_bytes()); + state.write_u64(self.val.hash()); + } +} + +impl PartialEq for Label<'_> { + fn eq(&self, other: &Self) -> bool { + // name check should be fast - just pointer comparison. + // val check is more involved with dynamic dispatch, so we can consider + // making label immutable and storing a hash of the value in place + self.name == other.name && self.val.hash() == other.val.hash() + } +} + +pub struct OwnedLabel { + pub name: &'static str, + pub val: Box, +} + +impl Clone for OwnedLabel { + fn clone(&self) -> Self { + Self { + name: self.name, + val: self.val.boxed(), + } + } +} + +impl OwnedLabel { + pub fn as_borrowed(&self) -> Label<'_> { + Label { + name: self.name, + val: self.val.as_ref(), + } + } + + pub fn name(&self) -> &'static str { + self.name + } + + pub fn str_value(&self) -> String { + self.val.to_string() + } +} + +impl Hash for OwnedLabel { + fn hash(&self, state: &mut H) { + self.as_borrowed().hash(state) + } +} + +impl Debug for OwnedLabel { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OwnedLabel") + .field("name", &self.name) + .field("val", &format!("{}", self.val)) + .finish() + } +} + +impl PartialEq for OwnedLabel { + fn eq(&self, other: &Self) -> bool { + self.name == other.name && self.val.hash() == other.val.hash() + } +} + +#[cfg(test)] +mod tests { + + use crate::{key::compute_hash, metric_name}; + + #[test] + fn one_label() { + let foo_1 = metric_name!("foo", "l1" => &1); + let foo_2 = metric_name!("foo", "l1" => &2); + + assert_ne!(foo_1.to_owned(), foo_2); + assert_ne!(compute_hash(&foo_1), compute_hash(&foo_2)); + assert_ne!(foo_2.to_owned(), foo_1); + + assert_eq!(compute_hash(&foo_1), compute_hash(foo_1.to_owned())) + } + + #[test] + #[should_panic(expected = "label names must be unique")] + fn unique() { + metric_name!("foo", "l1" => &1, "l1" => &0); + } + + #[test] + fn non_commutative() { + assert_ne!( + compute_hash(&metric_name!("foo", "l1" => &1, "l2" => &0)), + compute_hash(&metric_name!("foo", "l1" => &0, "l2" => &1)), + ); + assert_ne!( + compute_hash(&metric_name!("foo", "l1" => &1)), + compute_hash(&metric_name!("foo", "l1" => &1, "l2" => &1)), + ); + } +} diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs new file mode 100644 index 000000000..87786e91d --- /dev/null +++ b/ipa-metrics/src/lib.rs @@ -0,0 +1,56 @@ +mod collector; +mod context; +mod controller; +mod key; +mod kind; +mod label; +#[cfg(feature = "partitions")] +mod partitioned; +mod producer; +mod store; + +use std::{io, thread::JoinHandle}; + +pub use collector::MetricsCollector; +pub use context::{CurrentThreadContext as MetricsCurrentThreadContext, MetricsContext}; +pub use controller::{Command as ControllerCommand, Controller as MetricsCollectorController}; +pub use key::{MetricName, OwnedName, UniqueElements}; +pub use label::{Label, LabelValue}; +#[cfg(feature = "partitions")] +pub use partitioned::{ + current_partition, set_or_unset_partition, set_partition, Partition as MetricPartition, + PartitionedStore as MetricsStore, +}; +pub use producer::Producer as MetricsProducer; +#[cfg(not(feature = "partitions"))] +pub use store::Store as MetricsStore; + +pub fn installer() -> ( + MetricsCollector, + MetricsProducer, + MetricsCollectorController, +) { + let (command_tx, command_rx) = crossbeam_channel::unbounded(); + let (tx, rx) = crossbeam_channel::unbounded(); + ( + MetricsCollector { + rx, + local_store: MetricsStore::default(), + command_rx, + }, + MetricsProducer { tx }, + MetricsCollectorController { tx: command_tx }, + ) +} + +pub fn thread_installer( +) -> io::Result<(MetricsProducer, MetricsCollectorController, JoinHandle<()>)> { + let (collector, producer, controller) = installer(); + let handle = std::thread::Builder::new() + .name("metric-collector".to_string()) + .spawn(|| { + collector.install().block_until_shutdown(); + })?; + + Ok((producer, controller, handle)) +} diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs new file mode 100644 index 000000000..6e97b2446 --- /dev/null +++ b/ipa-metrics/src/partitioned.rs @@ -0,0 +1,211 @@ +use std::{borrow::Borrow, cell::Cell}; + +use hashbrown::hash_map::Entry; +use rustc_hash::FxBuildHasher; + +use crate::{ + kind::CounterValue, + store::{CounterHandle, Store}, + MetricName, +}; + +/// Each partition is a unique 16 byte value. +pub type Partition = u128; + +pub fn set_partition(new: Partition) { + PARTITION.set(Some(new)); +} + +pub fn set_or_unset_partition(new: Option) { + PARTITION.set(new); +} + +pub fn current_partition() -> Option { + PARTITION.get() +} + +thread_local! { + static PARTITION: Cell> = Cell::new(None); +} + +/// Provides the same functionality as [`Store`], but partitioned +/// across many dimensions. There is an extra price for it, so +/// don't use it, unless you need it. +/// The dimension is set through [`std::thread::LocalKey`], so +/// each thread can set only one dimension at a time. +/// +/// The API of this struct will match [`Store`] as they +/// can be used interchangeably. +#[derive(Clone, Debug)] +pub struct PartitionedStore { + inner: hashbrown::HashMap, + default_store: Store, +} + +impl Default for PartitionedStore { + fn default() -> Self { + Self::new() + } +} + +impl PartitionedStore { + pub const fn new() -> Self { + Self { + inner: hashbrown::HashMap::with_hasher(FxBuildHasher), + default_store: Store::new(), + } + } + + pub fn with_current_partition T, T>(&mut self, f: F) -> T { + let mut store = self.get_mut(current_partition()); + f(&mut store) + } + + pub fn with_partition T, T>( + &self, + partition: Partition, + f: F, + ) -> Option { + let store = self.inner.get(&partition); + store.map(f) + } + + pub fn with_partition_mut T, T>( + &mut self, + partition: Partition, + f: F, + ) -> T { + let mut store = self.get_mut(Some(partition)); + f(&mut store) + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() && self.default_store.is_empty() + } + + pub fn merge(&mut self, other: Self) { + for (partition, store) in other.inner { + self.get_mut(Some(partition)).merge(store); + } + self.default_store.merge(other.default_store); + } + + pub fn counter_value>(&self, name: B) -> CounterValue { + let name = name.borrow(); + if let Some(partition) = current_partition() { + self.inner + .get(&partition) + .map(|store| store.counter_value(name)) + .unwrap_or_default() + } else { + self.default_store.counter_value(name) + } + } + + pub fn counter( + &mut self, + key: &MetricName<'_, LABELS>, + ) -> CounterHandle<'_, LABELS> { + if let Some(partition) = current_partition() { + self.inner + .entry(partition) + .or_insert_with(|| Store::default()) + .counter(key) + } else { + self.default_store.counter(key) + } + } + + pub fn len(&self) -> usize { + self.inner.len() + self.default_store.len() + } + + fn get_mut(&mut self, partition: Option) -> &mut Store { + if let Some(v) = partition { + match self.inner.entry(v) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => entry.insert(Store::default()), + } + } else { + &mut self.default_store + } + } +} + +#[cfg(test)] +mod tests { + use crate::{ + metric_name, + partitioned::{set_partition, PartitionedStore}, + }; + + #[test] + fn unique_partition() { + let metric = metric_name!("foo"); + let mut store = PartitionedStore::new(); + store.with_partition_mut(1, |store| { + store.counter(&metric).inc(1); + }); + store.with_partition_mut(5, |store| { + store.counter(&metric).inc(5); + }); + + assert_eq!( + 5, + store.with_partition_mut(5, |store| store.counter(&metric).get()) + ); + assert_eq!( + 1, + store.with_partition_mut(1, |store| store.counter(&metric).get()) + ); + assert_eq!( + 0, + store.with_partition_mut(10, |store| store.counter(&metric).get()) + ); + } + + #[test] + fn current_partition() { + let metric = metric_name!("foo"); + let mut store = PartitionedStore::new(); + set_partition(4); + + store.with_current_partition(|store| { + store.counter(&metric).inc(1); + }); + store.with_current_partition(|store| { + store.counter(&metric).inc(5); + }); + + assert_eq!( + 6, + store.with_current_partition(|store| store.counter(&metric).get()) + ); + } + + #[test] + fn empty() { + let mut store = PartitionedStore::new(); + store.with_current_partition(|store| { + store.counter(&metric_name!("foo")).inc(1); + }); + + assert!(!store.is_empty()); + } + + #[test] + fn len() { + let mut store = PartitionedStore::new(); + store.with_current_partition(|store| { + store.counter(&metric_name!("foo")).inc(1); + }); + set_partition(4); + store.with_current_partition(|store| { + store.counter(&metric_name!("foo")).inc(1); + }); + + // one metric in partition 4, another one in default. Even that they are the same, + // partitioned store cannot distinguish between them + assert_eq!(2, store.len()); + } +} diff --git a/ipa-metrics/src/producer.rs b/ipa-metrics/src/producer.rs new file mode 100644 index 000000000..27925bed2 --- /dev/null +++ b/ipa-metrics/src/producer.rs @@ -0,0 +1,37 @@ +use crossbeam_channel::Sender; + +use crate::{context::CurrentThreadContext, MetricsStore}; + +#[derive(Clone)] +pub struct Producer { + pub(super) tx: Sender, +} + +impl Producer { + pub fn install(&self) { + CurrentThreadContext::init(self.tx.clone()); + } + + /// Returns a drop handle that should be used when thread is stopped. + /// In an ideal world, a destructor on [`MetricsContext`] could do this, + /// but as pointed in [`LocalKey`] documentation, deadlocks are possible + /// if another TLS storage is accessed at destruction time. + /// + /// I actually ran into this problem with crossbeam channels. Send operation + /// requires access to `thread::current` and that panics at runtime if called + /// from inside `Drop`. + /// + /// [`LocalKey`]: + pub fn drop_handle(&self) -> ProducerDropHandle { + ProducerDropHandle + } +} + +#[must_use] +pub struct ProducerDropHandle; + +impl Drop for ProducerDropHandle { + fn drop(&mut self) { + CurrentThreadContext::flush() + } +} diff --git a/ipa-metrics/src/store.rs b/ipa-metrics/src/store.rs new file mode 100644 index 000000000..6a18b7b6b --- /dev/null +++ b/ipa-metrics/src/store.rs @@ -0,0 +1,201 @@ +use std::{borrow::Borrow, hash::BuildHasher}; + +use hashbrown::hash_map::RawEntryMut; +use rustc_hash::FxBuildHasher; + +use crate::{ + key::{OwnedMetricName, OwnedName}, + kind::CounterValue, + MetricName, +}; + +/// A basic store. Currently only supports counters. +#[derive(Clone, Debug)] +pub struct Store { + // Counters and other metrics are stored to optimize writes. That means, one lookup + // per write. The cost of assembling the total count across all dimensions is absorbed + // by readers + counters: hashbrown::HashMap, +} + +impl Default for Store { + fn default() -> Self { + Self::new() + } +} + +impl Store { + pub const fn new() -> Self { + Self { + counters: hashbrown::HashMap::with_hasher(FxBuildHasher), + } + } + + pub(crate) fn merge(&mut self, other: Self) { + for (k, v) in other.counters { + let hash_builder = self.counters.hasher(); + let hash = hash_builder.hash_one(&k); + *self + .counters + .raw_entry_mut() + .from_hash(hash, |other| other.eq(&k)) + .or_insert(k, 0) + .1 += v; + } + } + + pub fn is_empty(&self) -> bool { + self.counters.is_empty() + } +} + +impl Store { + pub fn counter( + &mut self, + key: &MetricName<'_, LABELS>, + ) -> CounterHandle<'_, LABELS> { + let hash_builder = self.counters.hasher(); + let hash = hash_builder.hash_one(key); + let entry = self + .counters + .raw_entry_mut() + .from_hash(hash, |key_found| key_found.eq(key)); + match entry { + RawEntryMut::Occupied(slot) => CounterHandle { + val: slot.into_mut(), + }, + RawEntryMut::Vacant(slot) => { + let (_, val) = slot.insert_hashed_nocheck(hash, key.to_owned(), Default::default()); + CounterHandle { val } + } + } + } + + /// Returns the value for the specified metric across all dimensions. + /// The cost of this operation is `O(N*M)` where `N` - number of unique metrics + /// and `M` - number of all dimensions across all metrics. + /// + /// Note that the cost can be improved if it ever becomes a bottleneck by + /// creating a specialized two-level map (metric -> label -> value). + pub fn counter_value<'a, B: Borrow>>(&'a self, key: B) -> CounterValue { + let key = key.borrow(); + let mut answer = 0; + for (metric, value) in &self.counters { + if metric.key == key.key { + answer += value + } + } + + answer + } + + pub fn counters(&self) -> impl Iterator { + self.counters.iter().map(|(key, value)| (key, *value)) + } + + pub fn len(&self) -> usize { + self.counters.len() + } +} + +pub struct CounterHandle<'a, const LABELS: usize> { + val: &'a mut CounterValue, +} + +impl CounterHandle<'_, LABELS> { + pub fn inc(&mut self, inc: CounterValue) { + *self.val += inc; + } + + pub fn get(&self) -> CounterValue { + *self.val + } +} + +#[cfg(test)] +mod tests { + use std::hash::{DefaultHasher, Hash, Hasher}; + + use crate::{metric_name, store::Store, LabelValue}; + + impl LabelValue for &'static str { + fn hash(&self) -> u64 { + // TODO: use fast hashing here + let mut hasher = DefaultHasher::default(); + Hash::hash(self, &mut hasher); + + hasher.finish() + } + + fn boxed(&self) -> Box { + Box::new(*self) + } + } + + #[test] + fn counter() { + let mut store = Store::default(); + let name = metric_name!("foo"); + { + let mut handle = store.counter(&name); + assert_eq!(0, handle.get()); + handle.inc(3); + assert_eq!(3, handle.get()); + } + + { + store.counter(&name).inc(0); + assert_eq!(3, store.counter(&name).get()); + } + } + + #[test] + fn with_labels() { + let mut store = Store::default(); + let valid_name = metric_name!("foo", "h1" => &1, "h2" => &"2"); + let wrong_name = metric_name!("foo", "h1" => &2, "h2" => &"2"); + store.counter(&valid_name).inc(2); + + assert_eq!(2, store.counter(&valid_name).get()); + assert_eq!(0, store.counter(&wrong_name).get()); + } + + #[test] + fn merge() { + let mut store1 = Store::default(); + let mut store2 = Store::default(); + let foo = metric_name!("foo", "h1" => &1, "h2" => &"2"); + let bar = metric_name!("bar", "h2" => &"2"); + let baz = metric_name!("baz"); + store1.counter(&foo).inc(2); + store2.counter(&foo).inc(1); + + store1.counter(&bar).inc(7); + store2.counter(&baz).inc(3); + + store1.merge(store2); + + assert_eq!(3, store1.counter(&foo).get()); + assert_eq!(7, store1.counter(&bar).get()); + assert_eq!(3, store1.counter(&baz).get()); + } + + #[test] + fn counter_value() { + let mut store = Store::default(); + store + .counter(&metric_name!("foo", "h1" => &1, "h2" => &"1")) + .inc(1); + store + .counter(&metric_name!("foo", "h1" => &1, "h2" => &"2")) + .inc(1); + store + .counter(&metric_name!("foo", "h1" => &2, "h2" => &"1")) + .inc(1); + store + .counter(&metric_name!("foo", "h1" => &2, "h2" => &"2")) + .inc(1); + + assert_eq!(4, store.counter_value(&metric_name!("foo"))); + } +} From e130a30790d614aa2fd44548714446883fa0387a Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 17 Oct 2024 10:40:39 -0700 Subject: [PATCH 02/11] Improve documentation and API for partitioned store --- ipa-metrics/src/collector.rs | 2 +- ipa-metrics/src/context.rs | 29 +++++- ipa-metrics/src/key.rs | 13 +++ ipa-metrics/src/lib.rs | 2 +- ipa-metrics/src/partitioned.rs | 176 ++++++++++++++++++++------------- ipa-metrics/src/producer.rs | 13 ++- ipa-metrics/src/store.rs | 61 +++++++----- 7 files changed, 196 insertions(+), 100 deletions(-) diff --git a/ipa-metrics/src/collector.rs b/ipa-metrics/src/collector.rs index bf0a6bc06..8cd4f105a 100644 --- a/ipa-metrics/src/collector.rs +++ b/ipa-metrics/src/collector.rs @@ -134,7 +134,7 @@ mod tests { let (collector, producer, controller) = installer(); let handle = thread::spawn(|| { let store = collector.install().block_until_shutdown(); - store.counter_value(counter!("foo")) + store.counter_val(counter!("foo")) }); thread::scope(move |s| { diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs index 2680c0876..c91fc7906 100644 --- a/ipa-metrics/src/context.rs +++ b/ipa-metrics/src/context.rs @@ -91,6 +91,10 @@ impl MetricsContext { } fn flush(&mut self) { + if self.store.is_empty() { + return; + } + if self.is_connected() { let store = mem::take(&mut self.store); match self.tx.as_ref().unwrap().send(store) { @@ -118,7 +122,9 @@ impl Drop for MetricsContext { #[cfg(test)] mod tests { - use crate::MetricsContext; + use std::thread; + + use crate::{CurrentThreadPartitionContext, MetricsContext}; /// Each thread has its local store by default, and it is exclusive to it #[test] @@ -126,7 +132,7 @@ mod tests { fn local_store() { use crate::context::CurrentThreadContext; - crate::set_partition(0xdeadbeef); + CurrentThreadPartitionContext::set(0xdeadbeef); counter!("foo", 7); std::thread::spawn(|| { @@ -134,13 +140,13 @@ mod tests { counter!("foo", 5); assert_eq!( 5, - CurrentThreadContext::store(|store| store.counter_value(&counter!("foo"))) + CurrentThreadContext::store(|store| store.counter_val(counter!("foo"))) ); }); assert_eq!( 7, - CurrentThreadContext::store(|store| store.counter_value(&counter!("foo"))) + CurrentThreadContext::store(|store| store.counter_val(counter!("foo"))) ); } @@ -148,4 +154,19 @@ mod tests { fn default() { assert_eq!(0, MetricsContext::default().store().len()) } + + #[test] + fn ignore_empty_store_on_flush() { + let (tx, rx) = crossbeam_channel::unbounded(); + let mut ctx = MetricsContext::new(); + ctx.init(tx); + let handle = thread::spawn(move || { + if let Ok(_) = rx.recv() { + panic!("Context sent empty store"); + } + }); + ctx.flush(); + drop(ctx); + handle.join().unwrap(); + } } diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs index dec06a108..50a34b187 100644 --- a/ipa-metrics/src/key.rs +++ b/ipa-metrics/src/key.rs @@ -123,6 +123,19 @@ impl OwnedName { pub fn labels(&self) -> impl Iterator { self.labels.iter().filter_map(|l| l.as_ref()) } + + /// Checks that a subset of labels in `self` matches all values in `other`. + pub fn partial_match(&self, other: &Name<'_, LABELS>) -> bool { + if self.key != other.key { + false + } else { + other.labels.iter().all(|l| self.find_label(l)) + } + } + + fn find_label(&self, label: &Label<'_>) -> bool { + self.labels().any(|l| l.as_borrowed().eq(label)) + } } impl Hash for Name<'_, LABELS> { diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs index 87786e91d..843f327a2 100644 --- a/ipa-metrics/src/lib.rs +++ b/ipa-metrics/src/lib.rs @@ -18,7 +18,7 @@ pub use key::{MetricName, OwnedName, UniqueElements}; pub use label::{Label, LabelValue}; #[cfg(feature = "partitions")] pub use partitioned::{ - current_partition, set_or_unset_partition, set_partition, Partition as MetricPartition, + CurrentThreadContext as CurrentThreadPartitionContext, Partition as MetricPartition, PartitionedStore as MetricsStore, }; pub use producer::Producer as MetricsProducer; diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs index 6e97b2446..9e4653992 100644 --- a/ipa-metrics/src/partitioned.rs +++ b/ipa-metrics/src/partitioned.rs @@ -1,3 +1,19 @@ +//! This module enables metric partitioning that can be useful +//! when threads that emit metrics are shared across multiple executions. +//! A typical example for it are unit tests in Rust that share threads. +//! Having a global per-thread store would mean that it is not possible +//! to distinguish between different runs. +//! +//! Partitioning attempts to solve this with a global 16 byte identifier that +//! is set in thread local storage and read automatically by [`PartitionedStore`] +//! +//! Note that this module does not provide means to automatically set and unset +//! partitions. `ipa-metrics-tracing` defines a way to do it via tracing context +//! that is good enough for the vast majority of use cases. +//! +//! Because partitioned stores carry additional cost of extra lookup (partition -> store), +//! it is disabled by default and requires explicit opt-in via `partitioning` feature. + use std::{borrow::Borrow, cell::Cell}; use hashbrown::hash_map::Entry; @@ -9,23 +25,28 @@ use crate::{ MetricName, }; -/// Each partition is a unique 16 byte value. -pub type Partition = u128; - -pub fn set_partition(new: Partition) { - PARTITION.set(Some(new)); +thread_local! { + static PARTITION: Cell> = Cell::new(None); } -pub fn set_or_unset_partition(new: Option) { - PARTITION.set(new); -} +/// Each partition is a unique 8 byte value, meaning roughly 1B partitions +/// can be supported and the limiting factor is birthday bound. +pub type Partition = u64; -pub fn current_partition() -> Option { - PARTITION.get() -} +pub struct CurrentThreadContext; -thread_local! { - static PARTITION: Cell> = Cell::new(None); +impl CurrentThreadContext { + pub fn set(new: Partition) { + Self::toggle(Some(new)) + } + + pub fn toggle(new: Option) { + PARTITION.set(new); + } + + pub fn get() -> Option { + PARTITION.get() + } } /// Provides the same functionality as [`Store`], but partitioned @@ -38,7 +59,10 @@ thread_local! { /// can be used interchangeably. #[derive(Clone, Debug)] pub struct PartitionedStore { + /// Set of stores partitioned by [`Partition`] inner: hashbrown::HashMap, + /// We don't want to lose metrics that are emitted when partitions are not set. + /// So we provide a default store for those default_store: Store, } @@ -56,11 +80,6 @@ impl PartitionedStore { } } - pub fn with_current_partition T, T>(&mut self, f: F) -> T { - let mut store = self.get_mut(current_partition()); - f(&mut store) - } - pub fn with_partition T, T>( &self, partition: Partition, @@ -70,19 +89,6 @@ impl PartitionedStore { store.map(f) } - pub fn with_partition_mut T, T>( - &mut self, - partition: Partition, - f: F, - ) -> T { - let mut store = self.get_mut(Some(partition)); - f(&mut store) - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() && self.default_store.is_empty() - } - pub fn merge(&mut self, other: Self) { for (partition, store) in other.inner { self.get_mut(Some(partition)).merge(store); @@ -90,23 +96,26 @@ impl PartitionedStore { self.default_store.merge(other.default_store); } - pub fn counter_value>(&self, name: B) -> CounterValue { - let name = name.borrow(); - if let Some(partition) = current_partition() { + pub fn counter_val<'a, const LABELS: usize, B: Borrow>>( + &'a self, + key: B, + ) -> CounterValue { + let name = key.borrow(); + if let Some(partition) = CurrentThreadContext::get() { self.inner .get(&partition) - .map(|store| store.counter_value(name)) + .map(|store| store.counter_val(name)) .unwrap_or_default() } else { - self.default_store.counter_value(name) + self.default_store.counter_val(name) } } - pub fn counter( - &mut self, - key: &MetricName<'_, LABELS>, - ) -> CounterHandle<'_, LABELS> { - if let Some(partition) = current_partition() { + pub fn counter<'a, const LABELS: usize, B: Borrow>>( + &'a mut self, + key: B, + ) -> CounterHandle<'a, LABELS> { + if let Some(partition) = CurrentThreadContext::get() { self.inner .entry(partition) .or_insert_with(|| Store::default()) @@ -120,7 +129,20 @@ impl PartitionedStore { self.inner.len() + self.default_store.len() } - fn get_mut(&mut self, partition: Option) -> &mut Store { + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn with_partition_mut T, T>( + &mut self, + partition: Partition, + f: F, + ) -> T { + let mut store = self.get_mut(Some(partition)); + f(&mut store) + } + + fn get_mut(&mut self, partition: Option) -> &mut Store { if let Some(v) = partition { match self.inner.entry(v) { Entry::Occupied(entry) => entry.into_mut(), @@ -135,8 +157,8 @@ impl PartitionedStore { #[cfg(test)] mod tests { use crate::{ - metric_name, - partitioned::{set_partition, PartitionedStore}, + counter, metric_name, + partitioned::{CurrentThreadContext, PartitionedStore}, }; #[test] @@ -168,27 +190,23 @@ mod tests { fn current_partition() { let metric = metric_name!("foo"); let mut store = PartitionedStore::new(); - set_partition(4); + store.counter(&metric).inc(7); - store.with_current_partition(|store| { - store.counter(&metric).inc(1); - }); - store.with_current_partition(|store| { - store.counter(&metric).inc(5); - }); + CurrentThreadContext::set(4); - assert_eq!( - 6, - store.with_current_partition(|store| store.counter(&metric).get()) - ); + store.counter(&metric).inc(1); + store.counter(&metric).inc(5); + + assert_eq!(6, store.counter_val(&metric)); + CurrentThreadContext::toggle(None); + assert_eq!(7, store.counter_val(&metric)); } #[test] fn empty() { - let mut store = PartitionedStore::new(); - store.with_current_partition(|store| { - store.counter(&metric_name!("foo")).inc(1); - }); + let mut store = PartitionedStore::default(); + assert!(store.is_empty()); + store.counter(&metric_name!("foo")).inc(1); assert!(!store.is_empty()); } @@ -196,16 +214,42 @@ mod tests { #[test] fn len() { let mut store = PartitionedStore::new(); - store.with_current_partition(|store| { - store.counter(&metric_name!("foo")).inc(1); - }); - set_partition(4); - store.with_current_partition(|store| { - store.counter(&metric_name!("foo")).inc(1); - }); + assert_eq!(0, store.len()); + + store.counter(metric_name!("foo")).inc(1); + CurrentThreadContext::set(4); + store.counter(metric_name!("foo")).inc(1); // one metric in partition 4, another one in default. Even that they are the same, // partitioned store cannot distinguish between them assert_eq!(2, store.len()); } + + #[test] + fn merge() { + let mut store1 = PartitionedStore::new(); + let mut store2 = PartitionedStore::new(); + store1.with_partition_mut(1, |store| store.counter(counter!("foo")).inc(1)); + store2.with_partition_mut(1, |store| store.counter(counter!("foo")).inc(1)); + store1.with_partition_mut(2, |store| store.counter(counter!("foo")).inc(2)); + store2.with_partition_mut(2, |store| store.counter(counter!("foo")).inc(2)); + + store1.counter(counter!("foo")).inc(3); + store2.counter(counter!("foo")).inc(3); + + store1.merge(store2); + assert_eq!( + 2, + store1 + .with_partition(1, |store| store.counter_val(counter!("foo"))) + .unwrap() + ); + assert_eq!( + 4, + store1 + .with_partition(2, |store| store.counter_val(counter!("foo"))) + .unwrap() + ); + assert_eq!(6, store1.counter_val(counter!("foo"))); + } } diff --git a/ipa-metrics/src/producer.rs b/ipa-metrics/src/producer.rs index 27925bed2..ddd445922 100644 --- a/ipa-metrics/src/producer.rs +++ b/ipa-metrics/src/producer.rs @@ -2,6 +2,17 @@ use crossbeam_channel::Sender; use crate::{context::CurrentThreadContext, MetricsStore}; +/// A handle to enable centralized metrics collection from the current thread. +/// +/// This is a cloneable handle, so it can be installed in multiple threads. +/// The handle is installed by calling [`install`], which returns a drop handle. +/// When the drop handle is dropped, the context of local store is flushed +/// to the collector thread. +/// +/// Thread local store is always enabled by [`MetricsContext`], so it is always +/// possible to have a local view of metrics emitted by this thread. +/// +/// [`install`]: Producer::install #[derive(Clone)] pub struct Producer { pub(super) tx: Sender, @@ -13,7 +24,7 @@ impl Producer { } /// Returns a drop handle that should be used when thread is stopped. - /// In an ideal world, a destructor on [`MetricsContext`] could do this, + /// One may think destructor on [`MetricsContext`] could do this, /// but as pointed in [`LocalKey`] documentation, deadlocks are possible /// if another TLS storage is accessed at destruction time. /// diff --git a/ipa-metrics/src/store.rs b/ipa-metrics/src/store.rs index 6a18b7b6b..c662eca76 100644 --- a/ipa-metrics/src/store.rs +++ b/ipa-metrics/src/store.rs @@ -3,18 +3,14 @@ use std::{borrow::Borrow, hash::BuildHasher}; use hashbrown::hash_map::RawEntryMut; use rustc_hash::FxBuildHasher; -use crate::{ - key::{OwnedMetricName, OwnedName}, - kind::CounterValue, - MetricName, -}; +use crate::{key::OwnedMetricName, kind::CounterValue, MetricName}; /// A basic store. Currently only supports counters. +/// Counters and other metrics are stored to optimize writes. That means, one lookup +/// per write. The cost of assembling the total count across all dimensions is absorbed +/// by readers #[derive(Clone, Debug)] pub struct Store { - // Counters and other metrics are stored to optimize writes. That means, one lookup - // per write. The cost of assembling the total count across all dimensions is absorbed - // by readers counters: hashbrown::HashMap, } @@ -50,10 +46,11 @@ impl Store { } impl Store { - pub fn counter( - &mut self, - key: &MetricName<'_, LABELS>, - ) -> CounterHandle<'_, LABELS> { + pub fn counter<'a, const LABELS: usize, B: Borrow>>( + &'a mut self, + key: B, + ) -> CounterHandle<'a, LABELS> { + let key = key.borrow(); let hash_builder = self.counters.hasher(); let hash = hash_builder.hash_one(key); let entry = self @@ -71,17 +68,22 @@ impl Store { } } - /// Returns the value for the specified metric across all dimensions. + /// Returns the value for the specified metric taking into account + /// its dimensionality. That is (foo, dim1 = 1, dim2 = 2) will be + /// different from (foo, dim1 = 1). /// The cost of this operation is `O(N*M)` where `N` - number of unique metrics - /// and `M` - number of all dimensions across all metrics. + /// registered in this store and `M` number of dimensions. /// /// Note that the cost can be improved if it ever becomes a bottleneck by /// creating a specialized two-level map (metric -> label -> value). - pub fn counter_value<'a, B: Borrow>>(&'a self, key: B) -> CounterValue { + pub fn counter_val<'a, const LABELS: usize, B: Borrow>>( + &'a self, + key: B, + ) -> CounterValue { let key = key.borrow(); let mut answer = 0; for (metric, value) in &self.counters { - if metric.key == key.key { + if metric.partial_match(key) { answer += value } } @@ -89,10 +91,6 @@ impl Store { answer } - pub fn counters(&self) -> impl Iterator { - self.counters.iter().map(|(key, value)| (key, *value)) - } - pub fn len(&self) -> usize { self.counters.len() } @@ -116,7 +114,7 @@ impl CounterHandle<'_, LABELS> { mod tests { use std::hash::{DefaultHasher, Hash, Hasher}; - use crate::{metric_name, store::Store, LabelValue}; + use crate::{counter, metric_name, store::Store, LabelValue}; impl LabelValue for &'static str { fn hash(&self) -> u64 { @@ -184,18 +182,27 @@ mod tests { fn counter_value() { let mut store = Store::default(); store - .counter(&metric_name!("foo", "h1" => &1, "h2" => &"1")) + .counter(counter!("foo", "h1" => &1, "h2" => &"1")) .inc(1); store - .counter(&metric_name!("foo", "h1" => &1, "h2" => &"2")) + .counter(counter!("foo", "h1" => &1, "h2" => &"2")) .inc(1); store - .counter(&metric_name!("foo", "h1" => &2, "h2" => &"1")) + .counter(counter!("foo", "h1" => &2, "h2" => &"1")) .inc(1); store - .counter(&metric_name!("foo", "h1" => &2, "h2" => &"2")) + .counter(counter!("foo", "h1" => &2, "h2" => &"2")) .inc(1); - - assert_eq!(4, store.counter_value(&metric_name!("foo"))); + store + .counter(counter!("bar", "h1" => &1, "h2" => &"1")) + .inc(3); + + assert_eq!(4, store.counter_val(counter!("foo"))); + assert_eq!( + 1, + store.counter_val(&counter!("foo", "h1" => &1, "h2" => &"2")) + ); + assert_eq!(2, store.counter_val(&counter!("foo", "h1" => &1))); + assert_eq!(2, store.counter_val(&counter!("foo", "h2" => &"2"))); } } From 07be2603f3d350fdcd65df919ad885e67771c4c5 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 17 Oct 2024 15:07:59 -0700 Subject: [PATCH 03/11] Clippy --- ipa-metrics/src/collector.rs | 27 +++++++++++++++++++-------- ipa-metrics/src/context.rs | 22 +++++++++++----------- ipa-metrics/src/controller.rs | 34 +++++++++++++++++++++++++++++++++- ipa-metrics/src/key.rs | 20 ++++++++++++-------- ipa-metrics/src/kind.rs | 1 - ipa-metrics/src/label.rs | 5 +++-- ipa-metrics/src/lib.rs | 15 ++++++++++++--- ipa-metrics/src/producer.rs | 2 +- ipa-metrics/src/store.rs | 17 +++++++++-------- 9 files changed, 100 insertions(+), 43 deletions(-) diff --git a/ipa-metrics/src/collector.rs b/ipa-metrics/src/collector.rs index 8cd4f105a..872d1ccbe 100644 --- a/ipa-metrics/src/collector.rs +++ b/ipa-metrics/src/collector.rs @@ -10,9 +10,11 @@ thread_local! { static COLLECTOR: RefCell> = const { RefCell::new(None) } } +/// Convenience struct to block the current thread on metric collection pub struct Installed; impl Installed { + #[allow(clippy::unused_self)] pub fn block_until_shutdown(&self) -> MetricsStore { MetricsCollector::with_current_mut(|c| { c.event_loop(); @@ -29,6 +31,11 @@ pub struct MetricsCollector { } impl MetricsCollector { + /// This installs metrics collection mechanism to current thread. + /// + /// ## Panics + /// It panics if there is another collector system already installed. + #[allow(clippy::must_use_candidate)] pub fn install(self) -> Installed { COLLECTOR.with_borrow_mut(|c| { assert!(c.replace(self).is_none(), "Already initialized"); @@ -49,7 +56,7 @@ impl MetricsCollector { Ok(store) => { tracing::trace!("Collector received more data: {store:?}"); println!("Collector received more data: {store:?}"); - self.local_store.merge(store) + self.local_store.merge(store); } Err(e) => { tracing::debug!("No more threads collecting metrics. Disconnected: {e}"); @@ -76,7 +83,7 @@ impl MetricsCollector { } } - pub fn with_current_mut T, T>(f: F) -> T { + fn with_current_mut T, T>(f: F) -> T { COLLECTOR.with_borrow_mut(|c| { let collector = c.as_mut().expect("Collector is installed"); f(collector) @@ -97,7 +104,7 @@ mod tests { thread::{Scope, ScopedJoinHandle}, }; - use crate::{counter, installer, producer::Producer, thread_installer}; + use crate::{counter, install, install_new_thread, producer::Producer}; struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer); @@ -131,7 +138,7 @@ mod tests { #[test] fn start_stop() { - let (collector, producer, controller) = installer(); + let (collector, producer, controller) = install(); let handle = thread::spawn(|| { let store = collector.install().block_until_shutdown(); store.counter_val(counter!("foo")) @@ -144,19 +151,23 @@ mod tests { controller.stop().unwrap(); }); - assert_eq!(8, handle.join().unwrap()) + assert_eq!(8, handle.join().unwrap()); } #[test] fn with_thread() { - let (producer, controller, handle) = thread_installer().unwrap(); + let (producer, controller, handle) = install_new_thread().unwrap(); thread::scope(move |s| { let s = s.metered(producer); s.spawn(|| counter!("baz", 4)); s.spawn(|| counter!("bar", 1)); - s.spawn(|| controller.stop().unwrap()); + s.spawn(|| { + let snapshot = controller.snapshot().unwrap(); + println!("snapshot: {snapshot:?}"); + controller.stop().unwrap(); + }); }); - handle.join().unwrap() // Collector thread should be terminated by now + handle.join().unwrap(); // Collector thread should be terminated by now } } diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs index c91fc7906..ed5061415 100644 --- a/ipa-metrics/src/context.rs +++ b/ipa-metrics/src/context.rs @@ -29,11 +29,11 @@ impl CurrentThreadContext { } pub fn flush() { - METRICS_CTX.with_borrow_mut(|ctx| ctx.flush()); + METRICS_CTX.with_borrow_mut(MetricsContext::flush); } pub fn is_connected() -> bool { - METRICS_CTX.with_borrow(|ctx| ctx.is_connected()) + METRICS_CTX.with_borrow(MetricsContext::is_connected) } pub fn store T, T>(f: F) -> T { @@ -63,6 +63,7 @@ impl Default for MetricsContext { } impl MetricsContext { + #[must_use] pub const fn new() -> Self { Self { store: MetricsStore::new(), @@ -78,6 +79,7 @@ impl MetricsContext { self.tx = Some(tx); } + #[must_use] pub fn store(&self) -> &MetricsStore { &self.store } @@ -98,7 +100,7 @@ impl MetricsContext { if self.is_connected() { let store = mem::take(&mut self.store); match self.tx.as_ref().unwrap().send(store) { - Ok(_) => {} + Ok(()) => {} Err(e) => { tracing::warn!("MetricsContext is not connected: {e}"); } @@ -124,13 +126,13 @@ impl Drop for MetricsContext { mod tests { use std::thread; - use crate::{CurrentThreadPartitionContext, MetricsContext}; + use crate::MetricsContext; /// Each thread has its local store by default, and it is exclusive to it #[test] #[cfg(feature = "partitions")] fn local_store() { - use crate::context::CurrentThreadContext; + use crate::{context::CurrentThreadContext, CurrentThreadPartitionContext}; CurrentThreadPartitionContext::set(0xdeadbeef); counter!("foo", 7); @@ -152,7 +154,7 @@ mod tests { #[test] fn default() { - assert_eq!(0, MetricsContext::default().store().len()) + assert_eq!(0, MetricsContext::default().store().len()); } #[test] @@ -160,11 +162,9 @@ mod tests { let (tx, rx) = crossbeam_channel::unbounded(); let mut ctx = MetricsContext::new(); ctx.init(tx); - let handle = thread::spawn(move || { - if let Ok(_) = rx.recv() { - panic!("Context sent empty store"); - } - }); + let handle = + thread::spawn(move || assert!(rx.recv().is_err(), "Context sent non-empty store")); + ctx.flush(); drop(ctx); handle.join().unwrap(); diff --git a/ipa-metrics/src/controller.rs b/ipa-metrics/src/controller.rs index a70802f38..2f6e31194 100644 --- a/ipa-metrics/src/controller.rs +++ b/ipa-metrics/src/controller.rs @@ -7,11 +7,27 @@ pub enum Command { Stop(Sender<()>), } +/// Handle to communicate with centralized metrics collection system. pub struct Controller { pub(super) tx: Sender, } impl Controller { + /// Request new metric snapshot from the collector thread. + /// Blocks current thread until the snapshot is received + /// + /// ## Errors + /// If collector thread is disconnected or an error occurs during snapshot request + /// + /// ## Example + /// ```rust + /// use ipa_metrics::{install_new_thread, MetricsStore}; + /// + /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// let snapshot = controller.snapshot().unwrap(); + /// println!("Current metrics: {snapshot:?}"); + /// ``` + #[inline] pub fn snapshot(&self) -> Result { let (tx, rx) = crossbeam_channel::bounded(0); self.tx @@ -20,11 +36,27 @@ impl Controller { rx.recv().map_err(|e| format!("Disconnected channel: {e}")) } + /// Send request to terminate the collector thread. + /// Blocks current thread until the snapshot is received. + /// If this request is successful, any subsequent snapshot + /// or stop requests will return an error. + /// + /// ## Errors + /// If collector thread is disconnected or an error occurs while sending + /// or receiving data from the collector thread. + /// + /// ## Example + /// ```rust + /// use ipa_metrics::{install_new_thread, MetricsStore}; + /// + /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// controller.stop().unwrap(); + /// ``` pub fn stop(self) -> Result<(), String> { let (tx, rx) = crossbeam_channel::bounded(0); self.tx .send(Command::Stop(tx)) - .map_err(|e| format!("An error occurred while requesting metrics snapshot: {e}"))?; + .map_err(|e| format!("An error occurred while requesting termination: {e}"))?; rx.recv().map_err(|e| format!("Disconnected channel: {e}")) } } diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs index 50a34b187..d5ec9644a 100644 --- a/ipa-metrics/src/key.rs +++ b/ipa-metrics/src/key.rs @@ -76,6 +76,9 @@ pub struct Name<'lv, const LABELS: usize = 0> { } impl<'lv, const LABELS: usize> Name<'lv, LABELS> { + /// Constructs this instance from key and labels. + /// ## Panics + /// If number of labels exceeds `MAX_LABELS`. pub fn from_parts>(key: I, labels: [Label<'lv>; LABELS]) -> Self { assert!( LABELS <= MAX_LABELS, @@ -116,6 +119,7 @@ pub struct OwnedName { } impl OwnedName { + #[must_use] pub fn key(&self) -> &'static str { self.key } @@ -125,11 +129,12 @@ impl OwnedName { } /// Checks that a subset of labels in `self` matches all values in `other`. + #[must_use] pub fn partial_match(&self, other: &Name<'_, LABELS>) -> bool { - if self.key != other.key { - false - } else { + if self.key == other.key { other.labels.iter().all(|l| self.find_label(l)) + } else { + false } } @@ -142,7 +147,7 @@ impl Hash for Name<'_, LABELS> { fn hash(&self, state: &mut H) { state.write(self.key.as_bytes()); for label in &self.labels { - label.hash(state) + label.hash(state); } } } @@ -157,14 +162,13 @@ impl From<&'static str> for Name<'_, 0> { } pub trait UniqueElements { + #[must_use] fn enforce_unique(self) -> Self; } impl UniqueElements for [Label<'_>; 2] { fn enforce_unique(self) -> Self { - if self[0].name == self[1].name { - panic!("label names must be unique") - } + assert_ne!(self[0].name, self[1].name, "label names must be unique"); self } @@ -202,7 +206,7 @@ impl Hash for OwnedName { fn hash(&self, state: &mut H) { state.write(self.key.as_bytes()); for label in self.labels.iter().flatten() { - label.hash(state) + label.hash(state); } } } diff --git a/ipa-metrics/src/kind.rs b/ipa-metrics/src/kind.rs index b6abf7e4e..3a48d105b 100644 --- a/ipa-metrics/src/kind.rs +++ b/ipa-metrics/src/kind.rs @@ -1,6 +1,5 @@ //! Different metric types supported by this crate. //! Currently, only counters are supported. -//! TODO: add more /// Counters are simple 8 byte values. pub type CounterValue = u64; diff --git a/ipa-metrics/src/label.rs b/ipa-metrics/src/label.rs index d35680e08..f2ac183e9 100644 --- a/ipa-metrics/src/label.rs +++ b/ipa-metrics/src/label.rs @@ -38,6 +38,7 @@ pub struct Label<'lv> { } impl Label<'_> { + #[must_use] pub fn to_owned(&self) -> OwnedLabel { OwnedLabel { name: self.name, @@ -104,7 +105,7 @@ impl OwnedLabel { impl Hash for OwnedLabel { fn hash(&self, state: &mut H) { - self.as_borrowed().hash(state) + self.as_borrowed().hash(state); } } @@ -137,7 +138,7 @@ mod tests { assert_ne!(compute_hash(&foo_1), compute_hash(&foo_2)); assert_ne!(foo_2.to_owned(), foo_1); - assert_eq!(compute_hash(&foo_1), compute_hash(foo_1.to_owned())) + assert_eq!(compute_hash(&foo_1), compute_hash(foo_1.to_owned())); } #[test] diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs index 843f327a2..552fac5a1 100644 --- a/ipa-metrics/src/lib.rs +++ b/ipa-metrics/src/lib.rs @@ -1,3 +1,7 @@ +#![deny(clippy::pedantic)] +#![allow(clippy::similar_names)] +#![allow(clippy::module_name_repetitions)] + mod collector; mod context; mod controller; @@ -25,7 +29,8 @@ pub use producer::Producer as MetricsProducer; #[cfg(not(feature = "partitions"))] pub use store::Store as MetricsStore; -pub fn installer() -> ( +#[must_use] +pub fn install() -> ( MetricsCollector, MetricsProducer, MetricsCollectorController, @@ -43,9 +48,13 @@ pub fn installer() -> ( ) } -pub fn thread_installer( +/// Same as [`installer]` but spawns a new thread to run the collector. +/// +/// ## Errors +/// if thread cannot be started +pub fn install_new_thread( ) -> io::Result<(MetricsProducer, MetricsCollectorController, JoinHandle<()>)> { - let (collector, producer, controller) = installer(); + let (collector, producer, controller) = install(); let handle = std::thread::Builder::new() .name("metric-collector".to_string()) .spawn(|| { diff --git a/ipa-metrics/src/producer.rs b/ipa-metrics/src/producer.rs index ddd445922..f9ee42cc3 100644 --- a/ipa-metrics/src/producer.rs +++ b/ipa-metrics/src/producer.rs @@ -43,6 +43,6 @@ pub struct ProducerDropHandle; impl Drop for ProducerDropHandle { fn drop(&mut self) { - CurrentThreadContext::flush() + CurrentThreadContext::flush(); } } diff --git a/ipa-metrics/src/store.rs b/ipa-metrics/src/store.rs index c662eca76..34982a0b1 100644 --- a/ipa-metrics/src/store.rs +++ b/ipa-metrics/src/store.rs @@ -21,13 +21,14 @@ impl Default for Store { } impl Store { + #[must_use] pub const fn new() -> Self { Self { counters: hashbrown::HashMap::with_hasher(FxBuildHasher), } } - pub(crate) fn merge(&mut self, other: Self) { + pub fn merge(&mut self, other: Self) { for (k, v) in other.counters { let hash_builder = self.counters.hasher(); let hash = hash_builder.hash_one(&k); @@ -40,12 +41,6 @@ impl Store { } } - pub fn is_empty(&self) -> bool { - self.counters.is_empty() - } -} - -impl Store { pub fn counter<'a, const LABELS: usize, B: Borrow>>( &'a mut self, key: B, @@ -84,16 +79,22 @@ impl Store { let mut answer = 0; for (metric, value) in &self.counters { if metric.partial_match(key) { - answer += value + answer += value; } } answer } + #[must_use] pub fn len(&self) -> usize { self.counters.len() } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } pub struct CounterHandle<'a, const LABELS: usize> { From f064fb90916d4362df859ce8307271edba04f478 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 17 Oct 2024 15:53:27 -0700 Subject: [PATCH 04/11] Final touches --- ipa-metrics/src/collector.rs | 1 - ipa-metrics/src/context.rs | 4 ---- ipa-metrics/src/key.rs | 7 +------ ipa-metrics/src/label.rs | 25 +++++++++++++++++-------- ipa-metrics/src/store.rs | 29 ++++++++++++++++++++++------- 5 files changed, 40 insertions(+), 26 deletions(-) diff --git a/ipa-metrics/src/collector.rs b/ipa-metrics/src/collector.rs index 872d1ccbe..50f2b9b8f 100644 --- a/ipa-metrics/src/collector.rs +++ b/ipa-metrics/src/collector.rs @@ -66,7 +66,6 @@ impl MetricsCollector { i if i == command_idx => match next_op.recv(&self.command_rx) { Ok(ControllerCommand::Snapshot(tx)) => { tracing::trace!("Snapshot request received"); - println!("snapshot request received"); tx.send(self.local_store.clone()).unwrap(); } Ok(ControllerCommand::Stop(tx)) => { diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs index ed5061415..1c1ff3d7f 100644 --- a/ipa-metrics/src/context.rs +++ b/ipa-metrics/src/context.rs @@ -32,10 +32,6 @@ impl CurrentThreadContext { METRICS_CTX.with_borrow_mut(MetricsContext::flush); } - pub fn is_connected() -> bool { - METRICS_CTX.with_borrow(MetricsContext::is_connected) - } - pub fn store T, T>(f: F) -> T { METRICS_CTX.with_borrow(|ctx| f(ctx.store())) } diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs index d5ec9644a..eadb5e392 100644 --- a/ipa-metrics/src/key.rs +++ b/ipa-metrics/src/key.rs @@ -114,16 +114,11 @@ impl<'lv, const LABELS: usize> Name<'lv, LABELS> { /// This is the key inside metric stores which are simple hashmaps. #[derive(Debug, Clone)] pub struct OwnedName { - pub(super) key: &'static str, + key: &'static str, labels: [Option; 5], } impl OwnedName { - #[must_use] - pub fn key(&self) -> &'static str { - self.key - } - pub fn labels(&self) -> impl Iterator { self.labels.iter().filter_map(|l| l.as_ref()) } diff --git a/ipa-metrics/src/label.rs b/ipa-metrics/src/label.rs index f2ac183e9..9ee414e26 100644 --- a/ipa-metrics/src/label.rs +++ b/ipa-metrics/src/label.rs @@ -72,6 +72,8 @@ impl PartialEq for Label<'_> { } } +/// Same as [`Label`] but owns the values. This instance is stored +/// inside metric hashmaps as they need to own the keys. pub struct OwnedLabel { pub name: &'static str, pub val: Box, @@ -93,14 +95,6 @@ impl OwnedLabel { val: self.val.as_ref(), } } - - pub fn name(&self) -> &'static str { - self.name - } - - pub fn str_value(&self) -> String { - self.val.to_string() - } } impl Hash for OwnedLabel { @@ -158,4 +152,19 @@ mod tests { compute_hash(&metric_name!("foo", "l1" => &1, "l2" => &1)), ); } + + #[test] + fn clone() { + let metric = metric_name!("foo", "l1" => &1).to_owned(); + assert_eq!(&metric.labels().next(), &metric.labels().next().clone()); + } + + #[test] + fn fields() { + let metric = metric_name!("foo", "l1" => &1).to_owned(); + let label = metric.labels().next().unwrap().to_owned(); + + assert_eq!(label.name, "l1"); + assert_eq!(label.val.to_string(), "1"); + } } diff --git a/ipa-metrics/src/store.rs b/ipa-metrics/src/store.rs index 34982a0b1..58160ed4c 100644 --- a/ipa-metrics/src/store.rs +++ b/ipa-metrics/src/store.rs @@ -76,14 +76,12 @@ impl Store { key: B, ) -> CounterValue { let key = key.borrow(); - let mut answer = 0; - for (metric, value) in &self.counters { - if metric.partial_match(key) { - answer += value; - } - } - answer + self.counters + .iter() + .filter(|(counter, _)| counter.partial_match(key)) + .map(|(_, val)| val) + .sum() } #[must_use] @@ -206,4 +204,21 @@ mod tests { assert_eq!(2, store.counter_val(&counter!("foo", "h1" => &1))); assert_eq!(2, store.counter_val(&counter!("foo", "h2" => &"2"))); } + + #[test] + fn len_empty() { + let mut store = Store::default(); + assert!(store.is_empty()); + assert_eq!(0, store.len()); + + store.counter(counter!("foo")).inc(1); + assert!(!store.is_empty()); + assert_eq!(1, store.len()); + + store.counter(counter!("foo")).inc(1); + assert_eq!(1, store.len()); + + store.counter(counter!("bar")).inc(1); + assert_eq!(2, store.len()); + } } From dd8417b6eee2979c370d3e14a95f6fcc8db30f9d Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 17 Oct 2024 17:56:49 -0700 Subject: [PATCH 05/11] Fix flaky test --- ipa-metrics/src/collector.rs | 16 +++++++++++++--- ipa-metrics/src/controller.rs | 36 +++++++++++++++++++++++++++++++++++ ipa-metrics/src/lib.rs | 5 ++++- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/ipa-metrics/src/collector.rs b/ipa-metrics/src/collector.rs index 50f2b9b8f..4d5995af3 100644 --- a/ipa-metrics/src/collector.rs +++ b/ipa-metrics/src/collector.rs @@ -2,7 +2,10 @@ use std::cell::RefCell; use crossbeam_channel::{Receiver, Select}; -use crate::{ControllerCommand, MetricsStore}; +use crate::{ + controller::{Command, Status}, + ControllerCommand, MetricsStore, +}; thread_local! { /// Collector that is installed in a thread. It is responsible for receiving metrics from @@ -48,6 +51,7 @@ impl MetricsCollector { let mut select = Select::new(); let data_idx = select.recv(&self.rx); let command_idx = select.recv(&self.command_rx); + let mut state = Status::Active; loop { let next_op = select.select(); @@ -55,12 +59,12 @@ impl MetricsCollector { i if i == data_idx => match next_op.recv(&self.rx) { Ok(store) => { tracing::trace!("Collector received more data: {store:?}"); - println!("Collector received more data: {store:?}"); self.local_store.merge(store); } Err(e) => { tracing::debug!("No more threads collecting metrics. Disconnected: {e}"); select.remove(data_idx); + state = Status::Disconnected; } }, i if i == command_idx => match next_op.recv(&self.command_rx) { @@ -69,9 +73,13 @@ impl MetricsCollector { tx.send(self.local_store.clone()).unwrap(); } Ok(ControllerCommand::Stop(tx)) => { + tracing::trace!("Stop signal received"); tx.send(()).unwrap(); break; } + Ok(Command::Status(tx)) => { + tx.send(state).unwrap(); + } Err(e) => { tracing::debug!("Metric controller is disconnected: {e}"); break; @@ -103,7 +111,7 @@ mod tests { thread::{Scope, ScopedJoinHandle}, }; - use crate::{counter, install, install_new_thread, producer::Producer}; + use crate::{controller::Status, counter, install, install_new_thread, producer::Producer}; struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer); @@ -147,6 +155,8 @@ mod tests { let s = s.metered(producer); s.spawn(|| counter!("foo", 3)).join().unwrap(); s.spawn(|| counter!("foo", 5)).join().unwrap(); + drop(s); // this causes collector to eventually stop receiving signals + while controller.status().unwrap() == Status::Active {} controller.stop().unwrap(); }); diff --git a/ipa-metrics/src/controller.rs b/ipa-metrics/src/controller.rs index 2f6e31194..265dacf45 100644 --- a/ipa-metrics/src/controller.rs +++ b/ipa-metrics/src/controller.rs @@ -2,9 +2,22 @@ use crossbeam_channel::Sender; use crate::MetricsStore; +/// Indicates the current status of collector thread +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum Status { + /// There are at least one active thread that can send + /// the store snapshots to the collector. Collector is actively + /// listening for new snapshots. + Active, + /// All threads have been disconnected from this collector, + /// and it is currently awaiting shutdown via [`Command::Stop`] + Disconnected, +} + pub enum Command { Snapshot(Sender), Stop(Sender<()>), + Status(Sender), } /// Handle to communicate with centralized metrics collection system. @@ -59,4 +72,27 @@ impl Controller { .map_err(|e| format!("An error occurred while requesting termination: {e}"))?; rx.recv().map_err(|e| format!("Disconnected channel: {e}")) } + + /// Request current collector status. + /// + /// ## Errors + /// If collector thread is disconnected or an error occurs while sending + /// or receiving data from the collector thread. + /// + /// ## Example + /// ```rust + /// use ipa_metrics::{install_new_thread, ControllerStatus}; + /// + /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// let status = controller.status().unwrap(); + /// println!("Collector status: {status:?}"); + /// ``` + #[inline] + pub fn status(&self) -> Result { + let (tx, rx) = crossbeam_channel::bounded(0); + self.tx + .send(Command::Status(tx)) + .map_err(|e| format!("An error occurred while requesting status: {e}"))?; + rx.recv().map_err(|e| format!("Disconnected channel: {e}")) + } } diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs index 552fac5a1..2ee9d5be0 100644 --- a/ipa-metrics/src/lib.rs +++ b/ipa-metrics/src/lib.rs @@ -17,7 +17,10 @@ use std::{io, thread::JoinHandle}; pub use collector::MetricsCollector; pub use context::{CurrentThreadContext as MetricsCurrentThreadContext, MetricsContext}; -pub use controller::{Command as ControllerCommand, Controller as MetricsCollectorController}; +pub use controller::{ + Command as ControllerCommand, Controller as MetricsCollectorController, + Status as ControllerStatus, +}; pub use key::{MetricName, OwnedName, UniqueElements}; pub use label::{Label, LabelValue}; #[cfg(feature = "partitions")] From 497220ef8e32e78d4e4b2a2f109f7cd61b75b98b Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 18 Oct 2024 09:56:54 -0700 Subject: [PATCH 06/11] Add coverage for partitions --- scripts/coverage-ci | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/coverage-ci b/scripts/coverage-ci index 2d448daa7..c652e9f65 100755 --- a/scripts/coverage-ci +++ b/scripts/coverage-ci @@ -11,6 +11,9 @@ cargo build --all-targets # Need to be kept in sync manually with tests we run inside check.yml. cargo test --features "cli test-fixture relaxed-dp" +# Provide code coverage stats for ipa-metrics crate with partitions enabled +cargo test -p ipa-metrics --features "partitions" + # descriptive-gate does not require a feature flag. for gate in "compact-gate" ""; do cargo test --no-default-features --features "cli web-app real-world-infra test-fixture $gate" From 6dd937c599b2b906756f321e7004b8b922c35d47 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 18 Oct 2024 10:04:19 -0700 Subject: [PATCH 07/11] Clippy --- ipa-metrics/src/partitioned.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs index 9e4653992..f723759c3 100644 --- a/ipa-metrics/src/partitioned.rs +++ b/ipa-metrics/src/partitioned.rs @@ -26,7 +26,7 @@ use crate::{ }; thread_local! { - static PARTITION: Cell> = Cell::new(None); + static PARTITION: Cell> = const { Cell::new(None) } } /// Each partition is a unique 8 byte value, meaning roughly 1B partitions @@ -37,13 +37,14 @@ pub struct CurrentThreadContext; impl CurrentThreadContext { pub fn set(new: Partition) { - Self::toggle(Some(new)) + Self::toggle(Some(new)); } pub fn toggle(new: Option) { PARTITION.set(new); } + #[must_use] pub fn get() -> Option { PARTITION.get() } @@ -73,6 +74,7 @@ impl Default for PartitionedStore { } impl PartitionedStore { + #[must_use] pub const fn new() -> Self { Self { inner: hashbrown::HashMap::with_hasher(FxBuildHasher), @@ -118,28 +120,31 @@ impl PartitionedStore { if let Some(partition) = CurrentThreadContext::get() { self.inner .entry(partition) - .or_insert_with(|| Store::default()) + .or_insert_with(Store::default) .counter(key) } else { self.default_store.counter(key) } } + #[must_use] pub fn len(&self) -> usize { self.inner.len() + self.default_store.len() } + #[must_use] pub fn is_empty(&self) -> bool { self.len() == 0 } + #[allow(dead_code)] fn with_partition_mut T, T>( &mut self, partition: Partition, f: F, ) -> T { - let mut store = self.get_mut(Some(partition)); - f(&mut store) + let store = self.get_mut(Some(partition)); + f(store) } fn get_mut(&mut self, partition: Option) -> &mut Store { From 2f431f81cfea1764f26b65a6a14504576e41eb4f Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 18 Oct 2024 15:58:15 -0700 Subject: [PATCH 08/11] Improve coverage a bit --- ipa-metrics/src/partitioned.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs index f723759c3..0f71d0e28 100644 --- a/ipa-metrics/src/partitioned.rs +++ b/ipa-metrics/src/partitioned.rs @@ -117,14 +117,7 @@ impl PartitionedStore { &'a mut self, key: B, ) -> CounterHandle<'a, LABELS> { - if let Some(partition) = CurrentThreadContext::get() { - self.inner - .entry(partition) - .or_insert_with(Store::default) - .counter(key) - } else { - self.default_store.counter(key) - } + self.get_mut(CurrentThreadContext::get()).counter(key) } #[must_use] From c38d472857ba8f4ae0e25c9f9111a277b8d97ecb Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Mon, 21 Oct 2024 11:02:58 -0700 Subject: [PATCH 09/11] Fix another clippy error --- ipa-metrics/src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs index 1c1ff3d7f..2020f14fd 100644 --- a/ipa-metrics/src/context.rs +++ b/ipa-metrics/src/context.rs @@ -130,7 +130,7 @@ mod tests { fn local_store() { use crate::{context::CurrentThreadContext, CurrentThreadPartitionContext}; - CurrentThreadPartitionContext::set(0xdeadbeef); + CurrentThreadPartitionContext::set(0xdead_beef); counter!("foo", 7); std::thread::spawn(|| { From 008cee361ed1cbdd614df7542a0e2090a5ea846b Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Tue, 22 Oct 2024 09:59:07 -0700 Subject: [PATCH 10/11] Feedback --- ipa-metrics/src/context.rs | 14 +++++++------- ipa-metrics/src/key.rs | 18 ++++++------------ ipa-metrics/src/label.rs | 14 ++++++++------ ipa-metrics/src/lib.rs | 26 +++++++++++++++++++++++++- ipa-metrics/src/store.rs | 7 ++++--- 5 files changed, 50 insertions(+), 29 deletions(-) diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs index 2020f14fd..938d4560b 100644 --- a/ipa-metrics/src/context.rs +++ b/ipa-metrics/src/context.rs @@ -84,21 +84,21 @@ impl MetricsContext { &mut self.store } - fn is_connected(&self) -> bool { - self.tx.is_some() - } - fn flush(&mut self) { if self.store.is_empty() { return; } - if self.is_connected() { + if let Some(tx) = self.tx.as_ref() { let store = mem::take(&mut self.store); - match self.tx.as_ref().unwrap().send(store) { + match tx.send(store) { Ok(()) => {} Err(e) => { - tracing::warn!("MetricsContext is not connected: {e}"); + // Note that the store is dropped at this point. + // If it becomes a problem with collector threads disconnecting + // somewhat randomly, we can keep the old store around + // and clone it when sending. + tracing::warn!("MetricsContext is disconnected from the collector: {e}"); } } } else { diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs index eadb5e392..8f01ea2f4 100644 --- a/ipa-metrics/src/key.rs +++ b/ipa-metrics/src/key.rs @@ -112,7 +112,7 @@ impl<'lv, const LABELS: usize> Name<'lv, LABELS> { /// Same as [`Name`], but intended for internal use. This is an owned /// version of it, that does not borrow anything from outside. /// This is the key inside metric stores which are simple hashmaps. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq)] pub struct OwnedName { key: &'static str, labels: [Option; 5], @@ -140,7 +140,9 @@ impl OwnedName { impl Hash for Name<'_, LABELS> { fn hash(&self, state: &mut H) { - state.write(self.key.as_bytes()); + Hash::hash(&self.key, state); + // to be consistent with `OwnedName` hashing, we need to + // serialize labels without slice length prefix. for label in &self.labels { label.hash(state); } @@ -186,20 +188,13 @@ impl<'a, const LABELS: usize> PartialEq> for OwnedName { impl PartialEq for OwnedName { fn eq(&self, other: &OwnedName) -> bool { - self.key == other.key - && iter::zip(&self.labels, &other.labels).all(|(a, b)| match (a, b) { - (Some(a), Some(b)) => a == b, - (None, None) => true, - _ => false, - }) + self.key == other.key && self.labels.eq(&other.labels) } } -impl Eq for OwnedName {} - impl Hash for OwnedName { fn hash(&self, state: &mut H) { - state.write(self.key.as_bytes()); + Hash::hash(self.key, state); for label in self.labels.iter().flatten() { label.hash(state); } @@ -216,7 +211,6 @@ pub fn compute_hash(value: V) -> u64 { #[cfg(test)] mod tests { - use crate::{ key::{compute_hash, Name}, label::Label, diff --git a/ipa-metrics/src/label.rs b/ipa-metrics/src/label.rs index 9ee414e26..b4e37a704 100644 --- a/ipa-metrics/src/label.rs +++ b/ipa-metrics/src/label.rs @@ -3,16 +3,17 @@ use std::{ hash::{Hash, Hasher}, }; -pub use Value as LabelValue; - pub const MAX_LABELS: usize = 5; /// Dimension value (or label value) must be sendable to another thread /// and there must be a way to show it -pub trait Value: Display + Send { +pub trait LabelValue: Display + Send { /// Creates a unique hash for this value. /// It is easy to create collisions, so better avoid them, /// by assigning a unique integer to each value + /// + /// Note that this value is used for uniqueness check inside + /// metric stores fn hash(&self) -> u64; /// Creates an owned copy of this value. Dynamic dispatch @@ -31,10 +32,9 @@ impl LabelValue for u32 { } } -#[derive()] pub struct Label<'lv> { pub name: &'static str, - pub val: &'lv dyn Value, + pub val: &'lv dyn LabelValue, } impl Label<'_> { @@ -76,7 +76,7 @@ impl PartialEq for Label<'_> { /// inside metric hashmaps as they need to own the keys. pub struct OwnedLabel { pub name: &'static str, - pub val: Box, + pub val: Box, } impl Clone for OwnedLabel { @@ -118,6 +118,8 @@ impl PartialEq for OwnedLabel { } } +impl Eq for OwnedLabel {} + #[cfg(test)] mod tests { diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs index 2ee9d5be0..f84f8dc1c 100644 --- a/ipa-metrics/src/lib.rs +++ b/ipa-metrics/src/lib.rs @@ -32,6 +32,30 @@ pub use producer::Producer as MetricsProducer; #[cfg(not(feature = "partitions"))] pub use store::Store as MetricsStore; +/// Creates metric infrastructure that is ready to use +/// in the application code. It consists a triple of +/// [`MetricsCollector`], [`MetricsProducer`], and +/// [`MetricsCollectorController`]. +/// +/// Collector is used in the centralized place (a dedicated thread) +/// to collect metrics coming from thread local stores. +/// +/// Metric producer must be installed on every thread that is used +/// to emit telemetry, and it connects that thread to the collector. +/// +/// Controller provides command-line API interface to the collector. +/// A thread that owns the controller, can request current snapshot. +/// For more information about API, see [`Command`]. +/// +/// ## Example +/// ```rust +/// let (collector, producer, controller) = ipa_metrics::install(); +/// ``` +/// +/// [`MetricsCollector`]: crate::MetricsCollector +/// [`MetricsProducer`]: crate::MetricsProducer +/// [`MetricsCollectorController`]: crate::MetricsCollectorController +/// [`Command`]: crate::ControllerCommand #[must_use] pub fn install() -> ( MetricsCollector, @@ -51,7 +75,7 @@ pub fn install() -> ( ) } -/// Same as [`installer]` but spawns a new thread to run the collector. +/// Same as [`install`] but spawns a new thread to run the collector. /// /// ## Errors /// if thread cannot be started diff --git a/ipa-metrics/src/store.rs b/ipa-metrics/src/store.rs index 58160ed4c..501b875a2 100644 --- a/ipa-metrics/src/store.rs +++ b/ipa-metrics/src/store.rs @@ -63,9 +63,10 @@ impl Store { } } - /// Returns the value for the specified metric taking into account - /// its dimensionality. That is (foo, dim1 = 1, dim2 = 2) will be - /// different from (foo, dim1 = 1). + /// Returns the value for the specified metric, limited by any specified dimensions, + /// but not by any unspecified dimensions. If metric foo has dimensions dim1 and dim2, + /// a query for (foo, dim1 = 1) will sum the counter values having dim1 = 1 + /// and any value of dim2. /// The cost of this operation is `O(N*M)` where `N` - number of unique metrics /// registered in this store and `M` number of dimensions. /// From 7331a1cd442a36f1bcfea607bca70bc7346abb95 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Tue, 22 Oct 2024 19:00:09 -0700 Subject: [PATCH 11/11] Fix `Label` Hash implementation --- ipa-metrics/src/label.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipa-metrics/src/label.rs b/ipa-metrics/src/label.rs index b4e37a704..27da2b116 100644 --- a/ipa-metrics/src/label.rs +++ b/ipa-metrics/src/label.rs @@ -58,8 +58,8 @@ impl Debug for Label<'_> { impl Hash for Label<'_> { fn hash(&self, state: &mut H) { - state.write(self.name.as_bytes()); - state.write_u64(self.val.hash()); + Hash::hash(&self.name, state); + Hash::hash(&self.val.hash(), state); } }