Skip to content

Commit

Permalink
refactor: remove CodecPipelineStore
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Dec 10, 2024
1 parent 6e42799 commit db7c622
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 60 deletions.
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![warn(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]

use chunk_item::{ChunksItem, IntoItem};
use concurrency::ChunkConcurrentLimitAndCodecOptions;
Expand All @@ -12,7 +13,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use store::{CodecPipelineStore, StoreConfigType};
use store::StoreConfigType;
use unsafe_cell_slice::UnsafeCellSlice;
use zarrs::array::codec::{
ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, StoragePartialDecoder,
Expand All @@ -39,7 +40,7 @@ use utils::{PyErrExt, PyUntypedArrayExt};
#[pyclass]
pub struct CodecPipelineImpl {
pub(crate) codec_chain: Arc<CodecChain>,
pub(crate) store: Mutex<Option<Arc<dyn CodecPipelineStore>>>,
pub(crate) store: Mutex<Option<Arc<dyn ReadableWritableListableStorageTraits>>>,
pub(crate) codec_options: CodecOptions,
pub(crate) chunk_concurrent_minimum: usize,
pub(crate) chunk_concurrent_maximum: usize,
Expand All @@ -57,11 +58,11 @@ impl CodecPipelineImpl {

// TODO: Request upstream change to get store on codec pipeline initialisation, do not want to do all of this here
if let Some(gstore) = gstore.as_ref() {
Ok(gstore.store())
Ok(gstore.clone())
} else {
*gstore = Some(config.try_into()?);
let gstore = gstore.as_ref().expect("store was just initialised");
Ok(gstore.store())
Ok(gstore.clone())
}
}

Expand Down
41 changes: 28 additions & 13 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
use std::{collections::HashMap, sync::Arc};

use opendal::Builder;
use pyo3::{
exceptions::PyNotImplementedError,
exceptions::{PyNotImplementedError, PyValueError},
pyclass,
types::{PyAnyMethods, PyStringMethods, PyTypeMethods},
Bound, FromPyObject, PyAny, PyErr, PyResult,
};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_enum};

pub use filesystem::{CodecPipelineStoreFilesystem, FilesystemStoreConfig};
pub use http::{CodecPipelineStoreHTTP, HttpStoreConfig};
use zarrs::storage::ReadableWritableListableStorageTraits;
pub use filesystem::FilesystemStoreConfig;
pub use http::HttpStoreConfig;
use zarrs::storage::{
storage_adapter::async_to_sync::AsyncToSyncStorageAdapter,
ReadableWritableListableStorageTraits,
};
use zarrs_opendal::AsyncOpendalStore;

use crate::{
runtime::{tokio_block_on, TokioBlockOn},
utils::PyErrExt,
};

mod filesystem;
mod http;

pub trait CodecPipelineStore: Send + Sync {
fn store(&self) -> Arc<dyn ReadableWritableListableStorageTraits>;
}

#[gen_stub_pyclass]
#[pyclass(subclass)]
pub struct StoreConfig;
Expand Down Expand Up @@ -65,15 +71,24 @@ impl<'py> FromPyObject<'py> for StoreConfigType {
}
}

impl TryFrom<&StoreConfigType> for Arc<dyn CodecPipelineStore> {
impl TryFrom<&StoreConfigType> for Arc<dyn ReadableWritableListableStorageTraits> {
type Error = PyErr;

fn try_from(value: &StoreConfigType) -> Result<Self, Self::Error> {
match value {
StoreConfigType::Filesystem(config) => {
Ok(Arc::new(CodecPipelineStoreFilesystem::new(config)?))
}
StoreConfigType::Http(config) => Ok(Arc::new(CodecPipelineStoreHTTP::new(config)?)),
StoreConfigType::Filesystem(config) => config.try_into(),
StoreConfigType::Http(config) => config.try_into(),
}
}
}

type OpendalStoreSync = Arc<AsyncToSyncStorageAdapter<AsyncOpendalStore, TokioBlockOn>>;

fn opendal_builder_to_sync_store<B: Builder>(builder: B) -> PyResult<OpendalStoreSync> {
let operator = opendal::Operator::new(builder)
.map_py_err::<PyValueError>()?
.finish();
let store = Arc::new(zarrs_opendal::AsyncOpendalStore::new(operator));
let store = Arc::new(AsyncToSyncStorageAdapter::new(store, tokio_block_on()));
Ok(store)
}
24 changes: 8 additions & 16 deletions src/store/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use std::sync::Arc;

use pyo3::{exceptions::PyRuntimeError, pyclass, PyResult};
use pyo3::{exceptions::PyRuntimeError, pyclass, PyErr};
use pyo3_stub_gen::derive::gen_stub_pyclass;
use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorageTraits};

use crate::utils::PyErrExt;

use super::{CodecPipelineStore, StoreConfig};

pub struct CodecPipelineStoreFilesystem {
store: Arc<FilesystemStore>,
}
use super::StoreConfig;

#[gen_stub_pyclass]
#[pyclass(extends=StoreConfig)]
Expand All @@ -24,16 +20,12 @@ impl FilesystemStoreConfig {
}
}

