Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core)!: implement write returns metadata #5562

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions bindings/cpp/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ unsafe fn operator_write(op: ffi::OperatorPtr, path: String, bs: Vec<u8>) -> Rus
.0
.write(&path, bs)
.await
.map(|_| ())
.map_err(|e| CxxAsyncException::new(e.to_string().into_boxed_str()))?)
})
}
2 changes: 1 addition & 1 deletion bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl Operator {
// Safety: The bytes created from bs will be dropped after the function call.
// So it's safe to declare its lifetime as 'static.
fn write(&self, path: &str, bs: &'static [u8]) -> Result<()> {
Ok(self.0.write(path, bs)?)
Ok(self.0.write(path, bs).map(|_| ())?)
}

fn exists(&self, path: &str) -> Result<bool> {
Expand Down
2 changes: 1 addition & 1 deletion bindings/dotnet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub unsafe extern "C" fn blocking_operator_write(
let op = &*(op);
let path = std::ffi::CStr::from_ptr(path).to_str().unwrap();
let content = std::ffi::CStr::from_ptr(content).to_str().unwrap();
op.write(path, content.to_owned()).unwrap()
op.write(path, content.to_owned()).map(|_| ()).unwrap()
}

/// # Safety
Expand Down
2 changes: 1 addition & 1 deletion bindings/haskell/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ pub unsafe extern "C" fn blocking_write(
let bytes = Vec::from_raw_parts(bytes as *mut u8, len, len);

let res = match op.write(path_str, bytes.clone()) {
Ok(()) => FFIResult::ok(()),
Ok(_) => FFIResult::ok(()),
Err(e) => FFIResult::err_with_source("Failed to write", e),
};

Expand Down
8 changes: 6 additions & 2 deletions bindings/java/src/async_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ fn intern_write(
}

async fn do_write(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.write(&path, content).await?)
Ok(op.write(&path, content).await.map(|_| ())?)
}

/// # Safety
Expand Down Expand Up @@ -182,7 +182,11 @@ fn intern_append(
}

async fn do_append(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.write_with(&path, content).append(true).await?)
Ok(op
.write_with(&path, content)
.append(true)
.await
.map(|_| ())?)
}

/// # Safety
Expand Down
2 changes: 1 addition & 1 deletion bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn intern_write(
) -> Result<()> {
let path = jstring_to_string(env, &path)?;
let content = env.convert_byte_array(content)?;
Ok(op.write(&path, content)?)
Ok(op.write(&path, content).map(|_| ())?)
}

/// # Safety
Expand Down
8 changes: 4 additions & 4 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Operator {
writer = writer.cache_control(cache_control);
}
}
writer.await.map_err(format_napi_error)
writer.await.map(|_| ()).map_err(format_napi_error)
}

//noinspection DuplicatedCode
Expand Down Expand Up @@ -371,7 +371,7 @@ impl Operator {
writer = writer.cache_control(cache_control);
}
}
writer.call().map_err(format_napi_error)
writer.call().map(|_| ()).map_err(format_napi_error)
}

/// Copy file according to given `from` and `to` path.
Expand Down Expand Up @@ -809,7 +809,7 @@ impl BlockingWriter {
/// ```
#[napi]
pub unsafe fn close(&mut self) -> Result<()> {
self.0.close().map_err(format_napi_error)
self.0.close().map(|_| ()).map_err(format_napi_error)
}
}

Expand Down Expand Up @@ -855,7 +855,7 @@ impl Writer {
/// ```
#[napi]
pub async unsafe fn close(&mut self) -> Result<()> {
self.0.close().await.map_err(format_napi_error)
self.0.close().await.map(|_| ()).map_err(format_napi_error)
}
}

Expand Down
2 changes: 1 addition & 1 deletion bindings/ocaml/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub fn blocking_write(
path: String,
bs: &'static [u8],
) -> Result<(), String> {
map_res_error(operator.0.write(path.as_str(), bs))
map_res_error(operator.0.write(path.as_str(), bs).map(|_| ()))
}

#[ocaml::func]
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl Operator {
write = write.cache_control(cache_control);
}

write.call().map_err(format_pyerr)
write.call().map(|_| ()).map_err(format_pyerr)
}

/// Get current path's metadata **without cache** directly.
Expand Down Expand Up @@ -351,7 +351,7 @@ impl AsyncOperator {
if let Some(cache_control) = &kwargs.cache_control {
write = write.cache_control(cache_control);
}
write.await.map_err(format_pyerr)
write.await.map(|_| ()).map_err(format_pyerr)
})
}

Expand Down
1 change: 1 addition & 0 deletions bindings/ruby/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl Operator {
rb_self
.0
.write(&path, bs.to_bytes())
.map(|_| ())
.map_err(|err| Error::new(ruby.exception_runtime_error(), err.to_string()))
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/async_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
}

