Skip to content

Commit

Permalink
Remove next_ready usage in metrics destination
Browse files Browse the repository at this point in the history
Signed-off-by: Raymond Zhao <[email protected]>
  • Loading branch information
rayz committed Mar 3, 2025
1 parent 179e946 commit 114d80a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 54 deletions.
91 changes: 43 additions & 48 deletions lib/saluki-components/src/destinations/datadog/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,65 +206,60 @@ where
loop {
select! {
_ = health.live() => continue,
maybe_events = context.events().next_ready() => match maybe_events {
Some(event_buffers) => {
debug!(event_buffers_len = event_buffers.len(), "Received event buffers.");

for event_buffer in event_buffers {
debug!(events_len = event_buffer.len(), "Processing event buffer.");

for event in event_buffer {
if let Some(metric) = event.try_into_metric() {
let request_builder = match MetricsEndpoint::from_metric(&metric) {
MetricsEndpoint::Series => &mut series_request_builder,
MetricsEndpoint::Sketches => &mut sketches_request_builder,
};

// Encode the metric. If we get it back, that means the current request is full, and we need to
// flush it before we can try to encode the metric again... so we'll hold on to it in that case
// before flushing and trying to encode it again.
let metric_to_retry = match request_builder.encode(metric).await {
Ok(None) => continue,
Ok(Some(metric)) => metric,
Err(e) => {
error!(error = %e, "Failed to encode metric.");
telemetry.events_dropped_encoder().increment(1);
continue;
}
};
maybe_events = context.events().next() => match maybe_events {
Some(event_buffer) => {
for event in event_buffer {
if let Some(metric) = event.try_into_metric() {
let request_builder = match MetricsEndpoint::from_metric(&metric) {
MetricsEndpoint::Series => &mut series_request_builder,
MetricsEndpoint::Sketches => &mut sketches_request_builder,
};

// Encode the metric. If we get it back, that means the current request is full, and we need to
// flush it before we can try to encode the metric again... so we'll hold on to it in that case
// before flushing and trying to encode it again.
let metric_to_retry = match request_builder.encode(metric).await {
Ok(None) => continue,
Ok(Some(metric)) => metric,
Err(e) => {
error!(error = %e, "Failed to encode metric.");
telemetry.events_dropped_encoder().increment(1);
continue;
}
};


let maybe_requests = request_builder.flush().await;
if maybe_requests.is_empty() {
panic!("builder told us to flush, but gave us nothing");
}
let maybe_requests = request_builder.flush().await;
if maybe_requests.is_empty() {
panic!("builder told us to flush, but gave us nothing");
}

for maybe_request in maybe_requests {
match maybe_request {
Ok((events, request)) => forwarder_handle.send_request(events, request).await?,
Err(e) => {
// TODO: Increment a counter here that metrics were dropped due to a flush failure.
if e.is_recoverable() {
// If the error is recoverable, we'll hold on to the metric to retry it later.
continue;
} else {
return Err(GenericError::from(e).context("Failed to flush request."));
}
for maybe_request in maybe_requests {
match maybe_request {
Ok((events, request)) => forwarder_handle.send_request(events, request).await?,
Err(e) => {
// TODO: Increment a counter here that metrics were dropped due to a flush failure.
if e.is_recoverable() {
// If the error is recoverable, we'll hold on to the metric to retry it later.
continue;
} else {
return Err(GenericError::from(e).context("Failed to flush request."));
}
}
}
}

// Now try to encode the metric again. If it fails again, we'll just log it because it shouldn't
// be possible to fail at this point, otherwise we would have already caught that the first
// time.
if let Err(e) = request_builder.encode(metric_to_retry).await {
error!(error = %e, "Failed to encode metric.");
telemetry.events_dropped_encoder().increment(1);
}
// Now try to encode the metric again. If it fails again, we'll just log it because it shouldn't
// be possible to fail at this point, otherwise we would have already caught that the first
// time.
if let Err(e) = request_builder.encode(metric_to_retry).await {
error!(error = %e, "Failed to encode metric.");
telemetry.events_dropped_encoder().increment(1);
}
}
}


debug!("All event buffers processed.");

// If we're not already pending a flush, we'll start the countdown.
Expand Down
45 changes: 39 additions & 6 deletions lib/saluki-core/src/topology/interconnect/event_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::future::poll_fn;
use std::{future::poll_fn, num::NonZeroUsize};

use metrics::{Counter, Histogram};
use saluki_metrics::MetricsBuilder;
Expand All @@ -7,6 +7,11 @@ use tokio::sync::mpsc;
use super::FixedSizeEventBuffer;
use crate::{components::ComponentContext, observability::ComponentMetricsExt as _};

// Since we're dealing with event _buffers_, this becomes a multiplicative factor, so we might be receiving 128 (or
// whatever the number is) event buffers of 128 events each. This is good for batching/efficiency but we don't want
// wildly large batches, so this number is sized conservatively for now.
const NEXT_READY_RECV_LIMIT: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(128) };

/// A stream of events sent to a component.
///
/// For transforms and destinations, their events can only come from other components that have forwarded them onwards.
Expand Down Expand Up @@ -55,13 +60,20 @@ impl EventStream {
///
/// If the component (or components) connected to this event stream have stopped, `None` is returned.
pub async fn next_ready(&mut self) -> Option<Vec<FixedSizeEventBuffer>> {
match poll_fn(|cx| self.inner.poll_recv(cx)).await {
None => None,
Some(buffer) => {
let mut buffers = Vec::new();
poll_fn(|cx| self.inner.poll_recv_many(cx, &mut buffers, NEXT_READY_RECV_LIMIT.get())).await;

if buffers.is_empty() {
None
} else {
let mut total_events_received = 0;
for buffer in &buffers {
total_events_received += buffer.len() as u64;
self.events_received_size.record(buffer.len() as f64);
self.events_received.increment(1);
Some(vec![buffer])
}
self.events_received.increment(total_events_received);

Some(buffers)
}
}
}
Expand Down Expand Up @@ -119,4 +131,25 @@ mod tests {

assert!(event_stream.next_ready().await.is_none());
}

#[tokio::test]
async fn next_ready_obeys_limit() {
// We'll send a ton of event buffers, more than `NEXT_READY_RECV_LIMIT`, and make sure that when we call
// `next_ready`, we only get up to `NEXT_READY_RECV_LIMIT` event buffers.
let bufs_to_send = (NEXT_READY_RECV_LIMIT.get() as f64 * 1.75) as usize;
let (mut event_stream, tx) = create_event_stream(bufs_to_send);

for _ in 0..bufs_to_send {
let ebuf = FixedSizeEventBuffer::for_test(1);
tx.send(ebuf).await.expect("should not fail to send event buffer");
}

// Now call `next_ready` and make sure we only get `NEXT_READY_RECV_LIMIT` event buffers:
let received_ebufs1 = event_stream.next_ready().await.expect("should receive event buffers");
assert_eq!(received_ebufs1.len(), NEXT_READY_RECV_LIMIT.get());

// And one more time to get the rest:
let received_ebufs2 = event_stream.next_ready().await.expect("should receive event buffers");
assert_eq!(received_ebufs2.len(), bufs_to_send - received_ebufs1.len());
}
}

0 comments on commit 114d80a

Please sign in to comment.