Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snowflake-sink): change to streaming upload instead of batched bulk load #16269

Merged
merged 31 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
08f8ec6
use streaming upload instead of bulk load of aws_sdk_s3
xzhseh Apr 11, 2024
7091cd3
update fmt
xzhseh Apr 11, 2024
f9a354d
update comments
xzhseh Apr 11, 2024
a6e3392
remove S3Client from aws_s3_sdk
xzhseh Apr 12, 2024
463131f
remove useless env var reading
xzhseh Apr 12, 2024
a097dd8
remove async for SnowflakeS3Client::new
xzhseh Apr 12, 2024
db709fd
fix check
xzhseh Apr 13, 2024
6e45411
update comment
xzhseh Apr 13, 2024
f084070
graceful error handling for opendal s3 engine
xzhseh Apr 15, 2024
ca4a86b
update fmt
xzhseh Apr 15, 2024
e9ab227
refactor SnowflakeSinkWriter; introduce streaming_uploader & remove p…
xzhseh Apr 15, 2024
c63e930
update fmt
xzhseh Apr 15, 2024
5e35d06
remove redundant comment regarding error handling; update comment for…
xzhseh Apr 15, 2024
f55a311
remove assertion & gracefully handle the case if streaming uploader h…
xzhseh Apr 15, 2024
a04e29e
update comment for intermediate s3 file path
xzhseh Apr 16, 2024
a2657da
lazily initialize when actual data is fed in
xzhseh Apr 16, 2024
ad09874
fix check
xzhseh Apr 16, 2024
8b3f25e
update fmt
xzhseh Apr 16, 2024
98f4eb7
update with_options_sink.yaml accordingly
xzhseh Apr 16, 2024
1239135
update with_options_sink.yaml; incorporate file_suffix in streaming_u…
xzhseh Apr 17, 2024
c8d1735
Merge branch 'main' into xzhseh/snowflake-sink-streaming-upload
xzhseh Apr 17, 2024
fc4182a
use BytesMut to perform chunk-by-chunk upload
xzhseh Apr 17, 2024
da56015
update fmt
xzhseh Apr 17, 2024
44e2ed7
use row_buf as intermediate buffer to prevent temporary string alloca…
xzhseh Apr 18, 2024
3ca9b49
refactor finish_streaming_upload
xzhseh Apr 18, 2024
6ae3261
update fmt
xzhseh Apr 18, 2024
f9025c5
directly write to chunk_buf
xzhseh Apr 18, 2024
2498746
avoid unwrap; make new_streaming_uploader pure
xzhseh Apr 18, 2024
1fc8996
fix check
xzhseh Apr 18, 2024
bd71443
update comment
xzhseh Apr 18, 2024
342d1a4
fix check
xzhseh Apr 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ reqwest = { version = "0.12.2", features = ["json", "stream"] }
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rumqttc = { version = "0.24.0", features = ["url"] }
Expand Down
51 changes: 39 additions & 12 deletions src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use std::time::{SystemTime, UNIX_EPOCH};
use aws_config;
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::Client as S3Client;
use aws_types::region::Region;
use bytes::Bytes;
use jsonwebtoken::{encode, Algorithm, EncodingKey, Header};
use reqwest::{header, Client, RequestBuilder, StatusCode};
use risingwave_common::config::ObjectStoreConfig;
use risingwave_object_store::object::*;
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -191,6 +192,7 @@ pub struct SnowflakeS3Client {
s3_bucket: String,
s3_path: Option<String>,
s3_client: S3Client,
opendal_s3_engine: OpendalObjectStore,
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
}

