Skip to content

Commit

Permalink
Grab next event buffer
Browse files Browse the repository at this point in the history
Signed-off-by: Raymond Zhao <[email protected]>
  • Loading branch information
rayz committed Mar 3, 2025
1 parent 61175a9 commit 179e946
Showing 1 changed file with 6 additions and 39 deletions.
45 changes: 6 additions & 39 deletions lib/saluki-core/src/topology/interconnect/event_stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::poll_fn, num::NonZeroUsize};
use std::future::poll_fn;

use metrics::{Counter, Histogram};
use saluki_metrics::MetricsBuilder;
Expand All @@ -7,11 +7,6 @@ use tokio::sync::mpsc;
use super::FixedSizeEventBuffer;
use crate::{components::ComponentContext, observability::ComponentMetricsExt as _};

// Since we're dealing with event _buffers_, this becomes a multiplicative factor, so we might be receiving 128 (or
// whatever the number is) event buffers of 128 events each. This is good for batching/efficiency but we don't want
// wildly large batches, so this number is sized conservatively for now.
const NEXT_READY_RECV_LIMIT: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(128) };

/// A stream of events sent to a component.
///
/// For transforms and destinations, their events can only come from other components that have forwarded them onwards.
Expand Down Expand Up @@ -60,20 +55,13 @@ impl EventStream {
///
/// If the component (or components) connected to this event stream have stopped, `None` is returned.
pub async fn next_ready(&mut self) -> Option<Vec<FixedSizeEventBuffer>> {
let mut buffers = Vec::new();
poll_fn(|cx| self.inner.poll_recv_many(cx, &mut buffers, NEXT_READY_RECV_LIMIT.get())).await;

if buffers.is_empty() {
None
} else {
let mut total_events_received = 0;
for buffer in &buffers {
total_events_received += buffer.len() as u64;
match poll_fn(|cx| self.inner.poll_recv(cx)).await {
None => None,
Some(buffer) => {
self.events_received_size.record(buffer.len() as f64);
self.events_received.increment(1);
Some(vec![buffer])
}
self.events_received.increment(total_events_received);

Some(buffers)
}
}
}
Expand Down Expand Up @@ -131,25 +119,4 @@ mod tests {

assert!(event_stream.next_ready().await.is_none());
}

#[tokio::test]
async fn next_ready_obeys_limit() {
// We'll send a ton of event buffers, more than `NEXT_READY_RECV_LIMIT`, and make sure that when we call
// `next_ready`, we only get up to `NEXT_READY_RECV_LIMIT` event buffers.
let bufs_to_send = (NEXT_READY_RECV_LIMIT.get() as f64 * 1.75) as usize;
let (mut event_stream, tx) = create_event_stream(bufs_to_send);

for _ in 0..bufs_to_send {
let ebuf = FixedSizeEventBuffer::for_test(1);
tx.send(ebuf).await.expect("should not fail to send event buffer");
}

// Now call `next_ready` and make sure we only get `NEXT_READY_RECV_LIMIT` event buffers:
let received_ebufs1 = event_stream.next_ready().await.expect("should receive event buffers");
assert_eq!(received_ebufs1.len(), NEXT_READY_RECV_LIMIT.get());

// And one more time to get the rest:
let received_ebufs2 = event_stream.next_ready().await.expect("should receive event buffers");
assert_eq!(received_ebufs2.len(), bufs_to_send - received_ebufs1.len());
}
}

0 comments on commit 179e946

Please sign in to comment.