Skip to content

Commit

Permalink
refactor(core): Polish FuturesBytesStream by avoiding extra copy (#4474)
Browse files Browse the repository at this point in the history
* refactor(core): Polish FuturesBytesStream by avoiding extra copy

Signed-off-by: Xuanwo <[email protected]>

* remove not needed read at static

Signed-off-by: Xuanwo <[email protected]>

* cleanup

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Apr 13, 2024
1 parent 275224d commit 8e84d81
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 42 deletions.
22 changes: 0 additions & 22 deletions core/src/raw/oio/read/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,11 @@ pub trait Read: Unpin + Send + Sync {
///
/// Storage services should try to read as much as possible, only return bytes less than the
/// limit while reaching the end of the file.
#[cfg(not(target_arch = "wasm32"))]
fn read_at(
&self,
offset: u64,
limit: usize,
) -> impl Future<Output = Result<Buffer>> + MaybeSend;
#[cfg(target_arch = "wasm32")]
fn read_at(&self, offset: u64, limit: usize) -> impl Future<Output = Result<Buffer>>;
}

impl Read for () {
Expand Down Expand Up @@ -124,31 +121,12 @@ pub trait ReadDyn: Unpin + Send + Sync {
///
/// This function returns a boxed future to make it object safe.
fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture<Result<Buffer>>;

/// The static version of [`Read::read_at`].
///
/// This function returns a `'static` future by moving `self` into the
/// future. Caller can call `Box::pin` to build a static boxed future.
fn read_at_static(
self,
offset: u64,
limit: usize,
) -> impl Future<Output = Result<Buffer>> + MaybeSend + 'static
where
Self: Sized + 'static;
}

impl<T: Read + ?Sized> ReadDyn for T {
fn read_at_dyn(&self, offset: u64, limit: usize) -> BoxedFuture<Result<Buffer>> {
Box::pin(self.read_at(offset, limit))
}

async fn read_at_static(self, offset: u64, limit: usize) -> Result<Buffer>
where
Self: Sized + 'static,
{
self.read_at(offset, limit).await
}
}

/// # NOTE
Expand Down
39 changes: 19 additions & 20 deletions core/src/types/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ pub mod into_stream {
sync::{atomic::AtomicBool, Arc},
};

use crate::raw::oio::ReadDyn;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -183,7 +182,7 @@ pub mod into_stream {
// Update self.offset before building future.
self.offset += limit as u64;
let fut = async move {
let buf = r.read_at_static(offset, limit).await?;
let buf = r.read_at_dyn(offset, limit).await?;
if buf.len() < limit || limit == 0 {
// Update finished marked if buf is less than limit.
finished.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -363,7 +362,6 @@ pub mod into_futures_stream {
use std::task::Context;
use std::task::Poll;

use bytes::Buf;
use bytes::Bytes;
use futures::Stream;

Expand All @@ -376,6 +374,7 @@ pub mod into_futures_stream {
///
/// FuturesStream also implements [`Unpin`], [`Send`] and [`Sync`].
pub struct FuturesBytesStream {
r: oio::Reader,
state: State,
offset: u64,
size: u64,
Expand All @@ -385,8 +384,8 @@ pub mod into_futures_stream {
}

enum State {
Idle(Option<oio::Reader>),
Next(BoxedStaticFuture<(oio::Reader, Result<Buffer>)>),
Idle(Buffer),
Next(BoxedStaticFuture<Result<Buffer>>),
}

/// # Safety
Expand All @@ -399,7 +398,8 @@ pub mod into_futures_stream {
#[inline]
pub(crate) fn new(r: oio::Reader, range: Range<u64>) -> Self {
FuturesBytesStream {
state: State::Idle(Some(r)),
r,
state: State::Idle(Buffer::new()),
offset: range.start,
size: range.end - range.start,
// TODO: should use services preferred io size.
Expand All @@ -424,31 +424,30 @@ pub mod into_futures_stream {

loop {
match &mut this.state {
State::Idle(r) => {
State::Idle(buf) => {
// Consume current buffer
if let Some(bs) = buf.next() {
return Poll::Ready(Some(Ok(bs)));
}

// Make sure cur didn't exceed size.
if this.cur >= this.size {
return Poll::Ready(None);
}

let r = r.take().expect("reader must be present");
let r = this.r.clone();
let next_offset = this.offset + this.cur;
let next_size = (this.size - this.cur).min(this.cap as u64) as usize;
let fut = async move {
let res = r.read_at_dyn(next_offset, next_size).await;
(r, res)
};
let fut = async move { r.read_at_dyn(next_offset, next_size).await };
this.state = State::Next(Box::pin(fut));
}
State::Next(fut) => {
let (r, res) = ready!(fut.as_mut().poll(cx));
this.state = State::Idle(Some(r));
return match res {
Ok(buf) if !buf.has_remaining() => Poll::Ready(None),
Ok(mut buf) => {
this.cur += buf.remaining() as u64;
Poll::Ready(Some(Ok(buf.copy_to_bytes(buf.remaining()))))
let res = ready!(fut.as_mut().poll(cx));
match res {
Ok(buf) => {
this.state = State::Idle(buf);
}
Err(err) => Poll::Ready(Some(Err(format_std_io_error(err)))),
Err(err) => return Poll::Ready(Some(Err(format_std_io_error(err)))),
};
}
}
Expand Down

0 comments on commit 8e84d81

Please sign in to comment.