impl SnowflakeS3Client {
Expand All @@ -202,15 +204,16 @@ impl SnowflakeS3Client {
aws_region: String,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
aws_secret_access_key,
aws_access_key_id.clone(),
aws_secret_access_key.clone(),
// we don't allow temporary credentials
None,
None,
"rw_sink_to_s3_credentials",
);

let region = RegionProviderChain::first_try(Region::new(aws_region)).or_default_provider();
let region =
RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider();

let config = aws_config::from_env()
.credentials_provider(credentials)
Expand All @@ -221,25 +224,49 @@ impl SnowflakeS3Client {
// create the brand new s3 client used to sink files to s3
let s3_client = S3Client::new(&config);

// just use default here
let config = ObjectStoreConfig::default();
Copy link
Contributor Author

@xzhseh xzhseh Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus, do we need special parameters tuning for the configurations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily, but it would be better to allow users to pass parameters to the S3 client (OpenDAL) if possible. Just for illustration, any parameters start with opendal.param_name = param_val with be passed to OpenDAL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it would be better to allow users to pass parameters to the S3 client (OpenDAL) if possible.

agree, I could put the support of extra configurations for s3 client in subsequent pr.


// create the s3 engine for streaming upload to the intermediate s3 bucket
// note: this will lead to an internal panic if any credential / intermediate creation
// process has error, which may not be acceptable...
// but it's hard to gracefully handle the error without modifying downstream return type(s)...
let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials(
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
&s3_bucket,
config,
&aws_access_key_id,
&aws_secret_access_key,
&aws_region,
)
.unwrap();

Self {
s3_bucket,
s3_path,
s3_client,
opendal_s3_engine,
}
}

pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> {
self.s3_client
.put_object()
.bucket(self.s3_bucket.clone())
.key(generate_s3_file_name(self.s3_path.clone(), file_suffix))
.body(ByteStream::from(data))
.send()
let path = generate_s3_file_name(self.s3_path.clone(), file_suffix);
let mut uploader = self
.opendal_s3_engine
.streaming_upload(&path)
.await
.map_err(|err| {
SinkError::Snowflake(format!("failed to sink data to S3, error: {}", err))
SinkError::Snowflake(format!(
"failed to create the streaming uploader of opendal s3 engine, error: {}",
err
))
})?;

uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the idea of using streaming upload is not to simply change the way to upload a complete Vec<u8> buffer, but to replace the buffer with the streaming uploader. The ideal implementation will be,

  1. at the very beginning, we create a streaming uploader.
  2. when new data comes, we do some conversion and write the data to the streaming uploader
  3. when checkpoint barrier comes, we close the writer and commit the file to snowflake
  4. when the first next data comes, initialize a new streaming uploader

uploader.finish().await.map_err(|err| {
SinkError::Snowflake(format!(
"failed to finish streaming upload to s3 for snowflake sink, error: {}",
err
))
})?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct OpendalObjectStore {
pub(crate) op: Operator,
pub(crate) engine_type: EngineType,
}

#[derive(Clone)]
pub enum EngineType {
Memory,
Expand Down Expand Up @@ -216,6 +217,7 @@ impl ObjectStore for OpendalObjectStore {
pub struct OpendalStreamingUploader {
writer: Writer,
}

impl OpendalStreamingUploader {
pub async fn new(op: Operator, path: String) -> ObjectResult<Self> {
let writer = op
Expand Down
53 changes: 53 additions & 0 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl OpendalObjectStore {
.with_jitter(),
)
.finish();

Ok(Self {
op,
engine_type: EngineType::S3,
Expand All @@ -78,4 +79,56 @@ impl OpendalObjectStore {

Ok(HttpClient::build(client_builder)?)
}

/// currently used by snowflake sink,
/// especially when sinking to the intermediate s3 bucket.
pub fn new_s3_engine_with_credentials(
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
bucket: &str,
object_store_config: ObjectStoreConfig,
aws_access_key_id: &str,
aws_secret_access_key: &str,
aws_region: &str,
) -> ObjectResult<Self> {
// Create s3 builder with credentials.
let mut builder = S3::default();

// set credentials for s3 sink
builder.bucket(bucket);
builder.access_key_id(aws_access_key_id);
builder.secret_access_key(aws_secret_access_key);
builder.region(aws_region);

// For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field.
if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") {
builder.endpoint(&endpoint_url);
}

if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
builder.enable_virtual_host_style();
}

let http_client = Self::new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.1)
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
.with_jitter(),
)
.finish();

Ok(Self {
op,
engine_type: EngineType::S3,
})
}
}
Loading