diff --git a/core/src/raw/oio/buf/buffer.rs b/core/src/raw/oio/buf/buffer.rs index 7eae6bbf5484..b547463ca72c 100644 --- a/core/src/raw/oio/buf/buffer.rs +++ b/core/src/raw/oio/buf/buffer.rs @@ -97,7 +97,6 @@ impl From for Buffer { } } -/// Transform `VecDeque` to `Arc<[Bytes]>`. impl From> for Buffer { fn from(bs: VecDeque) -> Self { let size = bs.iter().map(|b| b.len()).sum(); @@ -110,7 +109,6 @@ impl From> for Buffer { } } -/// Transform `Vec` to `Arc<[Bytes]>`. impl From> for Buffer { fn from(bs: Vec) -> Self { let size = bs.iter().map(|b| b.len()).sum(); @@ -123,6 +121,18 @@ impl From> for Buffer { } } +impl From> for Buffer { + fn from(bs: Arc<[Bytes]>) -> Self { + let size = bs.iter().map(|b| b.len()).sum(); + Self(Inner::NonContiguous { + parts: bs, + size, + idx: 0, + offset: 0, + }) + } +} + impl Buf for Buffer { #[inline] fn remaining(&self) -> usize { @@ -238,80 +248,6 @@ impl Iterator for Buffer { } } -/// BufferQueue is a queue of [`Buffer`]. -/// -/// It's works like a `Vec` but with more efficient `advance` operation. -#[derive(Default)] -pub struct BufferQueue(VecDeque); - -impl BufferQueue { - /// Create a new buffer queue. - #[inline] - pub fn new() -> Self { - Self::default() - } - - /// Push new [`Buffer`] into the queue. - #[inline] - pub fn push(&mut self, buf: Buffer) { - self.0.push_back(buf); - } - - /// Total bytes size inside the buffer queue. - #[inline] - pub fn len(&self) -> usize { - self.0.iter().map(|b| b.len()).sum() - } - - /// Is the buffer queue empty. - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Build a Buffer from the queue. - #[inline] - pub fn to_buffer(&self) -> Buffer { - if self.0.is_empty() { - Buffer::new() - } else if self.0.len() == 1 { - self.0.clone().pop_front().unwrap() - } else { - let mut bytes = Vec::with_capacity(self.0.iter().map(|b| b.size_hint().0).sum()); - for buf in self.0.clone() { - for bs in buf { - bytes.push(bs); - } - } - Buffer::from(bytes) - } - } - - /// Advance the buffer queue by `cnt` bytes. - #[inline] - pub fn advance(&mut self, cnt: usize) { - assert!(cnt <= self.len(), "cannot advance past {cnt} bytes"); - - let mut new_cnt = cnt; - while new_cnt > 0 { - let buf = self.0.front_mut().expect("buffer must be valid"); - if new_cnt < buf.remaining() { - buf.advance(new_cnt); - break; - } else { - new_cnt -= buf.remaining(); - self.0.pop_front(); - } - } - } - - /// Clear the buffer queue. - #[inline] - pub fn clear(&mut self) { - self.0.clear() - } -} - #[cfg(test)] mod tests { use pretty_assertions::assert_eq; diff --git a/core/src/raw/oio/buf/mod.rs b/core/src/raw/oio/buf/mod.rs index 48a4ff8477ca..bd8e354c3dda 100644 --- a/core/src/raw/oio/buf/mod.rs +++ b/core/src/raw/oio/buf/mod.rs @@ -17,7 +17,9 @@ mod buffer; pub use buffer::Buffer; -pub use buffer::BufferQueue; mod flex_buf; pub use flex_buf::FlexBuf; + +mod queue_buf; +pub use queue_buf::QueueBuf; diff --git a/core/src/raw/oio/buf/queue_buf.rs b/core/src/raw/oio/buf/queue_buf.rs new file mode 100644 index 000000000000..b6114c1002bc --- /dev/null +++ b/core/src/raw/oio/buf/queue_buf.rs @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::oio::Buffer; +use bytes::{Buf, Bytes}; +use std::collections::VecDeque; +use std::sync::Arc; + +/// QueueBuf is a queue of [`Buffer`]. +/// +/// It's designed to allow storing multiple buffers without copying underlying bytes and consume them +/// in order. +/// +/// QueueBuf mainly provides the following operations: +/// +/// - `push`: Push a new buffer in the queue. +/// - `collect`: Collect all buffer in the queue as a new [`Buffer`] +/// - `advance`: Advance the queue by `cnt` bytes. +#[derive(Default)] +pub struct QueueBuf(VecDeque); + +impl QueueBuf { + /// Create a new buffer queue. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Push new [`Buffer`] into the queue. + #[inline] + pub fn push(&mut self, buf: Buffer) { + self.0.push_back(buf); + } + + /// Total bytes size inside the buffer queue. + #[inline] + pub fn len(&self) -> usize { + self.0.iter().map(|b| b.len()).sum() + } + + /// Is the buffer queue empty. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Build a new [`Buffer`] from the queue. + /// + /// If the queue is empty, it will return an empty buffer. Otherwise, it will iterate over all + /// buffers and collect them into a new buffer. + /// + /// # Notes + /// + /// There are allocation overheads when collecting multiple buffers into a new buffer. But + /// most of them should be acceptable since we can expect the item length of buffers are slower + /// than 4k. + #[inline] + pub fn collect(&self) -> Buffer { + if self.0.is_empty() { + Buffer::new() + } else if self.0.len() == 1 { + self.0.clone().pop_front().unwrap() + } else { + let iter = self.0.clone().into_iter().flatten(); + // This operation only needs one allocation from iterator to `Arc<[Bytes]>` instead + // of iterator -> `Vec` -> `Arc<[Bytes]>`. + let bs: Arc<[Bytes]> = Arc::from_iter(iter); + Buffer::from(bs) + } + } + + /// Advance the buffer queue by `cnt` bytes. + #[inline] + pub fn advance(&mut self, cnt: usize) { + assert!(cnt <= self.len(), "cannot advance past {cnt} bytes"); + + let mut new_cnt = cnt; + while new_cnt > 0 { + let buf = self.0.front_mut().expect("buffer must be valid"); + if new_cnt < buf.remaining() { + buf.advance(new_cnt); + break; + } else { + new_cnt -= buf.remaining(); + self.0.pop_front(); + } + } + } + + /// Clear the buffer queue. + #[inline] + pub fn clear(&mut self) { + self.0.clear() + } +} diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 183253e9b7c0..c15b60c3f1d1 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -30,7 +30,7 @@ pub struct ExactBufWriter { /// The size for buffer, we will flush the underlying storage at the size of this buffer. buffer_size: usize, - buffer: oio::BufferQueue, + buffer: oio::QueueBuf, } impl ExactBufWriter { @@ -39,7 +39,7 @@ impl ExactBufWriter { Self { inner, buffer_size, - buffer: oio::BufferQueue::new(), + buffer: oio::QueueBuf::new(), } } } @@ -47,7 +47,7 @@ impl ExactBufWriter { impl oio::Write for ExactBufWriter { async fn write(&mut self, mut bs: oio::Buffer) -> Result { if self.buffer.len() >= self.buffer_size { - let written = self.inner.write(self.buffer.to_buffer()).await?; + let written = self.inner.write(self.buffer.collect()).await?; self.buffer.advance(written); } @@ -64,7 +64,7 @@ impl oio::Write for ExactBufWriter { break; } - let written = self.inner.write(self.buffer.to_buffer()).await?; + let written = self.inner.write(self.buffer.collect()).await?; self.buffer.advance(written); }