Skip to content

Commit

Permalink
Controller for metrics collector
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Sep 17, 2024
1 parent 78910bf commit 3ff5ed5
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 6 deletions.
76 changes: 70 additions & 6 deletions ipa-metrics/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,26 @@ thread_local! {
static COLLECTOR: RefCell<Option<MetricsCollector>> = const { RefCell::new(None) }
}

fn installer() -> (MetricsCollector, MetricsProducer) {
pub fn installer() -> (
MetricsCollector,
MetricsProducer,
MetricsCollectorController,
) {
let (command_tx, command_rx) = crossbeam_channel::unbounded();
let (tx, rx) = crossbeam_channel::unbounded();
(
MetricsCollector {
rx,
local_store: MetricsStore::default(),
command_rx,
},
MetricsProducer { tx },
MetricsCollectorController { tx: command_tx },
)
}

#[derive(Clone)]
struct MetricsProducer {
pub struct MetricsProducer {
tx: Sender<MetricsStore>,
}

Expand Down Expand Up @@ -55,9 +62,28 @@ impl Drop for ProducerDropHandle {
}
}

struct MetricsCollector {
pub enum Command {
Snapshot(Sender<MetricsStore>),
}

pub struct MetricsCollectorController {
tx: Sender<Command>,
}

impl MetricsCollectorController {
pub fn snapshot(&self) -> Result<MetricsStore, String> {
let (tx, rx) = crossbeam_channel::bounded(0);
self.tx
.send(Command::Snapshot(tx))
.map_err(|e| format!("An error occurred while requesting metrics snapshot: {e}"))?;
rx.recv().map_err(|e| format!("Disconnected channel: {e}"))
}
}

pub struct MetricsCollector {
rx: Receiver<MetricsStore>,
local_store: MetricsStore,
command_rx: Receiver<Command>,
}

impl MetricsCollector {
Expand All @@ -67,6 +93,32 @@ impl MetricsCollector {
});
}

fn event_loop(&mut self) {
loop {
crossbeam_channel::select! {
recv(self.rx) -> msg => {
eprintln!("received new snapshot");
self.local_store.merge(msg.unwrap());
}
recv(self.command_rx) -> cmd => {
match cmd {
Ok(Command::Snapshot(tx)) => {
tx.send(self.local_store.clone()).unwrap();
}
Err(_) => {
eprintln!("disconnected");
break;
}
_ => {
eprintln!("unknown command");
break;
}
}
}
}
}
}

pub fn recv_all(&mut self) {
loop {
match self.rx.recv() {
Expand All @@ -77,14 +129,26 @@ impl MetricsCollector {
}

pub fn wait_for_all() -> MetricsStore {
COLLECTOR.with(|c| {
let mut c = c.borrow_mut();
COLLECTOR.with_borrow_mut(|c| {
let collector = c.as_mut().expect("Collector is installed");
collector.recv_all();

mem::take(&mut collector.local_store)
})
}

pub fn wait_for_shutdown() {
COLLECTOR.with_borrow_mut(|c| {
let collector = c.as_mut().expect("Collector is installed");
collector.event_loop();
});
}
}

impl Drop for MetricsCollector {
fn drop(&mut self) {
eprintln!("collector dropped");
}
}

#[cfg(test)]
Expand Down Expand Up @@ -131,7 +195,7 @@ mod tests {

#[test]
fn start_stop() {
let (collector, producer) = installer();
let (collector, producer, _) = installer();
let handle = thread::spawn(|| {
collector.install();
MetricsCollector::wait_for_all().counter_value(&metric_name!("foo"))
Expand Down
4 changes: 4 additions & 0 deletions ipa-metrics/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl CurrentThreadContext {
pub fn flush() {
METRICS_CTX.with_borrow_mut(|ctx| ctx.flush());
}

pub fn is_connected() -> bool {
METRICS_CTX.with_borrow(|ctx| ctx.is_connected())
}
}

/// This context is used inside thread-local storage,
Expand Down

0 comments on commit 3ff5ed5

Please sign in to comment.