diff --git a/ipa-core/src/cli/playbook/ipa.rs b/ipa-core/src/cli/playbook/ipa.rs index b688927426..8e7352f52b 100644 --- a/ipa-core/src/cli/playbook/ipa.rs +++ b/ipa-core/src/cli/playbook/ipa.rs @@ -94,7 +94,7 @@ where } tracing::info!("Starting query for OPRF, sending {} bytes towards each helper", buffers[0].len()); - let inputs = buffers.map(BodyStream::from); + let inputs = buffers.map(BodyStream::from_byte_vec); run_query_and_validate::(inputs, query_size, clients, query_id, query_config).await } diff --git a/ipa-core/src/helpers/transport/stream/axum_body.rs b/ipa-core/src/helpers/transport/stream/axum_body.rs index 4c90e10aee..78e7905f60 100644 --- a/ipa-core/src/helpers/transport/stream/axum_body.rs +++ b/ipa-core/src/helpers/transport/stream/axum_body.rs @@ -6,6 +6,7 @@ use std::{ use axum::body::{Body, BodyDataStream}; use bytes::Bytes; use futures::{Stream, StreamExt}; +use futures_util::stream; use pin_project::pin_project; use tokio_stream::wrappers::ReceiverStream; @@ -37,18 +38,50 @@ impl Stream for WrappedAxumBodyStream { } // Note that it is possible (although unlikely) that `from_body` panics. -#[cfg(any(test, feature = "test-fixture"))] -impl> From for WrappedAxumBodyStream { - fn from(buf: Buf) -> Self { - Self::new(Body::from(buf.into())) - } -} +// #[cfg(any(test, feature = "test-fixture"))] +// impl> From for WrappedAxumBodyStream { +// fn from(buf: Buf) -> Self { +// Self::new(Body::from(buf.into())) +// } +// } + +// #[cfg(any(test, feature = "test-fixture"))] +// impl From> for WrappedAxumBodyStream { +// fn from(buf: Vec) -> Self { +// const MAX_CHUNK_SIZE: usize = 1 << 16; // 64 KiB +// let mut segment = Bytes::from(buf); +// let mut segments = Vec::with_capacity(segment.len() / MAX_CHUNK_SIZE); +// while segment.len() > MAX_CHUNK_SIZE { +// segments.push(Ok::<_, BoxError>(segment.split_to(MAX_CHUNK_SIZE))); +// } +// segments.push(Ok::<_, BoxError>(segment)); +// +// tracing::info!("created body with {} chunks, each does not exceed {} size", segments.len(), MAX_CHUNK_SIZE); +// Self::new(Body::from_stream(stream::iter(segments))) +// // let stream = stream::iter(buf.chunks(MAX_CHUNK_SIZE).map(Ok::<_, BoxError>)); +// // Self::new(Body::from_stream(stream)) +// // Self::new(Body::from(buf.into())) +// } +// } impl WrappedAxumBodyStream { #[must_use] pub fn from_receiver_stream(receiver: Box>>) -> Self { Self::new(Body::from_stream(receiver)) } + + pub fn from_byte_vec(buf: Vec) -> Self { + const MAX_CHUNK_SIZE: usize = 1 << 16; // 64 KiB + let mut segment = Bytes::from(buf); + let mut segments = Vec::with_capacity(segment.len() / MAX_CHUNK_SIZE); + while segment.len() > MAX_CHUNK_SIZE { + segments.push(Ok::<_, BoxError>(segment.split_to(MAX_CHUNK_SIZE))); + } + segments.push(Ok::<_, BoxError>(segment)); + + tracing::info!("created body with {} chunks, each does not exceed {} size", segments.len(), MAX_CHUNK_SIZE); + Self::new(Body::from_stream(stream::iter(segments))) + } } #[cfg(feature = "real-world-infra")]