diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs index 46332eb20ee6..c874dd089ee9 100644 --- a/core/src/raw/oio/read/api.rs +++ b/core/src/raw/oio/read/api.rs @@ -84,14 +84,11 @@ pub trait Read: Unpin + Send + Sync { /// /// Storage services should try to read as much as possible, only return bytes less than the /// limit while reaching the end of the file. - #[cfg(not(target_arch = "wasm32"))] fn read_at( &self, offset: u64, limit: usize, ) -> impl Future> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn read_at(&self, offset: u64, limit: usize) -> impl Future>; } impl Read for () { @@ -124,31 +121,12 @@ pub trait ReadDyn: Unpin + Send + Sync { /// /// This function returns a boxed future to make it object safe. fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture>; - - /// The static version of [`Read::read_at`]. - /// - /// This function returns a `'static` future by moving `self` into the - /// future. Caller can call `Box::pin` to build a static boxed future. - fn read_at_static( - self, - offset: u64, - limit: usize, - ) -> impl Future> + MaybeSend + 'static - where - Self: Sized + 'static; } impl ReadDyn for T { fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture> { Box::pin(self.read_at(offset, limit)) } - - async fn read_at_static(self, offset: u64, limit: usize) -> Result - where - Self: Sized + 'static, - { - self.read_at(offset, limit).await - } } /// # NOTE diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index 4031ac79a5ba..cc82a645e00b 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -127,7 +127,6 @@ pub mod into_stream { sync::{atomic::AtomicBool, Arc}, }; - use crate::raw::oio::ReadDyn; use crate::raw::*; use crate::*; @@ -183,7 +182,7 @@ pub mod into_stream { // Update self.offset before building future. self.offset += limit as u64; let fut = async move { - let buf = r.read_at_static(offset, limit).await?; + let buf = r.read_at_dyn(offset, limit).await?; if buf.len() < limit || limit == 0 { // Update finished marked if buf is less than limit. finished.store(true, Ordering::Relaxed); @@ -363,7 +362,6 @@ pub mod into_futures_stream { use std::task::Context; use std::task::Poll; - use bytes::Buf; use bytes::Bytes; use futures::Stream; @@ -376,6 +374,7 @@ pub mod into_futures_stream { /// /// FuturesStream also implements [`Unpin`], [`Send`] and [`Sync`]. pub struct FuturesBytesStream { + r: oio::Reader, state: State, offset: u64, size: u64, @@ -385,8 +384,8 @@ pub mod into_futures_stream { } enum State { - Idle(Option), - Next(BoxedStaticFuture<(oio::Reader, Result)>), + Idle(Buffer), + Next(BoxedStaticFuture>), } /// # Safety @@ -399,7 +398,8 @@ pub mod into_futures_stream { #[inline] pub(crate) fn new(r: oio::Reader, range: Range) -> Self { FuturesBytesStream { - state: State::Idle(Some(r)), + r, + state: State::Idle(Buffer::new()), offset: range.start, size: range.end - range.start, // TODO: should use services preferred io size. @@ -424,31 +424,30 @@ pub mod into_futures_stream { loop { match &mut this.state { - State::Idle(r) => { + State::Idle(buf) => { + // Consume current buffer + if let Some(bs) = buf.next() { + return Poll::Ready(Some(Ok(bs))); + } + // Make sure cur didn't exceed size. if this.cur >= this.size { return Poll::Ready(None); } - let r = r.take().expect("reader must be present"); + let r = this.r.clone(); let next_offset = this.offset + this.cur; let next_size = (this.size - this.cur).min(this.cap as u64) as usize; - let fut = async move { - let res = r.read_at_dyn(next_offset, next_size).await; - (r, res) - }; + let fut = async move { r.read_at_dyn(next_offset, next_size).await }; this.state = State::Next(Box::pin(fut)); } State::Next(fut) => { - let (r, res) = ready!(fut.as_mut().poll(cx)); - this.state = State::Idle(Some(r)); - return match res { - Ok(buf) if !buf.has_remaining() => Poll::Ready(None), - Ok(mut buf) => { - this.cur += buf.remaining() as u64; - Poll::Ready(Some(Ok(buf.copy_to_bytes(buf.remaining())))) + let res = ready!(fut.as_mut().poll(cx)); + match res { + Ok(buf) => { + this.state = State::Idle(buf); } - Err(err) => Poll::Ready(Some(Err(format_std_io_error(err)))), + Err(err) => return Poll::Ready(Some(Err(format_std_io_error(err)))), }; } }