diff --git a/Cargo.toml b/Cargo.toml index 377020368..1aed2b4b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["ipa-core", "ipa-step", "ipa-step-derive", "ipa-step-test"] +members = ["ipa-core", "ipa-step", "ipa-step-derive", "ipa-step-test", "ipa-metrics"] [profile.release] incremental = true diff --git a/ipa-metrics/Cargo.toml b/ipa-metrics/Cargo.toml new file mode 100644 index 000000000..ebaeb9473 --- /dev/null +++ b/ipa-metrics/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "ipa-metrics" +version = "0.1.0" +edition = "2021" + +[features] +default = [] +# support metric partitioning +partitions = [] + +[dependencies] +# crossbeam channels are faster than std +crossbeam-channel = "0.5" +# This crate uses raw entry API that is unstable in stdlib +hashbrown = "0.15" +# Fast non-collision-resistant hashing +rustc-hash = "2.0.0" +# logging +tracing = "0.1" + diff --git a/ipa-metrics/src/collector.rs b/ipa-metrics/src/collector.rs new file mode 100644 index 000000000..4d5995af3 --- /dev/null +++ b/ipa-metrics/src/collector.rs @@ -0,0 +1,182 @@ +use std::cell::RefCell; + +use crossbeam_channel::{Receiver, Select}; + +use crate::{ + controller::{Command, Status}, + ControllerCommand, MetricsStore, +}; + +thread_local! { + /// Collector that is installed in a thread. It is responsible for receiving metrics from + /// all threads and aggregating them. + static COLLECTOR: RefCell> = const { RefCell::new(None) } +} + +/// Convenience struct to block the current thread on metric collection +pub struct Installed; + +impl Installed { + #[allow(clippy::unused_self)] + pub fn block_until_shutdown(&self) -> MetricsStore { + MetricsCollector::with_current_mut(|c| { + c.event_loop(); + + std::mem::take(&mut c.local_store) + }) + } +} + +pub struct MetricsCollector { + pub(super) rx: Receiver, + pub(super) local_store: MetricsStore, + pub(super) command_rx: Receiver, +} + +impl MetricsCollector { + /// This installs metrics collection mechanism to current thread. + /// + /// ## Panics + /// It panics if there is another collector system already installed. + #[allow(clippy::must_use_candidate)] + pub fn install(self) -> Installed { + COLLECTOR.with_borrow_mut(|c| { + assert!(c.replace(self).is_none(), "Already initialized"); + }); + + Installed + } + + fn event_loop(&mut self) { + let mut select = Select::new(); + let data_idx = select.recv(&self.rx); + let command_idx = select.recv(&self.command_rx); + let mut state = Status::Active; + + loop { + let next_op = select.select(); + match next_op.index() { + i if i == data_idx => match next_op.recv(&self.rx) { + Ok(store) => { + tracing::trace!("Collector received more data: {store:?}"); + self.local_store.merge(store); + } + Err(e) => { + tracing::debug!("No more threads collecting metrics. Disconnected: {e}"); + select.remove(data_idx); + state = Status::Disconnected; + } + }, + i if i == command_idx => match next_op.recv(&self.command_rx) { + Ok(ControllerCommand::Snapshot(tx)) => { + tracing::trace!("Snapshot request received"); + tx.send(self.local_store.clone()).unwrap(); + } + Ok(ControllerCommand::Stop(tx)) => { + tracing::trace!("Stop signal received"); + tx.send(()).unwrap(); + break; + } + Ok(Command::Status(tx)) => { + tx.send(state).unwrap(); + } + Err(e) => { + tracing::debug!("Metric controller is disconnected: {e}"); + break; + } + }, + _ => unreachable!(), + } + } + } + + fn with_current_mut T, T>(f: F) -> T { + COLLECTOR.with_borrow_mut(|c| { + let collector = c.as_mut().expect("Collector is installed"); + f(collector) + }) + } +} + +impl Drop for MetricsCollector { + fn drop(&mut self) { + tracing::debug!("Collector is dropped"); + } +} + +#[cfg(test)] +mod tests { + use std::{ + thread, + thread::{Scope, ScopedJoinHandle}, + }; + + use crate::{controller::Status, counter, install, install_new_thread, producer::Producer}; + + struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer); + + impl<'scope, 'env: 'scope> MeteredScope<'scope, 'env> { + fn spawn(&self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope, + { + let producer = self.1.clone(); + + self.0.spawn(move || { + producer.install(); + let r = f(); + let _ = producer.drop_handle(); + + r + }) + } + } + + trait IntoMetered<'scope, 'env: 'scope> { + fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env>; + } + + impl<'scope, 'env: 'scope> IntoMetered<'scope, 'env> for Scope<'scope, 'env> { + fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env> { + MeteredScope(self, meter) + } + } + + #[test] + fn start_stop() { + let (collector, producer, controller) = install(); + let handle = thread::spawn(|| { + let store = collector.install().block_until_shutdown(); + store.counter_val(counter!("foo")) + }); + + thread::scope(move |s| { + let s = s.metered(producer); + s.spawn(|| counter!("foo", 3)).join().unwrap(); + s.spawn(|| counter!("foo", 5)).join().unwrap(); + drop(s); // this causes collector to eventually stop receiving signals + while controller.status().unwrap() == Status::Active {} + controller.stop().unwrap(); + }); + + assert_eq!(8, handle.join().unwrap()); + } + + #[test] + fn with_thread() { + let (producer, controller, handle) = install_new_thread().unwrap(); + thread::scope(move |s| { + let s = s.metered(producer); + s.spawn(|| counter!("baz", 4)); + s.spawn(|| counter!("bar", 1)); + s.spawn(|| { + let snapshot = controller.snapshot().unwrap(); + println!("snapshot: {snapshot:?}"); + controller.stop().unwrap(); + }); + }); + + handle.join().unwrap(); // Collector thread should be terminated by now + } +} diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs new file mode 100644 index 000000000..938d4560b --- /dev/null +++ b/ipa-metrics/src/context.rs @@ -0,0 +1,168 @@ +use std::{cell::RefCell, mem}; + +use crossbeam_channel::Sender; + +use crate::MetricsStore; + +thread_local! { + pub(crate) static METRICS_CTX: RefCell = const { RefCell::new(MetricsContext::new()) } +} + +#[macro_export] +macro_rules! counter { + ($metric:expr, $val:expr $(, $l:expr => $v:expr)*) => {{ + let name = $crate::metric_name!($metric $(, $l => $v)*); + $crate::MetricsCurrentThreadContext::store_mut(|store| store.counter(&name).inc($val)) + }}; + ($metric:expr $(, $l:expr => $v:expr)*) => {{ + $crate::metric_name!($metric $(, $l => $v)*) + }}; +} + +/// Provides access to the metric store associated with the current thread. +/// If there is no store associated with the current thread, it will create a new one. +pub struct CurrentThreadContext; + +impl CurrentThreadContext { + pub fn init(tx: Sender) { + METRICS_CTX.with_borrow_mut(|ctx| ctx.init(tx)); + } + + pub fn flush() { + METRICS_CTX.with_borrow_mut(MetricsContext::flush); + } + + pub fn store T, T>(f: F) -> T { + METRICS_CTX.with_borrow(|ctx| f(ctx.store())) + } + + pub fn store_mut T, T>(f: F) -> T { + METRICS_CTX.with_borrow_mut(|ctx| f(ctx.store_mut())) + } +} + +/// This context is used inside thread-local storage, +/// so it must be wrapped inside [`std::cell::RefCell`]. +/// +/// For single-threaded applications, it is possible +/// to use it w/o connecting to the collector thread. +pub struct MetricsContext { + store: MetricsStore, + /// Handle to send metrics to the collector thread + tx: Option>, +} + +impl Default for MetricsContext { + fn default() -> Self { + Self::new() + } +} + +impl MetricsContext { + #[must_use] + pub const fn new() -> Self { + Self { + store: MetricsStore::new(), + tx: None, + } + } + + /// Connects this context to the collector thread. + /// Sender will be used to send data from this thread + fn init(&mut self, tx: Sender) { + assert!(self.tx.is_none(), "Already connected"); + + self.tx = Some(tx); + } + + #[must_use] + pub fn store(&self) -> &MetricsStore { + &self.store + } + + pub fn store_mut(&mut self) -> &mut MetricsStore { + &mut self.store + } + + fn flush(&mut self) { + if self.store.is_empty() { + return; + } + + if let Some(tx) = self.tx.as_ref() { + let store = mem::take(&mut self.store); + match tx.send(store) { + Ok(()) => {} + Err(e) => { + // Note that the store is dropped at this point. + // If it becomes a problem with collector threads disconnecting + // somewhat randomly, we can keep the old store around + // and clone it when sending. + tracing::warn!("MetricsContext is disconnected from the collector: {e}"); + } + } + } else { + tracing::warn!("MetricsContext is not connected"); + } + } +} + +impl Drop for MetricsContext { + fn drop(&mut self) { + if !self.store.is_empty() { + tracing::warn!( + "Non-empty metric store is dropped: {} metrics lost", + self.store.len() + ); + } + } +} + +#[cfg(test)] +mod tests { + use std::thread; + + use crate::MetricsContext; + + /// Each thread has its local store by default, and it is exclusive to it + #[test] + #[cfg(feature = "partitions")] + fn local_store() { + use crate::{context::CurrentThreadContext, CurrentThreadPartitionContext}; + + CurrentThreadPartitionContext::set(0xdead_beef); + counter!("foo", 7); + + std::thread::spawn(|| { + counter!("foo", 1); + counter!("foo", 5); + assert_eq!( + 5, + CurrentThreadContext::store(|store| store.counter_val(counter!("foo"))) + ); + }); + + assert_eq!( + 7, + CurrentThreadContext::store(|store| store.counter_val(counter!("foo"))) + ); + } + + #[test] + fn default() { + assert_eq!(0, MetricsContext::default().store().len()); + } + + #[test] + fn ignore_empty_store_on_flush() { + let (tx, rx) = crossbeam_channel::unbounded(); + let mut ctx = MetricsContext::new(); + ctx.init(tx); + let handle = + thread::spawn(move || assert!(rx.recv().is_err(), "Context sent non-empty store")); + + ctx.flush(); + drop(ctx); + handle.join().unwrap(); + } +} diff --git a/ipa-metrics/src/controller.rs b/ipa-metrics/src/controller.rs new file mode 100644 index 000000000..265dacf45 --- /dev/null +++ b/ipa-metrics/src/controller.rs @@ -0,0 +1,98 @@ +use crossbeam_channel::Sender; + +use crate::MetricsStore; + +/// Indicates the current status of collector thread +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum Status { + /// There are at least one active thread that can send + /// the store snapshots to the collector. Collector is actively + /// listening for new snapshots. + Active, + /// All threads have been disconnected from this collector, + /// and it is currently awaiting shutdown via [`Command::Stop`] + Disconnected, +} + +pub enum Command { + Snapshot(Sender), + Stop(Sender<()>), + Status(Sender), +} + +/// Handle to communicate with centralized metrics collection system. +pub struct Controller { + pub(super) tx: Sender, +} + +impl Controller { + /// Request new metric snapshot from the collector thread. + /// Blocks current thread until the snapshot is received + /// + /// ## Errors + /// If collector thread is disconnected or an error occurs during snapshot request + /// + /// ## Example + /// ```rust + /// use ipa_metrics::{install_new_thread, MetricsStore}; + /// + /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// let snapshot = controller.snapshot().unwrap(); + /// println!("Current metrics: {snapshot:?}"); + /// ``` + #[inline] + pub fn snapshot(&self) -> Result { + let (tx, rx) = crossbeam_channel::bounded(0); + self.tx + .send(Command::Snapshot(tx)) + .map_err(|e| format!("An error occurred while requesting metrics snapshot: {e}"))?; + rx.recv().map_err(|e| format!("Disconnected channel: {e}")) + } + + /// Send request to terminate the collector thread. + /// Blocks current thread until the snapshot is received. + /// If this request is successful, any subsequent snapshot + /// or stop requests will return an error. + /// + /// ## Errors + /// If collector thread is disconnected or an error occurs while sending + /// or receiving data from the collector thread. + /// + /// ## Example + /// ```rust + /// use ipa_metrics::{install_new_thread, MetricsStore}; + /// + /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// controller.stop().unwrap(); + /// ``` + pub fn stop(self) -> Result<(), String> { + let (tx, rx) = crossbeam_channel::bounded(0); + self.tx + .send(Command::Stop(tx)) + .map_err(|e| format!("An error occurred while requesting termination: {e}"))?; + rx.recv().map_err(|e| format!("Disconnected channel: {e}")) + } + + /// Request current collector status. + /// + /// ## Errors + /// If collector thread is disconnected or an error occurs while sending + /// or receiving data from the collector thread. + /// + /// ## Example + /// ```rust + /// use ipa_metrics::{install_new_thread, ControllerStatus}; + /// + /// let (_, controller, _handle) = install_new_thread().unwrap(); + /// let status = controller.status().unwrap(); + /// println!("Collector status: {status:?}"); + /// ``` + #[inline] + pub fn status(&self) -> Result { + let (tx, rx) = crossbeam_channel::bounded(0); + self.tx + .send(Command::Status(tx)) + .map_err(|e| format!("An error occurred while requesting status: {e}"))?; + rx.recv().map_err(|e| format!("Disconnected channel: {e}")) + } +} diff --git a/ipa-metrics/src/key.rs b/ipa-metrics/src/key.rs new file mode 100644 index 000000000..8f01ea2f4 --- /dev/null +++ b/ipa-metrics/src/key.rs @@ -0,0 +1,293 @@ +//! Metric names supported by this crate. +//! +//! Providing a good use for metrics is a tradeoff between +//! performance and ergonomics. Flexible metric engines support +//! dynamic names, like "bytes.sent.{ip}" or "cpu.{core}.instructions" +//! but that comes with a significant performance cost. +//! String interning helps to mitigate this on the storage site +//! but callsites need to allocate those at every call. +//! +//! IPA metrics can be performance sensitive. There are counters +//! incremented on every send and receive operation, so they need +//! to be fast. For this reason, dynamic metric names are not supported. +//! Metric name can only be a string, known at compile time. +//! +//! However, it is not flexible enough. Most metrics have dimensions +//! attached to them. IPA example is `bytes.sent` metric with step breakdown. +//! It is very useful to know the required throughput per circuit. +//! +//! This metric engine supports up to 5 dimensions attached to every metric, +//! again trying to strike a good balance between performance and usability. + +use std::{ + array, + hash::{Hash, Hasher}, + iter, + iter::repeat, +}; + +pub use Name as MetricName; +pub(super) use OwnedName as OwnedMetricName; + +use crate::label::{Label, OwnedLabel, MAX_LABELS}; + +#[macro_export] +macro_rules! metric_name { + // Match when two key-value pairs are provided + // TODO: enforce uniqueness at compile time + ($metric:expr, $l1:expr => $v1:expr, $l2:expr => $v2:expr) => {{ + use $crate::UniqueElements; + let labels = [ + $crate::Label { + name: $l1, + val: $v1, + }, + $crate::Label { + name: $l2, + val: $v2, + }, + ] + .enforce_unique(); + $crate::MetricName::from_parts($metric, labels) + }}; + // Match when one key-value pair is provided + ($metric:expr, $l1:expr => $v1:expr) => {{ + $crate::MetricName::from_parts( + $metric, + [$crate::Label { + name: $l1, + val: $v1, + }], + ) + }}; + // Match when no key-value pairs are provided + ($metric:expr) => {{ + $crate::MetricName::from_parts($metric, []) + }}; +} + +/// Metric name that is created at callsite on each metric invocation. +/// For this reason, it is performance sensitive - it tries to borrow +/// whatever it can from callee stack. +#[derive(Debug, PartialEq)] +pub struct Name<'lv, const LABELS: usize = 0> { + pub(super) key: &'static str, + labels: [Label<'lv>; LABELS], +} + +impl<'lv, const LABELS: usize> Name<'lv, LABELS> { + /// Constructs this instance from key and labels. + /// ## Panics + /// If number of labels exceeds `MAX_LABELS`. + pub fn from_parts>(key: I, labels: [Label<'lv>; LABELS]) -> Self { + assert!( + LABELS <= MAX_LABELS, + "Maximum 5 labels per metric is supported" + ); + + Self { + key: key.into(), + labels, + } + } + + /// [`ToOwned`] trait does not work because of + /// extra [`Borrow`] requirement + pub(super) fn to_owned(&self) -> OwnedName { + let labels: [_; 5] = array::from_fn(|i| { + if i < self.labels.len() { + Some(self.labels[i].to_owned()) + } else { + None + } + }); + + OwnedName { + key: self.key, + labels, + } + } +} + +/// Same as [`Name`], but intended for internal use. This is an owned +/// version of it, that does not borrow anything from outside. +/// This is the key inside metric stores which are simple hashmaps. +#[derive(Debug, Clone, Eq)] +pub struct OwnedName { + key: &'static str, + labels: [Option; 5], +} + +impl OwnedName { + pub fn labels(&self) -> impl Iterator { + self.labels.iter().filter_map(|l| l.as_ref()) + } + + /// Checks that a subset of labels in `self` matches all values in `other`. + #[must_use] + pub fn partial_match(&self, other: &Name<'_, LABELS>) -> bool { + if self.key == other.key { + other.labels.iter().all(|l| self.find_label(l)) + } else { + false + } + } + + fn find_label(&self, label: &Label<'_>) -> bool { + self.labels().any(|l| l.as_borrowed().eq(label)) + } +} + +impl Hash for Name<'_, LABELS> { + fn hash(&self, state: &mut H) { + Hash::hash(&self.key, state); + // to be consistent with `OwnedName` hashing, we need to + // serialize labels without slice length prefix. + for label in &self.labels { + label.hash(state); + } + } +} + +impl From<&'static str> for Name<'_, 0> { + fn from(value: &'static str) -> Self { + Self { + key: value, + labels: [], + } + } +} + +pub trait UniqueElements { + #[must_use] + fn enforce_unique(self) -> Self; +} + +impl UniqueElements for [Label<'_>; 2] { + fn enforce_unique(self) -> Self { + assert_ne!(self[0].name, self[1].name, "label names must be unique"); + + self + } +} + +impl<'a, const LABELS: usize> PartialEq> for OwnedName { + fn eq(&self, other: &Name<'a, LABELS>) -> bool { + self.key == other.key + && iter::zip( + &self.labels, + other.labels.iter().map(Some).chain(repeat(None)), + ) + .all(|(a, b)| match (a, b) { + (Some(a), Some(b)) => a.as_borrowed() == *b, + (None, None) => true, + _ => false, + }) + } +} + +impl PartialEq for OwnedName { + fn eq(&self, other: &OwnedName) -> bool { + self.key == other.key && self.labels.eq(&other.labels) + } +} + +impl Hash for OwnedName { + fn hash(&self, state: &mut H) { + Hash::hash(self.key, state); + for label in self.labels.iter().flatten() { + label.hash(state); + } + } +} + +#[cfg(test)] +pub fn compute_hash(value: V) -> u64 { + let mut hasher = std::hash::DefaultHasher::default(); + value.hash(&mut hasher); + + hasher.finish() +} + +#[cfg(test)] +mod tests { + use crate::{ + key::{compute_hash, Name}, + label::Label, + }; + + #[test] + fn eq() { + let name = Name::from("foo"); + assert_eq!(name.to_owned(), name); + } + + #[test] + fn hash_eq() { + let a = Name::from("foo"); + let b = Name::from("foo"); + assert_eq!(compute_hash(&a), compute_hash(b)); + assert_eq!(compute_hash(&a), compute_hash(a.to_owned())); + } + + #[test] + fn not_eq() { + let foo = Name::from("foo"); + let bar = Name::from("bar"); + assert_ne!(foo.to_owned(), bar); + } + + #[test] + fn hash_not_eq() { + let foo = Name::from("foo"); + let bar = Name::from("bar"); + assert_ne!(compute_hash(&foo), compute_hash(&bar)); + assert_ne!(compute_hash(foo), compute_hash(bar.to_owned())); + } + + #[test] + #[should_panic(expected = "Maximum 5 labels per metric is supported")] + fn more_than_5_labels() { + let _ = Name::from_parts( + "foo", + [ + Label { + name: "label_1", + val: &1, + }, + Label { + name: "label_2", + val: &1, + }, + Label { + name: "label_3", + val: &1, + }, + Label { + name: "label_4", + val: &1, + }, + Label { + name: "label_5", + val: &1, + }, + Label { + name: "label_6", + val: &1, + }, + ], + ); + } + + #[test] + fn eq_is_consistent() { + let a_name = metric_name!("foo", "label_1" => &1); + let b_name = metric_name!("foo", "label_1" => &1, "label_2" => &2); + + assert_eq!(a_name, a_name); + assert_eq!(a_name.to_owned(), a_name); + + assert_ne!(b_name.to_owned(), a_name); + assert_ne!(a_name.to_owned(), b_name); + } +} diff --git a/ipa-metrics/src/kind.rs b/ipa-metrics/src/kind.rs new file mode 100644 index 000000000..3a48d105b --- /dev/null +++ b/ipa-metrics/src/kind.rs @@ -0,0 +1,5 @@ +//! Different metric types supported by this crate. +//! Currently, only counters are supported. + +/// Counters are simple 8 byte values. +pub type CounterValue = u64; diff --git a/ipa-metrics/src/label.rs b/ipa-metrics/src/label.rs new file mode 100644 index 000000000..27da2b116 --- /dev/null +++ b/ipa-metrics/src/label.rs @@ -0,0 +1,172 @@ +use std::{ + fmt::{Debug, Display, Formatter}, + hash::{Hash, Hasher}, +}; + +pub const MAX_LABELS: usize = 5; + +/// Dimension value (or label value) must be sendable to another thread +/// and there must be a way to show it +pub trait LabelValue: Display + Send { + /// Creates a unique hash for this value. + /// It is easy to create collisions, so better avoid them, + /// by assigning a unique integer to each value + /// + /// Note that this value is used for uniqueness check inside + /// metric stores + fn hash(&self) -> u64; + + /// Creates an owned copy of this value. Dynamic dispatch + /// is required, because values are stored in a generic store + /// that can't be specialized for value types. + fn boxed(&self) -> Box; +} + +impl LabelValue for u32 { + fn hash(&self) -> u64 { + u64::from(*self) + } + + fn boxed(&self) -> Box { + Box::new(*self) + } +} + +pub struct Label<'lv> { + pub name: &'static str, + pub val: &'lv dyn LabelValue, +} + +impl Label<'_> { + #[must_use] + pub fn to_owned(&self) -> OwnedLabel { + OwnedLabel { + name: self.name, + val: self.val.boxed(), + } + } +} + +impl Debug for Label<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Label") + .field("name", &self.name) + .field("val", &format!("{}", self.val)) + .finish() + } +} + +impl Hash for Label<'_> { + fn hash(&self, state: &mut H) { + Hash::hash(&self.name, state); + Hash::hash(&self.val.hash(), state); + } +} + +impl PartialEq for Label<'_> { + fn eq(&self, other: &Self) -> bool { + // name check should be fast - just pointer comparison. + // val check is more involved with dynamic dispatch, so we can consider + // making label immutable and storing a hash of the value in place + self.name == other.name && self.val.hash() == other.val.hash() + } +} + +/// Same as [`Label`] but owns the values. This instance is stored +/// inside metric hashmaps as they need to own the keys. +pub struct OwnedLabel { + pub name: &'static str, + pub val: Box, +} + +impl Clone for OwnedLabel { + fn clone(&self) -> Self { + Self { + name: self.name, + val: self.val.boxed(), + } + } +} + +impl OwnedLabel { + pub fn as_borrowed(&self) -> Label<'_> { + Label { + name: self.name, + val: self.val.as_ref(), + } + } +} + +impl Hash for OwnedLabel { + fn hash(&self, state: &mut H) { + self.as_borrowed().hash(state); + } +} + +impl Debug for OwnedLabel { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OwnedLabel") + .field("name", &self.name) + .field("val", &format!("{}", self.val)) + .finish() + } +} + +impl PartialEq for OwnedLabel { + fn eq(&self, other: &Self) -> bool { + self.name == other.name && self.val.hash() == other.val.hash() + } +} + +impl Eq for OwnedLabel {} + +#[cfg(test)] +mod tests { + + use crate::{key::compute_hash, metric_name}; + + #[test] + fn one_label() { + let foo_1 = metric_name!("foo", "l1" => &1); + let foo_2 = metric_name!("foo", "l1" => &2); + + assert_ne!(foo_1.to_owned(), foo_2); + assert_ne!(compute_hash(&foo_1), compute_hash(&foo_2)); + assert_ne!(foo_2.to_owned(), foo_1); + + assert_eq!(compute_hash(&foo_1), compute_hash(foo_1.to_owned())); + } + + #[test] + #[should_panic(expected = "label names must be unique")] + fn unique() { + metric_name!("foo", "l1" => &1, "l1" => &0); + } + + #[test] + fn non_commutative() { + assert_ne!( + compute_hash(&metric_name!("foo", "l1" => &1, "l2" => &0)), + compute_hash(&metric_name!("foo", "l1" => &0, "l2" => &1)), + ); + assert_ne!( + compute_hash(&metric_name!("foo", "l1" => &1)), + compute_hash(&metric_name!("foo", "l1" => &1, "l2" => &1)), + ); + } + + #[test] + fn clone() { + let metric = metric_name!("foo", "l1" => &1).to_owned(); + assert_eq!(&metric.labels().next(), &metric.labels().next().clone()); + } + + #[test] + fn fields() { + let metric = metric_name!("foo", "l1" => &1).to_owned(); + let label = metric.labels().next().unwrap().to_owned(); + + assert_eq!(label.name, "l1"); + assert_eq!(label.val.to_string(), "1"); + } +} diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs new file mode 100644 index 000000000..f84f8dc1c --- /dev/null +++ b/ipa-metrics/src/lib.rs @@ -0,0 +1,92 @@ +#![deny(clippy::pedantic)] +#![allow(clippy::similar_names)] +#![allow(clippy::module_name_repetitions)] + +mod collector; +mod context; +mod controller; +mod key; +mod kind; +mod label; +#[cfg(feature = "partitions")] +mod partitioned; +mod producer; +mod store; + +use std::{io, thread::JoinHandle}; + +pub use collector::MetricsCollector; +pub use context::{CurrentThreadContext as MetricsCurrentThreadContext, MetricsContext}; +pub use controller::{ + Command as ControllerCommand, Controller as MetricsCollectorController, + Status as ControllerStatus, +}; +pub use key::{MetricName, OwnedName, UniqueElements}; +pub use label::{Label, LabelValue}; +#[cfg(feature = "partitions")] +pub use partitioned::{ + CurrentThreadContext as CurrentThreadPartitionContext, Partition as MetricPartition, + PartitionedStore as MetricsStore, +}; +pub use producer::Producer as MetricsProducer; +#[cfg(not(feature = "partitions"))] +pub use store::Store as MetricsStore; + +/// Creates metric infrastructure that is ready to use +/// in the application code. It consists a triple of +/// [`MetricsCollector`], [`MetricsProducer`], and +/// [`MetricsCollectorController`]. +/// +/// Collector is used in the centralized place (a dedicated thread) +/// to collect metrics coming from thread local stores. +/// +/// Metric producer must be installed on every thread that is used +/// to emit telemetry, and it connects that thread to the collector. +/// +/// Controller provides command-line API interface to the collector. +/// A thread that owns the controller, can request current snapshot. +/// For more information about API, see [`Command`]. +/// +/// ## Example +/// ```rust +/// let (collector, producer, controller) = ipa_metrics::install(); +/// ``` +/// +/// [`MetricsCollector`]: crate::MetricsCollector +/// [`MetricsProducer`]: crate::MetricsProducer +/// [`MetricsCollectorController`]: crate::MetricsCollectorController +/// [`Command`]: crate::ControllerCommand +#[must_use] +pub fn install() -> ( + MetricsCollector, + MetricsProducer, + MetricsCollectorController, +) { + let (command_tx, command_rx) = crossbeam_channel::unbounded(); + let (tx, rx) = crossbeam_channel::unbounded(); + ( + MetricsCollector { + rx, + local_store: MetricsStore::default(), + command_rx, + }, + MetricsProducer { tx }, + MetricsCollectorController { tx: command_tx }, + ) +} + +/// Same as [`install`] but spawns a new thread to run the collector. +/// +/// ## Errors +/// if thread cannot be started +pub fn install_new_thread( +) -> io::Result<(MetricsProducer, MetricsCollectorController, JoinHandle<()>)> { + let (collector, producer, controller) = install(); + let handle = std::thread::Builder::new() + .name("metric-collector".to_string()) + .spawn(|| { + collector.install().block_until_shutdown(); + })?; + + Ok((producer, controller, handle)) +} diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs new file mode 100644 index 000000000..0f71d0e28 --- /dev/null +++ b/ipa-metrics/src/partitioned.rs @@ -0,0 +1,253 @@ +//! This module enables metric partitioning that can be useful +//! when threads that emit metrics are shared across multiple executions. +//! A typical example for it are unit tests in Rust that share threads. +//! Having a global per-thread store would mean that it is not possible +//! to distinguish between different runs. +//! +//! Partitioning attempts to solve this with a global 16 byte identifier that +//! is set in thread local storage and read automatically by [`PartitionedStore`] +//! +//! Note that this module does not provide means to automatically set and unset +//! partitions. `ipa-metrics-tracing` defines a way to do it via tracing context +//! that is good enough for the vast majority of use cases. +//! +//! Because partitioned stores carry additional cost of extra lookup (partition -> store), +//! it is disabled by default and requires explicit opt-in via `partitioning` feature. + +use std::{borrow::Borrow, cell::Cell}; + +use hashbrown::hash_map::Entry; +use rustc_hash::FxBuildHasher; + +use crate::{ + kind::CounterValue, + store::{CounterHandle, Store}, + MetricName, +}; + +thread_local! { + static PARTITION: Cell> = const { Cell::new(None) } +} + +/// Each partition is a unique 8 byte value, meaning roughly 1B partitions +/// can be supported and the limiting factor is birthday bound. +pub type Partition = u64; + +pub struct CurrentThreadContext; + +impl CurrentThreadContext { + pub fn set(new: Partition) { + Self::toggle(Some(new)); + } + + pub fn toggle(new: Option) { + PARTITION.set(new); + } + + #[must_use] + pub fn get() -> Option { + PARTITION.get() + } +} + +/// Provides the same functionality as [`Store`], but partitioned +/// across many dimensions. There is an extra price for it, so +/// don't use it, unless you need it. +/// The dimension is set through [`std::thread::LocalKey`], so +/// each thread can set only one dimension at a time. +/// +/// The API of this struct will match [`Store`] as they +/// can be used interchangeably. +#[derive(Clone, Debug)] +pub struct PartitionedStore { + /// Set of stores partitioned by [`Partition`] + inner: hashbrown::HashMap, + /// We don't want to lose metrics that are emitted when partitions are not set. + /// So we provide a default store for those + default_store: Store, +} + +impl Default for PartitionedStore { + fn default() -> Self { + Self::new() + } +} + +impl PartitionedStore { + #[must_use] + pub const fn new() -> Self { + Self { + inner: hashbrown::HashMap::with_hasher(FxBuildHasher), + default_store: Store::new(), + } + } + + pub fn with_partition T, T>( + &self, + partition: Partition, + f: F, + ) -> Option { + let store = self.inner.get(&partition); + store.map(f) + } + + pub fn merge(&mut self, other: Self) { + for (partition, store) in other.inner { + self.get_mut(Some(partition)).merge(store); + } + self.default_store.merge(other.default_store); + } + + pub fn counter_val<'a, const LABELS: usize, B: Borrow>>( + &'a self, + key: B, + ) -> CounterValue { + let name = key.borrow(); + if let Some(partition) = CurrentThreadContext::get() { + self.inner + .get(&partition) + .map(|store| store.counter_val(name)) + .unwrap_or_default() + } else { + self.default_store.counter_val(name) + } + } + + pub fn counter<'a, const LABELS: usize, B: Borrow>>( + &'a mut self, + key: B, + ) -> CounterHandle<'a, LABELS> { + self.get_mut(CurrentThreadContext::get()).counter(key) + } + + #[must_use] + pub fn len(&self) -> usize { + self.inner.len() + self.default_store.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[allow(dead_code)] + fn with_partition_mut T, T>( + &mut self, + partition: Partition, + f: F, + ) -> T { + let store = self.get_mut(Some(partition)); + f(store) + } + + fn get_mut(&mut self, partition: Option) -> &mut Store { + if let Some(v) = partition { + match self.inner.entry(v) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => entry.insert(Store::default()), + } + } else { + &mut self.default_store + } + } +} + +#[cfg(test)] +mod tests { + use crate::{ + counter, metric_name, + partitioned::{CurrentThreadContext, PartitionedStore}, + }; + + #[test] + fn unique_partition() { + let metric = metric_name!("foo"); + let mut store = PartitionedStore::new(); + store.with_partition_mut(1, |store| { + store.counter(&metric).inc(1); + }); + store.with_partition_mut(5, |store| { + store.counter(&metric).inc(5); + }); + + assert_eq!( + 5, + store.with_partition_mut(5, |store| store.counter(&metric).get()) + ); + assert_eq!( + 1, + store.with_partition_mut(1, |store| store.counter(&metric).get()) + ); + assert_eq!( + 0, + store.with_partition_mut(10, |store| store.counter(&metric).get()) + ); + } + + #[test] + fn current_partition() { + let metric = metric_name!("foo"); + let mut store = PartitionedStore::new(); + store.counter(&metric).inc(7); + + CurrentThreadContext::set(4); + + store.counter(&metric).inc(1); + store.counter(&metric).inc(5); + + assert_eq!(6, store.counter_val(&metric)); + CurrentThreadContext::toggle(None); + assert_eq!(7, store.counter_val(&metric)); + } + + #[test] + fn empty() { + let mut store = PartitionedStore::default(); + assert!(store.is_empty()); + store.counter(&metric_name!("foo")).inc(1); + + assert!(!store.is_empty()); + } + + #[test] + fn len() { + let mut store = PartitionedStore::new(); + assert_eq!(0, store.len()); + + store.counter(metric_name!("foo")).inc(1); + CurrentThreadContext::set(4); + store.counter(metric_name!("foo")).inc(1); + + // one metric in partition 4, another one in default. Even that they are the same, + // partitioned store cannot distinguish between them + assert_eq!(2, store.len()); + } + + #[test] + fn merge() { + let mut store1 = PartitionedStore::new(); + let mut store2 = PartitionedStore::new(); + store1.with_partition_mut(1, |store| store.counter(counter!("foo")).inc(1)); + store2.with_partition_mut(1, |store| store.counter(counter!("foo")).inc(1)); + store1.with_partition_mut(2, |store| store.counter(counter!("foo")).inc(2)); + store2.with_partition_mut(2, |store| store.counter(counter!("foo")).inc(2)); + + store1.counter(counter!("foo")).inc(3); + store2.counter(counter!("foo")).inc(3); + + store1.merge(store2); + assert_eq!( + 2, + store1 + .with_partition(1, |store| store.counter_val(counter!("foo"))) + .unwrap() + ); + assert_eq!( + 4, + store1 + .with_partition(2, |store| store.counter_val(counter!("foo"))) + .unwrap() + ); + assert_eq!(6, store1.counter_val(counter!("foo"))); + } +} diff --git a/ipa-metrics/src/producer.rs b/ipa-metrics/src/producer.rs new file mode 100644 index 000000000..f9ee42cc3 --- /dev/null +++ b/ipa-metrics/src/producer.rs @@ -0,0 +1,48 @@ +use crossbeam_channel::Sender; + +use crate::{context::CurrentThreadContext, MetricsStore}; + +/// A handle to enable centralized metrics collection from the current thread. +/// +/// This is a cloneable handle, so it can be installed in multiple threads. +/// The handle is installed by calling [`install`], which returns a drop handle. +/// When the drop handle is dropped, the context of local store is flushed +/// to the collector thread. +/// +/// Thread local store is always enabled by [`MetricsContext`], so it is always +/// possible to have a local view of metrics emitted by this thread. +/// +/// [`install`]: Producer::install +#[derive(Clone)] +pub struct Producer { + pub(super) tx: Sender, +} + +impl Producer { + pub fn install(&self) { + CurrentThreadContext::init(self.tx.clone()); + } + + /// Returns a drop handle that should be used when thread is stopped. + /// One may think destructor on [`MetricsContext`] could do this, + /// but as pointed in [`LocalKey`] documentation, deadlocks are possible + /// if another TLS storage is accessed at destruction time. + /// + /// I actually ran into this problem with crossbeam channels. Send operation + /// requires access to `thread::current` and that panics at runtime if called + /// from inside `Drop`. + /// + /// [`LocalKey`]: + pub fn drop_handle(&self) -> ProducerDropHandle { + ProducerDropHandle + } +} + +#[must_use] +pub struct ProducerDropHandle; + +impl Drop for ProducerDropHandle { + fn drop(&mut self) { + CurrentThreadContext::flush(); + } +} diff --git a/ipa-metrics/src/store.rs b/ipa-metrics/src/store.rs new file mode 100644 index 000000000..501b875a2 --- /dev/null +++ b/ipa-metrics/src/store.rs @@ -0,0 +1,225 @@ +use std::{borrow::Borrow, hash::BuildHasher}; + +use hashbrown::hash_map::RawEntryMut; +use rustc_hash::FxBuildHasher; + +use crate::{key::OwnedMetricName, kind::CounterValue, MetricName}; + +/// A basic store. Currently only supports counters. +/// Counters and other metrics are stored to optimize writes. That means, one lookup +/// per write. The cost of assembling the total count across all dimensions is absorbed +/// by readers +#[derive(Clone, Debug)] +pub struct Store { + counters: hashbrown::HashMap, +} + +impl Default for Store { + fn default() -> Self { + Self::new() + } +} + +impl Store { + #[must_use] + pub const fn new() -> Self { + Self { + counters: hashbrown::HashMap::with_hasher(FxBuildHasher), + } + } + + pub fn merge(&mut self, other: Self) { + for (k, v) in other.counters { + let hash_builder = self.counters.hasher(); + let hash = hash_builder.hash_one(&k); + *self + .counters + .raw_entry_mut() + .from_hash(hash, |other| other.eq(&k)) + .or_insert(k, 0) + .1 += v; + } + } + + pub fn counter<'a, const LABELS: usize, B: Borrow>>( + &'a mut self, + key: B, + ) -> CounterHandle<'a, LABELS> { + let key = key.borrow(); + let hash_builder = self.counters.hasher(); + let hash = hash_builder.hash_one(key); + let entry = self + .counters + .raw_entry_mut() + .from_hash(hash, |key_found| key_found.eq(key)); + match entry { + RawEntryMut::Occupied(slot) => CounterHandle { + val: slot.into_mut(), + }, + RawEntryMut::Vacant(slot) => { + let (_, val) = slot.insert_hashed_nocheck(hash, key.to_owned(), Default::default()); + CounterHandle { val } + } + } + } + + /// Returns the value for the specified metric, limited by any specified dimensions, + /// but not by any unspecified dimensions. If metric foo has dimensions dim1 and dim2, + /// a query for (foo, dim1 = 1) will sum the counter values having dim1 = 1 + /// and any value of dim2. + /// The cost of this operation is `O(N*M)` where `N` - number of unique metrics + /// registered in this store and `M` number of dimensions. + /// + /// Note that the cost can be improved if it ever becomes a bottleneck by + /// creating a specialized two-level map (metric -> label -> value). + pub fn counter_val<'a, const LABELS: usize, B: Borrow>>( + &'a self, + key: B, + ) -> CounterValue { + let key = key.borrow(); + + self.counters + .iter() + .filter(|(counter, _)| counter.partial_match(key)) + .map(|(_, val)| val) + .sum() + } + + #[must_use] + pub fn len(&self) -> usize { + self.counters.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +pub struct CounterHandle<'a, const LABELS: usize> { + val: &'a mut CounterValue, +} + +impl CounterHandle<'_, LABELS> { + pub fn inc(&mut self, inc: CounterValue) { + *self.val += inc; + } + + pub fn get(&self) -> CounterValue { + *self.val + } +} + +#[cfg(test)] +mod tests { + use std::hash::{DefaultHasher, Hash, Hasher}; + + use crate::{counter, metric_name, store::Store, LabelValue}; + + impl LabelValue for &'static str { + fn hash(&self) -> u64 { + // TODO: use fast hashing here + let mut hasher = DefaultHasher::default(); + Hash::hash(self, &mut hasher); + + hasher.finish() + } + + fn boxed(&self) -> Box { + Box::new(*self) + } + } + + #[test] + fn counter() { + let mut store = Store::default(); + let name = metric_name!("foo"); + { + let mut handle = store.counter(&name); + assert_eq!(0, handle.get()); + handle.inc(3); + assert_eq!(3, handle.get()); + } + + { + store.counter(&name).inc(0); + assert_eq!(3, store.counter(&name).get()); + } + } + + #[test] + fn with_labels() { + let mut store = Store::default(); + let valid_name = metric_name!("foo", "h1" => &1, "h2" => &"2"); + let wrong_name = metric_name!("foo", "h1" => &2, "h2" => &"2"); + store.counter(&valid_name).inc(2); + + assert_eq!(2, store.counter(&valid_name).get()); + assert_eq!(0, store.counter(&wrong_name).get()); + } + + #[test] + fn merge() { + let mut store1 = Store::default(); + let mut store2 = Store::default(); + let foo = metric_name!("foo", "h1" => &1, "h2" => &"2"); + let bar = metric_name!("bar", "h2" => &"2"); + let baz = metric_name!("baz"); + store1.counter(&foo).inc(2); + store2.counter(&foo).inc(1); + + store1.counter(&bar).inc(7); + store2.counter(&baz).inc(3); + + store1.merge(store2); + + assert_eq!(3, store1.counter(&foo).get()); + assert_eq!(7, store1.counter(&bar).get()); + assert_eq!(3, store1.counter(&baz).get()); + } + + #[test] + fn counter_value() { + let mut store = Store::default(); + store + .counter(counter!("foo", "h1" => &1, "h2" => &"1")) + .inc(1); + store + .counter(counter!("foo", "h1" => &1, "h2" => &"2")) + .inc(1); + store + .counter(counter!("foo", "h1" => &2, "h2" => &"1")) + .inc(1); + store + .counter(counter!("foo", "h1" => &2, "h2" => &"2")) + .inc(1); + store + .counter(counter!("bar", "h1" => &1, "h2" => &"1")) + .inc(3); + + assert_eq!(4, store.counter_val(counter!("foo"))); + assert_eq!( + 1, + store.counter_val(&counter!("foo", "h1" => &1, "h2" => &"2")) + ); + assert_eq!(2, store.counter_val(&counter!("foo", "h1" => &1))); + assert_eq!(2, store.counter_val(&counter!("foo", "h2" => &"2"))); + } + + #[test] + fn len_empty() { + let mut store = Store::default(); + assert!(store.is_empty()); + assert_eq!(0, store.len()); + + store.counter(counter!("foo")).inc(1); + assert!(!store.is_empty()); + assert_eq!(1, store.len()); + + store.counter(counter!("foo")).inc(1); + assert_eq!(1, store.len()); + + store.counter(counter!("bar")).inc(1); + assert_eq!(2, store.len()); + } +} diff --git a/scripts/coverage-ci b/scripts/coverage-ci index 2d448daa7..c652e9f65 100755 --- a/scripts/coverage-ci +++ b/scripts/coverage-ci @@ -11,6 +11,9 @@ cargo build --all-targets # Need to be kept in sync manually with tests we run inside check.yml. cargo test --features "cli test-fixture relaxed-dp" +# Provide code coverage stats for ipa-metrics crate with partitions enabled +cargo test -p ipa-metrics --features "partitions" + # descriptive-gate does not require a feature flag. for gate in "compact-gate" ""; do cargo test --no-default-features --features "cli web-app real-world-infra test-fixture $gate"