diff --git a/ipa-core/src/bin/report_collector.rs b/ipa-core/src/bin/report_collector.rs
index fb41fcb9a..ef00e5471 100644
--- a/ipa-core/src/bin/report_collector.rs
+++ b/ipa-core/src/bin/report_collector.rs
@@ -5,6 +5,7 @@ use std::{
     fs::{File, OpenOptions},
     io,
     io::{stdout, BufReader, Write},
+    num::NonZeroUsize,
     ops::Deref,
     path::{Path, PathBuf},
 };
@@ -15,8 +16,8 @@ use ipa_core::{
     cli::{
         playbook::{
             make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate,
-            run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource,
-            RoundRobinSubmission, StreamingSubmission,
+            run_query_and_validate, validate, validate_dp, BufferedRoundRobinSubmission,
+            HybridQueryResult, InputSource, StreamingSubmission,
         },
         CsvSerializer, IpaQueryResult, Verbosity,
     },
@@ -430,6 +431,9 @@ async fn hybrid(
     count: usize,
     set_fixed_polling_ms: Option<u64>,
 ) -> Result<(), Box<dyn Error>> {
+    // twice the size of TCP MSS. This may get messed up if TCP options are used which is not
+    // in our control, but hopefully fragmentation is not too bad
+    const BUF_SIZE: NonZeroUsize = NonZeroUsize::new(1072).unwrap();
     let query_type = QueryType::MaliciousHybrid(hybrid_query_config);
 
     let [h1_streams, h2_streams, h3_streams] = [
@@ -439,7 +443,7 @@ async fn hybrid(
     ]
     .map(|path| {
         let file = File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
-        RoundRobinSubmission::new(BufReader::new(file))
+        BufferedRoundRobinSubmission::new(BufReader::new(file), BUF_SIZE)
     })
     .map(|s| s.into_byte_streams(args.shard_count));
 
diff --git a/ipa-core/src/cli/playbook/mod.rs b/ipa-core/src/cli/playbook/mod.rs
index fc8b46e9e..46b1645fb 100644
--- a/ipa-core/src/cli/playbook/mod.rs
+++ b/ipa-core/src/cli/playbook/mod.rs
@@ -22,7 +22,7 @@ use tokio::time::sleep;
 pub use self::{
     hybrid::{run_hybrid_query_and_validate, HybridQueryResult},
     ipa::{playbook_oprf_ipa, run_query_and_validate},
-    streaming::{RoundRobinSubmission, StreamingSubmission},
+    streaming::{BufferedRoundRobinSubmission, StreamingSubmission},
 };
 use crate::{
     cli::config_parse::HelperNetworkConfigParseExt,
diff --git a/ipa-core/src/cli/playbook/streaming.rs b/ipa-core/src/cli/playbook/streaming.rs
index f246582ee..f21770436 100644
--- a/ipa-core/src/cli/playbook/streaming.rs
+++ b/ipa-core/src/cli/playbook/streaming.rs
@@ -1,5 +1,6 @@
 use std::{
     io::BufRead,
+    num::NonZeroUsize,
     pin::Pin,
     task::{Context, Poll, Waker},
 };
@@ -9,7 +10,7 @@ use futures::Stream;
 
 use crate::{
     error::BoxError,
-    helpers::BytesStream,
+    helpers::{BufferedBytesStream, BytesStream},
     sync::{Arc, Mutex},
 };
 
@@ -20,6 +21,34 @@ pub trait StreamingSubmission {
     fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream>;
 }
 
+/// Same as [`RoundRobinSubmission`] but buffers the destination stream
+/// until it accumulates at least `buf_size` bytes of data
+pub struct BufferedRoundRobinSubmission<R> {
+    inner: R,
+    buf_size: NonZeroUsize,
+}
+
+impl<R: BufRead> BufferedRoundRobinSubmission<R> {
+    /// Creates a new instance with the specified buffer size. All streams created
+    /// using [`StreamingSubmission::into_byte_streams`] will have their own buffer set.
+    pub fn new(read_from: R, buf_size: NonZeroUsize) -> Self {
+        Self {
+            inner: read_from,
+            buf_size,
+        }
+    }
+}
+
+impl<R: BufRead + Send> StreamingSubmission for BufferedRoundRobinSubmission<R> {
+    fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream> {
+        RoundRobinSubmission::new(self.inner)
+            .into_byte_streams(count)
+            .into_iter()
+            .map(|s| BufferedBytesStream::new(s, self.buf_size))
+            .collect()
+    }
+}
+
 /// Round-Robin strategy to read off the provided buffer
 /// and distribute them. Inputs is expected to be hex-encoded
 /// and delimited by newlines. The output streams will have
@@ -149,6 +178,7 @@ impl<R: BufRead> State<R> {
 #[cfg(all(test, unit_test))]
 mod tests {
     use std::{
+        collections::HashSet,
         fs::File,
         io::{BufReader, Write},
         iter,
@@ -159,24 +189,98 @@ mod tests {
     use tempfile::TempDir;
 
     use crate::{
-        cli::playbook::streaming::{RoundRobinSubmission, StreamingSubmission},
+        cli::playbook::streaming::{
+            BufferedRoundRobinSubmission, RoundRobinSubmission, StreamingSubmission,
+        },
         helpers::BytesStream,
         test_executor::run,
     };
 
-    async fn drain_all<S: BytesStream>(streams: Vec<S>) -> Vec<String> {
+    async fn drain_all_buffered<S: BytesStream>(
+        streams: Vec<S>,
+        buf_size: Option<usize>,
+    ) -> Vec<Vec<u8>> {
         let mut futs = FuturesOrdered::default();
         for s in streams {
-            futs.push_back(s.try_fold(String::new(), |mut acc, chunk| async move {
-                // remove RLE decoding
-                let len = usize::from(u16::from_le_bytes(chunk[..2].try_into().unwrap()));
-                assert_eq!(len, chunk.len() - 2);
-                acc.push_str(&String::from_utf8_lossy(&chunk[2..]));
-                Ok(acc)
-            }));
+            futs.push_back(s.try_fold(
+                (Vec::new(), HashSet::new(), 0, 0),
+                |(mut acc, mut sizes, mut leftover, mut pending_len), mut chunk| async move {
+                    // keep track of chunk sizes we've seen from the stream. Only the last chunk
+                    // can have size that is not equal to `buf_size`
+                    sizes.insert(chunk.len());
+
+                    // if we have a leftover from previous buffer, push it first
+                    if leftover > 0 {
+                        let next_chunk = std::cmp::min(leftover, chunk.len());
+                        leftover -= next_chunk;
+                        acc.extend(&chunk.split_to(next_chunk));
+                    }
+
+                    while !chunk.is_empty() {
+                        // remove RLE decoding
+                        let len = if pending_len > 0 {
+                            // len (2 byte value) can be fragmented as well
+                            let next_byte =
+                                u8::from_le_bytes(chunk.split_to(1).as_ref().try_into().unwrap());
+                            let r = u16::from_le_bytes([pending_len, next_byte]);
+                            pending_len = 0;
+                            r
+                        } else if chunk.len() > 1 {
+                            let len =
+                                u16::from_le_bytes(chunk.split_to(2).as_ref().try_into().unwrap());
+                            len
+                        } else {
+                            pending_len =
+                                u8::from_le_bytes(chunk.split_to(1).as_ref().try_into().unwrap());
+                            assert!(chunk.is_empty());
+                            break;
+                        };
+
+                        let len = usize::from(len);
+
+                        // the next item may span across multiple buffers
+                        let take_len = if len > chunk.len() {
+                            leftover = len - chunk.len();
+                            chunk.len()
+                        } else {
+                            len
+                        };
+                        acc.extend(&chunk.split_to(take_len));
+                    }
+
+                    Ok((acc, sizes, leftover, pending_len))
+                },
+            ));
         }
+        futs.try_collect::<Vec<_>>()
+            .await
+            .unwrap()
+            .into_iter()
+            .map(|(s, sizes, leftover, pending_len)| {
+                assert_eq!(0, leftover);
+                assert_eq!(0, pending_len);
+
+                // We can have only one chunk that can be at or less than `buf_size`.
+                // If there are multiple chunks, then at least one must have `buf_size` and there
+                // can be at most two chunks.
+                if let Some(buf_size) = buf_size {
+                    assert!(sizes.len() <= 2);
+                    if sizes.len() > 1 {
+                        assert!(sizes.contains(&buf_size));
+                    }
+                }
+
+                s
+            })
+            .collect()
+    }
 
-        futs.try_collect::<Vec<_>>().await.unwrap()
+    async fn drain_all<S: BytesStream>(streams: Vec<S>) -> Vec<String> {
+        drain_all_buffered(streams, None)
+            .await
+            .into_iter()
+            .map(|v| String::from_utf8_lossy(&v).to_string())
+            .collect()
     }
 
     fn encoded<I: IntoIterator<Item: AsRef<[u8]>>>(input: I) -> Vec<String> {
@@ -188,6 +292,12 @@ mod tests {
         run(|| verify_one(vec!["foo", "bar", "baz", "qux", "quux"], 3));
     }
 
+    #[test]
+    fn basic_buffered() {
+        run(|| verify_buffered(vec!["foo", "bar", "baz", "qux", "quux"], 1, 1));
+        run(|| verify_buffered(vec!["foo", "bar", "baz", "qux", "quux"], 3, 5));
+    }
+
     #[test]
     #[should_panic(expected = "InvalidHexCharacter")]
     fn non_hex() {
@@ -272,6 +382,22 @@ mod tests {
         assert_eq!(expected, drain_all(streams).await);
     }
 
+    /// The reason we work with bytes is that string character may span multiple bytes,
+    /// making [`String::from_utf8`] method work incorrectly as it is not commutative with
+    /// buffering.
+    async fn verify_buffered<R: AsRef<[u8]>>(input: Vec<R>, count: usize, buf_size: usize) {
+        assert!(count > 0);
+        let data = encoded(input.iter().map(AsRef::as_ref)).join("\n");
+        let streams =
+            BufferedRoundRobinSubmission::new(data.as_bytes(), buf_size.try_into().unwrap())
+                .into_byte_streams(count);
+        let mut expected: Vec<Vec<u8>> = vec![vec![]; count];
+        for (i, next) in input.into_iter().enumerate() {
+            expected[i % count].extend(next.as_ref());
+        }
+        assert_eq!(expected, drain_all_buffered(streams, Some(buf_size)).await);
+    }
+
     proptest! {
         #[test]
         fn proptest_round_robin(input: Vec<String>, count in 1_usize..953) {
@@ -279,5 +405,12 @@ mod tests {
                 verify_one(input, count).await;
             });
         }
+
+        #[test]
+        fn proptest_round_robin_buffered(input: Vec<Vec<u8>>, count in 1_usize..953, buf_size in 1_usize..1024) {
+            run(move || async move {
+                verify_buffered(input, count, buf_size).await;
+            });
+        }
     }
 }
diff --git a/ipa-core/src/helpers/mod.rs b/ipa-core/src/helpers/mod.rs
index 2b8e27868..eae6d8439 100644
--- a/ipa-core/src/helpers/mod.rs
+++ b/ipa-core/src/helpers/mod.rs
@@ -75,11 +75,11 @@ pub use transport::{
     InMemoryTransportError,
 };
 pub use transport::{
-    make_owned_handler, query, routing, ApiError, BodyStream, BroadcastError, BytesStream,
-    HandlerBox, HandlerRef, HelperResponse, Identity as TransportIdentity, LengthDelimitedStream,
-    LogErrors, NoQueryId, NoResourceIdentifier, NoStep, QueryIdBinding, ReceiveRecords,
-    RecordsStream, RequestHandler, RouteParams, SingleRecordStream, StepBinding, StreamCollection,
-    StreamKey, Transport, WrappedBoxBodyStream,
+    make_owned_handler, query, routing, ApiError, BodyStream, BroadcastError, BufferedBytesStream,
+    BytesStream, HandlerBox, HandlerRef, HelperResponse, Identity as TransportIdentity,
+    LengthDelimitedStream, LogErrors, NoQueryId, NoResourceIdentifier, NoStep, QueryIdBinding,
+    ReceiveRecords, RecordsStream, RequestHandler, RouteParams, SingleRecordStream, StepBinding,
+    StreamCollection, StreamKey, Transport, WrappedBoxBodyStream,
 };
 use typenum::{Const, ToUInt, Unsigned, U8};
 use x25519_dalek::PublicKey;
diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs
index 6b8341966..39ee749f2 100644
--- a/ipa-core/src/helpers/transport/mod.rs
+++ b/ipa-core/src/helpers/transport/mod.rs
@@ -35,8 +35,8 @@ pub use receive::{LogErrors, ReceiveRecords};
 #[cfg(feature = "web-app")]
 pub use stream::WrappedAxumBodyStream;
 pub use stream::{
-    BodyStream, BytesStream, LengthDelimitedStream, RecordsStream, SingleRecordStream,
-    StreamCollection, StreamKey, WrappedBoxBodyStream,
+    BodyStream, BufferedBytesStream, BytesStream, LengthDelimitedStream, RecordsStream,
+    SingleRecordStream, StreamCollection, StreamKey, WrappedBoxBodyStream,
 };
 
 /// An identity of a peer that can be communicated with using [`Transport`]. There are currently two
diff --git a/ipa-core/src/helpers/transport/stream/buffered.rs b/ipa-core/src/helpers/transport/stream/buffered.rs
index 7efc12112..b0b332fb1 100644
--- a/ipa-core/src/helpers/transport/stream/buffered.rs
+++ b/ipa-core/src/helpers/transport/stream/buffered.rs
@@ -6,7 +6,7 @@ use std::{
 };
 
 use bytes::Bytes;
-use futures::Stream;
+use futures::{stream::Fuse, Stream, StreamExt};
 use pin_project::pin_project;
 
 use crate::helpers::BytesStream;
@@ -19,7 +19,7 @@ use crate::helpers::BytesStream;
 pub struct BufferedBytesStream<S> {
     /// Inner stream to poll
     #[pin]
-    inner: S,
+    inner: Fuse<S>,
     /// Buffer of bytes pending release
     buffer: Vec<u8>,
     /// Number of bytes released per single poll.
@@ -28,10 +28,10 @@ pub struct BufferedBytesStream<S> {
     sz: usize,
 }
 
-impl<S> BufferedBytesStream<S> {
-    fn new(inner: S, buf_size: NonZeroUsize) -> Self {
+impl<S: BytesStream> BufferedBytesStream<S> {
+    pub fn new(inner: S, buf_size: NonZeroUsize) -> Self {
         Self {
-            inner,
+            inner: inner.fuse(),
             buffer: Vec::with_capacity(buf_size.get()),
             sz: buf_size.get(),
         }
diff --git a/ipa-core/src/helpers/transport/stream/mod.rs b/ipa-core/src/helpers/transport/stream/mod.rs
index ac39fbb22..9e6e32a63 100644
--- a/ipa-core/src/helpers/transport/stream/mod.rs
+++ b/ipa-core/src/helpers/transport/stream/mod.rs
@@ -1,7 +1,6 @@
 #[cfg(feature = "web-app")]
 mod axum_body;
 mod box_body;
-#[allow(dead_code)]
 mod buffered;
 mod collection;
 mod input;
@@ -14,6 +13,7 @@ use std::{
 #[cfg(feature = "web-app")]
 pub use axum_body::WrappedAxumBodyStream;
 pub use box_body::WrappedBoxBodyStream;
+pub use buffered::BufferedBytesStream;
 use bytes::Bytes;
 pub use collection::{StreamCollection, StreamKey};
 use futures::{stream::iter, Stream};