From a8b793b7a0ee603dfbc6f9b872231e46de8ca68f Mon Sep 17 00:00:00 2001 From: meteorgan Date: Wed, 15 Jan 2025 10:32:58 +0800 Subject: [PATCH 01/15] docs(integration/object_store): add example for datafusion (#5543) --- integrations/object_store/Cargo.toml | 2 + integrations/object_store/README.md | 83 ++++++++++++++++++- .../object_store/examples/datafusion.rs | 52 ++++++++++++ 3 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 integrations/object_store/examples/datafusion.rs diff --git a/integrations/object_store/Cargo.toml b/integrations/object_store/Cargo.toml index d7c78cd1598e..47e02079d8be 100644 --- a/integrations/object_store/Cargo.toml +++ b/integrations/object_store/Cargo.toml @@ -58,3 +58,5 @@ tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } anyhow = "1.0.86" libtest-mimic = "0.7.3" uuid = "1.11.0" +datafusion = "44.0.0" +url = "2.5.2" \ No newline at end of file diff --git a/integrations/object_store/README.md b/integrations/object_store/README.md index 115488f3b8b2..6f30e457bd69 100644 --- a/integrations/object_store/README.md +++ b/integrations/object_store/README.md @@ -21,13 +21,21 @@ This crate can help you to access 30 more storage services with the same object_ ## Examples +`opendal_store_opendal` depends on the `opendal` crate. Please make sure to always use the latest versions of both. + +latest `object_store_opendal` ![Crate](https://img.shields.io/crates/v/object_store_opendal.svg) + +latest `opendal` ![Crate](https://img.shields.io/crates/v/opendal.svg) + +### 1. using `object_store` API to access S3 + Add the following dependencies to your `Cargo.toml` with correct version: ```toml [dependencies] object_store = "0.11.0" -object_store_opendal = "0.47.0" -opendal = { version = "0.49.0", features = ["services-s3"] } +object_store_opendal = "xxx" # see the latest version above +opendal = { version = "xxx", features = ["services-s3"] } # see the latest version above ``` Build `OpendalStore` via `opendal::Operator`: @@ -78,6 +86,77 @@ async fn main() { } ``` +### 2. querying data in a S3 bucket using DataFusion + +Add the following dependencies to your `Cargo.toml` with correct version: + +```toml +[dependencies] +object_store = "0.11.0" +object_store_opendal = "xxx" # see the latest version above +opendal = { version = "xxx", features = ["services-s3"] } # see the latest version above +datafusion = "44.0.0" +url = "2.5.2" +``` + +Build `OpendalStore` via `opendal::Operator` and register it to `DataFusion`: + +```rust +use datafusion::error::DataFusionError; +use datafusion::error::Result; +use datafusion::prelude::*; +use opendal::services::S3; +use opendal::Operator; +use std::sync::Arc; +use url::Url; + + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + // Configure OpenDAL for S3 + let region = "my_region"; + let bucket_name = "my_bucket"; + let builder = S3::default() + .endpoint("my_endpoint") + .bucket(bucket_name) + .region(region) + .access_key_id("my_access_key") + .secret_access_key("my_secret_key"); + let op = Operator::new(builder) + .map_err(|err| DataFusionError::External(Box::new(err)))? + .finish(); + let store = object_store_opendal::OpendalStore::new(op); + + // Register the object store + let path = format!("s3://{bucket_name}"); + let s3_url = Url::parse(&path).unwrap(); + ctx.register_object_store(&s3_url, Arc::new(store)); + + // Register CSV file as a table + let path = format!("s3://{bucket_name}/csv/data.csv"); + ctx.register_csv("trips", &path, CsvReadOptions::default()) + .await?; + + // Execute the query + let df = ctx.sql("SELECT * FROM trips LIMIT 10").await?; + // Print the results + df.show().await?; + + // Dynamic query using the file path directly + let ctx = ctx.enable_url_table(); + let df = ctx + .sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str()) + .await?; + // Print the results + df.show().await?; + + Ok(()) +} +``` + + ## WASM support To build with `wasm32-unknown-unknown` target, you need to enable the `send_wrapper` feature: diff --git a/integrations/object_store/examples/datafusion.rs b/integrations/object_store/examples/datafusion.rs new file mode 100644 index 000000000000..e12a3f6477ca --- /dev/null +++ b/integrations/object_store/examples/datafusion.rs @@ -0,0 +1,52 @@ +use datafusion::error::DataFusionError; +use datafusion::error::Result; +use datafusion::prelude::*; +use opendal::services::S3; +use opendal::Operator; +use std::sync::Arc; +use url::Url; + +/// This example demonstrates querying data in a S3 bucket using DataFusion and `object_store_opendal` +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + // Configure OpenDAL for S3 + let region = "my_region"; + let bucket_name = "my_bucket"; + let builder = S3::default() + .endpoint("my_endpoint") + .bucket(bucket_name) + .region(region) + .access_key_id("my_access_key") + .secret_access_key("my_secret_key"); + let op = Operator::new(builder) + .map_err(|err| DataFusionError::External(Box::new(err)))? + .finish(); + let store = object_store_opendal::OpendalStore::new(op); + + // Register the object store + let path = format!("s3://{bucket_name}"); + let s3_url = Url::parse(&path).unwrap(); + ctx.register_object_store(&s3_url, Arc::new(store)); + + // Register CSV file as a table + let path = format!("s3://{bucket_name}/csv/data.csv"); + ctx.register_csv("trips", &path, CsvReadOptions::default()) + .await?; + + // Execute the query + let df = ctx.sql("SELECT * FROM trips LIMIT 10").await?; + // Print the results + df.show().await?; + + // Dynamic query using the file path directly + let ctx = ctx.enable_url_table(); + let df = ctx + .sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str()) + .await?; + // Print the results + df.show().await?; + + Ok(()) +} From 8dacff2efbbc37b50d4a64e04be27e4ed13aed84 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 15 Jan 2025 14:02:51 +0800 Subject: [PATCH 02/15] ci: Pin the nightly version to rust 1.84 for fuzz (#5546) --- .github/actions/fuzz_test/action.yaml | 4 ++-- .github/workflows/test_fuzz.yml | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/actions/fuzz_test/action.yaml b/.github/actions/fuzz_test/action.yaml index bad0a89fa242..9d161cb2dd3a 100644 --- a/.github/actions/fuzz_test/action.yaml +++ b/.github/actions/fuzz_test/action.yaml @@ -16,7 +16,7 @@ # under the License. name: Fuzz Test -description: 'Fuzz test given setup and service' +description: "Fuzz test given setup and service" inputs: setup: description: "The setup action for test" @@ -41,7 +41,7 @@ runs: - name: Run Fuzz Test shell: bash working-directory: core/fuzz - run: cargo +nightly fuzz run ${{ inputs.target }} --features services-${{ inputs.service }} -- -max_total_time=120 + run: cargo +nightly-2024-11-22 fuzz run ${{ inputs.target }} --features services-${{ inputs.service }} -- -max_total_time=120 env: OPENDAL_TEST: ${{ inputs.service }} EOF diff --git a/.github/workflows/test_fuzz.yml b/.github/workflows/test_fuzz.yml index 448f6f6679a5..03d4041f2c2c 100644 --- a/.github/workflows/test_fuzz.yml +++ b/.github/workflows/test_fuzz.yml @@ -40,7 +40,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - target: [ "fuzz_reader", "fuzz_writer" ] + target: ["fuzz_reader", "fuzz_writer"] cases: - { service: "memory", setup: "memory" } - { service: "fs", setup: "local_fs" } @@ -54,8 +54,8 @@ jobs: shell: bash run: | sudo apt-get install -y libfuzzer-14-dev - rustup install nightly - cargo +nightly install cargo-fuzz + rustup install nightly-2024-11-22 + cargo +nightly-2024-11-22 install cargo-fuzz - name: Fuzz uses: ./.github/actions/fuzz_test env: From 67347874ff8d79c2932c2e1197cb3a2d76897b84 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Wed, 15 Jan 2025 15:47:07 +0800 Subject: [PATCH 03/15] feat(layer/otelmetrics): take meter when register (#5547) --- core/src/layers/otelmetrics.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/layers/otelmetrics.rs b/core/src/layers/otelmetrics.rs index 5b3d083a74a4..55702928e95c 100644 --- a/core/src/layers/otelmetrics.rs +++ b/core/src/layers/otelmetrics.rs @@ -18,9 +18,9 @@ use std::sync::Arc; use std::time::Duration; -use opentelemetry::global; use opentelemetry::metrics::Counter; use opentelemetry::metrics::Histogram; +use opentelemetry::metrics::Meter; use opentelemetry::KeyValue; use crate::layers::observe; @@ -38,8 +38,9 @@ use crate::*; /// # use opendal::Result; /// /// # fn main() -> Result<()> { +/// let meter = opentelemetry::global::meter("opendal"); /// let _ = Operator::new(services::Memory::default())? -/// .layer(OtelMetricsLayer::builder().register()) +/// .layer(OtelMetricsLayer::builder().register(meter)) /// .finish(); /// Ok(()) /// # } @@ -59,7 +60,6 @@ impl OtelMetricsLayer { /// # Examples /// /// ```no_run - /// # use log::debug; /// # use opendal::layers::OtelMetricsLayer; /// # use opendal::services; /// # use opendal::Operator; @@ -67,10 +67,10 @@ impl OtelMetricsLayer { /// /// # #[tokio::main] /// # async fn main() -> Result<()> { + /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().path_label(1).register()) + /// .layer(OtelMetricsLayer::builder().path_label(1).register(meter)) /// .finish(); - /// debug!("operator: {op:?}"); /// /// Ok(()) /// # } @@ -113,8 +113,9 @@ impl OtelMetricsLayerBuilder { /// /// # #[tokio::main] /// # async fn main() -> Result<()> { + /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().path_label(1).register()) + /// .layer(OtelMetricsLayer::builder().path_label(1).register(meter)) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -139,11 +140,12 @@ impl OtelMetricsLayerBuilder { /// /// # #[tokio::main] /// # async fn main() -> Result<()> { + /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? /// .layer( /// OtelMetricsLayer::builder() /// .operation_duration_seconds_boundaries(vec![0.01, 0.02, 0.05, 0.1, 0.2, 0.5]) - /// .register() + /// .register(meter) /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -171,11 +173,12 @@ impl OtelMetricsLayerBuilder { /// /// # #[tokio::main] /// # async fn main() -> Result<()> { + /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? /// .layer( /// OtelMetricsLayer::builder() /// .operation_bytes_boundaries(vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0]) - /// .register() + /// .register(meter) /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -195,7 +198,6 @@ impl OtelMetricsLayerBuilder { /// # Examples /// /// ```no_run - /// # use log::debug; /// # use opendal::layers::OtelMetricsLayer; /// # use opendal::services; /// # use opendal::Operator; @@ -203,16 +205,15 @@ impl OtelMetricsLayerBuilder { /// /// # #[tokio::main] /// # async fn main() -> Result<()> { + /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().register()) + /// .layer(OtelMetricsLayer::builder().register(meter)) /// .finish(); - /// debug!("operator: {op:?}"); /// /// Ok(()) /// # } /// ``` - pub fn register(self) -> OtelMetricsLayer { - let meter = global::meter("opendal"); + pub fn register(self, meter: Meter) -> OtelMetricsLayer { let duration_seconds = meter .f64_histogram("opendal.operation.duration") .with_description("Duration of operations") From 78b6a9f26dbeb5118990d3140f1c636d8e54e719 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 15 Jan 2025 17:28:30 +0800 Subject: [PATCH 04/15] feat(gcs): Convert TOO_MANY_REQUESTS to retryable Ratelimited (#5551) --- core/src/services/gcs/error.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/services/gcs/error.rs b/core/src/services/gcs/error.rs index 1410c4fa0d3c..187f0de5be1d 100644 --- a/core/src/services/gcs/error.rs +++ b/core/src/services/gcs/error.rs @@ -58,6 +58,7 @@ pub(super) fn parse_error(resp: Response) -> Error { StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { (ErrorKind::ConditionNotMatch, false) } + StatusCode::TOO_MANY_REQUESTS => (ErrorKind::RateLimited, true), StatusCode::INTERNAL_SERVER_ERROR | StatusCode::BAD_GATEWAY | StatusCode::SERVICE_UNAVAILABLE From abfbf1587bfe6dbeea1659c0392b61190c154287 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 15 Jan 2025 17:53:57 +0800 Subject: [PATCH 05/15] docs: Add docs on how to pronounce opendal (#5552) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index db39c5e230eb..ac820226dbfe 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![](https://img.shields.io/github/discussions/apache/opendal)](https://github.com/apache/opendal/discussions) [![](https://img.shields.io/discord/1081052318650339399?logo=discord&label=discord)](https://opendal.apache.org/discord) -OpenDAL is an Open Data Access Layer that enables seamless interaction with diverse storage services. +OpenDAL (`/ˈoʊ.pən.dæl/`, pronounced "OH-puhn-dal") is an Open Data Access Layer that enables seamless interaction with diverse storage services. OpenDAL's development is guided by its vision of **One Layer, All Storage** and its core principles: **Open Community**, **Solid Foundation**, **Fast Access**, **Object Storage First**, and **Extensible Architecture**. Read the explained vision at [OpenDAL Vision](https://opendal.apache.org/vision). From ef94a688408d00ce2bda99f2beab669e57063c9c Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Thu, 16 Jan 2025 00:59:36 +0800 Subject: [PATCH 06/15] chore(layer/otelmetrics): take meter by reference (#5553) --- core/src/layers/otelmetrics.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/layers/otelmetrics.rs b/core/src/layers/otelmetrics.rs index 55702928e95c..2cfea4e87a5c 100644 --- a/core/src/layers/otelmetrics.rs +++ b/core/src/layers/otelmetrics.rs @@ -40,7 +40,7 @@ use crate::*; /// # fn main() -> Result<()> { /// let meter = opentelemetry::global::meter("opendal"); /// let _ = Operator::new(services::Memory::default())? -/// .layer(OtelMetricsLayer::builder().register(meter)) +/// .layer(OtelMetricsLayer::builder().register(&meter)) /// .finish(); /// Ok(()) /// # } @@ -69,7 +69,7 @@ impl OtelMetricsLayer { /// # async fn main() -> Result<()> { /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().path_label(1).register(meter)) + /// .layer(OtelMetricsLayer::builder().path_label(1).register(&meter)) /// .finish(); /// /// Ok(()) @@ -115,7 +115,7 @@ impl OtelMetricsLayerBuilder { /// # async fn main() -> Result<()> { /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().path_label(1).register(meter)) + /// .layer(OtelMetricsLayer::builder().path_label(1).register(&meter)) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -145,7 +145,7 @@ impl OtelMetricsLayerBuilder { /// .layer( /// OtelMetricsLayer::builder() /// .operation_duration_seconds_boundaries(vec![0.01, 0.02, 0.05, 0.1, 0.2, 0.5]) - /// .register(meter) + /// .register(&meter) /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -178,7 +178,7 @@ impl OtelMetricsLayerBuilder { /// .layer( /// OtelMetricsLayer::builder() /// .operation_bytes_boundaries(vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0]) - /// .register(meter) + /// .register(&meter) /// ) /// .finish(); /// debug!("operator: {op:?}"); @@ -207,13 +207,13 @@ impl OtelMetricsLayerBuilder { /// # async fn main() -> Result<()> { /// let meter = opentelemetry::global::meter("opendal"); /// let op = Operator::new(services::Memory::default())? - /// .layer(OtelMetricsLayer::builder().register(meter)) + /// .layer(OtelMetricsLayer::builder().register(&meter)) /// .finish(); /// /// Ok(()) /// # } /// ``` - pub fn register(self, meter: Meter) -> OtelMetricsLayer { + pub fn register(self, meter: &Meter) -> OtelMetricsLayer { let duration_seconds = meter .f64_histogram("opendal.operation.duration") .with_description("Duration of operations") From 8aac21fbc4f6aec3623edf2a1846fc9ee53f5cdb Mon Sep 17 00:00:00 2001 From: meteorgan Date: Thu, 16 Jan 2025 20:28:32 +0800 Subject: [PATCH 07/15] ci: skip running behavior tests when adding or modifying documentation (#5558) --- .github/scripts/test_behavior/plan.py | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/scripts/test_behavior/plan.py b/.github/scripts/test_behavior/plan.py index 0f32ec6e4f53..013f7d89882e 100755 --- a/.github/scripts/test_behavior/plan.py +++ b/.github/scripts/test_behavior/plan.py @@ -139,6 +139,7 @@ def calculate_hint(changed_files: list[str]) -> Hint: and not p.startswith("core/edge/") and not p.startswith("core/fuzz/") and not p.startswith("core/src/services/") + and not p.startswith("core/src/docs/") ): hint.core = True hint.binding_java = True From da155eeedfb59f74b37e1c4eebfa441dcf25ddc7 Mon Sep 17 00:00:00 2001 From: meteorgan Date: Thu, 16 Jan 2025 23:09:30 +0800 Subject: [PATCH 08/15] RFC-5556: Write Returns Metadata (#5556) --- .../docs/rfcs/5556_write_returns_metadata.md | 121 ++++++++++++++++++ core/src/docs/rfcs/mod.rs | 4 + 2 files changed, 125 insertions(+) create mode 100644 core/src/docs/rfcs/5556_write_returns_metadata.md diff --git a/core/src/docs/rfcs/5556_write_returns_metadata.md b/core/src/docs/rfcs/5556_write_returns_metadata.md new file mode 100644 index 000000000000..a381580e516a --- /dev/null +++ b/core/src/docs/rfcs/5556_write_returns_metadata.md @@ -0,0 +1,121 @@ +- Proposal Name: `write_returns_metadata` +- Start Date: 2025-01-16 +- RFC PR: [apache/opendal#5556](https://github.com/apache/opendal/pull/5556) +- Tracking Issue: [apache/opendal#5557](https://github.com/apache/opendal/issues/5557) + +# Summary + +Enhance write operations by returning metadata after successful writes. + +# Motivation + +Currently, write operations (`write`, `write_with`, `writer`, `writer_with`) only return `Result<()>` or `Result`. +Users who need metadata after writing (like `ETag` or `version_id`) must make an additional `stat()` call. This is inefficient +and can lead to race conditions if the file is modified between the write and stat operations. + +Many storage services (like S3, GCS, Azure Blob) return metadata in their write responses. We should expose this information +to users directly after write operations. + +# Guide-level explanation + +The write operations will be enhanced to return metadata: + +```rust +// Before +op.write("path/to/file", data).await?; +let meta = op.stat("path/to/file").await?; +if Some(etag) = meta.etag() { + println!("File ETag: {}", etag); +} + +// After +let meta = op.write("path/to/file", data).await?; +if Some(etag) = meta.etag() { + println!("File ETag: {}", etag); +} +``` + +For writer operations: + +```rust +// Before +let mut writer = op.writer("path/to/file").await?; +writer.write(data).await?; +writer.close().await?; +let meta = op.stat("path/to/file").await?; +if Some(etag) = meta.etag() { + println!("File ETag: {}", etag); +} + +// After +let mut writer = op.writer("path/to/file").await?; +writer.write(data).await?; +let meta = writer.close().await?; +if Some(etag) = meta.etag() { + println!("File ETag: {}", etag); +} +``` + +The behavior remains unchanged if users don't need the metadata - they can simply ignore the return value. + +# Reference-level explanation + +## Changes to `Operator` API + +The following functions will be modified to return `Result` instead of `Result<()>`: + +- `write()` +- `write_with()` + +The `writer()` and `writer_with()` return types remain unchanged as they return `Result`. + +## Changes to struct `Writer` + +The `Writer` struct will be modified to return `Result` instead of `Result<()>` for the `close()` function. + +## Changes to trait `oio::Write` and trait `oio::MultipartWrite` + +The `Write` trait will be modified to return `Result` instead of `Result<()>` for the `close()` function. + +The `MultipartWrite` trait will be modified to return `Result` instead of `Result<()>` for the `complete_part()` +and `write_once` functions. + +## Implementation Details + +For services that return metadata in their write responses: +- The metadata will be captured from the service response +- All available fields (etag, version_id, etc.) will be populated + +For services that don't return metadata in write responses: +- for `fs`: we can use `stat` to retrieve the metadata before returning. since the metadata is cached by the kernel, +this won't cause a performance issue. +- for other services: A default metadata object will be returned. + + +# Drawbacks + +- Minor breaking change for users who explicitly type the return value of write operations +- Additional complexity in the Writer implementation + +# Rationale and alternatives + +- Provides a clean, consistent API +- Maintains backward compatibility for users who ignore the return value +- Improves performance by avoiding additional stat calls when possible + +# Prior art + +Similar patterns exist in other storage SDKs: + +- `object_store` crate returns metadata in `PutResult` after calling `put_opts` +- AWS SDK returns metadata in `PutObjectOutput` +- Azure SDK returns `UploadFileResponse` after uploads + +# Unresolved questions + +- None + + +# Future possibilities + +- None \ No newline at end of file diff --git a/core/src/docs/rfcs/mod.rs b/core/src/docs/rfcs/mod.rs index 7dcb2bf8a4b0..4d4980e81782 100644 --- a/core/src/docs/rfcs/mod.rs +++ b/core/src/docs/rfcs/mod.rs @@ -256,3 +256,7 @@ pub mod rfc_5485_conditional_reader {} /// List With Deleted #[doc = include_str!("5495_list_with_deleted.md")] pub mod rfc_5495_list_with_deleted {} + +/// Write Returns Metadata +#[doc = include_str!("5556_write_returns_metadata.md")] +pub mod rfc_5556_write_returns_metadata {} From 66bf9db1a41b58afc03d877509e45b555067cfd6 Mon Sep 17 00:00:00 2001 From: yihong Date: Fri, 17 Jan 2025 09:58:17 +0800 Subject: [PATCH 09/15] refactor: refactor some unnecessary clone and use next_back to make clippy happy (#5554) --- core/benches/vs_s3/src/main.rs | 6 +++--- core/src/raw/http_util/header.rs | 2 +- core/src/raw/path.rs | 2 +- core/src/services/azblob/error.rs | 7 ++++--- core/src/services/s3/error.rs | 6 ++++-- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/core/benches/vs_s3/src/main.rs b/core/benches/vs_s3/src/main.rs index 91b1be90495f..324b2bbad98c 100644 --- a/core/benches/vs_s3/src/main.rs +++ b/core/benches/vs_s3/src/main.rs @@ -69,7 +69,7 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu let mut group = c.benchmark_group("read"); group.throughput(criterion::Throughput::Bytes(16 * 1024 * 1024)); - TEST_RUNTIME.block_on(prepare(op.clone())); + TEST_RUNTIME.block_on(prepare(&op)); group.bench_function("opendal_s3_reader", |b| { b.to_async(&*TEST_RUNTIME).iter(|| async { @@ -118,10 +118,10 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu group.finish() } -async fn prepare(op: Operator) { +async fn prepare(op: &Operator) { let mut rng = thread_rng(); let mut content = vec![0; 16 * 1024 * 1024]; rng.fill_bytes(&mut content); - op.write("file", content.clone()).await.unwrap(); + op.write("file", content).await.unwrap(); } diff --git a/core/src/raw/http_util/header.rs b/core/src/raw/http_util/header.rs index 2da77a4b08dc..2db92e9efcad 100644 --- a/core/src/raw/http_util/header.rs +++ b/core/src/raw/http_util/header.rs @@ -129,7 +129,7 @@ where .with_operation("http_util::parse_header_to_str") })?; - let value = if let Some(v) = headers.get(name.clone()) { + let value = if let Some(v) = headers.get(&name) { v } else { return Ok(None); diff --git a/core/src/raw/path.rs b/core/src/raw/path.rs index fb3c4ad07abc..79b44951495b 100644 --- a/core/src/raw/path.rs +++ b/core/src/raw/path.rs @@ -157,7 +157,7 @@ pub fn get_basename(path: &str) -> &str { if !path.ends_with('/') { return path .split('/') - .last() + .next_back() .expect("file path without name is invalid"); } diff --git a/core/src/services/azblob/error.rs b/core/src/services/azblob/error.rs index 1ea38ad8755e..c67d805c035e 100644 --- a/core/src/services/azblob/error.rs +++ b/core/src/services/azblob/error.rs @@ -60,8 +60,8 @@ impl Debug for AzblobError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), @@ -76,7 +76,8 @@ pub(super) fn parse_error(resp: Response) -> Error { _ => (ErrorKind::Unexpected, false), }; - let mut message = match de::from_reader::<_, AzblobError>(bs.clone().reader()) { + let bs_content = bs.chunk(); + let mut message = match de::from_reader::<_, AzblobError>(bs_content.reader()) { Ok(azblob_err) => format!("{azblob_err:?}"), Err(_) => String::from_utf8_lossy(&bs).into_owned(), }; diff --git a/core/src/services/s3/error.rs b/core/src/services/s3/error.rs index 503ca4ecbd77..385b75fac109 100644 --- a/core/src/services/s3/error.rs +++ b/core/src/services/s3/error.rs @@ -37,6 +37,7 @@ pub(crate) struct S3Error { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (mut kind, mut retryable) = match parts.status.as_u16() { 403 => (ErrorKind::PermissionDenied, false), @@ -49,9 +50,10 @@ pub(super) fn parse_error(resp: Response) -> Error { _ => (ErrorKind::Unexpected, false), }; - let (message, s3_err) = de::from_reader::<_, S3Error>(body.clone().reader()) + let body_content = bs.chunk(); + let (message, s3_err) = de::from_reader::<_, S3Error>(body_content.reader()) .map(|s3_err| (format!("{s3_err:?}"), Some(s3_err))) - .unwrap_or_else(|_| (String::from_utf8_lossy(body.chunk()).into_owned(), None)); + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); if let Some(s3_err) = s3_err { (kind, retryable) = parse_s3_error_code(s3_err.code.as_str()).unwrap_or((kind, retryable)); From 71dd536c2641b4cca7de76a5079e7bd59b75a053 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 17 Jan 2025 13:12:40 +0800 Subject: [PATCH 10/15] chore(core): Avoid using mongodb 3.2.0 (#5560) --- core/Cargo.toml | 138 ++++++++++++++++++++++++------------------------ 1 file changed, 70 insertions(+), 68 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index f75fb8d8cb58..600b51303931 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -56,16 +56,16 @@ default = ["reqwest/rustls-tls", "executors-tokio", "services-memory"] # # You should never enable this feature unless you are developing opendal. tests = [ - "dep:rand", - "dep:sha2", - "dep:dotenvy", - "layers-blocking", - "services-azblob", - "services-fs", - "services-http", - "services-memory", - "internal-tokio-rt", - "services-s3", + "dep:rand", + "dep:sha2", + "dep:dotenvy", + "layers-blocking", + "services-azblob", + "services-fs", + "services-http", + "services-memory", + "internal-tokio-rt", + "services-s3", ] # Enable path cache. @@ -109,20 +109,20 @@ services-aliyun-drive = [] services-alluxio = [] services-atomicserver = ["dep:atomic_lib"] services-azblob = [ - "dep:sha2", - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:sha2", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-azdls = [ - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-azfile = [ - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-b2 = [] services-cacache = ["dep:cacache"] @@ -130,9 +130,9 @@ services-chainsafe = [] services-cloudflare-kv = [] services-compfs = ["dep:compio"] services-cos = [ - "dep:reqsign", - "reqsign?/services-tencent", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-tencent", + "reqsign?/reqwest_request", ] services-d1 = [] services-dashmap = ["dep:dashmap"] @@ -143,14 +143,14 @@ services-foundationdb = ["dep:foundationdb"] services-fs = ["tokio/fs", "internal-tokio-rt"] services-ftp = ["dep:suppaftp", "dep:bb8", "dep:async-tls"] services-gcs = [ - "dep:reqsign", - "reqsign?/services-google", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-google", + "reqsign?/reqwest_request", ] services-gdrive = ["internal-path-cache"] services-ghac = [] services-github = [] -services-gridfs = ["dep:mongodb"] +services-gridfs = ["dep:mongodb", "dep:mongodb-internal-macros"] services-hdfs = ["dep:hdrs"] services-hdfs-native = ["hdfs-native"] services-http = [] @@ -165,20 +165,20 @@ services-memcached = ["dep:bb8"] services-memory = [] services-mini-moka = ["dep:mini-moka"] services-moka = ["dep:moka"] -services-mongodb = ["dep:mongodb"] +services-mongodb = ["dep:mongodb", "dep:mongodb-internal-macros"] services-monoiofs = ["dep:monoio", "dep:flume"] services-mysql = ["dep:sqlx", "sqlx?/mysql"] services-nebula-graph = ["dep:rust-nebula", "dep:bb8", "dep:snowflaked"] services-obs = [ - "dep:reqsign", - "reqsign?/services-huaweicloud", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-huaweicloud", + "reqsign?/reqwest_request", ] services-onedrive = [] services-oss = [ - "dep:reqsign", - "reqsign?/services-aliyun", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-aliyun", + "reqsign?/reqwest_request", ] services-pcloud = [] services-persy = ["dep:persy", "internal-tokio-rt"] @@ -188,10 +188,10 @@ services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"] services-s3 = [ - "dep:reqsign", - "reqsign?/services-aws", - "reqsign?/reqwest_request", - "dep:crc32c", + "dep:reqsign", + "reqsign?/services-aws", + "reqsign?/reqwest_request", + "dep:crc32c", ] services-seafile = [] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] @@ -237,12 +237,12 @@ backon = { version = "1.2", features = ["tokio-sleep"] } base64 = "0.22" bytes = "1.6" chrono = { version = "0.4.28", default-features = false, features = [ - "clock", - "std", + "clock", + "std", ] } futures = { version = "0.3", default-features = false, features = [ - "std", - "async-await", + "std", + "async-await", ] } http = "1.1" log = "0.4" @@ -252,7 +252,7 @@ once_cell = "1" percent-encoding = "2" quick-xml = { version = "0.36", features = ["serialize", "overlapped-lists"] } reqwest = { version = "0.12.2", features = [ - "stream", + "stream", ], default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -272,7 +272,7 @@ prost = { version = "0.13", optional = true } sha1 = { version = "0.10.6", optional = true } sha2 = { version = "0.10", optional = true } sqlx = { version = "0.8.0", features = [ - "runtime-tokio-rustls", + "runtime-tokio-rustls", ], optional = true } # For http based services. @@ -285,8 +285,8 @@ ouroboros = { version = "0.18.4", optional = true } atomic_lib = { version = "0.39.0", optional = true } # for services-cacache cacache = { version = "13.0", default-features = false, features = [ - "tokio-runtime", - "mmap", + "tokio-runtime", + "mmap", ], optional = true } # for services-dashmap dashmap = { version = "6", optional = true } @@ -294,8 +294,8 @@ dashmap = { version = "6", optional = true } etcd-client = { version = "0.14", optional = true, features = ["tls"] } # for services-foundationdb foundationdb = { version = "0.9.0", features = [ - "embedded-fdb-include", - "fdb-7_3", + "embedded-fdb-include", + "fdb-7_3", ], optional = true } # for services-hdfs hdrs = { version = "0.3.2", optional = true, features = ["async_file"] } @@ -308,12 +308,14 @@ mini-moka = { version = "0.10", optional = true } # for services-moka moka = { version = "0.12", optional = true, features = ["future", "sync"] } # for services-mongodb -mongodb = { version = "3", optional = true } +# mongodb has known issues on 3.2.0: https://github.com/mongodb/mongo-rust-driver/issues/1287 +mongodb = { version = ">=3,<3.2.0", optional = true } +mongodb-internal-macros = { version = ">=3,<3.2.0", optional = true } # for services-sftp openssh = { version = "0.11.0", optional = true } openssh-sftp-client = { version = "0.15.2", optional = true, features = [ - "openssh", - "tracing", + "openssh", + "tracing", ] } # for services-persy persy = { version = "1.4.6", optional = true } @@ -321,9 +323,9 @@ persy = { version = "1.4.6", optional = true } redb = { version = "2", optional = true } # for services-redis redis = { version = "0.27", features = [ - "cluster-async", - "tokio-comp", - "connection-manager", + "cluster-async", + "tokio-comp", + "connection-manager", ], optional = true } # for services-rocksdb rocksdb = { version = "0.21.0", default-features = false, optional = true } @@ -331,9 +333,9 @@ rocksdb = { version = "0.21.0", default-features = false, optional = true } sled = { version = "0.34.7", optional = true } # for services-ftp suppaftp = { version = "6.0.3", default-features = false, features = [ - "async-secure", - "rustls", - "async-rustls", + "async-secure", + "rustls", + "async-rustls", ], optional = true } # for services-tikv tikv-client = { version = "0.3.0", optional = true, default-features = false } @@ -343,10 +345,10 @@ hdfs-native = { version = "0.10", optional = true } surrealdb = { version = "2", optional = true, features = ["protocol-http"] } # for services-compfs compio = { version = "0.12.0", optional = true, features = [ - "runtime", - "bytes", - "polling", - "dispatcher", + "runtime", + "bytes", + "polling", + "dispatcher", ] } # for services-s3 crc32c = { version = "0.6.6", optional = true } @@ -356,10 +358,10 @@ snowflaked = { version = "1", optional = true, features = ["sync"] } # for services-monoiofs flume = { version = "0.11", optional = true } monoio = { version = "0.2.4", optional = true, features = [ - "sync", - "mkdirat", - "unlinkat", - "renameat", + "sync", + "mkdirat", + "unlinkat", + "renameat", ] } # Layers @@ -398,7 +400,7 @@ fastrace = { version = "0.7", features = ["enable"] } fastrace-jaeger = "0.7" libtest-mimic = "0.8" opentelemetry = { version = "0.27", default-features = false, features = [ - "trace", + "trace", ] } opentelemetry-otlp = "0.27" opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } @@ -409,6 +411,6 @@ size = "0.4" tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] } tracing-opentelemetry = "0.28.0" tracing-subscriber = { version = "0.3", features = [ - "env-filter", - "tracing-log", + "env-filter", + "tracing-log", ] } From 753d4a83cc9e2401414e3aaa65d3eee487ab5f6c Mon Sep 17 00:00:00 2001 From: yihong Date: Fri, 17 Jan 2025 14:33:33 +0800 Subject: [PATCH 11/15] refactor: refactor all body.copy_to_bytes(body.remaining()) (#5561) Co-authored-by: Xuanwo --- core/src/services/aliyun_drive/error.rs | 4 ++-- core/src/services/alluxio/error.rs | 4 ++-- core/src/services/azdls/error.rs | 4 ++-- core/src/services/azfile/error.rs | 4 ++-- core/src/services/b2/error.rs | 4 ++-- core/src/services/chainsafe/error.rs | 4 ++-- core/src/services/cloudflare_kv/error.rs | 4 ++-- core/src/services/cos/error.rs | 4 ++-- core/src/services/d1/backend.rs | 5 ++--- core/src/services/d1/error.rs | 4 ++-- core/src/services/dbfs/error.rs | 5 ++--- core/src/services/dropbox/error.rs | 5 ++--- core/src/services/gdrive/error.rs | 5 ++--- core/src/services/ghac/error.rs | 5 ++--- core/src/services/github/error.rs | 4 ++-- core/src/services/http/error.rs | 5 ++--- core/src/services/huggingface/error.rs | 5 ++--- core/src/services/icloud/core.rs | 4 ++-- core/src/services/ipfs/error.rs | 5 ++--- core/src/services/ipmfs/error.rs | 5 ++--- core/src/services/koofr/error.rs | 5 ++--- core/src/services/lakefs/error.rs | 5 ++--- core/src/services/libsql/error.rs | 5 ++--- core/src/services/obs/error.rs | 4 ++-- core/src/services/onedrive/error.rs | 5 ++--- core/src/services/onedrive/writer.rs | 2 +- core/src/services/oss/error.rs | 4 ++-- core/src/services/pcloud/error.rs | 5 ++--- core/src/services/s3/backend.rs | 4 ++-- core/src/services/seafile/error.rs | 4 ++-- core/src/services/supabase/error.rs | 5 ++--- core/src/services/swift/error.rs | 4 ++-- core/src/services/upyun/error.rs | 4 ++-- core/src/services/vercel_artifacts/error.rs | 5 ++--- core/src/services/vercel_blob/error.rs | 4 ++-- core/src/services/webdav/error.rs | 5 ++--- core/src/services/webhdfs/error.rs | 5 ++--- core/src/services/yandex_disk/error.rs | 4 ++-- 38 files changed, 75 insertions(+), 93 deletions(-) diff --git a/core/src/services/aliyun_drive/error.rs b/core/src/services/aliyun_drive/error.rs index 8c06b988a757..fc90cfcd814b 100644 --- a/core/src/services/aliyun_drive/error.rs +++ b/core/src/services/aliyun_drive/error.rs @@ -28,8 +28,8 @@ struct AliyunDriveError { } pub(super) fn parse_error(res: Response) -> Error { - let (parts, mut body) = res.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = res.into_parts(); + let bs = body.to_bytes(); let (code, message) = serde_json::from_reader::<_, AliyunDriveError>(bs.clone().reader()) .map(|err| (Some(err.code), err.message)) .unwrap_or((None, String::from_utf8_lossy(&bs).into_owned())); diff --git a/core/src/services/alluxio/error.rs b/core/src/services/alluxio/error.rs index 6b7da11d2ae6..71935cee799e 100644 --- a/core/src/services/alluxio/error.rs +++ b/core/src/services/alluxio/error.rs @@ -32,8 +32,8 @@ struct AlluxioError { } pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let mut kind = match parts.status.as_u16() { 500 => ErrorKind::Unexpected, diff --git a/core/src/services/azdls/error.rs b/core/src/services/azdls/error.rs index 95e09bf88985..7ba598160930 100644 --- a/core/src/services/azdls/error.rs +++ b/core/src/services/azdls/error.rs @@ -60,8 +60,8 @@ impl Debug for AzdlsError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/azfile/error.rs b/core/src/services/azfile/error.rs index 63f8b933fff8..bd0cf9042f01 100644 --- a/core/src/services/azfile/error.rs +++ b/core/src/services/azfile/error.rs @@ -60,8 +60,8 @@ impl Debug for AzfileError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/b2/error.rs b/core/src/services/b2/error.rs index 678b5bb8386e..4c23282693d7 100644 --- a/core/src/services/b2/error.rs +++ b/core/src/services/b2/error.rs @@ -33,8 +33,8 @@ struct B2Error { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (mut kind, mut retryable) = match parts.status.as_u16() { 403 => (ErrorKind::PermissionDenied, false), diff --git a/core/src/services/chainsafe/error.rs b/core/src/services/chainsafe/error.rs index 2a546c314bb2..87f78e40ca4f 100644 --- a/core/src/services/chainsafe/error.rs +++ b/core/src/services/chainsafe/error.rs @@ -37,8 +37,8 @@ struct ChainsafeSubError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status.as_u16() { 401 | 403 => (ErrorKind::PermissionDenied, false), diff --git a/core/src/services/cloudflare_kv/error.rs b/core/src/services/cloudflare_kv/error.rs index 79a7b0dfcc0a..b04eb3e57e46 100644 --- a/core/src/services/cloudflare_kv/error.rs +++ b/core/src/services/cloudflare_kv/error.rs @@ -27,8 +27,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (mut kind, mut retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/cos/error.rs b/core/src/services/cos/error.rs index df5f95df5cca..4d767e2771cd 100644 --- a/core/src/services/cos/error.rs +++ b/core/src/services/cos/error.rs @@ -37,8 +37,8 @@ struct CosError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs index a86d47ab8862..b6897aa3ef51 100644 --- a/core/src/services/d1/backend.rs +++ b/core/src/services/d1/backend.rs @@ -18,7 +18,6 @@ use std::fmt::Debug; use std::fmt::Formatter; -use bytes::Buf; use http::header; use http::Request; use http::StatusCode; @@ -287,8 +286,8 @@ impl kv::Adapter for Adapter { let status = resp.status(); match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let mut body = resp.into_body(); - let bs = body.copy_to_bytes(body.remaining()); + let body = resp.into_body(); + let bs = body.to_bytes(); let d1_response = D1Response::parse(&bs)?; Ok(d1_response.get_result(&self.value_field)) } diff --git a/core/src/services/d1/error.rs b/core/src/services/d1/error.rs index f9cdf66b23e8..37f6b95564e9 100644 --- a/core/src/services/d1/error.rs +++ b/core/src/services/d1/error.rs @@ -27,8 +27,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (mut kind, mut retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/dbfs/error.rs b/core/src/services/dbfs/error.rs index ca5b3a2c20b8..666466b727ac 100644 --- a/core/src/services/dbfs/error.rs +++ b/core/src/services/dbfs/error.rs @@ -17,7 +17,6 @@ use std::fmt::Debug; -use bytes::Buf; use http::Response; use http::StatusCode; use serde::Deserialize; @@ -44,8 +43,8 @@ impl Debug for DbfsError { } pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/dropbox/error.rs b/core/src/services/dropbox/error.rs index ec242c6f7f96..715d029ed8bf 100644 --- a/core/src/services/dropbox/error.rs +++ b/core/src/services/dropbox/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; use serde::Deserialize; @@ -31,8 +30,8 @@ pub struct DropboxErrorResponse { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (mut kind, mut retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/gdrive/error.rs b/core/src/services/gdrive/error.rs index 94e00789f5ca..199e82e26a8a 100644 --- a/core/src/services/gdrive/error.rs +++ b/core/src/services/gdrive/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; use serde::Deserialize; @@ -35,8 +34,8 @@ struct GdriveInnerError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (mut kind, mut retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/ghac/error.rs b/core/src/services/ghac/error.rs index a2736e05b657..7efafe1f6dd7 100644 --- a/core/src/services/ghac/error.rs +++ b/core/src/services/ghac/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; @@ -24,7 +23,7 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); + let (parts, body) = resp.into_parts(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND | StatusCode::NO_CONTENT => (ErrorKind::NotFound, false), @@ -38,7 +37,7 @@ pub(super) fn parse_error(resp: Response) -> Error { _ => (ErrorKind::Unexpected, false), }; - let bs = body.copy_to_bytes(body.remaining()); + let bs = body.to_bytes(); let message = String::from_utf8_lossy(&bs); let mut err = Error::new(kind, message); diff --git a/core/src/services/github/error.rs b/core/src/services/github/error.rs index 3109370ad3df..cfc259c111e7 100644 --- a/core/src/services/github/error.rs +++ b/core/src/services/github/error.rs @@ -37,8 +37,8 @@ struct GithubSubError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status.as_u16() { 401 | 403 => (ErrorKind::PermissionDenied, false), diff --git a/core/src/services/http/error.rs b/core/src/services/http/error.rs index e2c2947f8932..09150089232b 100644 --- a/core/src/services/http/error.rs +++ b/core/src/services/http/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; @@ -24,8 +23,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/huggingface/error.rs b/core/src/services/huggingface/error.rs index 61abc2c25d1a..e69a70a257ed 100644 --- a/core/src/services/huggingface/error.rs +++ b/core/src/services/huggingface/error.rs @@ -17,7 +17,6 @@ use std::fmt::Debug; -use bytes::Buf; use http::Response; use http::StatusCode; use serde::Deserialize; @@ -41,8 +40,8 @@ impl Debug for HuggingfaceError { } pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/icloud/core.rs b/core/src/services/icloud/core.rs index 1028ba9f657e..328a30af3127 100644 --- a/core/src/services/icloud/core.rs +++ b/core/src/services/icloud/core.rs @@ -569,8 +569,8 @@ impl PathQuery for IcloudPathQuery { } pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let mut kind = match parts.status.as_u16() { 421 | 450 | 500 => ErrorKind::NotFound, diff --git a/core/src/services/ipfs/error.rs b/core/src/services/ipfs/error.rs index 9f7e183160b0..e47eda369d77 100644 --- a/core/src/services/ipfs/error.rs +++ b/core/src/services/ipfs/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; @@ -24,8 +23,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/ipmfs/error.rs b/core/src/services/ipmfs/error.rs index 780e3388a4bd..395b4e710447 100644 --- a/core/src/services/ipmfs/error.rs +++ b/core/src/services/ipmfs/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; use serde::Deserialize; @@ -45,8 +44,8 @@ struct IpfsError { /// /// ref: https://docs.ipfs.tech/reference/kubo/rpc/#http-status-codes pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let ipfs_error = de::from_slice::(&bs).ok(); diff --git a/core/src/services/koofr/error.rs b/core/src/services/koofr/error.rs index 7929c2de3775..760f201194a0 100644 --- a/core/src/services/koofr/error.rs +++ b/core/src/services/koofr/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use crate::raw::*; @@ -23,8 +22,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status.as_u16() { 403 => (ErrorKind::PermissionDenied, false), diff --git a/core/src/services/lakefs/error.rs b/core/src/services/lakefs/error.rs index 32c75d888408..69934d747b8a 100644 --- a/core/src/services/lakefs/error.rs +++ b/core/src/services/lakefs/error.rs @@ -17,7 +17,6 @@ use std::fmt::Debug; -use bytes::Buf; use http::Response; use http::StatusCode; use serde::Deserialize; @@ -41,8 +40,8 @@ impl Debug for LakefsError { } pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/libsql/error.rs b/core/src/services/libsql/error.rs index 8019ce29a801..54af90a53702 100644 --- a/core/src/services/libsql/error.rs +++ b/core/src/services/libsql/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; @@ -24,8 +23,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/obs/error.rs b/core/src/services/obs/error.rs index f67afdae5c7a..21ddd5f97d7b 100644 --- a/core/src/services/obs/error.rs +++ b/core/src/services/obs/error.rs @@ -37,8 +37,8 @@ struct ObsError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/onedrive/error.rs b/core/src/services/onedrive/error.rs index ab396a499d3f..8564eedc606e 100644 --- a/core/src/services/onedrive/error.rs +++ b/core/src/services/onedrive/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; @@ -24,8 +23,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index f2a35d1e57dd..c6ed61f22c4f 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -122,7 +122,7 @@ impl OneDriveWriter { } async fn create_upload_session(&self) -> Result { - let file_name_from_path = self.path.split('/').last().ok_or_else(|| { + let file_name_from_path = self.path.split('/').next_back().ok_or_else(|| { Error::new( ErrorKind::Unexpected, "connection string must have AccountName", diff --git a/core/src/services/oss/error.rs b/core/src/services/oss/error.rs index 3cfeb66b0c25..9173d5026430 100644 --- a/core/src/services/oss/error.rs +++ b/core/src/services/oss/error.rs @@ -36,8 +36,8 @@ struct OssError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/pcloud/error.rs b/core/src/services/pcloud/error.rs index 2b1f3edbe3ee..1234df1e2f7d 100644 --- a/core/src/services/pcloud/error.rs +++ b/core/src/services/pcloud/error.rs @@ -18,7 +18,6 @@ use std::fmt::Debug; use std::fmt::Formatter; -use bytes::Buf; use http::Response; use serde::Deserialize; @@ -43,8 +42,8 @@ impl Debug for PcloudError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let message = String::from_utf8_lossy(&bs).into_owned(); let mut err = Error::new(ErrorKind::Unexpected, message); diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 3f3f00521aae..9145f22a7c8b 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -751,10 +751,10 @@ impl Builder for S3Builder { let checksum_algorithm = match self.config.checksum_algorithm.as_deref() { Some("crc32c") => Some(ChecksumAlgorithm::Crc32c), None => None, - _ => { + v => { return Err(Error::new( ErrorKind::ConfigInvalid, - "{v} is not a supported checksum_algorithm.", + format!("{:?} is not a supported checksum_algorithm.", v), )) } }; diff --git a/core/src/services/seafile/error.rs b/core/src/services/seafile/error.rs index ab0bf6588c0a..47a7bde5358a 100644 --- a/core/src/services/seafile/error.rs +++ b/core/src/services/seafile/error.rs @@ -31,8 +31,8 @@ struct SeafileError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, _retryable) = match parts.status.as_u16() { 403 => (ErrorKind::PermissionDenied, false), diff --git a/core/src/services/supabase/error.rs b/core/src/services/supabase/error.rs index c2316eab8c71..273addb90415 100644 --- a/core/src/services/supabase/error.rs +++ b/core/src/services/supabase/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; use serde::Deserialize; @@ -35,8 +34,8 @@ struct SupabaseError { /// Parse the supabase error type to the OpenDAL error type pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); // Check HTTP status code first/ let (mut kind, mut retryable) = match parts.status.as_u16() { diff --git a/core/src/services/swift/error.rs b/core/src/services/swift/error.rs index 73afa41645c7..3bbb25f88154 100644 --- a/core/src/services/swift/error.rs +++ b/core/src/services/swift/error.rs @@ -33,8 +33,8 @@ struct ErrorResponse { } pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/upyun/error.rs b/core/src/services/upyun/error.rs index 317ea061dfdc..3a8567cd72e0 100644 --- a/core/src/services/upyun/error.rs +++ b/core/src/services/upyun/error.rs @@ -34,8 +34,8 @@ struct UpyunError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status.as_u16() { 403 => (ErrorKind::PermissionDenied, false), diff --git a/core/src/services/vercel_artifacts/error.rs b/core/src/services/vercel_artifacts/error.rs index ab396a499d3f..8564eedc606e 100644 --- a/core/src/services/vercel_artifacts/error.rs +++ b/core/src/services/vercel_artifacts/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; @@ -24,8 +23,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/vercel_blob/error.rs b/core/src/services/vercel_blob/error.rs index 29ecced517f3..58f9420a1ac5 100644 --- a/core/src/services/vercel_blob/error.rs +++ b/core/src/services/vercel_blob/error.rs @@ -39,8 +39,8 @@ struct VercelBlobErrorDetail { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status.as_u16() { 403 => (ErrorKind::PermissionDenied, false), diff --git a/core/src/services/webdav/error.rs b/core/src/services/webdav/error.rs index 0fa32efe814f..bc3f2f4b3bfd 100644 --- a/core/src/services/webdav/error.rs +++ b/core/src/services/webdav/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::Response; use http::StatusCode; @@ -24,8 +23,8 @@ use crate::*; /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), diff --git a/core/src/services/webhdfs/error.rs b/core/src/services/webhdfs/error.rs index 1a3a486f7e6e..2dd8ecb664d4 100644 --- a/core/src/services/webhdfs/error.rs +++ b/core/src/services/webhdfs/error.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use http::response::Parts; use http::Response; use http::StatusCode; @@ -41,8 +40,8 @@ struct WebHdfsError { } pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let s = String::from_utf8_lossy(&bs); parse_error_msg(parts, &s) } diff --git a/core/src/services/yandex_disk/error.rs b/core/src/services/yandex_disk/error.rs index 1f36284453ad..78c397548127 100644 --- a/core/src/services/yandex_disk/error.rs +++ b/core/src/services/yandex_disk/error.rs @@ -34,8 +34,8 @@ struct YandexDiskError { /// Parse error response into Error. pub(super) fn parse_error(resp: Response) -> Error { - let (parts, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); + let (parts, body) = resp.into_parts(); + let bs = body.to_bytes(); let (kind, retryable) = match parts.status.as_u16() { 410 | 403 => (ErrorKind::PermissionDenied, false), From c4da0505d8bc473da8996d19a9c7335b7d91339a Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 20 Jan 2025 17:08:23 +0800 Subject: [PATCH 12/15] build: fix Cargo.lock and pass --locked in CI (#5565) --- .github/workflows/ci_core.yml | 8 ++++---- core/Cargo.lock | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci_core.yml b/.github/workflows/ci_core.yml index 1317b1b6590d..0d0137309cb6 100644 --- a/.github/workflows/ci_core.yml +++ b/.github/workflows/ci_core.yml @@ -108,7 +108,7 @@ jobs: github-token: ${{ secrets.GITHUB_TOKEN }} - name: Build working-directory: core - run: cargo build + run: cargo build --locked build_all_features: runs-on: ubuntu-latest @@ -133,7 +133,7 @@ jobs: - name: Build working-directory: core - run: cargo build --all-features + run: cargo build --all-features --locked build_all_platforms: runs-on: ${{ matrix.os }} @@ -213,7 +213,7 @@ jobs: services-webdav services-webhdfs ) - cargo build --features "${FEATURES[*]}" + cargo build --features "${FEATURES[*]}" --locked # We only support some services(see `available_services` below) for now. build_under_wasm: @@ -233,7 +233,7 @@ jobs: services-s3 ) rustup target add wasm32-unknown-unknown - cargo build --target wasm32-unknown-unknown --no-default-features --features="${FEATURES[*]}" + cargo build --target wasm32-unknown-unknown --no-default-features --features="${FEATURES[*]}" --locked unit: runs-on: ubuntu-latest diff --git a/core/Cargo.lock b/core/Cargo.lock index c409e9d6d3c2..dacd86958436 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -5118,6 +5118,7 @@ dependencies = [ "mini-moka", "moka", "mongodb", + "mongodb-internal-macros", "monoio", "once_cell", "openssh", From cca3a875144352b9335ced8afda1f13d03bfc81f Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 20 Jan 2025 17:19:52 +0800 Subject: [PATCH 13/15] chore: add oli/oay/ofs to rust-analyzer.linkedProjects (#5564) --- .vscode/settings.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.vscode/settings.json b/.vscode/settings.json index 8a903fd1a084..e1b3178f354c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,6 +6,9 @@ "${workspaceFolder}/bindings/python/Cargo.toml", "${workspaceFolder}/bindings/java/Cargo.toml", "${workspaceFolder}/bindings/nodejs/Cargo.toml", + "${workspaceFolder}/bin/oli/Cargo.toml", + "${workspaceFolder}/bin/oay/Cargo.toml", + "${workspaceFolder}/bin/ofs/Cargo.toml" ], "java.compile.nullAnalysis.mode": "automatic" } From b8a3b7aa093d8695d89d579e146907850bb6565c Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 22 Jan 2025 12:09:22 +0800 Subject: [PATCH 14/15] feat(services/webdfs): Add user.name support for webhdfs (#5567) --- .../webhdfs/webhdfs_with_user_name/action.yml | 37 ++++++++++++++ core/src/services/webhdfs/backend.rs | 50 +++++++++++++++++-- core/src/services/webhdfs/config.rs | 3 ++ 3 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 .github/services/webhdfs/webhdfs_with_user_name/action.yml diff --git a/.github/services/webhdfs/webhdfs_with_user_name/action.yml b/.github/services/webhdfs/webhdfs_with_user_name/action.yml new file mode 100644 index 000000000000..842ec48cb1fa --- /dev/null +++ b/.github/services/webhdfs/webhdfs_with_user_name/action.yml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: webhdfs +description: "Behavior test for webhdfs with user name specified" + +runs: + using: "composite" + steps: + - name: Setup webhdfs + shell: bash + working-directory: fixtures/webhdfs + run: | + docker compose -f docker-compose-webhdfs.yml up -d --wait + - name: Setup + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_WEBHDFS_ROOT=/ + OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870 + OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/ + OPENDAL_WEBHDFS_USER_NAME=root + EOF diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 75d2240b1e5c..afd165afd045 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -101,6 +101,16 @@ impl WebhdfsBuilder { self } + /// Set the username of this backend, + /// used for authentication + /// + pub fn user_name(mut self, user_name: &str) -> Self { + if !user_name.is_empty() { + self.config.user_name = Some(user_name.to_string()); + } + self + } + /// Set the delegation token of this backend, /// used for authentication /// @@ -179,6 +189,7 @@ impl Builder for WebhdfsBuilder { let backend = WebhdfsBackend { root, endpoint, + user_name: self.config.user_name, auth, client, root_checker: OnceCell::new(), @@ -195,6 +206,7 @@ impl Builder for WebhdfsBuilder { pub struct WebhdfsBackend { root: String, endpoint: String, + user_name: Option, auth: Option, root_checker: OnceCell<()>, @@ -212,6 +224,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -220,6 +235,7 @@ impl WebhdfsBackend { req.body(Buffer::new()).map_err(new_request_build_error) } + /// create object pub async fn webhdfs_create_object_request( &self, @@ -235,6 +251,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -277,6 +296,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -311,7 +333,9 @@ impl WebhdfsBackend { percent_encode_path(&from), percent_encode_path(&to) ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -330,7 +354,9 @@ impl WebhdfsBackend { body: Buffer, ) -> Result> { let mut url = location.to_string(); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -362,7 +388,9 @@ impl WebhdfsBackend { percent_encode_path(&p), percent_encode_path(&sources), ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -379,6 +407,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += &format!("&{auth}"); } @@ -404,6 +435,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -429,6 +463,9 @@ impl WebhdfsBackend { if !start_after.is_empty() { url += format!("&startAfter={}", start_after).as_str(); } + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -455,7 +492,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); - + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -474,6 +513,9 @@ impl WebhdfsBackend { self.endpoint, percent_encode_path(&p), ); + if let Some(user) = &self.user_name { + url += format!("&user.name={user}").as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } diff --git a/core/src/services/webhdfs/config.rs b/core/src/services/webhdfs/config.rs index 168ea7d0f082..03dadfcaf609 100644 --- a/core/src/services/webhdfs/config.rs +++ b/core/src/services/webhdfs/config.rs @@ -30,6 +30,8 @@ pub struct WebhdfsConfig { pub root: Option, /// Endpoint for webhdfs. pub endpoint: Option, + /// Name of the user for webhdfs. + pub user_name: Option, /// Delegation token for webhdfs. pub delegation: Option, /// Disable batch listing @@ -43,6 +45,7 @@ impl Debug for WebhdfsConfig { f.debug_struct("WebhdfsConfig") .field("root", &self.root) .field("endpoint", &self.endpoint) + .field("user_name", &self.user_name) .field("atomic_write_dir", &self.atomic_write_dir) .finish_non_exhaustive() } From 1026d8a1d1dca7cf5f4182c8398260af27f7a511 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 24 Jan 2025 15:14:21 +0800 Subject: [PATCH 15/15] fix(logging): remove additional space (#5568) --- core/src/layers/logging.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index f7723bc90290..3dfaea31a625 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -194,7 +194,7 @@ impl LoggingInterceptor for DefaultLoggingInterceptor { log!( target: LOGGING_TARGET, lvl, - "service={} name={} {}: {operation} {message} {}", + "service={} name={}{}: {operation} {message} {}", info.scheme(), info.name(), LoggingContext(context), @@ -220,7 +220,7 @@ impl LoggingInterceptor for DefaultLoggingInterceptor { log!( target: LOGGING_TARGET, lvl, - "service={} name={} {}: {operation} {message}", + "service={} name={}{}: {operation} {message}", info.scheme(), info.name(), LoggingContext(context), @@ -232,12 +232,8 @@ struct LoggingContext<'a>(&'a [(&'a str, &'a str)]); impl Display for LoggingContext<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for (i, (k, v)) in self.0.iter().enumerate() { - if i > 0 { - write!(f, " {}={}", k, v)?; - } else { - write!(f, "{}={}", k, v)?; - } + for (k, v) in self.0.iter() { + write!(f, " {}={}", k, v)?; } Ok(()) }