#[async_backtrace::framed]
async fn close(&mut self) -> Result<()> {
async fn close(&mut self) -> Result<Metadata> {
self.inner.close().await
}

Expand All @@ -194,7 +194,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for AsyncBacktraceWrapper<R> {
self.inner.write(bs)
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
self.inner.close()
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
.instrument_await(format!("opendal::{}", Operation::WriterAbort.into_static()))
}

fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
self.inner
.close()
.instrument_await(format!("opendal::{}", Operation::WriterClose.into_static()))
Expand All @@ -218,7 +218,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
self.inner.write(bs)
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
self.inner.close()
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
self.handle.block_on(self.inner.write(bs))
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
self.handle.block_on(self.inner.close())
}
}
Expand Down
17 changes: 11 additions & 6 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
if cap.list && cap.write_can_empty {
cap.create_dir = true;
}
// write operations should always return content length
cap.write_has_content_length = true;
meta.into()
}

Expand Down Expand Up @@ -517,15 +519,17 @@ where
w.write(bs).await
}

async fn close(&mut self) -> Result<()> {
async fn close(&mut self) -> Result<Metadata> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.close().await?;
// we must return `Err` before setting inner to None; otherwise,
// we won't be able to retry `close` in `RetryLayer`.
let ret = w.close().await?;
self.inner = None;

Ok(())
Ok(ret)
}

async fn abort(&mut self) -> Result<()> {
Expand All @@ -552,13 +556,14 @@ where
w.write(bs)
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.close()?;
let ret = w.close()?;
self.inner = None;
Ok(())

Ok(ret)
}
}
4 changes: 2 additions & 2 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
self.inner.write(bs).await
}

async fn close(&mut self) -> Result<()> {
async fn close(&mut self) -> Result<Metadata> {
self.inner.close().await
}

Expand All @@ -281,7 +281,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
self.inner.write(bs)
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
self.inner.close()
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/correctness_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ mod tests {
Ok(())
}

async fn close(&mut self) -> Result<()> {
Ok(())
async fn close(&mut self) -> Result<Metadata> {
Ok(Metadata::default())
}

async fn abort(&mut self) -> Result<()> {
Expand Down
10 changes: 6 additions & 4 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,15 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
})
}

async fn close(&mut self) -> Result<()> {
async fn close(&mut self) -> Result<Metadata> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_close_start, c_path.as_ptr());
self.inner
.close()
.await
.map(|_| {
.map(|meta| {
probe_lazy!(opendal, writer_close_ok, c_path.as_ptr());
meta
})
.map_err(|err| {
probe_lazy!(opendal, writer_close_error, c_path.as_ptr());
Expand All @@ -413,13 +414,14 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
})
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_writer_close_start, c_path.as_ptr());
self.inner
.close()
.map(|_| {
.map(|meta| {
probe_lazy!(opendal, blocking_writer_close_ok, c_path.as_ptr());
meta
})
.map_err(|err| {
probe_lazy!(opendal, blocking_writer_close_error, c_path.as_ptr());
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
})
}

async fn close(&mut self) -> Result<()> {
async fn close(&mut self) -> Result<Metadata> {
self.inner.close().await.map_err(|err| {
err.with_operation(Operation::WriterClose)
.with_context("service", self.scheme)
Expand Down Expand Up @@ -411,7 +411,7 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
})
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
self.inner.close().map_err(|err| {
err.with_operation(Operation::BlockingWriterClose)
.with_context("service", self.scheme)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/fastrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl<R: oio::Write> oio::Write for FastraceWrapper<R> {
self.inner.abort()
}

fn close(&mut self) -> impl Future<Output = Result<()>> + MaybeSend {
fn close(&mut self) -> impl Future<Output = Result<Metadata>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(Operation::WriterClose.into_static());
self.inner.close()
Expand All @@ -342,7 +342,7 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for FastraceWrapper<R> {
self.inner.write(bs)
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
let _g = self.span.set_local_parent();
let _span =
LocalSpan::enter_with_local_parent(Operation::BlockingWriterClose.into_static());
Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
}
}

async fn close(&mut self) -> Result<()> {
async fn close(&mut self) -> Result<Metadata> {
self.logger.log(
&self.info,
Operation::WriterClose,
Expand All @@ -1057,15 +1057,15 @@ impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
);

match self.inner.close().await {
Ok(_) => {
Ok(meta) => {
self.logger.log(
&self.info,
Operation::WriterClose,
&[("path", &self.path), ("written", &self.written.to_string())],
"succeeded",
None,
);
Ok(())
Ok(meta)
}
Err(err) => {
self.logger.log(
Expand Down Expand Up @@ -1129,7 +1129,7 @@ impl<W: oio::BlockingWrite, I: LoggingInterceptor> oio::BlockingWrite for Loggin
}
}

fn close(&mut self) -> Result<()> {
fn close(&mut self) -> Result<Metadata> {
self.logger.log(
&self.info,
Operation::BlockingWriterClose,
Expand All @@ -1139,15 +1139,15 @@ impl<W: oio::BlockingWrite, I: LoggingInterceptor> oio::BlockingWrite for Loggin
);

match self.inner.close() {
Ok(_) => {
Ok(meta) => {
self.logger.log(
&self.info,
Operation::BlockingWriterWrite,
&[("path", &self.path), ("written", &self.written.to_string())],
"succeeded",
None,
);
Ok(())
Ok(meta)
}
Err(err) => {
self.logger.log(
Expand Down
Loading
Loading