Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Apr 2, 2024
1 parent 8427eca commit b097869
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 88 deletions.
40 changes: 12 additions & 28 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,40 +832,28 @@ pub async fn build_remote_object_store(
let gcs = gcs.strip_prefix("gcs://").unwrap();
let (bucket, root) = gcs.split_once('@').unwrap_or((gcs, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_gcs_engine(
bucket.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_gcs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
)
}
obs if obs.starts_with("obs://") => {
let obs = obs.strip_prefix("obs://").unwrap();
let (bucket, root) = obs.split_once('@').unwrap_or((obs, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_obs_engine(
bucket.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_obs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
)
}

oss if oss.starts_with("oss://") => {
let oss = oss.strip_prefix("oss://").unwrap();
let (bucket, root) = oss.split_once('@').unwrap_or((oss, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_oss_engine(
bucket.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_oss_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
)
}
webhdfs if webhdfs.starts_with("webhdfs://") => {
Expand All @@ -881,13 +869,9 @@ pub async fn build_remote_object_store(
let azblob = azblob.strip_prefix("azblob://").unwrap();
let (container_name, root) = azblob.split_once('@').unwrap_or((azblob, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_azblob_engine(
container_name.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_azblob_engine(container_name.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
)
}
fs if fs.starts_with("fs://") => {
Expand Down
12 changes: 2 additions & 10 deletions src/object_store/src/object/opendal_engine/azblob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,14 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Azblob;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{new_http_client, EngineType, OpendalObjectStore};
use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

const AZBLOB_ENDPOINT: &str = "AZBLOB_ENDPOINT";
impl OpendalObjectStore {
/// create opendal azblob engine.
pub fn new_azblob_engine(
container_name: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
pub fn new_azblob_engine(container_name: String, root: String) -> ObjectResult<Self> {
// Create azblob backend builder.
let mut builder = Azblob::default();
builder.root(&root);
Expand All @@ -38,9 +33,6 @@ impl OpendalObjectStore {

builder.endpoint(&endpoint);

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
12 changes: 2 additions & 10 deletions src/object_store/src/object/opendal_engine/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Gcs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{new_http_client, EngineType, OpendalObjectStore};
use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal gcs engine.
pub fn new_gcs_engine(
bucket: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
pub fn new_gcs_engine(bucket: String, root: String) -> ObjectResult<Self> {
// Create gcs backend builder.
let mut builder = Gcs::default();

Expand All @@ -40,9 +35,6 @@ impl OpendalObjectStore {
builder.credential(&cred);
}

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
11 changes: 2 additions & 9 deletions src/object_store/src/object/opendal_engine/obs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Obs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{new_http_client, EngineType, OpendalObjectStore};
use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal obs engine.
pub fn new_obs_engine(
bucket: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
pub fn new_obs_engine(bucket: String, root: String) -> ObjectResult<Self> {
// Create obs backend builder.
let mut builder = Obs::default();

Expand All @@ -46,8 +41,6 @@ impl OpendalObjectStore {
builder.access_key_id(&access_key_id);
builder.secret_access_key(&secret_access_key);

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
19 changes: 0 additions & 19 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
// limitations under the License.

use std::ops::Range;
use std::time::Duration;

use bytes::Bytes;
use fail::fail_point;
use futures::{stream, StreamExt, TryStreamExt};
use opendal::raw::HttpClient;
use opendal::services::Memory;
use opendal::{Metakey, Operator, Writer};
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::range::RangeBoundsExt;
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -62,22 +59,6 @@ impl OpendalObjectStore {
}
}

pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();

if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() {
client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
}

if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() {
client_builder = client_builder.tcp_nodelay(*nodelay);
}

client_builder = client_builder.https_only(false);

Ok(HttpClient::build(client_builder)?)
}

#[async_trait::async_trait]
impl ObjectStore for OpendalObjectStore {
fn get_object_prefix(&self, obj_id: u64) -> String {
Expand Down
19 changes: 17 additions & 2 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
use std::time::Duration;

use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::raw::HttpClient;
use opendal::services::S3;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{new_http_client, EngineType, OpendalObjectStore};
use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
Expand All @@ -40,7 +41,7 @@ impl OpendalObjectStore {
builder.enable_virtual_host_style();
}

let http_client = new_http_client(&object_store_config)?;
let http_client = Self::new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
Expand All @@ -63,4 +64,18 @@ impl OpendalObjectStore {
engine_type: EngineType::S3,
})
}

pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();

if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() {
client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
}

if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() {
client_builder = client_builder.tcp_nodelay(*nodelay);
}

Ok(HttpClient::build(client_builder)?)
}
}
12 changes: 2 additions & 10 deletions src/object_store/src/object/opendal_engine/oss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Oss;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{new_http_client, EngineType, OpendalObjectStore};
use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal oss engine.
pub fn new_oss_engine(
bucket: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
pub fn new_oss_engine(bucket: String, root: String) -> ObjectResult<Self> {
// Create oss backend builder.
let mut builder = Oss::default();

Expand All @@ -46,9 +41,6 @@ impl OpendalObjectStore {
builder.access_key_id(&access_key_id);
builder.access_key_secret(&access_key_secret);

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down

0 comments on commit b097869

Please sign in to comment.