Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use stream buffering in report collector #1504

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ipa-core/src/bin/report_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
cli::{
playbook::{
make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate,
run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource,
RoundRobinSubmission, StreamingSubmission,
run_query_and_validate, validate, validate_dp, BufferedRoundRobinSubmission,
HybridQueryResult, InputSource, StreamingSubmission,
},
CsvSerializer, IpaQueryResult, Verbosity,
},
Expand Down Expand Up @@ -370,7 +370,7 @@
]
.map(|path| {
let file = File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
RoundRobinSubmission::new(BufReader::new(file))
BufferedRoundRobinSubmission::new(BufReader::new(file))

Check warning on line 373 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L373

Added line #L373 was not covered by tests
})
.map(|s| s.into_byte_streams(shard_count));

Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/cli/playbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::time::sleep;
pub use self::{
hybrid::{run_hybrid_query_and_validate, HybridQueryResult},
ipa::{playbook_oprf_ipa, run_query_and_validate},
streaming::{RoundRobinSubmission, StreamingSubmission},
streaming::{BufferedRoundRobinSubmission, StreamingSubmission},
};
use crate::{
cli::config_parse::HelperNetworkConfigParseExt,
Expand Down
167 changes: 156 additions & 11 deletions ipa-core/src/cli/playbook/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
io::BufRead,
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll, Waker},
};
Expand All @@ -9,7 +10,7 @@

use crate::{
error::BoxError,
helpers::BytesStream,
helpers::{BufferedBytesStream, BytesStream},
sync::{Arc, Mutex},
};

Expand All @@ -20,6 +21,44 @@
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream>;
}

/// Same as [`RoundRobinSubmission`] but buffers the destination stream
/// until it accumulates at least `buf_size` bytes of data
pub struct BufferedRoundRobinSubmission<R> {
inner: R,
buf_size: NonZeroUsize,
}

impl<R: BufRead> BufferedRoundRobinSubmission<R> {
// Standard buffer size for file and network is 8Kb, so we are aligning this value with it.
// Tokio and standard bufer also use 8Kb buffers.
// If other value gives better performance, we should use it instead
const DEFAULT_BUF_SIZE: NonZeroUsize = NonZeroUsize::new(8192).unwrap();

/// Create a new instance with the default buffer size.
pub fn new(read_from: R) -> Self {
Self::new_with_buf_size(read_from, Self::DEFAULT_BUF_SIZE)
}

Check warning on line 40 in ipa-core/src/cli/playbook/streaming.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/cli/playbook/streaming.rs#L38-L40

Added lines #L38 - L40 were not covered by tests

/// Creates a new instance with the specified buffer size. All streams created
/// using [`StreamingSubmission::into_byte_streams`] will have their own buffer set.
fn new_with_buf_size(read_from: R, buf_size: NonZeroUsize) -> Self {
Self {
inner: read_from,
buf_size,
}
}
}

impl<R: BufRead + Send> StreamingSubmission for BufferedRoundRobinSubmission<R> {
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream> {
RoundRobinSubmission::new(self.inner)
.into_byte_streams(count)
.into_iter()
.map(|s| BufferedBytesStream::new(s, self.buf_size))
.collect()
}
}