impl CodecPipelineStoreFilesystem {
pub fn new(config: &FilesystemStoreConfig) -> PyResult<Self> {
let store =
Arc::new(FilesystemStore::new(config.root.clone()).map_py_err::<PyRuntimeError>()?);
Ok(Self { store })
}
}
impl TryInto<Arc<dyn ReadableWritableListableStorageTraits>> for &FilesystemStoreConfig {
type Error = PyErr;

impl CodecPipelineStore for CodecPipelineStoreFilesystem {
fn store(&self) -> Arc<dyn ReadableWritableListableStorageTraits> {
self.store.clone()
fn try_into(self) -> Result<Arc<dyn ReadableWritableListableStorageTraits>, Self::Error> {
let store: Arc<dyn ReadableWritableListableStorageTraits> =
Arc::new(FilesystemStore::new(self.root.clone()).map_py_err::<PyRuntimeError>()?);
Ok(store)
}
}
36 changes: 9 additions & 27 deletions src/store/http.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
use std::{collections::HashMap, sync::Arc};

use pyo3::{exceptions::PyValueError, pyclass, Bound, PyAny, PyResult};
use pyo3::{exceptions::PyValueError, pyclass, Bound, PyAny, PyErr, PyResult};
use pyo3_stub_gen::derive::gen_stub_pyclass;
use zarrs::storage::storage_adapter::async_to_sync::AsyncToSyncStorageAdapter;
use zarrs::storage::ReadableWritableListableStorageTraits;
use zarrs_opendal::AsyncOpendalStore;

use crate::{
runtime::{tokio_block_on, TokioBlockOn},
utils::PyErrExt,
};

use super::{CodecPipelineStore, StoreConfig};

pub struct CodecPipelineStoreHTTP {
store: Arc<AsyncToSyncStorageAdapter<AsyncOpendalStore, TokioBlockOn>>,
}
use super::{opendal_builder_to_sync_store, StoreConfig};

#[gen_stub_pyclass]
#[pyclass(extends=StoreConfig)]
Expand Down Expand Up @@ -45,20 +34,13 @@ impl HttpStoreConfig {
}
}

impl CodecPipelineStoreHTTP {
pub fn new(config: &HttpStoreConfig) -> PyResult<Self> {
let builder = opendal::services::Http::default().endpoint(&config.root);
let operator = opendal::Operator::new(builder)
.map_py_err::<PyValueError>()?
.finish();
let store = Arc::new(zarrs_opendal::AsyncOpendalStore::new(operator));
let store = Arc::new(AsyncToSyncStorageAdapter::new(store, tokio_block_on()));
Ok(Self { store })
}
}
impl TryInto<Arc<dyn ReadableWritableListableStorageTraits>> for &HttpStoreConfig {
type Error = PyErr;

impl CodecPipelineStore for CodecPipelineStoreHTTP {
fn store(&self) -> Arc<dyn ReadableWritableListableStorageTraits> {
self.store.clone()
fn try_into(self) -> Result<Arc<dyn ReadableWritableListableStorageTraits>, Self::Error> {
let builder = opendal::services::Http::default().endpoint(&self.root);
let store: Arc<dyn ReadableWritableListableStorageTraits> =
opendal_builder_to_sync_store(builder)?;
Ok(store)
}
}

0 comments on commit db7c622

Please sign in to comment.