Skip to content

Commit

Permalink
Merge pull request private-attribution#1356 from akoshelev/metrics-crate
Browse files Browse the repository at this point in the history
Add ipa-metrics crate
  • Loading branch information
akoshelev authored Oct 23, 2024
2 parents 09ceb05 + 7331a1c commit ebb5556
Show file tree
Hide file tree
Showing 13 changed files with 1,560 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["ipa-core", "ipa-step", "ipa-step-derive", "ipa-step-test"]
members = ["ipa-core", "ipa-step", "ipa-step-derive", "ipa-step-test", "ipa-metrics"]

[profile.release]
incremental = true
Expand Down
20 changes: 20 additions & 0 deletions ipa-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "ipa-metrics"
version = "0.1.0"
edition = "2021"

[features]
default = []
# support metric partitioning
partitions = []

[dependencies]
# crossbeam channels are faster than std
crossbeam-channel = "0.5"
# This crate uses raw entry API that is unstable in stdlib
hashbrown = "0.15"
# Fast non-collision-resistant hashing
rustc-hash = "2.0.0"
# logging
tracing = "0.1"

182 changes: 182 additions & 0 deletions ipa-metrics/src/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use std::cell::RefCell;

use crossbeam_channel::{Receiver, Select};

use crate::{
controller::{Command, Status},
ControllerCommand, MetricsStore,
};

thread_local! {
/// Collector that is installed in a thread. It is responsible for receiving metrics from
/// all threads and aggregating them.
static COLLECTOR: RefCell<Option<MetricsCollector>> = const { RefCell::new(None) }
}

/// Convenience struct to block the current thread on metric collection
pub struct Installed;

impl Installed {
#[allow(clippy::unused_self)]
pub fn block_until_shutdown(&self) -> MetricsStore {
MetricsCollector::with_current_mut(|c| {
c.event_loop();

std::mem::take(&mut c.local_store)
})
}
}

pub struct MetricsCollector {
pub(super) rx: Receiver<MetricsStore>,
pub(super) local_store: MetricsStore,
pub(super) command_rx: Receiver<ControllerCommand>,
}

impl MetricsCollector {
/// This installs metrics collection mechanism to current thread.
///
/// ## Panics
/// It panics if there is another collector system already installed.
#[allow(clippy::must_use_candidate)]
pub fn install(self) -> Installed {
COLLECTOR.with_borrow_mut(|c| {
assert!(c.replace(self).is_none(), "Already initialized");
});

Installed
}

fn event_loop(&mut self) {
let mut select = Select::new();
let data_idx = select.recv(&self.rx);
let command_idx = select.recv(&self.command_rx);
let mut state = Status::Active;

loop {
let next_op = select.select();
match next_op.index() {
i if i == data_idx => match next_op.recv(&self.rx) {
Ok(store) => {
tracing::trace!("Collector received more data: {store:?}");
self.local_store.merge(store);
}
Err(e) => {
tracing::debug!("No more threads collecting metrics. Disconnected: {e}");
select.remove(data_idx);
state = Status::Disconnected;
}
},
i if i == command_idx => match next_op.recv(&self.command_rx) {
Ok(ControllerCommand::Snapshot(tx)) => {
tracing::trace!("Snapshot request received");
tx.send(self.local_store.clone()).unwrap();
}
Ok(ControllerCommand::Stop(tx)) => {
tracing::trace!("Stop signal received");
tx.send(()).unwrap();
break;
}
Ok(Command::Status(tx)) => {
tx.send(state).unwrap();
}
Err(e) => {
tracing::debug!("Metric controller is disconnected: {e}");
break;
}
},
_ => unreachable!(),
}
}
}

fn with_current_mut<F: FnOnce(&mut Self) -> T, T>(f: F) -> T {
COLLECTOR.with_borrow_mut(|c| {
let collector = c.as_mut().expect("Collector is installed");
f(collector)
})
}
}

impl Drop for MetricsCollector {
fn drop(&mut self) {
tracing::debug!("Collector is dropped");
}
}

#[cfg(test)]
mod tests {
use std::{
thread,
thread::{Scope, ScopedJoinHandle},
};

use crate::{controller::Status, counter, install, install_new_thread, producer::Producer};

struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer);

impl<'scope, 'env: 'scope> MeteredScope<'scope, 'env> {
fn spawn<F, T>(&self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope,
{
let producer = self.1.clone();

self.0.spawn(move || {
producer.install();
let r = f();
let _ = producer.drop_handle();

r
})
}
}

trait IntoMetered<'scope, 'env: 'scope> {
fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env>;
}

impl<'scope, 'env: 'scope> IntoMetered<'scope, 'env> for Scope<'scope, 'env> {
fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env> {
MeteredScope(self, meter)
}
}

#[test]
fn start_stop() {
let (collector, producer, controller) = install();
let handle = thread::spawn(|| {
let store = collector.install().block_until_shutdown();
store.counter_val(counter!("foo"))
});

thread::scope(move |s| {
let s = s.metered(producer);
s.spawn(|| counter!("foo", 3)).join().unwrap();
s.spawn(|| counter!("foo", 5)).join().unwrap();
drop(s); // this causes collector to eventually stop receiving signals
while controller.status().unwrap() == Status::Active {}
controller.stop().unwrap();
});

assert_eq!(8, handle.join().unwrap());
}