/// Round-Robin strategy to read off the provided buffer
/// and distribute them. Inputs is expected to be hex-encoded
/// and delimited by newlines. The output streams will have
Expand Down Expand Up @@ -149,6 +188,7 @@
#[cfg(all(test, unit_test))]
mod tests {
use std::{
collections::HashSet,
fs::File,
io::{BufReader, Write},
iter,
Expand All @@ -159,24 +199,98 @@
use tempfile::TempDir;

use crate::{
cli::playbook::streaming::{RoundRobinSubmission, StreamingSubmission},
cli::playbook::streaming::{
BufferedRoundRobinSubmission, RoundRobinSubmission, StreamingSubmission,
},
helpers::BytesStream,
test_executor::run,
};

async fn drain_all<S: BytesStream>(streams: Vec<S>) -> Vec<String> {
async fn drain_all_buffered<S: BytesStream>(
streams: Vec<S>,
buf_size: Option<usize>,
) -> Vec<Vec<u8>> {
let mut futs = FuturesOrdered::default();
for s in streams {
futs.push_back(s.try_fold(String::new(), |mut acc, chunk| async move {
// remove RLE decoding
let len = usize::from(u16::from_le_bytes(chunk[..2].try_into().unwrap()));
assert_eq!(len, chunk.len() - 2);
acc.push_str(&String::from_utf8_lossy(&chunk[2..]));
Ok(acc)
}));
futs.push_back(s.try_fold(
(Vec::new(), HashSet::new(), 0, 0),
|(mut acc, mut sizes, mut leftover, mut pending_len), mut chunk| async move {
// keep track of chunk sizes we've seen from the stream. Only the last chunk
// can have size that is not equal to `buf_size`
sizes.insert(chunk.len());

// if we have a leftover from previous buffer, push it first
if leftover > 0 {
let next_chunk = std::cmp::min(leftover, chunk.len());
leftover -= next_chunk;
acc.extend(&chunk.split_to(next_chunk));
}

while !chunk.is_empty() {
// remove RLE decoding
let len = if pending_len > 0 {
// len (2 byte value) can be fragmented as well
let next_byte =
u8::from_le_bytes(chunk.split_to(1).as_ref().try_into().unwrap());
let r = u16::from_le_bytes([pending_len, next_byte]);
pending_len = 0;
r
} else if chunk.len() > 1 {
let len =
u16::from_le_bytes(chunk.split_to(2).as_ref().try_into().unwrap());
len
} else {
pending_len =
u8::from_le_bytes(chunk.split_to(1).as_ref().try_into().unwrap());
assert!(chunk.is_empty());
break;
};

let len = usize::from(len);

// the next item may span across multiple buffers
let take_len = if len > chunk.len() {
leftover = len - chunk.len();
chunk.len()
} else {
len
};
acc.extend(&chunk.split_to(take_len));
}

Ok((acc, sizes, leftover, pending_len))
},
));
}
futs.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.map(|(s, sizes, leftover, pending_len)| {
assert_eq!(0, leftover);
assert_eq!(0, pending_len);

// We can have only one chunk that can be at or less than `buf_size`.
// If there are multiple chunks, then at least one must have `buf_size` and there
// can be at most two chunks.
if let Some(buf_size) = buf_size {
assert!(sizes.len() <= 2);
if sizes.len() > 1 {
assert!(sizes.contains(&buf_size));
}
}

futs.try_collect::<Vec<_>>().await.unwrap()
s
})
.collect()
}

async fn drain_all<S: BytesStream>(streams: Vec<S>) -> Vec<String> {
drain_all_buffered(streams, None)
.await
.into_iter()
.map(|v| String::from_utf8_lossy(&v).to_string())
.collect()
}

fn encoded<I: IntoIterator<Item: AsRef<[u8]>>>(input: I) -> Vec<String> {
Expand All @@ -188,6 +302,12 @@
run(|| verify_one(vec!["foo", "bar", "baz", "qux", "quux"], 3));
}

#[test]
fn basic_buffered() {
run(|| verify_buffered(vec!["foo", "bar", "baz", "qux", "quux"], 1, 1));
run(|| verify_buffered(vec!["foo", "bar", "baz", "qux", "quux"], 3, 5));
}

#[test]
#[should_panic(expected = "InvalidHexCharacter")]
fn non_hex() {
Expand Down Expand Up @@ -272,12 +392,37 @@
assert_eq!(expected, drain_all(streams).await);
}

/// The reason we work with bytes is that string character may span multiple bytes,
/// making [`String::from_utf8`] method work incorrectly as it is not commutative with
/// buffering.
async fn verify_buffered<R: AsRef<[u8]>>(input: Vec<R>, count: usize, buf_size: usize) {
assert!(count > 0);
let data = encoded(input.iter().map(AsRef::as_ref)).join("\n");
let streams = BufferedRoundRobinSubmission::new_with_buf_size(
data.as_bytes(),
buf_size.try_into().unwrap(),
)
.into_byte_streams(count);
let mut expected: Vec<Vec<u8>> = vec![vec![]; count];
for (i, next) in input.into_iter().enumerate() {
expected[i % count].extend(next.as_ref());
}
assert_eq!(expected, drain_all_buffered(streams, Some(buf_size)).await);
}

proptest! {
#[test]
fn proptest_round_robin(input: Vec<String>, count in 1_usize..953) {
run(move || async move {
verify_one(input, count).await;
});
}

#[test]
fn proptest_round_robin_buffered(input: Vec<Vec<u8>>, count in 1_usize..953, buf_size in 1_usize..1024) {
run(move || async move {
verify_buffered(input, count, buf_size).await;
});
}
}
}
10 changes: 5 additions & 5 deletions ipa-core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ pub use transport::{
InMemoryTransportError,
};
pub use transport::{
make_owned_handler, query, routing, ApiError, BodyStream, BroadcastError, BytesStream,
HandlerBox, HandlerRef, HelperResponse, Identity as TransportIdentity, LengthDelimitedStream,
LogErrors, NoQueryId, NoResourceIdentifier, NoStep, QueryIdBinding, ReceiveRecords,
RecordsStream, RequestHandler, RouteParams, SingleRecordStream, StepBinding, StreamCollection,
StreamKey, Transport, WrappedBoxBodyStream,
make_owned_handler, query, routing, ApiError, BodyStream, BroadcastError, BufferedBytesStream,
BytesStream, HandlerBox, HandlerRef, HelperResponse, Identity as TransportIdentity,
LengthDelimitedStream, LogErrors, NoQueryId, NoResourceIdentifier, NoStep, QueryIdBinding,
ReceiveRecords, RecordsStream, RequestHandler, RouteParams, SingleRecordStream, StepBinding,
StreamCollection, StreamKey, Transport, WrappedBoxBodyStream,
};
use typenum::{Const, ToUInt, Unsigned, U8};
use x25519_dalek::PublicKey;
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/helpers/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use receive::{LogErrors, ReceiveRecords};
#[cfg(feature = "web-app")]
pub use stream::WrappedAxumBodyStream;
pub use stream::{
BodyStream, BytesStream, LengthDelimitedStream, RecordsStream, SingleRecordStream,
StreamCollection, StreamKey, WrappedBoxBodyStream,
BodyStream, BufferedBytesStream, BytesStream, LengthDelimitedStream, RecordsStream,
SingleRecordStream, StreamCollection, StreamKey, WrappedBoxBodyStream,
};

/// An identity of a peer that can be communicated with using [`Transport`]. There are currently two
Expand Down
10 changes: 5 additions & 5 deletions ipa-core/src/helpers/transport/stream/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use bytes::Bytes;
use futures::Stream;
use futures::{stream::Fuse, Stream, StreamExt};
use pin_project::pin_project;

use crate::helpers::BytesStream;
Expand All @@ -19,7 +19,7 @@ use crate::helpers::BytesStream;
pub struct BufferedBytesStream<S> {
/// Inner stream to poll
#[pin]
inner: S,
inner: Fuse<S>,
/// Buffer of bytes pending release
buffer: Vec<u8>,
/// Number of bytes released per single poll.
Expand All @@ -28,10 +28,10 @@ pub struct BufferedBytesStream<S> {
sz: usize,
}

impl<S> BufferedBytesStream<S> {
fn new(inner: S, buf_size: NonZeroUsize) -> Self {
impl<S: BytesStream> BufferedBytesStream<S> {
pub fn new(inner: S, buf_size: NonZeroUsize) -> Self {
Self {
inner,
inner: inner.fuse(),
buffer: Vec::with_capacity(buf_size.get()),
sz: buf_size.get(),
}
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/helpers/transport/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#[cfg(feature = "web-app")]
mod axum_body;
mod box_body;
#[allow(dead_code)]
mod buffered;
mod collection;
mod input;
Expand All @@ -14,6 +13,7 @@ use std::{
#[cfg(feature = "web-app")]
pub use axum_body::WrappedAxumBodyStream;
pub use box_body::WrappedBoxBodyStream;
pub use buffered::BufferedBytesStream;
use bytes::Bytes;
pub use collection::{StreamCollection, StreamKey};
use futures::{stream::iter, Stream};
Expand Down
Loading