diff --git a/.github/services/s3/ceph_rados_s3/action.yml b/.github/services/s3/ceph_rados_s3/disable_action.yml similarity index 100% rename from .github/services/s3/ceph_rados_s3/action.yml rename to .github/services/s3/ceph_rados_s3/disable_action.yml diff --git a/bindings/cpp/src/async.rs b/bindings/cpp/src/async.rs index 595006f269e0..30f25e1b077e 100644 --- a/bindings/cpp/src/async.rs +++ b/bindings/cpp/src/async.rs @@ -103,6 +103,7 @@ unsafe fn operator_write(op: ffi::OperatorPtr, path: String, bs: Vec) -> Rus .0 .write(&path, bs) .await + .map(|_| ()) .map_err(|e| CxxAsyncException::new(e.to_string().into_boxed_str()))?) }) } diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 957134501cc7..5a02c2b971fc 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -124,7 +124,7 @@ impl Operator { // Safety: The bytes created from bs will be dropped after the function call. // So it's safe to declare its lifetime as 'static. fn write(&self, path: &str, bs: &'static [u8]) -> Result<()> { - Ok(self.0.write(path, bs)?) + Ok(self.0.write(path, bs).map(|_| ())?) } fn exists(&self, path: &str) -> Result { diff --git a/bindings/dotnet/src/lib.rs b/bindings/dotnet/src/lib.rs index acfd5834a2ab..617874d80501 100644 --- a/bindings/dotnet/src/lib.rs +++ b/bindings/dotnet/src/lib.rs @@ -61,7 +61,7 @@ pub unsafe extern "C" fn blocking_operator_write( let op = &*(op); let path = std::ffi::CStr::from_ptr(path).to_str().unwrap(); let content = std::ffi::CStr::from_ptr(content).to_str().unwrap(); - op.write(path, content.to_owned()).unwrap() + op.write(path, content.to_owned()).map(|_| ()).unwrap() } /// # Safety diff --git a/bindings/haskell/src/lib.rs b/bindings/haskell/src/lib.rs index c470699d09b8..da7ff35ed406 100644 --- a/bindings/haskell/src/lib.rs +++ b/bindings/haskell/src/lib.rs @@ -206,7 +206,7 @@ pub unsafe extern "C" fn blocking_write( let bytes = Vec::from_raw_parts(bytes as *mut u8, len, len); let res = match op.write(path_str, bytes.clone()) { - Ok(()) => FFIResult::ok(()), + Ok(_) => FFIResult::ok(()), Err(e) => FFIResult::err_with_source("Failed to write", e), }; diff --git a/bindings/java/src/async_operator.rs b/bindings/java/src/async_operator.rs index b30fdc8df84f..239b97b89e51 100644 --- a/bindings/java/src/async_operator.rs +++ b/bindings/java/src/async_operator.rs @@ -139,7 +139,7 @@ fn intern_write( } async fn do_write(op: &mut Operator, path: String, content: Vec) -> Result<()> { - Ok(op.write(&path, content).await?) + Ok(op.write(&path, content).await.map(|_| ())?) } /// # Safety @@ -182,7 +182,11 @@ fn intern_append( } async fn do_append(op: &mut Operator, path: String, content: Vec) -> Result<()> { - Ok(op.write_with(&path, content).append(true).await?) + Ok(op + .write_with(&path, content) + .append(true) + .await + .map(|_| ())?) } /// # Safety diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index 9dfe74b0c789..9e457feda862 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -104,7 +104,7 @@ fn intern_write( ) -> Result<()> { let path = jstring_to_string(env, &path)?; let content = env.convert_byte_array(content)?; - Ok(op.write(&path, content)?) + Ok(op.write(&path, content).map(|_| ())?) } /// # Safety diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index bd6c688099fd..76fceda07b26 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -269,7 +269,7 @@ impl Operator { writer = writer.cache_control(cache_control); } } - writer.await.map_err(format_napi_error) + writer.await.map(|_| ()).map_err(format_napi_error) } //noinspection DuplicatedCode @@ -371,7 +371,7 @@ impl Operator { writer = writer.cache_control(cache_control); } } - writer.call().map_err(format_napi_error) + writer.call().map(|_| ()).map_err(format_napi_error) } /// Copy file according to given `from` and `to` path. @@ -809,7 +809,7 @@ impl BlockingWriter { /// ``` #[napi] pub unsafe fn close(&mut self) -> Result<()> { - self.0.close().map_err(format_napi_error) + self.0.close().map(|_| ()).map_err(format_napi_error) } } @@ -855,7 +855,7 @@ impl Writer { /// ``` #[napi] pub async unsafe fn close(&mut self) -> Result<()> { - self.0.close().await.map_err(format_napi_error) + self.0.close().await.map(|_| ()).map_err(format_napi_error) } } diff --git a/bindings/ocaml/src/operator/mod.rs b/bindings/ocaml/src/operator/mod.rs index a2dc06a7fd2f..0bbd32ff46ed 100644 --- a/bindings/ocaml/src/operator/mod.rs +++ b/bindings/ocaml/src/operator/mod.rs @@ -91,7 +91,7 @@ pub fn blocking_write( path: String, bs: &'static [u8], ) -> Result<(), String> { - map_res_error(operator.0.write(path.as_str(), bs)) + map_res_error(operator.0.write(path.as_str(), bs).map(|_| ())) } #[ocaml::func] diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs index b3c14b7c5d0a..33e4a6f9bc4f 100644 --- a/bindings/python/src/operator.rs +++ b/bindings/python/src/operator.rs @@ -135,7 +135,7 @@ impl Operator { write = write.cache_control(cache_control); } - write.call().map_err(format_pyerr) + write.call().map(|_| ()).map_err(format_pyerr) } /// Get current path's metadata **without cache** directly. @@ -351,7 +351,7 @@ impl AsyncOperator { if let Some(cache_control) = &kwargs.cache_control { write = write.cache_control(cache_control); } - write.await.map_err(format_pyerr) + write.await.map(|_| ()).map_err(format_pyerr) }) } diff --git a/bindings/ruby/src/operator.rs b/bindings/ruby/src/operator.rs index 45b11392a4c9..7aae6d6c0b57 100644 --- a/bindings/ruby/src/operator.rs +++ b/bindings/ruby/src/operator.rs @@ -69,6 +69,7 @@ impl Operator { rb_self .0 .write(&path, bs.to_bytes()) + .map(|_| ()) .map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string())) } diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index f9dc06b47bf1..f22172c290a7 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -179,7 +179,7 @@ impl oio::Write for AsyncBacktraceWrapper { } #[async_backtrace::framed] - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { self.inner.close().await } @@ -194,7 +194,7 @@ impl oio::BlockingWrite for AsyncBacktraceWrapper { self.inner.write(bs) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.inner.close() } } diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs index 1cf988af6ce7..238616aed4b2 100644 --- a/core/src/layers/await_tree.rs +++ b/core/src/layers/await_tree.rs @@ -206,7 +206,7 @@ impl oio::Write for AwaitTreeWrapper { .instrument_await(format!("opendal::{}", Operation::WriterAbort.into_static())) } - fn close(&mut self) -> impl Future> + MaybeSend { + fn close(&mut self) -> impl Future> + MaybeSend { self.inner .close() .instrument_await(format!("opendal::{}", Operation::WriterClose.into_static())) @@ -218,7 +218,7 @@ impl oio::BlockingWrite for AwaitTreeWrapper { self.inner.write(bs) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.inner.close() } } diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index fc97a33703af..18288e9e2d15 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -289,7 +289,7 @@ impl oio::BlockingWrite for BlockingWrapper { self.handle.block_on(self.inner.write(bs)) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.handle.block_on(self.inner.close()) } } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index cdabe3fcc035..6960d97f88c4 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -350,6 +350,8 @@ impl LayeredAccess for CompleteAccessor { if cap.list && cap.write_can_empty { cap.create_dir = true; } + // write operations should always return content length + cap.write_has_content_length = true; meta.into() } @@ -517,15 +519,17 @@ where w.write(bs).await } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.close().await?; + // we must return `Err` before setting inner to None; otherwise, + // we won't be able to retry `close` in `RetryLayer`. + let ret = w.close().await?; self.inner = None; - Ok(()) + Ok(ret) } async fn abort(&mut self) -> Result<()> { @@ -552,13 +556,14 @@ where w.write(bs) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - w.close()?; + let ret = w.close()?; self.inner = None; - Ok(()) + + Ok(ret) } } diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 2fca982aacd9..7152a977dc8a 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -267,7 +267,7 @@ impl oio::Write for ConcurrentLimitWrapper { self.inner.write(bs).await } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { self.inner.close().await } @@ -281,7 +281,7 @@ impl oio::BlockingWrite for ConcurrentLimitWrapper { self.inner.write(bs) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.inner.close() } } diff --git a/core/src/layers/correctness_check.rs b/core/src/layers/correctness_check.rs index 27383473cde2..b50adf40f295 100644 --- a/core/src/layers/correctness_check.rs +++ b/core/src/layers/correctness_check.rs @@ -383,8 +383,8 @@ mod tests { Ok(()) } - async fn close(&mut self) -> Result<()> { - Ok(()) + async fn close(&mut self) -> Result { + Ok(Metadata::default()) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs index b3c181ce1e04..35465ad145f1 100644 --- a/core/src/layers/dtrace.rs +++ b/core/src/layers/dtrace.rs @@ -382,14 +382,15 @@ impl oio::Write for DtraceLayerWrapper { }) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, writer_close_start, c_path.as_ptr()); self.inner .close() .await - .map(|_| { + .map(|meta| { probe_lazy!(opendal, writer_close_ok, c_path.as_ptr()); + meta }) .map_err(|err| { probe_lazy!(opendal, writer_close_error, c_path.as_ptr()); @@ -413,13 +414,14 @@ impl oio::BlockingWrite for DtraceLayerWrapper { }) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, blocking_writer_close_start, c_path.as_ptr()); self.inner .close() - .map(|_| { + .map(|meta| { probe_lazy!(opendal, blocking_writer_close_ok, c_path.as_ptr()); + meta }) .map_err(|err| { probe_lazy!(opendal, blocking_writer_close_error, c_path.as_ptr()); diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 305fc82d35f3..c0a816ea9bb6 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -375,7 +375,7 @@ impl oio::Write for ErrorContextWrapper { }) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { self.inner.close().await.map_err(|err| { err.with_operation(Operation::WriterClose) .with_context("service", self.scheme) @@ -411,7 +411,7 @@ impl oio::BlockingWrite for ErrorContextWrapper { }) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.inner.close().map_err(|err| { err.with_operation(Operation::BlockingWriterClose) .with_context("service", self.scheme) diff --git a/core/src/layers/fastrace.rs b/core/src/layers/fastrace.rs index 56470c0f87d9..8b0371b23add 100644 --- a/core/src/layers/fastrace.rs +++ b/core/src/layers/fastrace.rs @@ -327,7 +327,7 @@ impl oio::Write for FastraceWrapper { self.inner.abort() } - fn close(&mut self) -> impl Future> + MaybeSend { + fn close(&mut self) -> impl Future> + MaybeSend { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(Operation::WriterClose.into_static()); self.inner.close() @@ -342,7 +342,7 @@ impl oio::BlockingWrite for FastraceWrapper { self.inner.write(bs) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(Operation::BlockingWriterClose.into_static()); diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 3dfaea31a625..054131f84ec2 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1047,7 +1047,7 @@ impl oio::Write for LoggingWriter { } } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { self.logger.log( &self.info, Operation::WriterClose, @@ -1057,7 +1057,7 @@ impl oio::Write for LoggingWriter { ); match self.inner.close().await { - Ok(_) => { + Ok(meta) => { self.logger.log( &self.info, Operation::WriterClose, @@ -1065,7 +1065,7 @@ impl oio::Write for LoggingWriter { "succeeded", None, ); - Ok(()) + Ok(meta) } Err(err) => { self.logger.log( @@ -1129,7 +1129,7 @@ impl oio::BlockingWrite for Loggin } } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.logger.log( &self.info, Operation::BlockingWriterClose, @@ -1139,7 +1139,7 @@ impl oio::BlockingWrite for Loggin ); match self.inner.close() { - Ok(_) => { + Ok(meta) => { self.logger.log( &self.info, Operation::BlockingWriterWrite, @@ -1147,7 +1147,7 @@ impl oio::BlockingWrite for Loggin "succeeded", None, ); - Ok(()) + Ok(meta) } Err(err) => { self.logger.log( diff --git a/core/src/layers/observe/metrics.rs b/core/src/layers/observe/metrics.rs index a6562734743b..7855427e3676 100644 --- a/core/src/layers/observe/metrics.rs +++ b/core/src/layers/observe/metrics.rs @@ -966,13 +966,13 @@ impl oio::Write for MetricsWrapper { res } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let op = Operation::WriterClose; let start = Instant::now(); let res = match self.inner.close().await { - Ok(()) => Ok(()), + Ok(meta) => Ok(meta), Err(err) => { self.interceptor.observe_operation_errors_total( self.scheme, @@ -1069,13 +1069,13 @@ impl oio::BlockingWrite for MetricsW res } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { let op = Operation::BlockingWriterClose; let start = Instant::now(); let res = match self.inner.close() { - Ok(()) => Ok(()), + Ok(meta) => Ok(meta), Err(err) => { self.interceptor.observe_operation_errors_total( self.scheme, diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index bdbc80279d8e..1dd15c0efd57 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -277,7 +277,7 @@ impl oio::Write for OtelTraceWrapper { self.inner.abort() } - fn close(&mut self) -> impl Future> + MaybeSend { + fn close(&mut self) -> impl Future> + MaybeSend { self.inner.close() } } @@ -287,7 +287,7 @@ impl oio::BlockingWrite for OtelTraceWrapper { self.inner.write(bs) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.inner.close() } } diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index ee8b925b9cb0..c9152225d2e6 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -649,7 +649,7 @@ impl oio::Write for RetryWrapper { res.map_err(|err| err.set_persistent()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { use backon::RetryableWithContext; let inner = self.take_inner()?; @@ -684,7 +684,7 @@ impl oio::BlockingWrite for RetryWra .map_err(|e| e.set_persistent()) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { { || self.inner.as_mut().unwrap().close() } .retry(self.builder) .when(|e| e.is_temporary()) @@ -838,17 +838,18 @@ mod tests { fn info(&self) -> Arc { let mut am = AccessorInfo::default(); - am.set_native_capability(Capability { - read: true, - write: true, - write_can_multi: true, - delete: true, - delete_max_size: Some(10), - stat: true, - list: true, - list_with_recursive: true, - ..Default::default() - }); + am.set_scheme(Scheme::Custom("mock")) + .set_native_capability(Capability { + read: true, + write: true, + write_can_multi: true, + delete: true, + delete_max_size: Some(10), + stat: true, + list: true, + list_with_recursive: true, + ..Default::default() + }); am.into() } @@ -932,7 +933,7 @@ mod tests { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary()) } diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 5b7154c92aa8..4a5e674e1c2e 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -233,7 +233,7 @@ impl oio::Write for ThrottleWrapper { self.inner.abort().await } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { self.inner.close().await } } @@ -261,7 +261,7 @@ impl oio::BlockingWrite for ThrottleWrapper { } } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.inner.close() } } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index af8e298145f0..f8182aa66155 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -367,7 +367,7 @@ impl oio::Write for TimeoutWrapper { Self::io_timeout(self.timeout, Operation::WriterWrite.into_static(), fut).await } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let fut = self.inner.close(); Self::io_timeout(self.timeout, Operation::WriterClose.into_static(), fut).await } diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index efc1380db420..64b49e816936 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -328,7 +328,7 @@ impl oio::Write for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn close(&mut self) -> impl Future> + MaybeSend { + fn close(&mut self) -> impl Future> + MaybeSend { self.inner.close() } } @@ -346,7 +346,7 @@ impl oio::BlockingWrite for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { self.inner.close() } } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 56146aef0a70..6a2d2fbc4e5f 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -297,9 +297,13 @@ impl oio::Write for KvWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let buf = self.buffer.clone().collect(); - self.kv.set(&self.path, buf).await + let length = buf.len() as u64; + self.kv.set(&self.path, buf).await?; + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) } async fn abort(&mut self) -> Result<()> { @@ -314,10 +318,13 @@ impl oio::BlockingWrite for KvWriter { Ok(()) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { let buf = self.buffer.clone().collect(); + let length = buf.len() as u64; self.kv.blocking_set(&self.path, buf)?; - Ok(()) + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) } } diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 1005811c9443..a002b20e2e59 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -291,7 +291,7 @@ impl oio::Write for KvWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let value = match &self.value { Some(value) => value.clone(), None => { @@ -300,8 +300,10 @@ impl oio::Write for KvWriter { value } }; + let meta = value.metadata.clone(); self.kv.set(&self.path, value).await?; - Ok(()) + + Ok(meta) } async fn abort(&mut self) -> Result<()> { @@ -318,7 +320,7 @@ impl oio::BlockingWrite for KvWriter { Ok(()) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { let kv = self.kv.clone(); let value = match &self.value { Some(value) => value.clone(), @@ -329,8 +331,9 @@ impl oio::BlockingWrite for KvWriter { } }; + let meta = value.metadata.clone(); kv.blocking_set(&self.path, value)?; - Ok(()) + Ok(meta) } } diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs index e42bc9be4de1..e95059975048 100644 --- a/core/src/raw/enum_utils.rs +++ b/core/src/raw/enum_utils.rs @@ -77,7 +77,7 @@ impl oio::Write for TwoWays { } } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { match self { Self::One(v) => v.close().await, Self::Two(v) => v.close().await, @@ -146,7 +146,7 @@ impl oio::Write } } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { match self { Self::One(v) => v.close().await, Self::Two(v) => v.close().await, diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 91d2b3cad9a9..0d8c55fbd8e0 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -35,7 +35,7 @@ pub trait Write: Unpin + Send + Sync { fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend; /// Close the writer and make sure all data has been flushed. - fn close(&mut self) -> impl Future> + MaybeSend; + fn close(&mut self) -> impl Future> + MaybeSend; /// Abort the pending writer. fn abort(&mut self) -> impl Future> + MaybeSend; @@ -46,7 +46,7 @@ impl Write for () { unimplemented!("write is required to be implemented for oio::Write") } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support close", @@ -64,7 +64,7 @@ impl Write for () { pub trait WriteDyn: Unpin + Send + Sync { fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture>; - fn close_dyn(&mut self) -> BoxedFuture>; + fn close_dyn(&mut self) -> BoxedFuture>; fn abort_dyn(&mut self) -> BoxedFuture>; } @@ -74,7 +74,7 @@ impl WriteDyn for T { Box::pin(self.write(bs)) } - fn close_dyn(&mut self) -> BoxedFuture> { + fn close_dyn(&mut self) -> BoxedFuture> { Box::pin(self.close()) } @@ -88,7 +88,7 @@ impl Write for Box { self.deref_mut().write_dyn(bs).await } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { self.deref_mut().close_dyn().await } @@ -114,7 +114,7 @@ pub trait BlockingWrite: Send + Sync + 'static { fn write(&mut self, bs: Buffer) -> Result<()>; /// Close the writer and make sure all data has been flushed. - fn close(&mut self) -> Result<()>; + fn close(&mut self) -> Result; } impl BlockingWrite for () { @@ -124,7 +124,7 @@ impl BlockingWrite for () { unimplemented!("write is required to be implemented for oio::BlockingWrite") } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support close", @@ -140,7 +140,7 @@ impl BlockingWrite for Box { (**self).write(bs) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { (**self).close() } } diff --git a/core/src/raw/oio/write/append_write.rs b/core/src/raw/oio/write/append_write.rs index 9ff3d06b7c1a..fe06f6105058 100644 --- a/core/src/raw/oio/write/append_write.rs +++ b/core/src/raw/oio/write/append_write.rs @@ -48,7 +48,7 @@ pub trait AppendWrite: Send + Sync + Unpin + 'static { offset: u64, size: u64, body: Buffer, - ) -> impl Future> + MaybeSend; + ) -> impl Future> + MaybeSend; } /// AppendWriter will implements [`oio::Write`] based on append object. @@ -60,6 +60,8 @@ pub struct AppendWriter { inner: W, offset: Option, + + meta: Metadata, } /// # Safety @@ -71,6 +73,7 @@ impl AppendWriter { Self { inner, offset: None, + meta: Metadata::default(), } } } @@ -90,14 +93,16 @@ where }; let size = bs.len(); - self.inner.append(offset, size as u64, bs).await?; + self.meta = self.inner.append(offset, size as u64, bs).await?; // Update offset after succeed. self.offset = Some(offset + size as u64); Ok(()) } - async fn close(&mut self) -> Result<()> { - Ok(()) + async fn close(&mut self) -> Result { + self.meta + .set_content_length(self.offset.unwrap_or_default()); + Ok(self.meta.clone()) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index b8debb11aec3..145d7c3ff885 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -63,7 +63,11 @@ pub trait BlockWrite: Send + Sync + Unpin + 'static { /// BlockWriter will call this API when: /// /// - All the data has been written to the buffer and we can perform the upload at once. - fn write_once(&self, size: u64, body: Buffer) -> impl Future> + MaybeSend; + fn write_once( + &self, + size: u64, + body: Buffer, + ) -> impl Future> + MaybeSend; /// write_block will write a block of the data. /// @@ -80,7 +84,10 @@ pub trait BlockWrite: Send + Sync + Unpin + 'static { /// complete_block will complete the block upload to build the final /// file. - fn complete_block(&self, block_ids: Vec) -> impl Future> + MaybeSend; + fn complete_block( + &self, + block_ids: Vec, + ) -> impl Future> + MaybeSend; /// abort_block will cancel the block upload and purge all data. fn abort_block(&self, block_ids: Vec) -> impl Future> + MaybeSend; @@ -103,6 +110,8 @@ pub struct BlockWriter { block_ids: Vec, cache: Option, tasks: ConcurrentTasks, Uuid>, + + write_bytes_count: u64, } impl BlockWriter { @@ -149,6 +158,7 @@ impl BlockWriter { } }) }), + write_bytes_count: 0, } } @@ -165,6 +175,8 @@ where W: BlockWrite, { async fn write(&mut self, bs: Buffer) -> Result<()> { + self.write_bytes_count += bs.len() as u64; + if !self.started && self.cache.is_none() { self.fill_cache(bs); return Ok(()); @@ -187,16 +199,16 @@ where Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { if !self.started { let (size, body) = match self.cache.clone() { Some(cache) => (cache.len(), cache), None => (0, Buffer::new()), }; - self.w.write_once(size as u64, body).await?; self.cache = None; - return Ok(()); + let meta = self.w.write_once(size as u64, body).await?; + return Ok(meta.with_content_length(self.write_bytes_count)); } if let Some(cache) = self.cache.clone() { @@ -219,7 +231,8 @@ where } let block_ids = self.block_ids.clone(); - self.w.complete_block(block_ids).await + let meta = self.w.complete_block(block_ids).await?; + Ok(meta.with_content_length(self.write_bytes_count)) } async fn abort(&mut self) -> Result<()> { @@ -268,8 +281,8 @@ mod tests { } impl BlockWrite for Arc> { - async fn write_once(&self, _: u64, _: Buffer) -> Result<()> { - Ok(()) + async fn write_once(&self, _: u64, _: Buffer) -> Result { + Ok(Metadata::default()) } async fn write_block(&self, block_id: Uuid, size: u64, body: Buffer) -> Result<()> { @@ -290,7 +303,7 @@ mod tests { Ok(()) } - async fn complete_block(&self, block_ids: Vec) -> Result<()> { + async fn complete_block(&self, block_ids: Vec) -> Result { let mut this = self.lock().unwrap(); let mut bs = Vec::new(); for id in block_ids { @@ -298,7 +311,7 @@ mod tests { } this.content = Some(bs.into_iter().flatten().collect()); - Ok(()) + Ok(Metadata::default()) } async fn abort_block(&self, _: Vec) -> Result<()> { diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs index b692303c5b60..4e445a7ff831 100644 --- a/core/src/raw/oio/write/multipart_write.rs +++ b/core/src/raw/oio/write/multipart_write.rs @@ -61,7 +61,11 @@ pub trait MultipartWrite: Send + Sync + Unpin + 'static { /// MultipartWriter will call this API when: /// /// - All the data has been written to the buffer and we can perform the upload at once. - fn write_once(&self, size: u64, body: Buffer) -> impl Future> + MaybeSend; + fn write_once( + &self, + size: u64, + body: Buffer, + ) -> impl Future> + MaybeSend; /// initiate_part will call start a multipart upload and return the upload id. /// @@ -69,7 +73,7 @@ pub trait MultipartWrite: Send + Sync + Unpin + 'static { /// /// - the total size of data is unknown. /// - the total size of data is known, but the size of current write - /// is less then the total size. + /// is less than the total size. fn initiate_part(&self) -> impl Future> + MaybeSend; /// write_part will write a part of the data and returns the result @@ -93,7 +97,7 @@ pub trait MultipartWrite: Send + Sync + Unpin + 'static { &self, upload_id: &str, parts: &[MultipartPart], - ) -> impl Future> + MaybeSend; + ) -> impl Future> + MaybeSend; /// abort_part will cancel the multipart upload and purge all data. fn abort_part(&self, upload_id: &str) -> impl Future> + MaybeSend; @@ -136,6 +140,8 @@ pub struct MultipartWriter { next_part_number: usize, tasks: ConcurrentTasks, MultipartPart>, + + write_bytes_count: u64, } /// # Safety @@ -187,6 +193,7 @@ impl MultipartWriter { } }) }), + write_bytes_count: 0, } } @@ -203,6 +210,7 @@ where W: MultipartWrite, { async fn write(&mut self, bs: Buffer) -> Result<()> { + self.write_bytes_count += bs.len() as u64; let upload_id = match self.upload_id.clone() { Some(v) => v, None => { @@ -237,7 +245,7 @@ where Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let upload_id = match self.upload_id.clone() { Some(v) => v, None => { @@ -245,10 +253,12 @@ where Some(cache) => (cache.len(), cache), None => (0, Buffer::new()), }; - // Call write_once if there is no upload_id. - self.w.write_once(size as u64, body).await?; + self.cache = None; - return Ok(()); + // Call write_once if there is no upload_id. + let mut meta = self.w.write_once(size as u64, body).await?; + meta.set_content_length(self.write_bytes_count); + return Ok(meta); } }; @@ -284,7 +294,9 @@ where .with_context("actual", self.parts.len()) .with_context("upload_id", upload_id)); } - self.w.complete_part(&upload_id, &self.parts).await + let mut meta = self.w.complete_part(&upload_id, &self.parts).await?; + meta.set_content_length(self.write_bytes_count); + Ok(meta) } async fn abort(&mut self) -> Result<()> { @@ -333,9 +345,9 @@ mod tests { } impl MultipartWrite for Arc> { - async fn write_once(&self, size: u64, _: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, _: Buffer) -> Result { self.lock().await.length += size; - Ok(()) + Ok(Metadata::default().with_content_length(size)) } async fn initiate_part(&self) -> Result { @@ -378,12 +390,16 @@ mod tests { }) } - async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<()> { + async fn complete_part( + &self, + upload_id: &str, + parts: &[MultipartPart], + ) -> Result { let test = self.lock().await; assert_eq!(upload_id, test.upload_id); assert_eq!(parts.len(), test.part_numbers.len()); - Ok(()) + Ok(Metadata::default().with_content_length(test.length)) } async fn abort_part(&self, upload_id: &str) -> Result<()> { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 938973c33a71..2d19d96d427a 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -30,7 +30,7 @@ pub trait OneShotWrite: Send + Sync + Unpin + 'static { /// write_once write all data at once. /// /// Implementations should make sure that the data is written correctly at once. - fn write_once(&self, bs: Buffer) -> impl Future> + MaybeSend; + fn write_once(&self, bs: Buffer) -> impl Future> + MaybeSend; } /// OneShotWrite is used to implement [`oio::Write`] based on one shot. @@ -63,11 +63,17 @@ impl oio::Write for OneShotWriter { } } - async fn close(&mut self) -> Result<()> { - match self.buffer.clone() { - Some(bs) => self.inner.write_once(bs).await, + async fn close(&mut self) -> Result { + let mut length = 0; + let meta = match self.buffer.clone() { + Some(bs) => { + length = bs.len(); + self.inner.write_once(bs).await + } None => self.inner.write_once(Buffer::new()).await, - } + }?; + + Ok(meta.with_content_length(length as u64)) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/position_write.rs b/core/src/raw/oio/write/position_write.rs index e31c8ae0ad31..f5d9d325806b 100644 --- a/core/src/raw/oio/write/position_write.rs +++ b/core/src/raw/oio/write/position_write.rs @@ -52,7 +52,7 @@ pub trait PositionWrite: Send + Sync + Unpin + 'static { ) -> impl Future> + MaybeSend; /// close is used to close the underlying file. - fn close(&self) -> impl Future> + MaybeSend; + fn close(&self) -> impl Future> + MaybeSend; /// abort is used to abort the underlying abort. fn abort(&self) -> impl Future> + MaybeSend; @@ -149,7 +149,7 @@ impl oio::Write for PositionWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { // Make sure all tasks are finished. while self.tasks.next().await.transpose()?.is_some() {} @@ -158,8 +158,7 @@ impl oio::Write for PositionWriter { self.w.write_all_at(offset, buffer).await?; self.cache = None; } - self.w.close().await?; - Ok(()) + self.w.close().await } async fn abort(&mut self) -> Result<()> { @@ -228,8 +227,8 @@ mod tests { Ok(()) } - async fn close(&self) -> Result<()> { - Ok(()) + async fn close(&self) -> Result { + Ok(Metadata::default()) } async fn abort(&self) -> Result<()> { diff --git a/core/src/services/aliyun_drive/writer.rs b/core/src/services/aliyun_drive/writer.rs index 31bf74fad8c7..35afaaade7f4 100644 --- a/core/src/services/aliyun_drive/writer.rs +++ b/core/src/services/aliyun_drive/writer.rs @@ -37,6 +37,8 @@ pub struct AliyunDriveWriter { file_id: Option, upload_id: Option, part_number: usize, + + write_bytes_count: u64, } impl AliyunDriveWriter { @@ -49,12 +51,14 @@ impl AliyunDriveWriter { file_id: None, upload_id: None, part_number: 1, // must start from 1 + write_bytes_count: 0, } } } impl oio::Write for AliyunDriveWriter { async fn write(&mut self, bs: Buffer) -> Result<()> { + self.write_bytes_count += bs.len() as u64; let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) { (Some(upload_id), Some(file_id)) => (upload_id, file_id), _ => { @@ -108,13 +112,14 @@ impl oio::Write for AliyunDriveWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let (Some(upload_id), Some(file_id)) = (self.upload_id.as_ref(), self.file_id.as_ref()) else { - return Ok(()); + return Ok(Metadata::default().with_content_length(self.write_bytes_count)); }; + self.core.complete(file_id, upload_id).await?; - Ok(()) + Ok(Metadata::default().with_content_length(self.write_bytes_count)) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/alluxio/writer.rs b/core/src/services/alluxio/writer.rs index f452b6b10bbd..4409d99cdf66 100644 --- a/core/src/services/alluxio/writer.rs +++ b/core/src/services/alluxio/writer.rs @@ -29,6 +29,8 @@ pub struct AlluxioWriter { _op: OpWrite, path: String, stream_id: Option, + + write_bytes_count: u64, } impl AlluxioWriter { @@ -38,12 +40,15 @@ impl AlluxioWriter { _op, path, stream_id: None, + write_bytes_count: 0, } } } impl oio::Write for AlluxioWriter { async fn write(&mut self, bs: Buffer) -> Result<()> { + self.write_bytes_count += bs.len() as u64; + let stream_id = match self.stream_id { Some(stream_id) => stream_id, None => { @@ -56,11 +61,13 @@ impl oio::Write for AlluxioWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let Some(stream_id) = self.stream_id else { - return Ok(()); + return Ok(Metadata::default().with_content_length(self.write_bytes_count)); }; - self.core.close(stream_id).await + self.core.close(stream_id).await?; + + Ok(Metadata::default().with_content_length(self.write_bytes_count)) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 7659d564bbf6..24116c364b59 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -88,7 +88,7 @@ impl oio::AppendWrite for AzblobWriter { } } - async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> { + async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result { let mut req = self .core .azblob_append_blob_request(&self.path, offset, size, body)?; @@ -99,14 +99,14 @@ impl oio::AppendWrite for AzblobWriter { let status = resp.status(); match status { - StatusCode::CREATED => Ok(()), + StatusCode::CREATED => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } } impl oio::BlockWrite for AzblobWriter { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let mut req: http::Request = self.core .azblob_put_blob_request(&self.path, Some(size), &self.op, body)?; @@ -117,7 +117,7 @@ impl oio::BlockWrite for AzblobWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -135,7 +135,7 @@ impl oio::BlockWrite for AzblobWriter { } } - async fn complete_block(&self, block_ids: Vec) -> Result<()> { + async fn complete_block(&self, block_ids: Vec) -> Result { let resp = self .core .azblob_complete_put_block_list(&self.path, block_ids, &self.op) @@ -143,7 +143,7 @@ impl oio::BlockWrite for AzblobWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/azdls/writer.rs b/core/src/services/azdls/writer.rs index dc1bc7605022..384023cbb961 100644 --- a/core/src/services/azdls/writer.rs +++ b/core/src/services/azdls/writer.rs @@ -40,7 +40,7 @@ impl AzdlsWriter { } impl oio::OneShotWrite for AzdlsWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let mut req = self.core .azdls_create_request(&self.path, "file", &self.op, Buffer::new())?; @@ -67,7 +67,7 @@ impl oio::OneShotWrite for AzdlsWriter { let status = resp.status(); match status { - StatusCode::OK | StatusCode::ACCEPTED => Ok(()), + StatusCode::OK | StatusCode::ACCEPTED => Ok(Metadata::default()), _ => Err(parse_error(resp).with_operation("Backend::azdls_update_request")), } } @@ -87,7 +87,7 @@ impl oio::AppendWrite for AzdlsWriter { } } - async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> { + async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result { if offset == 0 { let mut req = self.core @@ -116,7 +116,7 @@ impl oio::AppendWrite for AzdlsWriter { let status = resp.status(); match status { - StatusCode::OK | StatusCode::ACCEPTED => Ok(()), + StatusCode::OK | StatusCode::ACCEPTED => Ok(Metadata::default()), _ => Err(parse_error(resp).with_operation("Backend::azdls_update_request")), } } diff --git a/core/src/services/azfile/writer.rs b/core/src/services/azfile/writer.rs index 9a5323c7efe7..b8686c6251c3 100644 --- a/core/src/services/azfile/writer.rs +++ b/core/src/services/azfile/writer.rs @@ -39,7 +39,7 @@ impl AzfileWriter { } impl oio::OneShotWrite for AzfileWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self .core .azfile_create_file(&self.path, bs.len(), &self.op) @@ -59,7 +59,7 @@ impl oio::OneShotWrite for AzfileWriter { .await?; let status = resp.status(); match status { - StatusCode::OK | StatusCode::CREATED => Ok(()), + StatusCode::OK | StatusCode::CREATED => Ok(Metadata::default()), _ => Err(parse_error(resp).with_operation("Backend::azfile_update")), } } @@ -77,7 +77,7 @@ impl oio::AppendWrite for AzfileWriter { } } - async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> { + async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result { let resp = self .core .azfile_update(&self.path, size, offset, body) @@ -85,7 +85,7 @@ impl oio::AppendWrite for AzfileWriter { let status = resp.status(); match status { - StatusCode::OK | StatusCode::CREATED => Ok(()), + StatusCode::OK | StatusCode::CREATED => Ok(Metadata::default()), _ => Err(parse_error(resp).with_operation("Backend::azfile_update")), } } diff --git a/core/src/services/b2/writer.rs b/core/src/services/b2/writer.rs index dcbabc81f8b0..5512c24ad06e 100644 --- a/core/src/services/b2/writer.rs +++ b/core/src/services/b2/writer.rs @@ -47,7 +47,7 @@ impl B2Writer { } impl oio::MultipartWrite for B2Writer { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let resp = self .core .upload_file(&self.path, Some(size), &self.op, body) @@ -56,7 +56,7 @@ impl oio::MultipartWrite for B2Writer { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -113,7 +113,11 @@ impl oio::MultipartWrite for B2Writer { } } - async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartPart], + ) -> Result { let part_sha1_array = parts .iter() .map(|p| { @@ -134,7 +138,7 @@ impl oio::MultipartWrite for B2Writer { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/chainsafe/writer.rs b/core/src/services/chainsafe/writer.rs index b1c01cc94e1a..e1b8181b77b9 100644 --- a/core/src/services/chainsafe/writer.rs +++ b/core/src/services/chainsafe/writer.rs @@ -43,13 +43,13 @@ impl ChainsafeWriter { } impl oio::OneShotWrite for ChainsafeWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self.core.upload_object(&self.path, bs).await?; let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/compfs/writer.rs b/core/src/services/compfs/writer.rs index aa05f787f52e..c89fcc7c1ef1 100644 --- a/core/src/services/compfs/writer.rs +++ b/core/src/services/compfs/writer.rs @@ -30,11 +30,17 @@ use crate::*; pub struct CompfsWriter { core: Arc, file: Cursor, + + write_bytes_count: u64, } impl CompfsWriter { pub(super) fn new(core: Arc, file: Cursor) -> Self { - Self { core, file } + Self { + core, + file, + write_bytes_count: 0, + } } } @@ -45,6 +51,8 @@ impl oio::Write for CompfsWriter { /// /// The IoBuf::buf_len() only returns the length of the current buffer. async fn write(&mut self, bs: Buffer) -> Result<()> { + self.write_bytes_count += bs.len() as u64; + let mut file = self.file.clone(); self.core @@ -57,7 +65,7 @@ impl oio::Write for CompfsWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let f = self.file.clone(); self.core @@ -67,7 +75,9 @@ impl oio::Write for CompfsWriter { let f = self.file.clone(); self.core .exec(move || async move { f.into_inner().close().await }) - .await + .await?; + + Ok(Metadata::default().with_content_length(self.write_bytes_count)) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index dd6e27a4f917..f0d8c064ef43 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -44,7 +44,7 @@ impl CosWriter { } impl oio::MultipartWrite for CosWriter { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let mut req = self .core .cos_put_object_request(&self.path, Some(size), &self.op, body)?; @@ -56,7 +56,7 @@ impl oio::MultipartWrite for CosWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -121,7 +121,11 @@ impl oio::MultipartWrite for CosWriter { } } - async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartPart], + ) -> Result { let parts = parts .iter() .map(|p| CompleteMultipartUploadRequestPart { @@ -138,7 +142,7 @@ impl oio::MultipartWrite for CosWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -180,7 +184,7 @@ impl oio::AppendWrite for CosWriter { } } - async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> { + async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result { let mut req = self .core .cos_append_object_request(&self.path, offset, size, &self.op, body)?; @@ -192,7 +196,7 @@ impl oio::AppendWrite for CosWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/dbfs/writer.rs b/core/src/services/dbfs/writer.rs index 1a3963b8c712..48aa9ae22bb0 100644 --- a/core/src/services/dbfs/writer.rs +++ b/core/src/services/dbfs/writer.rs @@ -38,7 +38,7 @@ impl DbfsWriter { } impl oio::OneShotWrite for DbfsWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let size = bs.len(); // MAX_BLOCK_SIZE_EXCEEDED will be thrown if this limit(1MB) is exceeded. @@ -57,7 +57,7 @@ impl oio::OneShotWrite for DbfsWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index b4eaf2aa7820..b277d9fb1af0 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -37,14 +37,14 @@ impl DropboxWriter { } impl oio::OneShotWrite for DropboxWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self .core .dropbox_update(&self.path, Some(bs.len()), &self.op, bs) .await?; let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 1a0afb9c2e86..0a6e6f85813d 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -187,6 +187,8 @@ impl Access for FsBackend { write_can_empty: true, write_can_append: true, write_can_multi: true, + write_has_last_modified: true, + create_dir: true, delete: true, diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 761f8fcf9e9d..d3a641f7c73d 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -63,7 +63,7 @@ impl oio::Write for FsWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let f = self.f.as_mut().expect("FsWriter must be initialized"); f.flush().await.map_err(new_std_io_error)?; f.sync_all().await.map_err(new_std_io_error)?; @@ -73,7 +73,19 @@ impl oio::Write for FsWriter { .await .map_err(new_std_io_error)?; } - Ok(()) + + let file_meta = f.metadata().await.map_err(new_std_io_error)?; + let mode = if file_meta.is_file() { + EntryMode::FILE + } else if file_meta.is_dir() { + EntryMode::DIR + } else { + EntryMode::Unknown + }; + let meta = Metadata::new(mode) + .with_content_length(file_meta.len()) + .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into()); + Ok(meta) } async fn abort(&mut self) -> Result<()> { @@ -102,16 +114,26 @@ impl oio::BlockingWrite for FsWriter { Ok(()) } - fn close(&mut self) -> Result<()> { - if let Some(f) = self.f.take() { - f.sync_all().map_err(new_std_io_error)?; + fn close(&mut self) -> Result { + let f = self.f.as_mut().expect("FsWriter must be initialized"); + f.sync_all().map_err(new_std_io_error)?; - if let Some(tmp_path) = &self.tmp_path { - std::fs::rename(tmp_path, &self.target_path).map_err(new_std_io_error)?; - } + if let Some(tmp_path) = &self.tmp_path { + std::fs::rename(tmp_path, &self.target_path).map_err(new_std_io_error)?; } - Ok(()) + let file_meta = f.metadata().map_err(new_std_io_error)?; + let mode = if file_meta.is_file() { + EntryMode::FILE + } else if file_meta.is_dir() { + EntryMode::DIR + } else { + EntryMode::Unknown + }; + let meta = Metadata::new(mode) + .with_content_length(file_meta.len()) + .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into()); + Ok(meta) } } @@ -144,7 +166,7 @@ impl oio::PositionWrite for FsWriter { .map_err(new_task_join_error)? } - async fn close(&self) -> Result<()> { + async fn close(&self) -> Result { let f = self.f.as_ref().expect("FsWriter must be initialized"); let mut f = f @@ -162,7 +184,19 @@ impl oio::PositionWrite for FsWriter { .await .map_err(new_std_io_error)?; } - Ok(()) + + let file_meta = f.metadata().map_err(new_std_io_error)?; + let mode = if file_meta.is_file() { + EntryMode::FILE + } else if file_meta.is_dir() { + EntryMode::DIR + } else { + EntryMode::Unknown + }; + let meta = Metadata::new(mode) + .with_content_length(file_meta.len()) + .with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into()); + Ok(meta) } async fn abort(&self) -> Result<()> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index fd2549b04655..c1908e8e5adc 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -30,6 +30,7 @@ pub struct FtpWriter { tmp_path: Option, ftp_stream: PooledConnection<'static, Manager>, data_stream: Option>, + write_bytes_count: u64, } /// # Safety @@ -53,12 +54,15 @@ impl FtpWriter { tmp_path, ftp_stream, data_stream: None, + write_bytes_count: 0, } } } impl oio::Write for FtpWriter { async fn write(&mut self, mut bs: Buffer) -> Result<()> { + self.write_bytes_count += bs.len() as u64; + let path = if let Some(tmp_path) = &self.tmp_path { tmp_path } else { @@ -90,7 +94,7 @@ impl oio::Write for FtpWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let data_stream = self.data_stream.take(); if let Some(mut data_stream) = data_stream { data_stream.flush().await.map_err(|err| { @@ -110,7 +114,7 @@ impl oio::Write for FtpWriter { } } - Ok(()) + Ok(Metadata::default().with_content_length(self.write_bytes_count)) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index ae620df4623c..afb8adbd257b 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -46,7 +46,7 @@ impl GcsWriter { } impl oio::MultipartWrite for GcsWriter { - async fn write_once(&self, _: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, _: u64, body: Buffer) -> Result { let size = body.len() as u64; let mut req = self.core.gcs_insert_object_request( &percent_encode_path(&self.path), @@ -62,7 +62,7 @@ impl oio::MultipartWrite for GcsWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -118,7 +118,11 @@ impl oio::MultipartWrite for GcsWriter { }) } - async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartPart], + ) -> Result { let parts = parts .iter() .map(|p| CompleteMultipartUploadRequestPart { @@ -135,7 +139,7 @@ impl oio::MultipartWrite for GcsWriter { if !resp.status().is_success() { return Err(parse_error(resp)); } - Ok(()) + Ok(Metadata::default()) } async fn abort_part(&self, upload_id: &str) -> Result<()> { diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index bfabff97a147..89b8024aad5e 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -46,7 +46,7 @@ impl GdriveWriter { } impl oio::OneShotWrite for GdriveWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let size = bs.len(); let resp = if let Some(file_id) = &self.file_id { @@ -69,7 +69,7 @@ impl oio::OneShotWrite for GdriveWriter { serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; self.core.path_cache.insert(&self.path, &file.id).await; } - Ok(()) + Ok(Metadata::default()) } _ => Err(parse_error(resp)), } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index c5d49a17af01..777c8ada7233 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -63,12 +63,12 @@ impl oio::Write for GhacWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let req = self.backend.ghac_commit(self.cache_id, self.size)?; let resp = self.backend.client.send(req).await?; if resp.status().is_success() { - Ok(()) + Ok(Metadata::default().with_content_length(self.size)) } else { Err(parse_error(resp).map(|err| err.with_operation("Backend::ghac_commit"))) } diff --git a/core/src/services/github/writer.rs b/core/src/services/github/writer.rs index 381a497d2057..652a4e483683 100644 --- a/core/src/services/github/writer.rs +++ b/core/src/services/github/writer.rs @@ -38,13 +38,13 @@ impl GithubWriter { } impl oio::OneShotWrite for GithubWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self.core.upload(&self.path, bs).await?; let status = resp.status(); match status { - StatusCode::OK | StatusCode::CREATED => Ok(()), + StatusCode::OK | StatusCode::CREATED => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index c4497ce0c1e0..ee4041975129 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -299,8 +299,12 @@ impl Access for HdfsBackend { async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { let target_path = build_rooted_abs_path(&self.root, path); + let mut initial_size = 0; let target_exists = match self.client.metadata(&target_path) { - Ok(_) => true, + Ok(meta) => { + initial_size = meta.len(); + true + } Err(err) => { if err.kind() != io::ErrorKind::NotFound { return Err(new_std_io_error(err)); @@ -323,6 +327,9 @@ impl Access for HdfsBackend { let parent = get_parent(&target_path); self.client.create_dir(parent).map_err(new_std_io_error)?; } + if !should_append { + initial_size = 0; + } let mut open_options = self.client.open_file(); open_options.create(true); @@ -345,6 +352,7 @@ impl Access for HdfsBackend { f, Arc::clone(&self.client), target_exists, + initial_size, ), )) } @@ -474,8 +482,12 @@ impl Access for HdfsBackend { fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { let target_path = build_rooted_abs_path(&self.root, path); + let mut initial_size = 0; let target_exists = match self.client.metadata(&target_path) { - Ok(_) => true, + Ok(meta) => { + initial_size = meta.len(); + true + } Err(err) => { if err.kind() != io::ErrorKind::NotFound { return Err(new_std_io_error(err)); @@ -498,6 +510,9 @@ impl Access for HdfsBackend { let parent = get_parent(&target_path); self.client.create_dir(parent).map_err(new_std_io_error)?; } + if !should_append { + initial_size = 0; + } let mut open_options = self.client.open_file(); open_options.create(true); @@ -519,6 +534,7 @@ impl Access for HdfsBackend { f, Arc::clone(&self.client), target_exists, + initial_size, ), )) } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index e4123861a0f4..463e93e30c72 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -30,6 +30,7 @@ pub struct HdfsWriter { f: Option, client: Arc, target_path_exists: bool, + size: u64, } /// # Safety @@ -44,6 +45,7 @@ impl HdfsWriter { f: F, client: Arc, target_path_exists: bool, + initial_size: u64, ) -> Self { Self { target_path, @@ -51,12 +53,14 @@ impl HdfsWriter { f: Some(f), client, target_path_exists, + size: initial_size, } } } impl oio::Write for HdfsWriter { async fn write(&mut self, mut bs: Buffer) -> Result<()> { + self.size += bs.len() as u64; let f = self.f.as_mut().expect("HdfsWriter must be initialized"); while bs.has_remaining() { @@ -67,7 +71,7 @@ impl oio::Write for HdfsWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { let f = self.f.as_mut().expect("HdfsWriter must be initialized"); f.close().await.map_err(new_std_io_error)?; @@ -84,7 +88,7 @@ impl oio::Write for HdfsWriter { .map_err(new_std_io_error)? } - Ok(()) + Ok(Metadata::default().with_content_length(self.size)) } async fn abort(&mut self) -> Result<()> { @@ -97,6 +101,8 @@ impl oio::Write for HdfsWriter { impl oio::BlockingWrite for HdfsWriter { fn write(&mut self, mut bs: Buffer) -> Result<()> { + self.size += bs.len() as u64; + let f = self.f.as_mut().expect("HdfsWriter must be initialized"); while bs.has_remaining() { let n = f.write(bs.chunk()).map_err(new_std_io_error)?; @@ -106,7 +112,7 @@ impl oio::BlockingWrite for HdfsWriter { Ok(()) } - fn close(&mut self) -> Result<()> { + fn close(&mut self) -> Result { let f = self.f.as_mut().expect("HdfsWriter must be initialized"); f.flush().map_err(new_std_io_error)?; @@ -122,6 +128,6 @@ impl oio::BlockingWrite for HdfsWriter { .map_err(new_std_io_error)?; } - Ok(()) + Ok(Metadata::default().with_content_length(self.size)) } } diff --git a/core/src/services/hdfs_native/writer.rs b/core/src/services/hdfs_native/writer.rs index 4cab45b3be46..49e84be09425 100644 --- a/core/src/services/hdfs_native/writer.rs +++ b/core/src/services/hdfs_native/writer.rs @@ -35,7 +35,7 @@ impl oio::Write for HdfsNativeWriter { todo!() } - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { todo!() } diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index e04eafcf7360..6254a67d8a41 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -35,13 +35,13 @@ impl IpmfsWriter { } impl oio::OneShotWrite for IpmfsWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self.backend.ipmfs_write(&self.path, bs).await?; let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/koofr/writer.rs b/core/src/services/koofr/writer.rs index 1c391b224733..eb0956f3ac2a 100644 --- a/core/src/services/koofr/writer.rs +++ b/core/src/services/koofr/writer.rs @@ -38,7 +38,7 @@ impl KoofrWriter { } impl oio::OneShotWrite for KoofrWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { self.core.ensure_dir_exists(&self.path).await?; let resp = self.core.put(&self.path, bs).await?; @@ -46,7 +46,7 @@ impl oio::OneShotWrite for KoofrWriter { let status = resp.status(); match status { - StatusCode::OK | StatusCode::CREATED => Ok(()), + StatusCode::OK | StatusCode::CREATED => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/lakefs/writer.rs b/core/src/services/lakefs/writer.rs index 2aa087a36b66..c45805756a6d 100644 --- a/core/src/services/lakefs/writer.rs +++ b/core/src/services/lakefs/writer.rs @@ -37,13 +37,13 @@ impl LakefsWriter { } impl oio::OneShotWrite for LakefsWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self.core.upload_object(&self.path, &self.op, bs).await?; let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/monoiofs/backend.rs b/core/src/services/monoiofs/backend.rs index afdca2d7d67a..27d609b005da 100644 --- a/core/src/services/monoiofs/backend.rs +++ b/core/src/services/monoiofs/backend.rs @@ -121,8 +121,11 @@ impl Access for MonoiofsBackend { stat_has_last_modified: true, read: true, + write: true, write_can_append: true, + write_has_last_modified: true, + delete: true, rename: true, create_dir: true, diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs index 05c501530b0f..ce72802f9523 100644 --- a/core/src/services/monoiofs/core.rs +++ b/core/src/services/monoiofs/core.rs @@ -106,7 +106,7 @@ impl MonoiofsCore { .with_entries(io_uring_entries) .build() .expect("monoio runtime initialize should success"); - // run a infinite loop that receives TaskSpawner and calls + // run an infinite loop that receives TaskSpawner and calls // them in a context of monoio rt.block_on(async { while let Ok(spawner) = rx.recv_async().await { diff --git a/core/src/services/monoiofs/writer.rs b/core/src/services/monoiofs/writer.rs index 24ae3ab37216..7f9fe96ba98d 100644 --- a/core/src/services/monoiofs/writer.rs +++ b/core/src/services/monoiofs/writer.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use bytes::Buf; use bytes::Bytes; +use chrono::DateTime; use futures::channel::mpsc; use futures::channel::oneshot; use futures::SinkExt; @@ -36,6 +37,9 @@ enum WriterRequest { buf: Bytes, tx: oneshot::Sender>, }, + Stat { + tx: oneshot::Sender>, + }, Close { tx: oneshot::Sender>, }, @@ -101,6 +105,10 @@ impl MonoiofsWriter { // MonoiofsWriter::write cancelled let _ = tx.send(result.map_err(new_std_io_error)); } + WriterRequest::Stat { tx } => { + let result = file.metadata().await; + let _ = tx.send(result.map_err(new_std_io_error)); + } WriterRequest::Close { tx } => { let result = file.sync_all().await; // discard the result if send failed due to @@ -143,11 +151,33 @@ impl oio::Write for MonoiofsWriter { /// Send close request to worker thread and wait for result. Actual /// close happens in [`MonoiofsWriter::worker_entrypoint`] running /// on worker thread. - async fn close(&mut self) -> Result<()> { + async fn close(&mut self) -> Result { + let (tx, rx) = oneshot::channel(); + self.core + .unwrap(self.tx.send(WriterRequest::Stat { tx }).await); + let file_meta = self.core.unwrap(rx.await)?; + let (tx, rx) = oneshot::channel(); self.core .unwrap(self.tx.send(WriterRequest::Close { tx }).await); - self.core.unwrap(rx.await) + self.core.unwrap(rx.await)?; + + let mode = if file_meta.is_dir() { + EntryMode::DIR + } else if file_meta.is_file() { + EntryMode::FILE + } else { + EntryMode::Unknown + }; + let meta = Metadata::new(mode) + .with_content_length(file_meta.len()) + .with_last_modified( + file_meta + .modified() + .map(DateTime::from) + .map_err(new_std_io_error)?, + ); + Ok(meta) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index fb502aa0f1ff..cbb523864b4d 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -45,7 +45,7 @@ impl ObsWriter { } impl oio::MultipartWrite for ObsWriter { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let mut req = self .core .obs_put_object_request(&self.path, Some(size), &self.op, body)?; @@ -57,7 +57,7 @@ impl oio::MultipartWrite for ObsWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -122,7 +122,7 @@ impl oio::MultipartWrite for ObsWriter { } } - async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result<()> { + async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) -> Result { let parts = parts .iter() .map(|p| CompleteMultipartUploadRequestPart { @@ -139,7 +139,7 @@ impl oio::MultipartWrite for ObsWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -181,7 +181,7 @@ impl oio::AppendWrite for ObsWriter { } } - async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> { + async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result { let mut req = self .core .obs_append_object_request(&self.path, offset, size, &self.op, body)?; @@ -193,7 +193,7 @@ impl oio::AppendWrite for ObsWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index c6ed61f22c4f..4a29d90d2192 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -44,7 +44,7 @@ impl OneDriveWriter { } impl oio::OneShotWrite for OneDriveWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let size = bs.len(); if size <= Self::MAX_SIMPLE_SIZE { @@ -53,7 +53,7 @@ impl oio::OneShotWrite for OneDriveWriter { self.write_chunked(bs.to_bytes()).await?; } - Ok(()) + Ok(Metadata::default()) } } diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index d13f401f37b5..847a93c0f98c 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -44,7 +44,7 @@ impl OssWriter { } impl oio::MultipartWrite for OssWriter { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let mut req = self.core .oss_put_object_request(&self.path, Some(size), &self.op, body, false)?; @@ -56,7 +56,7 @@ impl oio::MultipartWrite for OssWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -127,7 +127,11 @@ impl oio::MultipartWrite for OssWriter { } } - async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartPart], + ) -> Result { let parts = parts .iter() .map(|p| MultipartUploadPart { @@ -144,7 +148,7 @@ impl oio::MultipartWrite for OssWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -185,7 +189,7 @@ impl oio::AppendWrite for OssWriter { } } - async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> { + async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result { let mut req = self .core .oss_append_object_request(&self.path, offset, size, &self.op, body)?; @@ -197,7 +201,7 @@ impl oio::AppendWrite for OssWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/pcloud/writer.rs b/core/src/services/pcloud/writer.rs index 5b4948d6e2ea..ffafc1e14f12 100644 --- a/core/src/services/pcloud/writer.rs +++ b/core/src/services/pcloud/writer.rs @@ -40,7 +40,7 @@ impl PcloudWriter { } impl oio::OneShotWrite for PcloudWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { self.core.ensure_dir_exists(&self.path).await?; let resp = self.core.upload_file(&self.path, bs).await?; @@ -58,7 +58,7 @@ impl oio::OneShotWrite for PcloudWriter { return Err(Error::new(ErrorKind::Unexpected, format!("{resp:?}"))); } - Ok(()) + Ok(Metadata::default()) } _ => Err(parse_error(resp)), } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 9145f22a7c8b..609db520eb6d 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -50,6 +50,7 @@ use crate::raw::oio::PageLister; use crate::raw::*; use crate::services::S3Config; use crate::*; +use constants::X_AMZ_VERSION_ID; /// Allow constructing correct region endpoint if user gives a global endpoint. static ENDPOINT_TEMPLATES: Lazy> = Lazy::new(|| { @@ -962,6 +963,9 @@ impl Access for S3Backend { write_with_if_match: !self.core.disable_write_with_if_match, write_with_if_not_exists: true, write_with_user_metadata: true, + write_has_content_length: true, + write_has_etag: true, + write_has_version: self.core.enable_versioning, // The min multipart size of S3 is 5 MiB. // @@ -1021,7 +1025,7 @@ impl Access for S3Backend { meta.with_user_metadata(user_meta); } - if let Some(v) = parse_header_to_str(headers, "x-amz-version-id")? { + if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? { meta.set_version(v); } diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 69cab8f2b93d..c58c29fc9a9d 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -72,6 +72,9 @@ pub mod constants { pub const X_AMZ_META_PREFIX: &str = "x-amz-meta-"; + pub const X_AMZ_VERSION_ID: &str = "x-amz-version-id"; + pub const X_AMZ_OBJECT_SIZE: &str = "x-amz-object-size"; + pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition"; pub const RESPONSE_CONTENT_TYPE: &str = "response-content-type"; pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control"; @@ -940,6 +943,20 @@ pub struct CompleteMultipartUploadRequestPart { pub checksum_crc32c: Option, } +/// Output of `CompleteMultipartUpload` operation +#[derive(Debug, Default, Deserialize)] +#[serde[default, rename_all = "PascalCase"]] +pub struct CompleteMultipartUploadResult { + pub bucket: String, + pub key: String, + pub location: String, + #[serde(rename = "ETag")] + pub etag: String, + pub code: String, + pub message: String, + pub request_id: String, +} + /// Request of DeleteObjects. #[derive(Default, Debug, Serialize)] #[serde(default, rename = "Delete", rename_all = "PascalCase")] @@ -1140,6 +1157,55 @@ mod tests { ) } + /// this example is from: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html + #[test] + fn test_deserialize_complete_multipart_upload_result() { + let bs = Bytes::from( + r#" + + http://Example-Bucket.s3.region.amazonaws.com/Example-Object + Example-Bucket + Example-Object + "3858f62230ac3c915f300c664312c11f-9" + "#, + ); + + let out: CompleteMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).expect("must success"); + + assert_eq!(out.bucket, "Example-Bucket"); + assert_eq!(out.key, "Example-Object"); + assert_eq!( + out.location, + "http://Example-Bucket.s3.region.amazonaws.com/Example-Object" + ); + assert_eq!(out.etag, "\"3858f62230ac3c915f300c664312c11f-9\""); + } + + #[test] + fn test_deserialize_complete_multipart_upload_result_when_return_error() { + let bs = Bytes::from( + r#" + + + InternalError + We encountered an internal error. Please try again. + 656c76696e6727732072657175657374 + Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg== + "#, + ); + + let out: CompleteMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).expect("must success"); + + assert_eq!(out.code, "InternalError"); + assert_eq!( + out.message, + "We encountered an internal error. Please try again." + ); + assert_eq!(out.request_id, "656c76696e6727732072657175657374"); + } + /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples #[test] fn test_serialize_delete_objects_request() { diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index e0c7c5084b59..34f5ce99faa7 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -17,15 +17,15 @@ use std::sync::Arc; -use bytes::Buf; -use http::StatusCode; - use super::core::*; use super::error::from_s3_error; use super::error::parse_error; use super::error::S3Error; use crate::raw::*; use crate::*; +use bytes::Buf; +use constants::{X_AMZ_OBJECT_SIZE, X_AMZ_VERSION_ID}; +use http::StatusCode; pub type S3Writers = oio::MultipartWriter; @@ -44,10 +44,26 @@ impl S3Writer { op, } } + + fn parse_header_into_meta(path: &str, headers: &http::HeaderMap) -> Result { + let mut meta = Metadata::new(EntryMode::from_path(path)); + if let Some(etag) = parse_etag(headers)? { + meta.set_etag(etag); + } + if let Some(version) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? { + meta.set_version(version); + } + if let Some(size) = parse_header_to_str(headers, X_AMZ_OBJECT_SIZE)? { + if let Ok(value) = size.parse() { + meta.set_content_length(value); + } + } + Ok(meta) + } } impl oio::MultipartWrite for S3Writer { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let mut req = self .core .s3_put_object_request(&self.path, Some(size), &self.op, body)?; @@ -58,8 +74,10 @@ impl oio::MultipartWrite for S3Writer { let status = resp.status(); + let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?; + match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(meta), _ => Err(parse_error(resp)), } } @@ -133,7 +151,11 @@ impl oio::MultipartWrite for S3Writer { } } - async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartPart], + ) -> Result { let parts = parts .iter() .map(|p| match &self.core.checksum_algorithm { @@ -159,18 +181,30 @@ impl oio::MultipartWrite for S3Writer { let status = resp.status(); + let mut meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?; + match status { StatusCode::OK => { // still check if there is any error because S3 might return error for status code 200 // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Example_4 let (parts, body) = resp.into_parts(); - let maybe_error: S3Error = + + let ret: CompleteMultipartUploadResult = quick_xml::de::from_reader(body.reader()).map_err(new_xml_deserialize_error)?; - if !maybe_error.code.is_empty() { - return Err(from_s3_error(maybe_error, parts)); + if !ret.code.is_empty() { + return Err(from_s3_error( + S3Error { + code: ret.code, + message: ret.message, + resource: "".to_string(), + request_id: ret.request_id, + }, + parts, + )); } + meta.set_etag(&ret.etag); - Ok(()) + Ok(meta) } _ => Err(parse_error(resp)), } diff --git a/core/src/services/seafile/writer.rs b/core/src/services/seafile/writer.rs index f187b75c478b..98705a1bdcfb 100644 --- a/core/src/services/seafile/writer.rs +++ b/core/src/services/seafile/writer.rs @@ -45,7 +45,7 @@ impl SeafileWriter { } impl oio::OneShotWrite for SeafileWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let upload_url = self.core.get_upload_url().await?; let req = Request::post(upload_url); @@ -79,7 +79,7 @@ impl oio::OneShotWrite for SeafileWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 9b86c789577a..19474fe3c69c 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -28,18 +28,21 @@ use crate::*; pub struct SftpWriter { /// TODO: maybe we can use `File` directly? file: Pin>, + write_bytes_count: u64, } impl SftpWriter { pub fn new(file: File) -> Self { SftpWriter { file: Box::pin(TokioCompatFile::new(file)), + write_bytes_count: 0, } } } impl oio::Write for SftpWriter { async fn write(&mut self, mut bs: Buffer) -> Result<()> { + self.write_bytes_count += bs.len() as u64; while bs.has_remaining() { let n = self .file @@ -52,8 +55,10 @@ impl oio::Write for SftpWriter { Ok(()) } - async fn close(&mut self) -> Result<()> { - self.file.shutdown().await.map_err(new_std_io_error) + async fn close(&mut self) -> Result { + self.file.shutdown().await.map_err(new_std_io_error)?; + + Ok(Metadata::default().with_content_length(self.write_bytes_count)) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index 2a33d520b30a..fd31acd77a74 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -42,7 +42,7 @@ impl SupabaseWriter { } impl oio::OneShotWrite for SupabaseWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let mut req = self.core.supabase_upload_object_request( &self.path, Some(bs.len()), @@ -55,7 +55,7 @@ impl oio::OneShotWrite for SupabaseWriter { let resp = self.core.send(req).await?; match resp.status() { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/swift/writer.rs b/core/src/services/swift/writer.rs index a6590ee2120c..39db365ba172 100644 --- a/core/src/services/swift/writer.rs +++ b/core/src/services/swift/writer.rs @@ -36,7 +36,7 @@ impl SwiftWriter { } impl oio::OneShotWrite for SwiftWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self .core .swift_create_object(&self.path, bs.len() as u64, bs) @@ -45,7 +45,7 @@ impl oio::OneShotWrite for SwiftWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/upyun/writer.rs b/core/src/services/upyun/writer.rs index b0a921b66ea2..cfef21cca84c 100644 --- a/core/src/services/upyun/writer.rs +++ b/core/src/services/upyun/writer.rs @@ -40,7 +40,7 @@ impl UpyunWriter { } impl oio::MultipartWrite for UpyunWriter { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let req = self.core.upload(&self.path, Some(size), &self.op, body)?; let resp = self.core.send(req).await?; @@ -48,7 +48,7 @@ impl oio::MultipartWrite for UpyunWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -100,7 +100,11 @@ impl oio::MultipartWrite for UpyunWriter { } } - async fn complete_part(&self, upload_id: &str, _parts: &[oio::MultipartPart]) -> Result<()> { + async fn complete_part( + &self, + upload_id: &str, + _parts: &[oio::MultipartPart], + ) -> Result { let resp = self .core .complete_multipart_upload(&self.path, upload_id) @@ -109,7 +113,7 @@ impl oio::MultipartWrite for UpyunWriter { let status = resp.status(); match status { - StatusCode::NO_CONTENT => Ok(()), + StatusCode::NO_CONTENT => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 64fe6846be1e..66dfa976f611 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -40,7 +40,7 @@ impl VercelArtifactsWriter { } impl oio::OneShotWrite for VercelArtifactsWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self .backend .vercel_artifacts_put(self.path.as_str(), bs.len() as u64, bs) @@ -49,7 +49,7 @@ impl oio::OneShotWrite for VercelArtifactsWriter { let status = resp.status(); match status { - StatusCode::OK | StatusCode::ACCEPTED => Ok(()), + StatusCode::OK | StatusCode::ACCEPTED => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/vercel_blob/writer.rs b/core/src/services/vercel_blob/writer.rs index 8f2542280ea2..bcb4acb4fc3a 100644 --- a/core/src/services/vercel_blob/writer.rs +++ b/core/src/services/vercel_blob/writer.rs @@ -43,7 +43,7 @@ impl VercelBlobWriter { } impl oio::MultipartWrite for VercelBlobWriter { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let req = self .core .get_put_request(&self.path, Some(size), &self.op, body)?; @@ -53,7 +53,7 @@ impl oio::MultipartWrite for VercelBlobWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -112,7 +112,11 @@ impl oio::MultipartWrite for VercelBlobWriter { } } - async fn complete_part(&self, upload_id: &str, parts: &[oio::MultipartPart]) -> Result<()> { + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartPart], + ) -> Result { let parts = parts .iter() .map(|p| Part { @@ -129,7 +133,7 @@ impl oio::MultipartWrite for VercelBlobWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index ea7c56f556ff..fee1534597f5 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -38,7 +38,7 @@ impl WebdavWriter { } impl oio::OneShotWrite for WebdavWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { let resp = self .core .webdav_put(&self.path, Some(bs.len() as u64), &self.op, bs) @@ -47,7 +47,9 @@ impl oio::OneShotWrite for WebdavWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => Ok(()), + StatusCode::CREATED | StatusCode::OK | StatusCode::NO_CONTENT => { + Ok(Metadata::default()) + } _ => Err(parse_error(resp)), } } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index e8876793e627..2f63d50a05de 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +use bytes::Buf; use http::StatusCode; use uuid::Uuid; use super::backend::WebhdfsBackend; use super::error::parse_error; use crate::raw::*; +use crate::services::webhdfs::message::FileStatusWrapper; use crate::*; pub type WebhdfsWriters = @@ -40,7 +42,7 @@ impl WebhdfsWriter { } impl oio::BlockWrite for WebhdfsWriter { - async fn write_once(&self, size: u64, body: Buffer) -> Result<()> { + async fn write_once(&self, size: u64, body: Buffer) -> Result { let req = self .backend .webhdfs_create_object_request(&self.path, Some(size), &self.op, body) @@ -50,7 +52,7 @@ impl oio::BlockWrite for WebhdfsWriter { let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -81,7 +83,7 @@ impl oio::BlockWrite for WebhdfsWriter { } } - async fn complete_block(&self, block_ids: Vec) -> Result<()> { + async fn complete_block(&self, block_ids: Vec) -> Result { let Some(ref atomic_write_dir) = self.backend.atomic_write_dir else { return Err(Error::new( ErrorKind::Unsupported, @@ -123,7 +125,7 @@ impl oio::BlockWrite for WebhdfsWriter { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } @@ -142,19 +144,16 @@ impl oio::BlockWrite for WebhdfsWriter { impl oio::AppendWrite for WebhdfsWriter { async fn offset(&self) -> Result { - Ok(0) - } - - async fn append(&self, _offset: u64, size: u64, body: Buffer) -> Result<()> { let resp = self.backend.webhdfs_get_file_status(&self.path).await?; - let status = resp.status(); - - let location; - match status { StatusCode::OK => { - location = self.backend.webhdfs_init_append_request(&self.path).await?; + let bs = resp.into_body(); + let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader()) + .map_err(new_json_deserialize_error)? + .file_status; + + Ok(file_status.length) } StatusCode::NOT_FOUND => { let req = self @@ -163,26 +162,25 @@ impl oio::AppendWrite for WebhdfsWriter { .await?; let resp = self.backend.client.send(req).await?; - let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => { - location = self.backend.webhdfs_init_append_request(&self.path).await?; - } - _ => return Err(parse_error(resp)), + StatusCode::CREATED | StatusCode::OK => Ok(0), + _ => Err(parse_error(resp)), } } - _ => return Err(parse_error(resp)), + _ => Err(parse_error(resp)), } + } + async fn append(&self, _offset: u64, size: u64, body: Buffer) -> Result { + let location = self.backend.webhdfs_init_append_request(&self.path).await?; let req = self.backend.webhdfs_append_request(&location, size, body)?; - let resp = self.backend.client.send(req).await?; let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/services/yandex_disk/writer.rs b/core/src/services/yandex_disk/writer.rs index 709b265c9327..dc8079b215f8 100644 --- a/core/src/services/yandex_disk/writer.rs +++ b/core/src/services/yandex_disk/writer.rs @@ -39,7 +39,7 @@ impl YandexDiskWriter { } impl oio::OneShotWrite for YandexDiskWriter { - async fn write_once(&self, bs: Buffer) -> Result<()> { + async fn write_once(&self, bs: Buffer) -> Result { self.core.ensure_dir_exists(&self.path).await?; let upload_url = self.core.get_upload_url(&self.path).await?; @@ -53,7 +53,7 @@ impl oio::OneShotWrite for YandexDiskWriter { let status = resp.status(); match status { - StatusCode::CREATED => Ok(()), + StatusCode::CREATED => Ok(Metadata::default()), _ => Err(parse_error(resp)), } } diff --git a/core/src/types/blocking_write/blocking_writer.rs b/core/src/types/blocking_write/blocking_writer.rs index b27c4df6ee1b..5d59d9ca5851 100644 --- a/core/src/types/blocking_write/blocking_writer.rs +++ b/core/src/types/blocking_write/blocking_writer.rs @@ -83,7 +83,7 @@ impl BlockingWriter { /// /// Close should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. - pub fn close(&mut self) -> Result<()> { + pub fn close(&mut self) -> Result { self.inner.close() } diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index b9e60d305088..c74fe3628f19 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -157,6 +157,15 @@ pub struct Capability { /// Maximum total size supported for write operations. /// For example, Cloudflare D1 has a 1MB total size limit. pub write_total_max_size: Option, + /// Indicates whether content length information is available in write response. + /// by default, it should be `true` for all write operations. + pub write_has_content_length: bool, + /// Indicates whether last modified timestamp is available in write response + pub write_has_last_modified: bool, + /// Indicates whether entity tag is available in write response + pub write_has_etag: bool, + /// Indicates whether version information is available in write response + pub write_has_version: bool, /// Indicates if directory creation is supported. pub create_dir: bool, diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index d9fafba501d2..d426db5a8b25 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -184,7 +184,7 @@ impl WriteGenerator { } /// Finish the write process. - pub async fn close(&mut self) -> Result<()> { + pub async fn close(&mut self) -> Result { loop { if self.buffer.is_empty() { break; @@ -271,7 +271,7 @@ impl WriteGenerator { } /// Finish the write process. - pub fn close(&mut self) -> Result<()> { + pub fn close(&mut self) -> Result { loop { if self.buffer.is_empty() { break; @@ -315,8 +315,8 @@ mod tests { Ok(()) } - async fn close(&mut self) -> Result<()> { - Ok(()) + async fn close(&mut self) -> Result { + Ok(Metadata::default()) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/types/metadata.rs b/core/src/types/metadata.rs index e6e6130aa195..0508f1988c14 100644 --- a/core/src/types/metadata.rs +++ b/core/src/types/metadata.rs @@ -47,7 +47,7 @@ use crate::*; /// | `Some(false)` | `true` | **The metadata's associated version is not the latest version and is deleted.** This represents a historical version that has been marked for deletion. Users will need to specify the version ID to access it, and accessing it may be subject to specific delete marker behavior (e.g., in S3, it might not return actual data but a specific delete marker response). | /// | `None` | `false` | **The metadata's associated file is not deleted, but its version status is either unknown or it is not the latest version.** This likely indicates that versioning is not enabled for this file, or versioning information is unavailable. | /// | `None` | `true` | **The metadata's associated file is deleted, but its version status is either unknown or it is not the latest version.** This typically means the file was deleted without versioning enabled, or its versioning information is unavailable. This may represent an actual data deletion operation rather than an S3 delete marker. | -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, Default)] pub struct Metadata { mode: EntryMode, diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 1730ead88755..b7d4e24b2a64 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -470,7 +470,7 @@ impl BlockingOperator { /// # Ok(()) /// # } /// ``` - pub fn write(&self, path: &str, bs: impl Into) -> Result<()> { + pub fn write(&self, path: &str, bs: impl Into) -> Result { self.write_with(path, bs).call() } @@ -633,8 +633,7 @@ impl BlockingOperator { let context = WriteContext::new(inner, path, args, options); let mut w = BlockingWriter::new(context)?; w.write(bs)?; - w.close()?; - Ok(()) + w.close() }, )) } diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index a65e572fbbd9..9f23f4ed9d21 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -727,7 +727,7 @@ impl Operator { /// # Ok(()) /// # } /// ``` - pub async fn write(&self, path: &str, bs: impl Into) -> Result<()> { + pub async fn write(&self, path: &str, bs: impl Into) -> Result { let bs = bs.into(); self.write_with(path, bs).await } @@ -1007,7 +1007,7 @@ impl Operator { &self, path: &str, bs: impl Into, - ) -> FutureWrite>> { + ) -> FutureWrite>> { let path = normalize_path(path); let bs = bs.into(); @@ -1032,8 +1032,7 @@ impl Operator { let context = WriteContext::new(inner, path, args, options); let mut w = Writer::new(context).await?; w.write(bs).await?; - w.close().await?; - Ok(()) + w.close().await }, ) } diff --git a/core/src/types/operator/operator_functions.rs b/core/src/types/operator/operator_functions.rs index cdfcc555da3e..b856e83eb072 100644 --- a/core/src/types/operator/operator_functions.rs +++ b/core/src/types/operator/operator_functions.rs @@ -69,7 +69,7 @@ impl OperatorFunction { pub struct FunctionWrite( /// The args for FunctionWrite is a bit special because we also /// need to move the bytes input this function. - pub(crate) OperatorFunction<(OpWrite, OpWriter, Buffer), ()>, + pub(crate) OperatorFunction<(OpWrite, OpWriter, Buffer), Metadata>, ); impl FunctionWrite { @@ -135,7 +135,7 @@ impl FunctionWrite { /// Call the function to consume all the input and generate a /// result. - pub fn call(self) -> Result<()> { + pub fn call(self) -> Result { self.0.call() } } diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 4c1b90657df2..41f68ed9f8ec 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -565,9 +565,9 @@ impl>> FutureReader { /// Future that generated by [`Operator::write_with`]. /// /// Users can add more options by public functions provided by this struct. -pub type FutureWrite = OperatorFuture<(OpWrite, OpWriter, Buffer), (), F>; +pub type FutureWrite = OperatorFuture<(OpWrite, OpWriter, Buffer), Metadata, F>; -impl>> FutureWrite { +impl>> FutureWrite { /// Set the executor for this operation. pub fn executor(self, executor: Executor) -> Self { self.map(|(args, options, bs)| (args.with_executor(executor), options, bs)) diff --git a/core/src/types/write/buffer_sink.rs b/core/src/types/write/buffer_sink.rs index 61a8be457c69..f993455d0541 100644 --- a/core/src/types/write/buffer_sink.rs +++ b/core/src/types/write/buffer_sink.rs @@ -36,7 +36,7 @@ pub struct BufferSink { enum State { Idle(Option>), Writing(BoxedStaticFuture<(WriteGenerator, Result)>), - Closing(BoxedStaticFuture<(WriteGenerator, Result<()>)>), + Closing(BoxedStaticFuture<(WriteGenerator, Result)>), } /// # Safety diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs index ca2f31ac6b7b..66f7c948eb28 100644 --- a/core/src/types/write/writer.rs +++ b/core/src/types/write/writer.rs @@ -175,7 +175,7 @@ impl Writer { /// /// Close should only be called when the writer is not closed or /// aborted, otherwise an unexpected error could be returned. - pub async fn close(&mut self) -> Result<()> { + pub async fn close(&mut self) -> Result { self.inner.close().await } diff --git a/core/tests/behavior/async_write.rs b/core/tests/behavior/async_write.rs index 8e577450f232..3ed3bf00f355 100644 --- a/core/tests/behavior/async_write.rs +++ b/core/tests/behavior/async_write.rs @@ -49,6 +49,7 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_write_with_if_not_exists, test_write_with_if_match, test_write_with_user_metadata, + test_write_returns_metadata, test_writer_write, test_writer_write_with_overwrite, test_writer_write_with_concurrent, @@ -57,7 +58,8 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_writer_abort, test_writer_abort_with_concurrent, test_writer_futures_copy, - test_writer_futures_copy_with_concurrent + test_writer_futures_copy_with_concurrent, + test_writer_return_metadata )) } @@ -65,6 +67,7 @@ pub fn tests(op: &Operator, tests: &mut Vec) { tests.extend(async_trials!( op, test_write_with_append, + test_write_with_append_returns_metadata, test_writer_with_append )) } @@ -257,6 +260,28 @@ pub async fn test_write_with_user_metadata(op: Operator) -> Result<()> { Ok(()) } +pub async fn test_write_returns_metadata(op: Operator) -> Result<()> { + let cap = op.info().full_capability(); + + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + + let meta = op.write(&path, content).await?; + let stat_meta = op.stat(&path).await?; + + assert_eq!(stat_meta.content_length(), meta.content_length()); + if cap.write_has_etag { + assert_eq!(stat_meta.etag(), meta.etag()); + } + if cap.write_has_last_modified { + assert_eq!(stat_meta.last_modified(), meta.last_modified()); + } + if cap.write_has_version { + assert_eq!(stat_meta.version(), meta.version()); + } + + Ok(()) +} + /// Delete existing file should succeed. pub async fn test_writer_abort(op: Operator) -> Result<()> { let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); @@ -551,6 +576,38 @@ pub async fn test_writer_futures_copy_with_concurrent(op: Operator) -> Result<() Ok(()) } +pub async fn test_writer_return_metadata(op: Operator) -> Result<()> { + let cap = op.info().full_capability(); + if !cap.write_can_multi { + return Ok(()); + } + + let path = TEST_FIXTURE.new_file_path(); + let size = 5 * 1024 * 1024; // write file with 5 MiB + let content_a = gen_fixed_bytes(size); + let content_b = gen_fixed_bytes(size); + + let mut w = op.writer(&path).await?; + w.write(content_a.clone()).await?; + w.write(content_b.clone()).await?; + let meta = w.close().await?; + + let stat_meta = op.stat(&path).await.expect("stat must succeed"); + + assert_eq!(stat_meta.content_length(), meta.content_length()); + if cap.write_has_last_modified { + assert_eq!(stat_meta.last_modified(), meta.last_modified()); + } + if cap.write_has_etag { + assert_eq!(stat_meta.etag(), meta.etag()); + } + if cap.write_has_version { + assert_eq!(stat_meta.version(), meta.version()); + } + + Ok(()) +} + /// Test append to a file must success. pub async fn test_write_with_append(op: Operator) -> Result<()> { let path = TEST_FIXTURE.new_file_path(); @@ -583,6 +640,40 @@ pub async fn test_write_with_append(op: Operator) -> Result<()> { Ok(()) } +pub async fn test_write_with_append_returns_metadata(op: Operator) -> Result<()> { + let cap = op.info().full_capability(); + + let path = TEST_FIXTURE.new_file_path(); + let (content_one, _) = gen_bytes(cap); + let (content_two, _) = gen_bytes(cap); + + op.write_with(&path, content_one.clone()) + .append(true) + .await + .expect("append file first time must success"); + + let meta = op + .write_with(&path, content_two.clone()) + .append(true) + .await + .expect("append to an existing file must success"); + + let stat_meta = op.stat(&path).await.expect("stat must succeed"); + + assert_eq!(stat_meta.content_length(), meta.content_length()); + if cap.write_has_last_modified { + assert_eq!(stat_meta.last_modified(), meta.last_modified()); + } + if cap.write_has_etag { + assert_eq!(stat_meta.etag(), meta.etag()); + } + if cap.write_has_version { + assert_eq!(stat_meta.version(), meta.version()); + } + + Ok(()) +} + /// Copy data from reader to writer pub async fn test_writer_with_append(op: Operator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string(); diff --git a/core/tests/behavior/blocking_write.rs b/core/tests/behavior/blocking_write.rs index a63b828a782f..4765d63b15eb 100644 --- a/core/tests/behavior/blocking_write.rs +++ b/core/tests/behavior/blocking_write.rs @@ -34,7 +34,8 @@ pub fn tests(op: &Operator, tests: &mut Vec) { op, test_blocking_write_file, test_blocking_write_with_dir_path, - test_blocking_write_with_special_chars + test_blocking_write_with_special_chars, + test_blocking_write_returns_metadata )) } @@ -42,6 +43,7 @@ pub fn tests(op: &Operator, tests: &mut Vec) { tests.extend(blocking_trials!( op, test_blocking_write_with_append, + test_blocking_write_with_append_returns_metadata, test_blocking_writer_with_append )) } @@ -100,6 +102,28 @@ pub fn test_blocking_write_with_special_chars(op: BlockingOperator) -> Result<() Ok(()) } +pub fn test_blocking_write_returns_metadata(op: BlockingOperator) -> Result<()> { + let cap = op.info().full_capability(); + + let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); + let meta = op.write(&path, content)?; + + let stat_meta = op.stat(&path).expect("stat must succeed"); + + assert_eq!(meta.content_length(), stat_meta.content_length()); + if cap.write_has_last_modified { + assert_eq!(meta.last_modified(), stat_meta.last_modified()); + } + if cap.write_has_etag { + assert_eq!(meta.etag(), stat_meta.etag()); + } + if cap.write_has_version { + assert_eq!(meta.version(), stat_meta.version()); + } + + Ok(()) +} + /// Test append to a file must success. pub fn test_blocking_write_with_append(op: BlockingOperator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string(); @@ -127,6 +151,39 @@ pub fn test_blocking_write_with_append(op: BlockingOperator) -> Result<()> { Ok(()) } +pub fn test_blocking_write_with_append_returns_metadata(op: BlockingOperator) -> Result<()> { + let cap = op.info().full_capability(); + + let (path, content_one, _) = TEST_FIXTURE.new_file(op.clone()); + + op.write_with(&path, content_one) + .append(true) + .call() + .expect("append file first time must success"); + + let (data, _) = gen_bytes(op.info().full_capability()); + let meta = op + .write_with(&path, data) + .append(true) + .call() + .expect("append to an existing file must success"); + + let stat_meta = op.stat(&path).expect("stat must succeed"); + + assert_eq!(meta.content_length(), stat_meta.content_length()); + if cap.write_has_last_modified { + assert_eq!(meta.last_modified(), stat_meta.last_modified()); + } + if cap.write_has_etag { + assert_eq!(meta.etag(), stat_meta.etag()); + } + if cap.write_has_version { + assert_eq!(meta.version(), stat_meta.version()); + } + + Ok(()) +} + /// Copy data from reader to writer pub fn test_blocking_writer_with_append(op: BlockingOperator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string(); diff --git a/integrations/fuse3/src/file_system.rs b/integrations/fuse3/src/file_system.rs index 2811149b2177..d49803669af0 100644 --- a/integrations/fuse3/src/file_system.rs +++ b/integrations/fuse3/src/file_system.rs @@ -557,7 +557,7 @@ impl PathFilesystem for Filesystem { if let Some(inner_writer) = file.inner_writer { let mut lock = inner_writer.lock().await; let res = lock.writer.close().await.map_err(opendal_error2errno); - return res; + return res.map(|_| ()); } if matches!(path, Some(ref p) if p != &file.path) { diff --git a/integrations/parquet/src/async_writer.rs b/integrations/parquet/src/async_writer.rs index 7f4ab16e5c3d..7f92fd6b69b4 100644 --- a/integrations/parquet/src/async_writer.rs +++ b/integrations/parquet/src/async_writer.rs @@ -97,6 +97,7 @@ impl AsyncFileWriter for AsyncWriter { self.inner .close() .await + .map(|_| ()) .map_err(|err| ParquetError::External(Box::new(err))) }) }