From 5844f616faa13d2e90593a2c6a4f2b8997e87cd3 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 9 Aug 2024 19:19:15 +0800 Subject: [PATCH 1/2] refactor(core): Use kv based context to avoid allocations Signed-off-by: Xuanwo --- core/src/layers/logging.rs | 1021 ++++++++++++++++++++++------------- core/src/raw/operation.rs | 17 + core/src/raw/tests/utils.rs | 68 +-- 3 files changed, 675 insertions(+), 431 deletions(-) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 1dfdba85ef0d..277c9dfa0c4d 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -16,11 +16,8 @@ // under the License. use std::fmt::Debug; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use std::sync::Arc; -use bytes::Buf; use futures::FutureExt; use futures::TryFutureExt; use log::log; @@ -36,18 +33,12 @@ use crate::*; /// - OpenDAL will log in structural way. /// - Every operation will start with a `started` log entry. /// - Every operation will finish with the following status: -/// - `finished`: the operation is successful. -/// - `errored`: the operation returns an expected error like `NotFound`. +/// - `succeeded`: the operation is successful, but might have more to take. +/// - `finished`: the whole operation is finished. /// - `failed`: the operation returns an unexpected error. /// - The default log level while expected error happened is `Warn`. /// - The default log level while unexpected failure happened is `Error`. /// -/// # Todo -/// -/// We should migrate to log's kv api after it's ready. -/// -/// Tracking issue: -/// /// # Examples /// /// ```no_run @@ -86,7 +77,7 @@ use crate::*; /// ```no_run /// use opendal::layers::LoggingInterceptor; /// use opendal::layers::LoggingLayer; -/// use opendal::raw::Operation; +/// use opendal::raw; /// use opendal::services; /// use opendal::Error; /// use opendal::Operator; @@ -98,9 +89,9 @@ use crate::*; /// impl LoggingInterceptor for MyLoggingInterceptor { /// fn log( /// &self, -/// scheme: Scheme, -/// operation: Operation, -/// context: &str, +/// info: &raw::AccessorInfo, +/// operation: raw::Operation, +/// context: &[(&str, &str)], /// message: &str, /// err: Option<&Error>, /// ) { @@ -115,31 +106,21 @@ use crate::*; /// ``` #[derive(Debug)] pub struct LoggingLayer { - notify: Arc, -} - -impl Clone for LoggingLayer { - fn clone(&self) -> Self { - Self { - notify: self.notify.clone(), - } - } + logger: I, } impl Default for LoggingLayer { fn default() -> Self { Self { - notify: Arc::new(DefaultLoggingInterceptor), + logger: DefaultLoggingInterceptor, } } } impl LoggingLayer { /// Create the layer with specific logging interceptor. - pub fn new(notify: I) -> LoggingLayer { - LoggingLayer { - notify: Arc::new(notify), - } + pub fn new(logger: I) -> LoggingLayer { + LoggingLayer { logger } } } @@ -147,59 +128,25 @@ impl Layer for LoggingLayer { type LayeredAccess = LoggingAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { - let meta = inner.info(); + let info = inner.info(); LoggingAccessor { inner, - ctx: LoggingContext { - scheme: meta.scheme(), - notify: self.notify.clone(), - }, - } - } -} - -#[derive(Debug)] -pub struct LoggingContext { - scheme: Scheme, - notify: Arc, -} - -impl Clone for LoggingContext { - fn clone(&self) -> Self { - Self { - scheme: self.scheme, - notify: self.notify.clone(), + info, + logger: self.logger.clone(), } } } -impl LoggingContext { - fn log(&self, operation: Operation, context: &str, message: &str, err: Option<&Error>) { - self.notify - .log(self.scheme, operation, context, message, err) - } - - fn log_with_path(&self, operation: Operation, path: &str, message: &str, err: Option<&Error>) { - self.notify.log( - self.scheme, - operation, - &format!("path={path}"), - message, - err, - ) - } -} - /// LoggingInterceptor is used to intercept the log. -pub trait LoggingInterceptor: Debug + Send + Sync + 'static { +pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static { /// Everytime there is a log, this function will be called. /// /// # Inputs /// - /// - scheme: The service generates the log. + /// - info: The service's access info. /// - operation: The operation to log. - /// - context: Additional context of the log. + /// - context: Additional context of the log like path, etc. /// - message: The log message. /// - err: The error to log. /// @@ -210,81 +157,97 @@ pub trait LoggingInterceptor: Debug + Send + Sync + 'static { /// could perform unexpectedly slow. fn log( &self, - scheme: Scheme, + info: &AccessorInfo, operation: Operation, - context: &str, + context: &[(&str, &str)], message: &str, err: Option<&Error>, ); } /// The DefaultLoggingInterceptor will log the message by the standard logging macro. -#[derive(Debug)] +#[derive(Debug, Copy, Clone, Default)] pub struct DefaultLoggingInterceptor; impl LoggingInterceptor for DefaultLoggingInterceptor { + #[inline] fn log( &self, - scheme: Scheme, + info: &AccessorInfo, operation: Operation, - context: &str, + context: &[(&str, &str)], message: &str, err: Option<&Error>, ) { - let Some(err) = err else { - let lvl = self.operation_level(operation); + if let Some(err) = err { + // Print error if it's unexpected, otherwise in warn. + let lvl = if err.kind() == ErrorKind::Unexpected { + Level::Error + } else { + Level::Warn + }; + log!( target: LOGGING_TARGET, lvl, - "service={} operation={} {} -> {}", - scheme, - operation, - context, - message, + "service={} name={} {}: {operation} {message} {}", + info.scheme(), + info.name(), + format_args!( + "{}", + context.iter().enumerate().map(|(i, (k, v))| { + if i > 0 { + format!(" {}={}", k, v) + } else { + format!("{}={}", k, v) + } + }).collect::() + ), + // Print error message with debug output while unexpected happened. + // + // It's super sad that we can't bind `format_args!()` here. + // See: https://github.com/rust-lang/rust/issues/92698 + if err.kind() != ErrorKind::Unexpected { + format!("{err}") + } else { + format!("{err:?}") + } ); - return; + } + + // Print debug message if operation is oneshot, otherwise in trace. + let lvl = if operation.is_oneshot() { + Level::Debug + } else { + Level::Trace }; - let lvl = self.error_level(err); log!( target: LOGGING_TARGET, lvl, - "service={} operation={} {} -> {} {}", - scheme, - operation, - context, - message, - err, + "service={} name={} {}: {operation} {message}", + info.scheme(), + info.name(), + format_args!( + "{}", + context.iter().enumerate().map(|(i, (k, v))| { + if i > 0 { + format!(" {}={}", k, v) + } else { + format!("{}={}", k, v) + } + }).collect::() + ), ); } } -impl DefaultLoggingInterceptor { - fn operation_level(&self, operation: Operation) -> Level { - match operation { - Operation::ReaderRead - | Operation::BlockingReaderRead - | Operation::WriterWrite - | Operation::BlockingWriterWrite => Level::Trace, - _ => Level::Debug, - } - } - - #[inline] - fn error_level(&self, err: &Error) -> Level { - if err.kind() == ErrorKind::Unexpected { - Level::Error - } else { - Level::Warn - } - } -} - #[derive(Clone, Debug)] pub struct LoggingAccessor { inner: A, - ctx: LoggingContext, + info: Arc, + logger: I, } static LOGGING_TARGET: &str = "opendal::services"; @@ -303,83 +266,128 @@ impl LayeredAccess for LoggingAccessor { } fn metadata(&self) -> Arc { - self.ctx.log(Operation::Info, "", "started", None); - let result = self.inner.info(); - self.ctx.log( - Operation::Info, - "", - &format!("finished: {:?}", result), - None, - ); + self.logger + .log(&self.info, Operation::Info, &[], "started", None); + + let info = self.info.clone(); + + self.logger + .log(&self.info, Operation::Info, &[], "finished", None); - result + info } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.ctx - .log_with_path(Operation::CreateDir, path, "started", None); + self.logger.log( + &self.info, + Operation::CreateDir, + &[("path", path)], + "started", + None, + ); self.inner .create_dir(path, args) .await .map(|v| { - self.ctx - .log_with_path(Operation::CreateDir, path, "finished", None); + self.logger.log( + &self.info, + Operation::CreateDir, + &[("path", path)], + "finished", + None, + ); v }) .map_err(|err| { - self.ctx - .log_with_path(Operation::CreateDir, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::CreateDir, + &[("path", path)], + "failed", + Some(&err), + ); err }) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.ctx - .log_with_path(Operation::Read, path, "started", None); + self.logger.log( + &self.info, + Operation::Read, + &[("path", path)], + "started", + None, + ); self.inner .read(path, args) .await .map(|(rp, r)| { - self.ctx - .log_with_path(Operation::Read, path, "got reader", None); + self.logger.log( + &self.info, + Operation::Read, + &[("path", path)], + "created reader", + None, + ); ( rp, - LoggingReader::new(self.ctx.clone(), Operation::Read, path, r), + LoggingReader::new(self.info.clone(), self.logger.clone(), path, r), ) }) .map_err(|err| { - self.ctx - .log_with_path(Operation::Read, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::Read, + &[("path", path)], + "failed", + Some(&err), + ); err }) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.ctx - .log_with_path(Operation::Write, path, "started", None); + self.logger.log( + &self.info, + Operation::Write, + &[("path", path)], + "started", + None, + ); self.inner .write(path, args) .await .map(|(rp, w)| { - self.ctx - .log_with_path(Operation::Write, path, "start writing", None); - let w = LoggingWriter::new(self.ctx.clone(), Operation::Write, path, w); + self.logger.log( + &self.info, + Operation::Write, + &[("path", path)], + "created writer", + None, + ); + let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w); (rp, w) }) .map_err(|err| { - self.ctx - .log_with_path(Operation::Write, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::Write, + &[("path", path)], + "failed", + Some(&err), + ); err }) } async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - self.ctx.log( + self.logger.log( + &self.info, Operation::Copy, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "started", None, ); @@ -388,19 +396,21 @@ impl LayeredAccess for LoggingAccessor { .copy(from, to, args) .await .map(|v| { - self.ctx.log( + self.logger.log( + &self.info, Operation::Copy, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "finished", None, ); v }) .map_err(|err| { - self.ctx.log( + self.logger.log( + &self.info, Operation::Copy, - &format!("from={from} to={to}"), - "", + &[("from", from), ("to", to)], + "failed", Some(&err), ); err @@ -408,9 +418,10 @@ impl LayeredAccess for LoggingAccessor { } async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { - self.ctx.log( + self.logger.log( + &self.info, Operation::Rename, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "started", None, ); @@ -419,19 +430,21 @@ impl LayeredAccess for LoggingAccessor { .rename(from, to, args) .await .map(|v| { - self.ctx.log( + self.logger.log( + &self.info, Operation::Rename, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "finished", None, ); v }) .map_err(|err| { - self.ctx.log( + self.logger.log( + &self.info, Operation::Rename, - &format!("from={from} to={to}"), - "", + &[("from", from), ("to", to)], + "failed", Some(&err), ); err @@ -439,56 +452,105 @@ impl LayeredAccess for LoggingAccessor { } async fn stat(&self, path: &str, args: OpStat) -> Result { - self.ctx - .log_with_path(Operation::Stat, path, "started", None); + self.logger.log( + &self.info, + Operation::Stat, + &[("path", path)], + "started", + None, + ); self.inner .stat(path, args) .await .map(|v| { - self.ctx - .log_with_path(Operation::Stat, path, "finished", None); + self.logger.log( + &self.info, + Operation::Stat, + &[("path", path)], + "finished", + None, + ); v }) .map_err(|err| { - self.ctx - .log_with_path(Operation::Stat, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::Stat, + &[("path", path)], + "failed", + Some(&err), + ); err }) } async fn delete(&self, path: &str, args: OpDelete) -> Result { - self.ctx - .log_with_path(Operation::Delete, path, "started", None); + self.logger.log( + &self.info, + Operation::Delete, + &[("path", path)], + "started", + None, + ); self.inner .delete(path, args.clone()) .inspect(|v| match v { Ok(_) => { - self.ctx - .log_with_path(Operation::Delete, path, "finished", None); + self.logger.log( + &self.info, + Operation::Delete, + &[("path", path)], + "finished", + None, + ); } Err(err) => { - self.ctx - .log_with_path(Operation::Delete, path, "", Some(err)); + self.logger.log( + &self.info, + Operation::Delete, + &[("path", path)], + "failed", + Some(err), + ); } }) .await } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + self.logger.log( + &self.info, + Operation::List, + &[("path", path)], + "started", + None, + ); + self.inner .list(path, args) .map(|v| match v { Ok((rp, v)) => { - self.ctx - .log_with_path(Operation::List, path, "start listing dir", None); - let streamer = LoggingLister::new(self.ctx.clone(), path, Operation::List, v); + self.logger.log( + &self.info, + Operation::List, + &[("path", path)], + "created lister", + None, + ); + let streamer = + LoggingLister::new(self.info.clone(), self.logger.clone(), path, v); Ok((rp, streamer)) } Err(err) => { - self.ctx - .log_with_path(Operation::List, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::List, + &[("path", path)], + "failed", + Some(&err), + ); Err(err) } }) @@ -496,20 +558,35 @@ impl LayeredAccess for LoggingAccessor { } async fn presign(&self, path: &str, args: OpPresign) -> Result { - self.ctx - .log_with_path(Operation::Presign, path, "started", None); + self.logger.log( + &self.info, + Operation::Presign, + &[("path", path)], + "started", + None, + ); self.inner .presign(path, args) .await .map(|v| { - self.ctx - .log_with_path(Operation::Presign, path, "finished", None); + self.logger.log( + &self.info, + Operation::Presign, + &[("path", path)], + "finished", + None, + ); v }) .map_err(|err| { - self.ctx - .log_with_path(Operation::Presign, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::Presign, + &[("path", path)], + "failed", + Some(&err), + ); err }) } @@ -517,9 +594,10 @@ impl LayeredAccess for LoggingAccessor { async fn batch(&self, args: OpBatch) -> Result { let (op, count) = (args.operation()[0].1.operation(), args.operation().len()); - self.ctx.log( + self.logger.log( + &self.info, Operation::Batch, - &format!("op={op} count={count} -> started"), + &[("op", op.into_static()), ("count", &count.to_string())], "started", None, ); @@ -527,9 +605,10 @@ impl LayeredAccess for LoggingAccessor { self.inner .batch(args) .map_ok(|v| { - self.ctx.log( + self.logger.log( + &self.info, Operation::Batch, - &format!("op={op} count={count}"), + &[("op", op.into_static()), ("count", &count.to_string())], &format!( "finished: {}, succeed: {}, failed: {}", v.results().len(), @@ -541,10 +620,11 @@ impl LayeredAccess for LoggingAccessor { v }) .map_err(|err| { - self.ctx.log( + self.logger.log( + &self.info, Operation::Batch, - &format!("op={op} count={count}"), - "", + &[("op", op.into_static()), ("count", &count.to_string())], + "failed", Some(&err), ); err @@ -553,65 +633,111 @@ impl LayeredAccess for LoggingAccessor { } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.ctx - .log_with_path(Operation::BlockingCreateDir, path, "started", None); + self.logger.log( + &self.info, + Operation::BlockingCreateDir, + &[("path", path)], + "started", + None, + ); self.inner .blocking_create_dir(path, args) .map(|v| { - self.ctx - .log_with_path(Operation::BlockingCreateDir, path, "finished", None); + self.logger.log( + &self.info, + Operation::BlockingCreateDir, + &[("path", path)], + "finished", + None, + ); v }) .map_err(|err| { - self.ctx - .log_with_path(Operation::BlockingCreateDir, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::BlockingCreateDir, + &[("path", path)], + "failed", + Some(&err), + ); err }) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.ctx - .log_with_path(Operation::BlockingRead, path, "started", None); + self.logger.log( + &self.info, + Operation::BlockingRead, + &[("path", path)], + "started", + None, + ); self.inner .blocking_read(path, args.clone()) .map(|(rp, r)| { - self.ctx - .log_with_path(Operation::BlockingRead, path, "got reader", None); - let r = LoggingReader::new(self.ctx.clone(), Operation::BlockingRead, path, r); + self.logger.log( + &self.info, + Operation::BlockingRead, + &[("path", path)], + "created reader", + None, + ); + let r = LoggingReader::new(self.info.clone(), self.logger.clone(), path, r); (rp, r) }) .map_err(|err| { - self.ctx - .log_with_path(Operation::BlockingRead, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::BlockingRead, + &[("path", path)], + "failed", + Some(&err), + ); err }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.ctx - .log_with_path(Operation::BlockingWrite, path, "started", None); + self.logger.log( + &self.info, + Operation::BlockingWrite, + &[("path", path)], + "started", + None, + ); self.inner .blocking_write(path, args) .map(|(rp, w)| { - self.ctx - .log_with_path(Operation::BlockingWrite, path, "start writing", None); - let w = LoggingWriter::new(self.ctx.clone(), Operation::BlockingWrite, path, w); + self.logger.log( + &self.info, + Operation::BlockingWrite, + &[("path", path)], + "created writer", + None, + ); + let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w); (rp, w) }) .map_err(|err| { - self.ctx - .log_with_path(Operation::BlockingWrite, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::BlockingWrite, + &[("path", path)], + "failed", + Some(&err), + ); err }) } fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingCopy, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "started", None, ); @@ -619,18 +745,20 @@ impl LayeredAccess for LoggingAccessor { self.inner .blocking_copy(from, to, args) .map(|v| { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingCopy, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "finished", None, ); v }) .map_err(|err| { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingCopy, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "", Some(&err), ); @@ -639,9 +767,10 @@ impl LayeredAccess for LoggingAccessor { } fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingRename, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "started", None, ); @@ -649,19 +778,21 @@ impl LayeredAccess for LoggingAccessor { self.inner .blocking_rename(from, to, args) .map(|v| { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingRename, - &format!("from={from} to={to}"), + &[("from", from), ("to", to)], "finished", None, ); v }) .map_err(|err| { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingRename, - &format!("from={from} to={to}"), - "", + &[("from", from), ("to", to)], + "failed", Some(&err), ); err @@ -669,56 +800,101 @@ impl LayeredAccess for LoggingAccessor { } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - self.ctx - .log_with_path(Operation::BlockingStat, path, "started", None); + self.logger.log( + &self.info, + Operation::BlockingStat, + &[("path", path)], + "started", + None, + ); self.inner .blocking_stat(path, args) .map(|v| { - self.ctx - .log_with_path(Operation::BlockingStat, path, "finished", None); + self.logger.log( + &self.info, + Operation::BlockingStat, + &[("path", path)], + "finished", + None, + ); v }) .map_err(|err| { - self.ctx - .log_with_path(Operation::BlockingStat, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::BlockingStat, + &[("path", path)], + "failed", + Some(&err), + ); err }) } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - self.ctx - .log_with_path(Operation::BlockingDelete, path, "started", None); + self.logger.log( + &self.info, + Operation::BlockingDelete, + &[("path", path)], + "started", + None, + ); self.inner .blocking_delete(path, args) .map(|v| { - self.ctx - .log_with_path(Operation::BlockingDelete, path, "finished", None); + self.logger.log( + &self.info, + Operation::BlockingDelete, + &[("path", path)], + "finished", + None, + ); v }) .map_err(|err| { - self.ctx - .log_with_path(Operation::BlockingDelete, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::BlockingDelete, + &[("path", path)], + "failed", + Some(&err), + ); err }) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - self.ctx - .log_with_path(Operation::BlockingList, path, "started", None); + self.logger.log( + &self.info, + Operation::BlockingList, + &[("path", path)], + "started", + None, + ); self.inner .blocking_list(path, args) .map(|(rp, v)| { - self.ctx - .log_with_path(Operation::BlockingList, path, "got dir", None); - let li = LoggingLister::new(self.ctx.clone(), path, Operation::BlockingList, v); + self.logger.log( + &self.info, + Operation::BlockingList, + &[("path", path)], + "created lister", + None, + ); + let li = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v); (rp, li) }) .map_err(|err| { - self.ctx - .log_with_path(Operation::BlockingList, path, "", Some(&err)); + self.logger.log( + &self.info, + Operation::BlockingList, + &[("path", path)], + "", + Some(&err), + ); err }) } @@ -726,69 +902,63 @@ impl LayeredAccess for LoggingAccessor { /// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality. pub struct LoggingReader { - ctx: LoggingContext, + info: Arc, + logger: I, path: String, - op: Operation, - read: AtomicU64, + read: u64, inner: R, } impl LoggingReader { - fn new(ctx: LoggingContext, op: Operation, path: &str, reader: R) -> Self { + fn new(info: Arc, logger: I, path: &str, reader: R) -> Self { Self { - ctx, - op, + info, + logger, path: path.to_string(), - read: AtomicU64::new(0), + read: 0, inner: reader, } } } -impl Drop for LoggingReader { - fn drop(&mut self) { - self.ctx.log( - self.op, - &format!( - "path={} read={}", - self.path, - self.read.load(Ordering::Relaxed) - ), - "data read finished", +impl oio::Read for LoggingReader { + async fn read(&mut self) -> Result { + self.logger.log( + &self.info, + Operation::ReaderRead, + &[("path", &self.path), ("read", &self.read.to_string())], + "started", None, ); - } -} -impl oio::Read for LoggingReader { - async fn read(&mut self) -> Result { match self.inner.read().await { Ok(bs) => { - self.read - .fetch_add(bs.remaining() as u64, Ordering::Relaxed); - self.ctx.log( + self.read += bs.len() as u64; + self.logger.log( + &self.info, Operation::ReaderRead, - &format!( - "path={} read={}", - self.path, - self.read.load(Ordering::Relaxed), - ), - &format!("read returns {}B", bs.remaining()), + &[ + ("path", &self.path), + ("read", &self.read.to_string()), + ("size", &bs.len().to_string()), + ], + if bs.is_empty() { + "finished" + } else { + "succeeded" + }, None, ); Ok(bs) } Err(err) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::ReaderRead, - &format!( - "path={} read={}", - self.path, - self.read.load(Ordering::Relaxed) - ), - "read failed:", + &[("path", &self.path), ("read", &self.read.to_string())], + "failed", Some(&err), ); Err(err) @@ -799,31 +969,40 @@ impl oio::Read for LoggingReader { impl oio::BlockingRead for LoggingReader { fn read(&mut self) -> Result { + self.logger.log( + &self.info, + Operation::BlockingReaderRead, + &[("path", &self.path), ("read", &self.read.to_string())], + "started", + None, + ); + match self.inner.read() { Ok(bs) => { - self.read - .fetch_add(bs.remaining() as u64, Ordering::Relaxed); - self.ctx.log( + self.read += bs.len() as u64; + self.logger.log( + &self.info, Operation::BlockingReaderRead, - &format!( - "path={} read={}", - self.path, - self.read.load(Ordering::Relaxed), - ), - &format!("read returns {}B", bs.remaining()), + &[ + ("path", &self.path), + ("read", &self.read.to_string()), + ("size", &bs.len().to_string()), + ], + if bs.is_empty() { + "finished" + } else { + "succeeded" + }, None, ); Ok(bs) } Err(err) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingReaderRead, - &format!( - "path={} read={}", - self.path, - self.read.load(Ordering::Relaxed) - ), - "read failed:", + &[("path", &self.path), ("read", &self.read.to_string())], + "failed", Some(&err), ); Err(err) @@ -833,8 +1012,8 @@ impl oio::BlockingRead for LoggingR } pub struct LoggingWriter { - ctx: LoggingContext, - op: Operation, + info: Arc, + logger: I, path: String, written: u64, @@ -842,10 +1021,10 @@ pub struct LoggingWriter { } impl LoggingWriter { - fn new(ctx: LoggingContext, op: Operation, path: &str, writer: W) -> Self { + fn new(info: Arc, logger: I, path: &str, writer: W) -> Self { Self { - ctx, - op, + info, + logger, path: path.to_string(), written: 0, @@ -857,21 +1036,45 @@ impl LoggingWriter { impl oio::Write for LoggingWriter { async fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); + + self.logger.log( + &self.info, + Operation::WriterWrite, + &[ + ("path", &self.path), + ("written", &self.written.to_string()), + ("size", &size.to_string()), + ], + "started", + None, + ); + match self.inner.write(bs).await { Ok(_) => { - self.ctx.log( + self.written += size as u64; + self.logger.log( + &self.info, Operation::WriterWrite, - &format!("path={} written={}B", self.path, self.written), - &format!("data write {}B", size), + &[ + ("path", &self.path), + ("written", &self.written.to_string()), + ("size", &size.to_string()), + ], + "succeeded", None, ); Ok(()) } Err(err) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::WriterWrite, - &format!("path={} written={}B", self.path, self.written), - "data write failed:", + &[ + ("path", &self.path), + ("written", &self.written.to_string()), + ("size", &size.to_string()), + ], + "failed", Some(&err), ); Err(err) @@ -880,21 +1083,31 @@ impl oio::Write for LoggingWriter { } async fn abort(&mut self) -> Result<()> { + self.logger.log( + &self.info, + Operation::WriterAbort, + &[("path", &self.path), ("written", &self.written.to_string())], + "started", + None, + ); + match self.inner.abort().await { Ok(_) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::WriterAbort, - &format!("path={} written={}B", self.path, self.written), - "abort writer", + &[("path", &self.path), ("written", &self.written.to_string())], + "succeeded", None, ); Ok(()) } Err(err) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::WriterAbort, - &format!("path={} written={}B", self.path, self.written), - "abort writer failed:", + &[("path", &self.path), ("written", &self.written.to_string())], + "failed", Some(&err), ); Err(err) @@ -903,21 +1116,31 @@ impl oio::Write for LoggingWriter { } async fn close(&mut self) -> Result<()> { + self.logger.log( + &self.info, + Operation::WriterClose, + &[("path", &self.path), ("written", &self.written.to_string())], + "started", + None, + ); + match self.inner.close().await { Ok(_) => { - self.ctx.log( - self.op, - &format!("path={} written={}B", self.path, self.written), - "data written finished", + self.logger.log( + &self.info, + Operation::WriterClose, + &[("path", &self.path), ("written", &self.written.to_string())], + "succeeded", None, ); Ok(()) } Err(err) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::WriterClose, - &format!("path={} written={}B", self.path, self.written), - "data close failed:", + &[("path", &self.path), ("written", &self.written.to_string())], + "failed", Some(&err), ); Err(err) @@ -928,21 +1151,45 @@ impl oio::Write for LoggingWriter { impl oio::BlockingWrite for LoggingWriter { fn write(&mut self, bs: Buffer) -> Result<()> { - match self.inner.write(bs.clone()) { + let size = bs.len(); + + self.logger.log( + &self.info, + Operation::BlockingWriterWrite, + &[ + ("path", &self.path), + ("written", &self.written.to_string()), + ("size", &size.to_string()), + ], + "started", + None, + ); + + match self.inner.write(bs) { Ok(_) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingWriterWrite, - &format!("path={} written={}B", self.path, self.written), - &format!("data write {}B", bs.len()), + &[ + ("path", &self.path), + ("written", &self.written.to_string()), + ("size", &size.to_string()), + ], + "succeeded", None, ); Ok(()) } Err(err) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingWriterWrite, - &format!("path={} written={}B", self.path, self.written), - "data write failed:", + &[ + ("path", &self.path), + ("written", &self.written.to_string()), + ("size", &size.to_string()), + ], + "failed", Some(&err), ); Err(err) @@ -951,21 +1198,31 @@ impl oio::BlockingWrite for Loggin } fn close(&mut self) -> Result<()> { + self.logger.log( + &self.info, + Operation::BlockingWriterClose, + &[("path", &self.path), ("written", &self.written.to_string())], + "started", + None, + ); + match self.inner.close() { Ok(_) => { - self.ctx.log( - self.op, - &format!("path={} written={}B", self.path, self.written), - "data written finished", + self.logger.log( + &self.info, + Operation::BlockingWriterWrite, + &[("path", &self.path), ("written", &self.written.to_string())], + "succeeded", None, ); Ok(()) } Err(err) => { - self.ctx.log( + self.logger.log( + &self.info, Operation::BlockingWriterClose, - &format!("path={} written={}B", self.path, self.written), - "data close failed:", + &[("path", &self.path), ("written", &self.written.to_string())], + "failed", Some(&err), ); Err(err) @@ -975,58 +1232,71 @@ impl oio::BlockingWrite for Loggin } pub struct LoggingLister { - ctx: LoggingContext, + info: Arc, + logger: I, path: String, - op: Operation, - finished: bool, + listed: usize, inner: P, } impl LoggingLister { - fn new(ctx: LoggingContext, path: &str, op: Operation, inner: P) -> Self { + fn new(info: Arc, logger: I, path: &str, inner: P) -> Self { Self { - ctx, + info, + logger, path: path.to_string(), - op, - finished: false, - inner, - } - } -} -impl Drop for LoggingLister { - fn drop(&mut self) { - if self.finished { - self.ctx - .log_with_path(self.op, &self.path, "all entries read finished", None); - } else { - self.ctx - .log_with_path(self.op, &self.path, "partial entries read finished", None); + listed: 0, + inner, } } } impl oio::List for LoggingLister { async fn next(&mut self) -> Result> { + self.logger.log( + &self.info, + Operation::ListerNext, + &[("path", &self.path), ("listed", &self.listed.to_string())], + "started", + None, + ); + let res = self.inner.next().await; match &res { Ok(Some(de)) => { - self.ctx.log_with_path( - self.op, - &self.path, - &format!("listed entry: {}", de.path()), + self.listed += 1; + self.logger.log( + &self.info, + Operation::ListerNext, + &[ + ("path", &self.path), + ("listed", &self.listed.to_string()), + ("entry", de.path()), + ], + "succeeded", None, ); } Ok(None) => { - self.ctx - .log_with_path(self.op, &self.path, "finished", None); - self.finished = true; + self.logger.log( + &self.info, + Operation::ListerNext, + &[("path", &self.path), ("listed", &self.listed.to_string())], + "finished", + None, + ); } Err(err) => { - self.ctx.log_with_path(self.op, &self.path, "", Some(err)); + self.logger.log( + &self.info, + Operation::ListerNext, + &[("path", &self.path), ("listed", &self.listed.to_string())], + "failed", + Some(err), + ); } }; @@ -1036,24 +1306,47 @@ impl oio::List for LoggingLister { impl oio::BlockingList for LoggingLister { fn next(&mut self) -> Result> { - let res = self.inner.next(); + self.logger.log( + &self.info, + Operation::BlockingListerNext, + &[("path", &self.path), ("listed", &self.listed.to_string())], + "started", + None, + ); + let res = self.inner.next(); match &res { - Ok(Some(des)) => { - self.ctx.log_with_path( - self.op, - &self.path, - &format!("listed entry: {}", des.path()), + Ok(Some(de)) => { + self.listed += 1; + self.logger.log( + &self.info, + Operation::BlockingListerNext, + &[ + ("path", &self.path), + ("listed", &self.listed.to_string()), + ("entry", de.path()), + ], + "succeeded", None, ); } Ok(None) => { - self.ctx - .log_with_path(self.op, &self.path, "finished", None); - self.finished = true; + self.logger.log( + &self.info, + Operation::BlockingListerNext, + &[("path", &self.path), ("listed", &self.listed.to_string())], + "finished", + None, + ); } Err(err) => { - self.ctx.log_with_path(self.op, &self.path, "", Some(err)); + self.logger.log( + &self.info, + Operation::BlockingListerNext, + &[("path", &self.path), ("listed", &self.listed.to_string())], + "failed", + Some(err), + ); } }; diff --git a/core/src/raw/operation.rs b/core/src/raw/operation.rs index d195cb57ac05..c5ef76047404 100644 --- a/core/src/raw/operation.rs +++ b/core/src/raw/operation.rs @@ -86,6 +86,23 @@ impl Operation { pub fn into_static(self) -> &'static str { self.into() } + + /// Check if given operation is oneshot or not. + /// + /// For example, `Stat` is oneshot but `ReaderRead` could happen multiple times. + /// + /// This function can be used to decide take actions based on operations like logging. + pub fn is_oneshot(&self) -> bool { + !matches!( + self, + Operation::ReaderRead + | Operation::WriterWrite + | Operation::ListerNext + | Operation::BlockingReaderRead + | Operation::BlockingWriterWrite + | Operation::BlockingListerNext + ) + } } impl Display for Operation { diff --git a/core/src/raw/tests/utils.rs b/core/src/raw/tests/utils.rs index 18545764281e..808788f710cc 100644 --- a/core/src/raw/tests/utils.rs +++ b/core/src/raw/tests/utils.rs @@ -19,11 +19,8 @@ use std::collections::HashMap; use std::env; use std::str::FromStr; -use log::{log, Level}; use once_cell::sync::Lazy; -use crate::layers::LoggingInterceptor; -use crate::raw::Operation; use crate::*; /// TEST_RUNTIME is the runtime used for running tests. @@ -76,7 +73,7 @@ pub fn init_test_service() -> Result> { let op = { op.layer(layers::ChaosLayer::new(0.1)) }; let mut op = op - .layer(layers::LoggingLayer::new(BacktraceLoggingInterceptor)) + .layer(layers::LoggingLayer::default()) .layer(layers::TimeoutLayer::new()) .layer(layers::RetryLayer::new().with_max_times(4)); @@ -91,66 +88,3 @@ pub fn init_test_service() -> Result> { Ok(Some(op)) } - -/// A logging interceptor that logs the backtrace. -#[derive(Debug, Clone)] -struct BacktraceLoggingInterceptor; - -impl LoggingInterceptor for BacktraceLoggingInterceptor { - fn log( - &self, - scheme: Scheme, - operation: raw::Operation, - context: &str, - message: &str, - err: Option<&Error>, - ) { - let Some(err) = err else { - let lvl = self.operation_level(operation); - log!( - target: "opendal::services", - lvl, - "service={} operation={} {} -> {}", - scheme, - operation, - context, - message, - ); - return; - }; - - let err_msg = if err.kind() != ErrorKind::Unexpected { - format!("{err}") - } else { - format!("{err:?}") - }; - let lvl = if err.kind() == ErrorKind::Unexpected { - Level::Error - } else { - Level::Warn - }; - - log!( - target: "opendal::services", - lvl, - "service={} operation={} {} -> {} {}", - scheme, - operation, - context, - message, - err_msg, - ); - } -} - -impl BacktraceLoggingInterceptor { - fn operation_level(&self, operation: Operation) -> Level { - match operation { - Operation::ReaderRead - | Operation::BlockingReaderRead - | Operation::WriterWrite - | Operation::BlockingWriterWrite => Level::Trace, - _ => Level::Debug, - } - } -} From 311d6f2bfc66efd5c0339a41cc062edf68e9451a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 9 Aug 2024 19:26:57 +0800 Subject: [PATCH 2/2] Remove logger test Signed-off-by: Xuanwo --- bindings/haskell/test/BasicTest.hs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/bindings/haskell/test/BasicTest.hs b/bindings/haskell/test/BasicTest.hs index 6c817d046e53..9733f7a12886 100644 --- a/bindings/haskell/test/BasicTest.hs +++ b/bindings/haskell/test/BasicTest.hs @@ -31,8 +31,7 @@ basicTests = "Basic Tests" [ testCase "testBasicOperation" testRawOperation, testCase "testMonad" testMonad, - testCase "testError" testError, - testCase "testLogger" testLogger + testCase "testError" testError ] testRawOperation :: Assertion @@ -102,15 +101,6 @@ testError = do where operation = readOp "non-exist-path" -testLogger :: Assertion -testLogger = do - state <- newIORef "" - let logger initStr msg = modifyIORef' initStr (<> msgText msg) - let logFn = LogAction $ logger state - Right _ <- newOperator "memory" {ocLogAction = Just logFn} - logStr <- readIORef state - T.take 78 logStr @?= "service=memory operation=metadata -> startedservice=memory operation=metadata" - -- helper function (?=) :: (MonadIO m, Eq a, Show a) => m a -> a -> m ()