diff --git a/ipa-core/src/cli/playbook/ipa.rs b/ipa-core/src/cli/playbook/ipa.rs index 777eb7276..1b5a511ce 100644 --- a/ipa-core/src/cli/playbook/ipa.rs +++ b/ipa-core/src/cli/playbook/ipa.rs @@ -94,7 +94,7 @@ where ) } - let inputs = buffers.map(BodyStream::from); + let inputs = buffers.map(BodyStream::from_byte_vec); tracing::info!("Starting query for OPRF"); run_query_and_validate::(inputs, query_size, clients, query_id, query_config).await diff --git a/ipa-core/src/helpers/transport/stream/box_body.rs b/ipa-core/src/helpers/transport/stream/box_body.rs index d43c04640..7d68ab159 100644 --- a/ipa-core/src/helpers/transport/stream/box_body.rs +++ b/ipa-core/src/helpers/transport/stream/box_body.rs @@ -31,6 +31,19 @@ impl WrappedBoxBodyStream { pub fn empty() -> Self { WrappedBoxBodyStream(Box::pin(futures::stream::empty())) } + + pub fn from_byte_vec(buf: Vec) -> Self { + const MAX_CHUNK_SIZE: usize = 1 << 16; // 64 KiB + let mut segment = Bytes::from(buf); + let mut segments = Vec::with_capacity(segment.len() / MAX_CHUNK_SIZE); + while segment.len() > MAX_CHUNK_SIZE { + segments.push(Ok(segment.split_to(MAX_CHUNK_SIZE))); + } + segments.push(Ok(segment)); + + tracing::info!("[in-memory-infra] created body with {} chunks, each does not exceed {} size", segments.len(), MAX_CHUNK_SIZE); + Self::from_bytes_stream(futures::stream::iter(segments)) + } } impl Stream for WrappedBoxBodyStream {