diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index c75447323c30e..1eee846144483 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -889,11 +889,22 @@ pub async fn build_remote_object_store( set your endpoint to the environment variable RW_S3_ENDPOINT."); panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3."); } - minio if minio.starts_with("minio://") => ObjectStoreImpl::S3( - S3ObjectStore::with_minio(minio, metrics.clone(), config.clone()) - .await - .monitored(metrics, config), - ), + minio if minio.starts_with("minio://") => { + if config.s3.developer.use_opendal { + tracing::info!("Using OpenDAL to access minio."); + ObjectStoreImpl::Opendal( + OpendalObjectStore::with_minio(minio, config.clone()) + .unwrap() + .monitored(metrics, config), + ) + } else { + ObjectStoreImpl::S3( + S3ObjectStore::with_minio(minio, metrics.clone(), config.clone()) + .await + .monitored(metrics, config), + ) + } + } "memory" => { if ident == "Meta Backup" { tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident); diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index d50422f015c7a..2aa4bd458806e 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -38,6 +38,7 @@ pub enum EngineType { Memory, Hdfs, Gcs, + Minio, S3, Obs, Oss, @@ -64,6 +65,7 @@ impl ObjectStore for OpendalObjectStore { fn get_object_prefix(&self, obj_id: u64) -> String { match self.engine_type { EngineType::S3 => prefix::s3::get_object_prefix(obj_id), + EngineType::Minio => prefix::s3::get_object_prefix(obj_id), EngineType::Memory => String::default(), EngineType::Hdfs => String::default(), EngineType::Gcs => String::default(), @@ -201,6 +203,7 @@ impl ObjectStore for OpendalObjectStore { match self.engine_type { EngineType::Memory => "Memory", EngineType::Hdfs => "Hdfs", + EngineType::Minio => "Minio", EngineType::S3 => "S3", EngineType::Gcs => "Gcs", EngineType::Obs => "Obs", diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index db2c7732d8fbf..28f90a48e9ae0 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -65,6 +65,56 @@ impl OpendalObjectStore { }) } + /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`. + pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult { + let server = server.strip_prefix("minio://").unwrap(); + let (access_key_id, rest) = server.split_once(':').unwrap(); + let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); + + let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { + rest = rest_stripped; + "https://" + } else if let Some(rest_stripped) = rest.strip_prefix("http://") { + rest = rest_stripped; + "http://" + } else { + "http://" + }; + let (address, bucket) = rest.split_once('/').unwrap(); + + let mut builder = S3::default(); + builder + .bucket(bucket) + .region("custom") + .access_key_id(access_key_id) + .secret_access_key(secret_access_key) + .endpoint(&format!("{}{}", endpoint_prefix, address)); + + builder.disable_config_load(); + let http_client = Self::new_http_client(&object_store_config)?; + builder.http_client(http_client); + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer( + RetryLayer::new() + .with_min_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_interval_ms, + )) + .with_max_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_max_delay_ms, + )) + .with_max_times(object_store_config.s3.object_store_req_retry_max_attempts) + .with_factor(1.0) + .with_jitter(), + ) + .finish(); + + Ok(Self { + op, + engine_type: EngineType::Minio, + }) + } + pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult { let mut client_builder = reqwest::ClientBuilder::new(); diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 196638f1e3689..a9f6c9ae1db07 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -645,6 +645,7 @@ impl S3ObjectStore { let server = server.strip_prefix("minio://").unwrap(); let (access_key_id, rest) = server.split_once(':').unwrap(); let (secret_access_key, mut rest) = rest.split_once('@').unwrap(); + let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") { rest = rest_stripped; "https://"