#[test]
fn with_thread() {
let (producer, controller, handle) = install_new_thread().unwrap();
thread::scope(move |s| {
let s = s.metered(producer);
s.spawn(|| counter!("baz", 4));
s.spawn(|| counter!("bar", 1));
s.spawn(|| {
let snapshot = controller.snapshot().unwrap();
println!("snapshot: {snapshot:?}");
controller.stop().unwrap();
});
});

handle.join().unwrap(); // Collector thread should be terminated by now
}
}
168 changes: 168 additions & 0 deletions ipa-metrics/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::{cell::RefCell, mem};

use crossbeam_channel::Sender;

use crate::MetricsStore;

thread_local! {
pub(crate) static METRICS_CTX: RefCell<MetricsContext> = const { RefCell::new(MetricsContext::new()) }
}

#[macro_export]
macro_rules! counter {
($metric:expr, $val:expr $(, $l:expr => $v:expr)*) => {{
let name = $crate::metric_name!($metric $(, $l => $v)*);
$crate::MetricsCurrentThreadContext::store_mut(|store| store.counter(&name).inc($val))
}};
($metric:expr $(, $l:expr => $v:expr)*) => {{
$crate::metric_name!($metric $(, $l => $v)*)
}};
}

/// Provides access to the metric store associated with the current thread.
/// If there is no store associated with the current thread, it will create a new one.
pub struct CurrentThreadContext;

impl CurrentThreadContext {
pub fn init(tx: Sender<MetricsStore>) {
METRICS_CTX.with_borrow_mut(|ctx| ctx.init(tx));
}

pub fn flush() {
METRICS_CTX.with_borrow_mut(MetricsContext::flush);
}

pub fn store<F: FnOnce(&MetricsStore) -> T, T>(f: F) -> T {
METRICS_CTX.with_borrow(|ctx| f(ctx.store()))
}

pub fn store_mut<F: FnOnce(&mut MetricsStore) -> T, T>(f: F) -> T {
METRICS_CTX.with_borrow_mut(|ctx| f(ctx.store_mut()))
}
}

/// This context is used inside thread-local storage,
/// so it must be wrapped inside [`std::cell::RefCell`].
///
/// For single-threaded applications, it is possible
/// to use it w/o connecting to the collector thread.
pub struct MetricsContext {
store: MetricsStore,
/// Handle to send metrics to the collector thread
tx: Option<Sender<MetricsStore>>,
}

impl Default for MetricsContext {
fn default() -> Self {
Self::new()
}
}

impl MetricsContext {
#[must_use]
pub const fn new() -> Self {
Self {
store: MetricsStore::new(),
tx: None,
}
}

/// Connects this context to the collector thread.
/// Sender will be used to send data from this thread
fn init(&mut self, tx: Sender<MetricsStore>) {
assert!(self.tx.is_none(), "Already connected");

self.tx = Some(tx);
}

#[must_use]
pub fn store(&self) -> &MetricsStore {
&self.store
}

pub fn store_mut(&mut self) -> &mut MetricsStore {
&mut self.store
}

fn flush(&mut self) {
if self.store.is_empty() {
return;
}

if let Some(tx) = self.tx.as_ref() {
let store = mem::take(&mut self.store);
match tx.send(store) {
Ok(()) => {}
Err(e) => {
// Note that the store is dropped at this point.
// If it becomes a problem with collector threads disconnecting
// somewhat randomly, we can keep the old store around
// and clone it when sending.
tracing::warn!("MetricsContext is disconnected from the collector: {e}");
}
}
} else {
tracing::warn!("MetricsContext is not connected");
}
}
}

impl Drop for MetricsContext {
fn drop(&mut self) {
if !self.store.is_empty() {
tracing::warn!(
"Non-empty metric store is dropped: {} metrics lost",
self.store.len()
);
}
}
}

#[cfg(test)]
mod tests {
use std::thread;

use crate::MetricsContext;

/// Each thread has its local store by default, and it is exclusive to it
#[test]
#[cfg(feature = "partitions")]
fn local_store() {
use crate::{context::CurrentThreadContext, CurrentThreadPartitionContext};

CurrentThreadPartitionContext::set(0xdead_beef);
counter!("foo", 7);

std::thread::spawn(|| {
counter!("foo", 1);
counter!("foo", 5);
assert_eq!(
5,
CurrentThreadContext::store(|store| store.counter_val(counter!("foo")))
);
});

assert_eq!(
7,
CurrentThreadContext::store(|store| store.counter_val(counter!("foo")))
);
}

#[test]
fn default() {
assert_eq!(0, MetricsContext::default().store().len());
}

#[test]
fn ignore_empty_store_on_flush() {
let (tx, rx) = crossbeam_channel::unbounded();
let mut ctx = MetricsContext::new();
ctx.init(tx);
let handle =
thread::spawn(move || assert!(rx.recv().is_err(), "Context sent non-empty store"));

ctx.flush();
drop(ctx);
handle.join().unwrap();
}
}
Loading

0 comments on commit ebb5556

Please sign in to comment.