Skip to content

Commit

Permalink
chore: Align with chunk instead of confusing buffer (#4528)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Apr 24, 2024
1 parent 4134e1b commit 329d7c5
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 70 deletions.
4 changes: 2 additions & 2 deletions core/benches/oio/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use bytes::Buf;
use criterion::Criterion;
use once_cell::sync::Lazy;
use opendal::raw::oio::ExactBufWriter;
use opendal::raw::oio::ChunkedWriter;
use opendal::raw::oio::Write;
use rand::thread_rng;
use size::Size;
Expand All @@ -44,7 +44,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
group.bench_with_input(size.to_string(), &content, |b, content| {
b.to_async(&*TOKIO).iter(|| async {
let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024);

let mut bs = content.clone();
while !bs.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions core/fuzz/fuzz_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) -> Result<()> {

let mut writer = op.writer_with(&path);
if let Some(buffer) = input.buffer {
writer = writer.buffer(buffer);
writer = writer.chunk(buffer);
} else if let Some(min_size) = op.info().full_capability().write_multi_min_size {
writer = writer.buffer(min_size);
writer = writer.chunk(min_size);
}
if let Some(concurrent) = input.concurrent {
writer = writer.concurrent(concurrent);
Expand Down
17 changes: 7 additions & 10 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::cmp;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
Expand Down Expand Up @@ -376,8 +375,7 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> {
type Inner = A;
type Reader = CompleteReader<A::Reader>;
type BlockingReader = CompleteReader<A::BlockingReader>;
type Writer =
TwoWays<CompleteWriter<A::Writer>, oio::ExactBufWriter<CompleteWriter<A::Writer>>>;
type Writer = TwoWays<CompleteWriter<A::Writer>, oio::ChunkedWriter<CompleteWriter<A::Writer>>>;
type BlockingWriter = CompleteWriter<A::BlockingWriter>;
type Lister = CompleteLister<A, A::Lister>;
type BlockingLister = CompleteLister<A, A::BlockingLister>;
Expand Down Expand Up @@ -426,16 +424,16 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> {
}

// Calculate buffer size.
let buffer_size = args.buffer().map(|mut size| {
let chunk_size = args.chunk().map(|mut size| {
if let Some(v) = capability.write_multi_max_size {
size = cmp::min(v, size);
size = size.min(v);
}
if let Some(v) = capability.write_multi_min_size {
size = cmp::max(v, size);
size = size.max(v);
}
if let Some(v) = capability.write_multi_align_size {
// Make sure size >= size first.
size = cmp::max(v, size);
size = size.max(v);
size -= size % v;
}

Expand All @@ -445,9 +443,9 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> {
let (rp, w) = self.inner.write(path, args.clone()).await?;
let w = CompleteWriter::new(w);

let w = match buffer_size {
let w = match chunk_size {
None => TwoWays::One(w),
Some(size) => TwoWays::Two(oio::ExactBufWriter::new(w, size)),
Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size)),
};

Ok((rp, w))
Expand Down Expand Up @@ -628,7 +626,6 @@ impl<W> CompleteWriter<W> {
impl<W> Drop for CompleteWriter<W> {
fn drop(&mut self) {
if self.inner.is_some() {
// Do we need to panic here?
log::warn!("writer has not been closed or aborted, must be a bug")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,38 @@
use crate::raw::*;
use crate::*;

/// ExactBufWriter is used to implement [`oio::Write`] based on exact buffer strategy: flush the
/// underlying storage when the buffered size is exactly the same as the buffer size.
/// ChunkedWriter is used to implement [`oio::Write`] based on chunk:
/// flush the underlying storage at the `chunk`` size.
///
/// ExactBufWriter makes sure that the size of the data written to the underlying storage is exactly
/// `buffer_size` bytes. It's useful when the underlying storage requires the size to be written.
///
/// For example, R2 requires all parts must be the same size except the last part.
pub struct ExactBufWriter<W: oio::Write> {
/// ChunkedWriter makes sure that the size of the data written to the
/// underlying storage is exactly `chunk` bytes.
pub struct ChunkedWriter<W: oio::Write> {
inner: W,

/// The size for buffer, we will flush the underlying storage at the size of this buffer.
buffer_size: usize,
chunk_size: usize,
buffer: oio::QueueBuf,
}

impl<W: oio::Write> ExactBufWriter<W> {
impl<W: oio::Write> ChunkedWriter<W> {
/// Create a new exact buf writer.
pub fn new(inner: W, buffer_size: usize) -> Self {
pub fn new(inner: W, chunk_size: usize) -> Self {
Self {
inner,
buffer_size,
chunk_size,
buffer: oio::QueueBuf::new(),
}
}
}

impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
impl<W: oio::Write> oio::Write for ChunkedWriter<W> {
async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
if self.buffer.len() >= self.buffer_size {
if self.buffer.len() >= self.chunk_size {
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
}

let remaining = self.buffer_size - self.buffer.len();
let remaining = self.chunk_size - self.buffer.len();
bs.truncate(remaining);
let n = bs.len();
self.buffer.push(bs);
Expand Down Expand Up @@ -126,7 +124,7 @@ mod tests {
let mut expected = vec![0; 5];
rng.fill_bytes(&mut expected);

let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);
let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10);

let mut bs = Bytes::from(expected.clone());
while !bs.is_empty() {
Expand Down Expand Up @@ -156,7 +154,7 @@ mod tests {
let mut expected = vec![];

let buffer_size = rng.gen_range(1..10);
let mut writer = ExactBufWriter::new(MockWriter { buf: vec![] }, buffer_size);
let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, buffer_size);
debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}");

for _ in 0..1000 {
Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/oio/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ mod one_shot_write;
pub use one_shot_write::OneShotWrite;
pub use one_shot_write::OneShotWriter;

mod exact_buf_write;
pub use exact_buf_write::ExactBufWriter;
mod chunked_write;
pub use chunked_write::ChunkedWriter;

mod range_write;
pub use range_write::RangeWrite;
Expand Down
23 changes: 12 additions & 11 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ impl OpStat {
#[derive(Debug, Clone, Default)]
pub struct OpWrite {
append: bool,
buffer: Option<usize>,
chunk: Option<usize>,
concurrent: usize,

content_type: Option<String>,
Expand Down Expand Up @@ -583,23 +583,24 @@ impl OpWrite {
self
}

/// Get the buffer from op.
/// Get the chunk from op.
///
/// The buffer is used by service to decide the buffer size of the underlying writer.
pub fn buffer(&self) -> Option<usize> {
self.buffer
/// The chunk is used by service to decide the chunk size of the underlying writer.
pub fn chunk(&self) -> Option<usize> {
self.chunk
}

/// Set the buffer of op.
/// Set the chunk of op.
///
/// If buffer is set, the data will be buffered by the underlying writer.
/// If chunk is set, the data will be chunked by the underlying writer.
///
/// ## NOTE
///
/// Service could have their own minimum buffer size while perform write operations like
/// multipart uploads. So the buffer size may be larger than the given buffer size.
pub fn with_buffer(mut self, buffer: usize) -> Self {
self.buffer = Some(buffer);
/// Service could have their own minimum chunk size while perform write
/// operations like multipart uploads. So the chunk size may be larger than
/// the given buffer size.
pub fn with_chunk(mut self, chunk: usize) -> Self {
self.chunk = Some(chunk);
self
}

Expand Down
22 changes: 11 additions & 11 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ impl Operator {
///
/// OpenDAL abstracts the multipart uploads into [`Writer`]. It will automatically
/// handle the multipart uploads for you. You can control the behavior of multipart uploads
/// by setting `buffer`, `concurrent` via [`Operator::writer_with`]
/// by setting `chunk`, `concurrent` via [`Operator::writer_with`]
///
/// # Examples
///
Expand Down Expand Up @@ -826,16 +826,16 @@ impl Operator {
/// extra options like `content_type` and `cache_control`, please use [`Operator::write_with`]
/// instead.
///
/// ## Buffer
/// ## Chunk
///
/// OpenDAL is designed to write files directly without buffering by default, giving users
/// OpenDAL is designed to write files directly without chunking by default, giving users
/// control over the exact size of their writes and helping avoid unnecessary costs.
///
/// This is not efficient for cases when users write small chunks of data. Some storage services
/// like `s3` could even return hard errors like `EntityTooSmall`. Besides, cloud storage services
/// will cost more money if we write data in small chunks.
///
/// Users can use [`Operator::write_with`] to set a good buffer size might improve the performance,
/// Users can use [`Operator::write_with`] to set a good chunk size might improve the performance,
///
/// # Examples
///
Expand Down Expand Up @@ -885,20 +885,20 @@ impl Operator {
/// # }
/// ```
///
/// ## `buffer`
/// ## `chunk`
///
/// Set `buffer` for the writer.
/// Set `chunk` for the writer.
///
/// OpenDAL is designed to write files directly without buffering by default, giving users
/// OpenDAL is designed to write files directly without chunking by default, giving users
/// control over the exact size of their writes and helping avoid unnecessary costs.
///
/// This is not efficient for cases when users write small chunks of data. Some storage services
/// like `s3` could even return hard errors like `EntityTooSmall`. Besides, cloud storage services
/// will cost more money if we write data in small chunks.
///
/// Set a good buffer size might improve the performance, reduce the API calls and save money.
/// Set a good chunk size might improve the performance, reduce the API calls and save money.
///
/// The following example will set the writer buffer to 8MiB. Only one API call will be sent at
/// The following example will set the writer chunk to 8MiB. Only one API call will be sent at
/// `close` instead.
///
/// ```no_run
Expand All @@ -911,7 +911,7 @@ impl Operator {
/// # async fn test(op: Operator) -> Result<()> {
/// let mut w = op
/// .writer_with("path/to/file")
/// .buffer(8 * 1024 * 1024)
/// .chunk(8 * 1024 * 1024)
/// .await?;
/// w.write(vec![0; 4096]).await?;
/// w.write(vec![1; 4096]).await?;
Expand Down Expand Up @@ -1078,7 +1078,7 @@ impl Operator {
///
/// OpenDAL abstracts the multipart uploads into [`Writer`]. It will automatically
/// handle the multipart uploads for you. You can control the behavior of multipart uploads
/// by setting `buffer`, `concurrent` via [`Operator::writer_with`]
/// by setting `chunk`, `concurrent` via [`Operator::writer_with`]
///
/// # Options
///
Expand Down
4 changes: 2 additions & 2 deletions core/src/types/operator/operator_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl FunctionWrite {
/// Service could have their own minimum buffer size while perform write operations like
/// multipart uploads. So the buffer size may be larger than the given buffer size.
pub fn buffer(mut self, v: usize) -> Self {
self.0 = self.0.map_args(|(args, bs)| (args.with_buffer(v), bs));
self.0 = self.0.map_args(|(args, bs)| (args.with_chunk(v), bs));
self
}

Expand Down Expand Up @@ -163,7 +163,7 @@ impl FunctionWriter {
/// Service could have their own minimum buffer size while perform write operations like
/// multipart uploads. So the buffer size may be larger than the given buffer size.
pub fn buffer(mut self, v: usize) -> Self {
self.0 = self.0.map_args(|args| args.with_buffer(v));
self.0 = self.0.map_args(|args| args.with_chunk(v));
self
}

Expand Down
20 changes: 10 additions & 10 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,13 @@ impl<F: Future<Output = Result<()>>> FutureWrite<F> {
///
/// Service could have their own minimum buffer size while perform write operations like
/// multipart uploads. So the buffer size may be larger than the given buffer size.
pub fn buffer(self, v: usize) -> Self {
self.map(|(args, bs)| (args.with_buffer(v), bs))
pub fn chunk(self, v: usize) -> Self {
self.map(|(args, bs)| (args.with_chunk(v), bs))
}

/// Set the maximum concurrent write task amount.
pub fn concurrent(self, v: usize) -> Self {
self.map(|(args, bs)| (args.with_buffer(v), bs))
self.map(|(args, bs)| (args.with_chunk(v), bs))
}

/// Set the content type of option
Expand Down Expand Up @@ -331,23 +331,23 @@ impl<F: Future<Output = Result<Writer>>> FutureWriter<F> {
self.map(|args| args.with_append(v))
}

/// Set the buffer size of op.
/// Set the chunk size of op.
///
/// If buffer size is set, the data will be buffered by the underlying writer.
/// If chunk size is set, the data will be chunked by the underlying writer.
///
/// ## NOTE
///
/// Service could have their own limitation for buffer size. It's possible that buffer size
/// is not equal to the given buffer size.
/// Service could have their own limitation for chunk size. It's possible that chunk size
/// is not equal to the given chunk size.
///
/// For example:
///
/// - AWS S3 requires the part size to be in [5MiB, 5GiB].
/// - GCS requires the part size to be aligned with 256 KiB.
///
/// The services will alter the buffer size to meet their requirements.
pub fn buffer(self, v: usize) -> Self {
self.map(|args| args.with_buffer(v))
/// The services will alter the chunk size to meet their requirements.
pub fn chunk(self, v: usize) -> Self {
self.map(|args| args.with_chunk(v))
}

/// Set the maximum concurrent write task amount.
Expand Down
8 changes: 4 additions & 4 deletions core/tests/behavior/async_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> {
let content_b = gen_fixed_bytes(size);
let stream = stream::iter(vec![content_a.clone(), content_b.clone()]).map(Ok);

let mut w = op.writer_with(&path).buffer(5 * 1024 * 1024).await?;
let mut w = op.writer_with(&path).chunk(5 * 1024 * 1024).await?;
w.sink(stream).await?;
w.close().await?;

Expand Down Expand Up @@ -377,7 +377,7 @@ pub async fn test_writer_sink_with_concurrent(op: Operator) -> Result<()> {

let mut w = op
.writer_with(&path)
.buffer(5 * 1024 * 1024)
.chunk(5 * 1024 * 1024)
.concurrent(4)
.await?;
w.sink(stream).await?;
Expand Down Expand Up @@ -414,7 +414,7 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {

let mut w = op
.writer_with(&path)
.buffer(8 * 1024 * 1024)
.chunk(8 * 1024 * 1024)
.await?
.into_futures_async_write();

Expand Down Expand Up @@ -449,7 +449,7 @@ pub async fn test_writer_futures_copy_with_concurrent(op: Operator) -> Result<()

let mut w = op
.writer_with(&path)
.buffer(8 * 1024 * 1024)
.chunk(8 * 1024 * 1024)
.concurrent(4)
.await?
.into_futures_async_write();
Expand Down

0 comments on commit 329d7c5

Please sign in to comment.