From 4f947aef59fe5d05148e17d1ca614e47e6ae4760 Mon Sep 17 00:00:00 2001 From: Predrag Gruevski Date: Thu, 9 Jan 2025 20:42:59 +0000 Subject: [PATCH] Expose a `Form::into_stream()` method on async multipart forms. The async equivalent of the change in #2524. An example use case is compressing multipart form data with zstd. The entire contents of the payload have to be compressed together, which requires accessing the contents of the Form. The existing stream functionality mostly implements what we need, but just isn't publicly accessible. This PR adds a pub method for it. --- src/async_impl/multipart.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/async_impl/multipart.rs b/src/async_impl/multipart.rs index 494bd5d9a..75f3c1dbf 100644 --- a/src/async_impl/multipart.rs +++ b/src/async_impl/multipart.rs @@ -140,11 +140,23 @@ impl Form { } /// Consume this instance and transform into an instance of Body for use in a request. - pub(crate) fn stream(mut self) -> Body { + pub(crate) fn stream(self) -> Body { if self.inner.fields.is_empty() { return Body::empty(); } + Body::stream(self.into_stream()) + } + + /// Produce a stream of the bytes in this `Form`, consuming it. + pub fn into_stream(mut self) -> impl Stream> + Send + Sync { + if self.inner.fields.is_empty() { + let empty_stream: Pin< + Box> + Send + Sync>, + > = Box::pin(futures_util::stream::empty()); + return empty_stream; + } + // create initial part to init reduce chain let (name, part) = self.inner.fields.remove(0); let start = Box::pin(self.part_stream(name, part)) @@ -161,7 +173,7 @@ impl Form { let last = stream::once(future::ready(Ok( format!("--{}--\r\n", self.boundary()).into() ))); - Body::stream(stream.chain(last)) + Box::pin(stream.chain(last)) } /// Generate a hyper::Body stream for a single Part instance of a Form request.