From 3ff5ed5d3af8cc4072cb66e60987be6d6da0af4a Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Tue, 17 Sep 2024 11:17:24 -0700 Subject: [PATCH] Controller for metrics collector --- ipa-metrics/src/collector.rs | 76 +++++++++++++++++++++++++++++++++--- ipa-metrics/src/context.rs | 4 ++ 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/ipa-metrics/src/collector.rs b/ipa-metrics/src/collector.rs index be5270da5..de8a929a4 100644 --- a/ipa-metrics/src/collector.rs +++ b/ipa-metrics/src/collector.rs @@ -10,19 +10,26 @@ thread_local! { static COLLECTOR: RefCell> = const { RefCell::new(None) } } -fn installer() -> (MetricsCollector, MetricsProducer) { +pub fn installer() -> ( + MetricsCollector, + MetricsProducer, + MetricsCollectorController, +) { + let (command_tx, command_rx) = crossbeam_channel::unbounded(); let (tx, rx) = crossbeam_channel::unbounded(); ( MetricsCollector { rx, local_store: MetricsStore::default(), + command_rx, }, MetricsProducer { tx }, + MetricsCollectorController { tx: command_tx }, ) } #[derive(Clone)] -struct MetricsProducer { +pub struct MetricsProducer { tx: Sender, } @@ -55,9 +62,28 @@ impl Drop for ProducerDropHandle { } } -struct MetricsCollector { +pub enum Command { + Snapshot(Sender), +} + +pub struct MetricsCollectorController { + tx: Sender, +} + +impl MetricsCollectorController { + pub fn snapshot(&self) -> Result { + let (tx, rx) = crossbeam_channel::bounded(0); + self.tx + .send(Command::Snapshot(tx)) + .map_err(|e| format!("An error occurred while requesting metrics snapshot: {e}"))?; + rx.recv().map_err(|e| format!("Disconnected channel: {e}")) + } +} + +pub struct MetricsCollector { rx: Receiver, local_store: MetricsStore, + command_rx: Receiver, } impl MetricsCollector { @@ -67,6 +93,32 @@ impl MetricsCollector { }); } + fn event_loop(&mut self) { + loop { + crossbeam_channel::select! { + recv(self.rx) -> msg => { + eprintln!("received new snapshot"); + self.local_store.merge(msg.unwrap()); + } + recv(self.command_rx) -> cmd => { + match cmd { + Ok(Command::Snapshot(tx)) => { + tx.send(self.local_store.clone()).unwrap(); + } + Err(_) => { + eprintln!("disconnected"); + break; + } + _ => { + eprintln!("unknown command"); + break; + } + } + } + } + } + } + pub fn recv_all(&mut self) { loop { match self.rx.recv() { @@ -77,14 +129,26 @@ impl MetricsCollector { } pub fn wait_for_all() -> MetricsStore { - COLLECTOR.with(|c| { - let mut c = c.borrow_mut(); + COLLECTOR.with_borrow_mut(|c| { let collector = c.as_mut().expect("Collector is installed"); collector.recv_all(); mem::take(&mut collector.local_store) }) } + + pub fn wait_for_shutdown() { + COLLECTOR.with_borrow_mut(|c| { + let collector = c.as_mut().expect("Collector is installed"); + collector.event_loop(); + }); + } +} + +impl Drop for MetricsCollector { + fn drop(&mut self) { + eprintln!("collector dropped"); + } } #[cfg(test)] @@ -131,7 +195,7 @@ mod tests { #[test] fn start_stop() { - let (collector, producer) = installer(); + let (collector, producer, _) = installer(); let handle = thread::spawn(|| { collector.install(); MetricsCollector::wait_for_all().counter_value(&metric_name!("foo")) diff --git a/ipa-metrics/src/context.rs b/ipa-metrics/src/context.rs index d7b0043b6..1fca51cf7 100644 --- a/ipa-metrics/src/context.rs +++ b/ipa-metrics/src/context.rs @@ -37,6 +37,10 @@ impl CurrentThreadContext { pub fn flush() { METRICS_CTX.with_borrow_mut(|ctx| ctx.flush()); } + + pub fn is_connected() -> bool { + METRICS_CTX.with_borrow(|ctx| ctx.is_connected()) + } } /// This context is used inside thread-local storage,