Skip to content

Commit

Permalink
add http client
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Apr 2, 2024
1 parent bc4f766 commit 5644fbf
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ itertools = "0.12"
madsim = "0.2.22"
opendal = "0.45.1"
prometheus = { version = "0.13", features = ["process"] }
reqwest = "0.11"
risingwave_common = { workspace = true }
rustls = "0.23.4"
spin = "0.9"
Expand Down
40 changes: 28 additions & 12 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,28 +832,40 @@ pub async fn build_remote_object_store(
let gcs = gcs.strip_prefix("gcs://").unwrap();
let (bucket, root) = gcs.split_once('@').unwrap_or((gcs, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_gcs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_gcs_engine(
bucket.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
)
}
obs if obs.starts_with("obs://") => {
let obs = obs.strip_prefix("obs://").unwrap();
let (bucket, root) = obs.split_once('@').unwrap_or((obs, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_obs_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_obs_engine(
bucket.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
)
}

oss if oss.starts_with("oss://") => {
let oss = oss.strip_prefix("oss://").unwrap();
let (bucket, root) = oss.split_once('@').unwrap_or((oss, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_oss_engine(bucket.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_oss_engine(
bucket.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
)
}
webhdfs if webhdfs.starts_with("webhdfs://") => {
Expand All @@ -869,9 +881,13 @@ pub async fn build_remote_object_store(
let azblob = azblob.strip_prefix("azblob://").unwrap();
let (container_name, root) = azblob.split_once('@').unwrap_or((azblob, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_azblob_engine(container_name.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_azblob_engine(
container_name.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
)
}
fs if fs.starts_with("fs://") => {
Expand Down
13 changes: 11 additions & 2 deletions src/object_store/src/object/opendal_engine/azblob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Azblob;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{new_http_client, EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

const AZBLOB_ENDPOINT: &str = "AZBLOB_ENDPOINT";
impl OpendalObjectStore {
/// create opendal azblob engine.
pub fn new_azblob_engine(container_name: String, root: String) -> ObjectResult<Self> {
pub fn new_azblob_engine(
container_name: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
// Create azblob backend builder.
let mut builder = Azblob::default();
builder.root(&root);
Expand All @@ -32,6 +37,10 @@ impl OpendalObjectStore {
.unwrap_or_else(|_| panic!("AZBLOB_ENDPOINT not found from environment variables"));

builder.endpoint(&endpoint);

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
13 changes: 11 additions & 2 deletions src/object_store/src/object/opendal_engine/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Gcs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{new_http_client, EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal gcs engine.
pub fn new_gcs_engine(bucket: String, root: String) -> ObjectResult<Self> {
pub fn new_gcs_engine(
bucket: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
// Create gcs backend builder.
let mut builder = Gcs::default();

Expand All @@ -34,6 +39,10 @@ impl OpendalObjectStore {
if let Ok(cred) = cred {
builder.credential(&cred);
}

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
12 changes: 10 additions & 2 deletions src/object_store/src/object/opendal_engine/obs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Obs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{new_http_client, EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal obs engine.
pub fn new_obs_engine(bucket: String, root: String) -> ObjectResult<Self> {
pub fn new_obs_engine(
bucket: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
// Create obs backend builder.
let mut builder = Obs::default();

Expand All @@ -40,6 +45,9 @@ impl OpendalObjectStore {
builder.endpoint(&endpoint);
builder.access_key_id(&access_key_id);
builder.secret_access_key(&secret_access_key);

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
20 changes: 20 additions & 0 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
// limitations under the License.

use std::ops::Range;
use std::time::Duration;

use bytes::Bytes;
use fail::fail_point;
use futures::{stream, StreamExt, TryStreamExt};
use opendal::raw::HttpClient;
use opendal::services::Memory;
use opendal::{Metakey, Operator, Writer};
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::range::RangeBoundsExt;
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -59,6 +62,23 @@ impl OpendalObjectStore {
}
}

pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();

if let Some(keepalive_ms) = config.s3.object_store_keepalive_ms.as_ref() {
client_builder =
client_builder.http2_keep_alive_timeout(Duration::from_millis(*keepalive_ms));
}

if let Some(nodelay) = config.s3.object_store_nodelay.as_ref() {
client_builder = client_builder.tcp_nodelay(*nodelay);
}

client_builder = client_builder.https_only(false);

Ok(HttpClient::build(client_builder)?)
}

#[async_trait::async_trait]
impl ObjectStore for OpendalObjectStore {
fn get_object_prefix(&self, obj_id: u64) -> String {
Expand Down
5 changes: 4 additions & 1 deletion src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opendal::services::S3;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{new_http_client, EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
Expand All @@ -40,6 +40,9 @@ impl OpendalObjectStore {
builder.enable_virtual_host_style();
}

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
Expand Down
13 changes: 11 additions & 2 deletions src/object_store/src/object/opendal_engine/oss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Oss;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{new_http_client, EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal oss engine.
pub fn new_oss_engine(bucket: String, root: String) -> ObjectResult<Self> {
pub fn new_oss_engine(
bucket: String,
root: String,
object_store_config: ObjectStoreConfig,
) -> ObjectResult<Self> {
// Create oss backend builder.
let mut builder = Oss::default();

Expand All @@ -40,6 +45,10 @@ impl OpendalObjectStore {
builder.endpoint(&endpoint);
builder.access_key_id(&access_key_id);
builder.access_key_secret(&access_key_secret);

let http_client = new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down

0 comments on commit 5644fbf

Please sign in to comment.