From b8273a042b01a31b2f154187f00d1df2c127a1c1 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 19 Nov 2024 10:09:59 -0500 Subject: [PATCH 1/3] Start setting up IoObjectStore --- Cargo.lock | 4 +-- Cargo.toml | 2 +- src/lib.rs | 1 + src/object_store/io_object_store.rs | 44 +++++++++++++++++++++++++++++ src/object_store/mod.rs | 18 ++++++++++++ 5 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 src/object_store/io_object_store.rs create mode 100644 src/object_store/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 403350e..e84b56d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,9 +489,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.82" +version = "0.1.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index c2c706d..39b2a2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ default-run = "dft" [dependencies] arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] , optional = true } -async-trait = "0.1.80" +async-trait = "0.1.83" clap = { version = "4.5.1", features = ["derive"] } color-eyre = "0.6.3" crossterm = { version = "0.28.1", features = ["event-stream"] } diff --git a/src/lib.rs b/src/lib.rs index 4b8c925..98d5b83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ pub mod execution; pub mod extensions; #[cfg(feature = "experimental-flightsql-server")] pub mod flightsql_server; +pub mod object_store; pub mod telemetry; pub mod test_utils; pub mod tui; diff --git a/src/object_store/io_object_store.rs b/src/object_store/io_object_store.rs new file mode 100644 index 0000000..a2282d4 --- /dev/null +++ b/src/object_store/io_object_store.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use object_store::{path::Path, GetOptions, GetResult, ObjectStore, Result}; + +use crate::execution::executor::io::spawn_io; + +/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying methods with +/// [`spawn_io`] so that they are run on the Tokio Runtime dedicated to doing IO. +#[derive(Debug)] +pub struct IoObjectStore { + inner: Arc, +} + +#[async_trait] +impl ObjectStore for IoObjectStore { + async fn get(&self, location: &Path) -> Result { + let location = location.clone(); + let store = self.inner.clone(); + spawn_io(async move { store.get(&location).await }).await + } + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let location = location.clone(); + let store = self.inner.clone(); + spawn_io(async move { store.get_opts(&location, options).await }).await + } +} diff --git a/src/object_store/mod.rs b/src/object_store/mod.rs new file mode 100644 index 0000000..b0ef30a --- /dev/null +++ b/src/object_store/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod io_object_store; From 1eac330660ed98b2e118bb483ec380de32109a84 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 20 Nov 2024 09:24:33 -0500 Subject: [PATCH 2/3] Finish --- "\\" | 100 ++++++++++++++++++++++++++++ src/extensions/s3.rs | 4 +- src/object_store/io_object_store.rs | 78 ++++++++++++++++++++-- 3 files changed, 174 insertions(+), 8 deletions(-) create mode 100644 "\\" diff --git "a/\\" "b/\\" new file mode 100644 index 0000000..b7bdd53 --- /dev/null +++ "b/\\" @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, +}; + +use crate::execution::executor::io::spawn_io; + +/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying methods with +/// [`spawn_io`] so that they are run on the Tokio Runtime dedicated to doing IO. +#[derive(Debug)] +pub struct IoObjectStore { + inner: Arc, +} + +impl std::fmt::Display for IoObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "IoObjectStore") + } +} + +#[async_trait] +impl ObjectStore for IoObjectStore { + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let location = location.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.get_opts(&location, options).await }).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + let location = location.clone(); + let store = self.inner.clone(); + spawn_io(async move { store.delete(&location).await }).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let prefix = prefix.cloned(); + let store = self.inner.clone(); + spawn_io(async move { store.list_with_delimiter(prefix.as_ref()).await }).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> Result> { + let location = location.clone(); + let store = self.inner.clone(); + spawn_io(async move { store.put_multipart_opts(&location, opts).await }).await + } + + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let location = location.clone(); + let store = self.inner.clone(); + spawn_io(async move { store.put_opts(&location, payload, opts).await }).await + } +} diff --git a/src/extensions/s3.rs b/src/extensions/s3.rs index 377bd44..5d812ed 100644 --- a/src/extensions/s3.rs +++ b/src/extensions/s3.rs @@ -19,6 +19,7 @@ use crate::config::ExecutionConfig; use crate::extensions::{DftSessionStateBuilder, Extension}; +use crate::object_store::io_object_store::IoObjectStore; use log::info; use std::sync::Arc; @@ -56,9 +57,10 @@ impl Extension for AwsS3Extension { info!("Endpoint exists"); if let Ok(parsed_endpoint) = Url::parse(object_store_url) { info!("Parsed endpoint"); + let io_store = IoObjectStore::new(Arc::new(object_store)); builder .runtime_env() - .register_object_store(&parsed_endpoint, Arc::new(object_store)); + .register_object_store(&parsed_endpoint, Arc::new(io_store)); info!("Registered s3 object store"); } } diff --git a/src/object_store/io_object_store.rs b/src/object_store/io_object_store.rs index a2282d4..6efd693 100644 --- a/src/object_store/io_object_store.rs +++ b/src/object_store/io_object_store.rs @@ -18,7 +18,11 @@ use std::sync::Arc; use async_trait::async_trait; -use object_store::{path::Path, GetOptions, GetResult, ObjectStore, Result}; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, +}; use crate::execution::executor::io::spawn_io; @@ -29,16 +33,76 @@ pub struct IoObjectStore { inner: Arc, } +impl IoObjectStore { + pub fn new(object_store: Arc) -> Self { + Self { + inner: object_store, + } + } +} + +impl std::fmt::Display for IoObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "IoObjectStore") + } +} + #[async_trait] impl ObjectStore for IoObjectStore { - async fn get(&self, location: &Path) -> Result { - let location = location.clone(); - let store = self.inner.clone(); - spawn_io(async move { store.get(&location).await }).await - } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let location = location.clone(); - let store = self.inner.clone(); + let store = Arc::clone(&self.inner); spawn_io(async move { store.get_opts(&location, options).await }).await } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let from = from.clone(); + let to = to.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.copy(&from, &to).await }).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + let location = location.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.delete(&location).await }).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let prefix = prefix.cloned(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.list_with_delimiter(prefix.as_ref()).await }).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> Result> { + let location = location.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.put_multipart_opts(&location, opts).await }).await + } + + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let location = location.clone(); + let store = Arc::clone(&self.inner); + spawn_io(async move { store.put_opts(&location, payload, opts).await }).await + } } From 68af0e1552a84840cacf0c10d4afc831ba1495f9 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 20 Nov 2024 10:55:36 -0500 Subject: [PATCH 3/3] Fixes --- "\\" | 100 ------------------ src/extensions/s3.rs | 2 +- src/lib.rs | 2 +- .../io_object_store.rs | 0 src/{object_store => object_stores}/mod.rs | 2 +- 5 files changed, 3 insertions(+), 103 deletions(-) delete mode 100644 "\\" rename src/{object_store => object_stores}/io_object_store.rs (100%) rename src/{object_store => object_stores}/mod.rs (97%) diff --git "a/\\" "b/\\" deleted file mode 100644 index b7bdd53..0000000 --- "a/\\" +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use futures::stream::BoxStream; -use object_store::{ - path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, -}; - -use crate::execution::executor::io::spawn_io; - -/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying methods with -/// [`spawn_io`] so that they are run on the Tokio Runtime dedicated to doing IO. -#[derive(Debug)] -pub struct IoObjectStore { - inner: Arc, -} - -impl std::fmt::Display for IoObjectStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "IoObjectStore") - } -} - -#[async_trait] -impl ObjectStore for IoObjectStore { - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - let location = location.clone(); - let store = Arc::clone(&self.inner); - spawn_io(async move { store.get_opts(&location, options).await }).await - } - - async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - let from = from.clone(); - let to = to.clone(); - let store = Arc::clone(&self.inner); - spawn_io(async move { store.copy(&from, &to).await }).await - } - - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - let from = from.clone(); - let to = to.clone(); - let store = Arc::clone(&self.inner); - spawn_io(async move { store.copy(&from, &to).await }).await - } - - async fn delete(&self, location: &Path) -> Result<()> { - let location = location.clone(); - let store = self.inner.clone(); - spawn_io(async move { store.delete(&location).await }).await - } - - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { - self.inner.list(prefix) - } - - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - let prefix = prefix.cloned(); - let store = self.inner.clone(); - spawn_io(async move { store.list_with_delimiter(prefix.as_ref()).await }).await - } - - async fn put_multipart_opts( - &self, - location: &Path, - opts: PutMultipartOpts, - ) -> Result> { - let location = location.clone(); - let store = self.inner.clone(); - spawn_io(async move { store.put_multipart_opts(&location, opts).await }).await - } - - async fn put_opts( - &self, - location: &Path, - payload: PutPayload, - opts: PutOptions, - ) -> Result { - let location = location.clone(); - let store = self.inner.clone(); - spawn_io(async move { store.put_opts(&location, payload, opts).await }).await - } -} diff --git a/src/extensions/s3.rs b/src/extensions/s3.rs index 5d812ed..fa92bc8 100644 --- a/src/extensions/s3.rs +++ b/src/extensions/s3.rs @@ -19,7 +19,7 @@ use crate::config::ExecutionConfig; use crate::extensions::{DftSessionStateBuilder, Extension}; -use crate::object_store::io_object_store::IoObjectStore; +use crate::object_stores::io_object_store::IoObjectStore; use log::info; use std::sync::Arc; diff --git a/src/lib.rs b/src/lib.rs index 98d5b83..5f7c4a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ pub mod execution; pub mod extensions; #[cfg(feature = "experimental-flightsql-server")] pub mod flightsql_server; -pub mod object_store; +pub mod object_stores; pub mod telemetry; pub mod test_utils; pub mod tui; diff --git a/src/object_store/io_object_store.rs b/src/object_stores/io_object_store.rs similarity index 100% rename from src/object_store/io_object_store.rs rename to src/object_stores/io_object_store.rs diff --git a/src/object_store/mod.rs b/src/object_stores/mod.rs similarity index 97% rename from src/object_store/mod.rs rename to src/object_stores/mod.rs index b0ef30a..705051c 100644 --- a/src/object_store/mod.rs +++ b/src/object_stores/mod.rs @@ -14,5 +14,5 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - +#[cfg(feature = "s3")] pub mod io_object_store;