diff --git a/Cargo.lock b/Cargo.lock index 16350db6b46e4..8180f2affa12e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9619,6 +9619,7 @@ dependencies = [ "risingwave_common", "risingwave_common_estimate_size", "risingwave_jni_core", + "risingwave_object_store", "risingwave_pb", "risingwave_rpc_client", "rumqttc", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 51437f82e3ec5..5f759ff1612cf 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -108,6 +108,7 @@ reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } +risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } rumqttc = { version = "0.24.0", features = ["url"] } diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index f4901b025effc..e4dbbfa59f17b 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -13,13 +13,16 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Write; use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; +use bytes::{Bytes, BytesMut}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_object_store::object::{ObjectStore, StreamingUploader}; use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; @@ -29,13 +32,14 @@ use with_options::WithOptions; use super::encoder::{ JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode, }; -use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client}; +use super::snowflake_connector::{generate_s3_file_name, SnowflakeHttpClient, SnowflakeS3Client}; use super::writer::LogSinkerOf; use super::{SinkError, SinkParam}; use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; +const INITIAL_ROW_CAPACITY: usize = 1024; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct SnowflakeCommon { @@ -77,9 +81,9 @@ pub struct SnowflakeCommon { pub s3_bucket: String, /// The optional s3 path to be specified - /// the actual file location would be `:///` + /// the actual file location would be `s3:////` /// if this field is specified by user(s) - /// otherwise it would be `://` + /// otherwise it would be `s3:///` #[serde(rename = "snowflake.s3_path")] pub s3_path: Option, @@ -94,11 +98,6 @@ pub struct SnowflakeCommon { /// The s3 region, e.g., us-east-2 #[serde(rename = "snowflake.aws_region")] pub aws_region: String, - - /// The configurable max row(s) to batch, - /// which should be *explicitly* specified by user(s) - #[serde(rename = "snowflake.max_batch_row_num")] - pub max_batch_row_num: String, } #[serde_as] @@ -137,8 +136,7 @@ impl Sink for SnowflakeSink { self.schema.clone(), self.pk_indices.clone(), self.is_append_only, - ) - .await + )? .into_log_sinker(writer_param.sink_metrics)) } @@ -177,22 +175,25 @@ pub struct SnowflakeSinkWriter { /// the client to insert file to external storage (i.e., s3) s3_client: SnowflakeS3Client, row_encoder: JsonEncoder, - row_counter: u32, - payload: String, - /// the threshold for sinking to s3 - max_batch_row_num: u32, /// The current epoch, used in naming the sink files /// mainly used for debugging purpose epoch: u64, + /// streaming uploader to upload data to the intermediate (s3) storage. + /// this also contains the file suffix *unique* to the *local* sink writer per epoch. + /// i.e., opendal s3 engine and the file suffix for intermediate s3 file. + /// note: the option here *implicitly* indicates whether we have at + /// least call `streaming_upload` once during this epoch, + /// which is mainly used to prevent uploading empty data. + streaming_uploader: Option<(Box, String)>, } impl SnowflakeSinkWriter { - pub async fn new( + pub fn new( config: SnowflakeConfig, schema: Schema, pk_indices: Vec, is_append_only: bool, - ) -> Self { + ) -> Result { let http_client = SnowflakeHttpClient::new( config.common.account_identifier.clone(), config.common.user.clone(), @@ -211,17 +212,9 @@ impl SnowflakeSinkWriter { config.common.aws_access_key_id.clone(), config.common.aws_secret_access_key.clone(), config.common.aws_region.clone(), - ) - .await; - - let max_batch_row_num = config - .common - .max_batch_row_num - .clone() - .parse::() - .expect("failed to parse `snowflake.max_batch_row_num` as a `u32`"); + )?; - Self { + Ok(Self { config, schema: schema.clone(), pk_indices, @@ -236,32 +229,102 @@ impl SnowflakeSinkWriter { TimestamptzHandlingMode::UtcString, TimeHandlingMode::String, ), - row_counter: 0, - payload: String::new(), - max_batch_row_num, - // initial value of `epoch` will start from 0 + // initial value of `epoch` will be set to 0 epoch: 0, - } + // will be (lazily) initialized after the begin of each epoch + // when some data is ready to be upload + streaming_uploader: None, + }) } - /// reset the `payload` and `row_counter`. - /// shall *only* be called after a successful sink to s3. - fn reset(&mut self) { - self.payload.clear(); - self.row_counter = 0; + /// return a brand new the streaming uploader as well as the file suffix. + /// note: should *only* be called iff after a new epoch begins, + /// and `streaming_upload` being called the first time. + /// i.e., lazily initialization of the internal `streaming_uploader`. + /// plus, this function is *pure*, the `&mut self` here is to make rustc (and tokio) happy. + async fn new_streaming_uploader(&mut self) -> Result<(Box, String)> { + let file_suffix = self.file_suffix(); + let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); + let uploader = self + .s3_client + .opendal_s3_engine + .streaming_upload(&path) + .await + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to create the streaming uploader of opendal s3 engine for epoch {}, error: {}", + self.epoch, + err + )) + })?; + Ok((uploader, file_suffix)) + } + + /// write data to the current streaming uploader for this epoch. + async fn streaming_upload(&mut self, data: Bytes) -> Result<()> { + let (uploader, _) = match self.streaming_uploader.as_mut() { + Some(s) => s, + None => { + assert!( + self.streaming_uploader.is_none(), + "expect `streaming_uploader` to be None" + ); + let uploader = self.new_streaming_uploader().await?; + self.streaming_uploader.insert(uploader) + } + }; + uploader + .write_bytes(data) + .await + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", + err + )) + })?; + Ok(()) } - fn at_sink_threshold(&self) -> bool { - self.row_counter >= self.max_batch_row_num + /// finalize streaming upload for this epoch. + /// ensure all the data has been properly uploaded to intermediate s3. + async fn finish_streaming_upload(&mut self) -> Result> { + let uploader = std::mem::take(&mut self.streaming_uploader); + let Some((uploader, file_suffix)) = uploader else { + // there is no data to be uploaded for this epoch + return Ok(None); + }; + uploader.finish().await.map_err(|err| { + SinkError::Snowflake(format!( + "failed to finish streaming upload to s3 for snowflake sink, error: {}", + err + )) + })?; + Ok(Some(file_suffix)) } - fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + let mut chunk_buf = BytesMut::new(); + + // write the json representations of the row(s) in current chunk to `chunk_buf` for (op, row) in chunk.rows() { assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`"); - let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); - self.payload.push_str(&row_json_string); - self.row_counter += 1; + // to prevent temporary string allocation, + // so we directly write to `chunk_buf` implicitly via `write_fmt`. + write!( + chunk_buf, + "{}", + Value::Object(self.row_encoder.encode(row)?) + ) + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to write json object to `row_buf`, error: {}", + err + )) + })?; } + + // streaming upload in a chunk-by-chunk manner + self.streaming_upload(chunk_buf.freeze()).await?; Ok(()) } @@ -284,20 +347,16 @@ impl SnowflakeSinkWriter { /// sink `payload` to s3, then trigger corresponding `insertFiles` post request /// to snowflake, to finish the overall sinking pipeline. - async fn sink_payload(&mut self) -> Result<()> { - if self.payload.is_empty() { + async fn commit(&mut self) -> Result<()> { + // note that after `finish_streaming_upload`, do *not* interact with + // `streaming_uploader` until new data comes in at next epoch, + // since the ownership has been taken in this method, and `None` will be left. + let Some(file_suffix) = self.finish_streaming_upload().await? else { + // represents there is no data to be uploaded for this epoch return Ok(()); - } - let file_suffix = self.file_suffix(); - // todo: change this to streaming upload - // first sink to the external stage provided by user (i.e., s3) - self.s3_client - .sink_to_s3(self.payload.clone().into(), file_suffix.clone()) - .await?; - // then trigger `insertFiles` post request to snowflake - self.http_client.send_request(file_suffix).await?; - // reset `payload` & `row_counter` - self.reset(); + }; + // trigger `insertFiles` post request to snowflake + self.http_client.send_request(&file_suffix).await?; Ok(()) } } @@ -319,20 +378,16 @@ impl SinkWriter for SnowflakeSinkWriter { async fn barrier(&mut self, is_checkpoint: bool) -> Result { if is_checkpoint { - // sink all the row(s) currently batched in `self.payload` - self.sink_payload().await?; + // finalize current streaming upload, plus notify snowflake to sink + // the corresponding data to snowflake pipe. + // note: if no data needs to be committed, then `commit` is simply a no-op. + self.commit().await?; } Ok(()) } async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - self.append_only(chunk)?; - - // When the number of row exceeds `MAX_BATCH_ROW_NUM` - if self.at_sink_threshold() { - self.sink_payload().await?; - } - + self.append_only(chunk).await?; Ok(()) } } diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 30c74045441a2..432d222a8b426 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -15,15 +15,10 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; -use aws_config; -use aws_config::meta::region::RegionProviderChain; -use aws_sdk_s3::config::Credentials; -use aws_sdk_s3::primitives::ByteStream; -use aws_sdk_s3::Client as S3Client; -use aws_types::region::Region; -use bytes::Bytes; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; use reqwest::{header, Client, RequestBuilder, StatusCode}; +use risingwave_common::config::ObjectStoreConfig; +use risingwave_object_store::object::*; use serde::{Deserialize, Serialize}; use thiserror_ext::AsReport; @@ -35,7 +30,7 @@ const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK"; const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE"; /// The helper function to generate the *global unique* s3 file name. -fn generate_s3_file_name(s3_path: Option, suffix: String) -> String { +pub(crate) fn generate_s3_file_name(s3_path: Option<&str>, suffix: &str) -> String { match s3_path { Some(path) => format!("{}/{}_{}", path, S3_INTERMEDIATE_FILE_NAME, suffix), None => format!("{}_{}", S3_INTERMEDIATE_FILE_NAME, suffix), @@ -155,7 +150,7 @@ impl SnowflakeHttpClient { /// NOTE: this function should ONLY be called *after* /// uploading files to remote external staged storage, i.e., AWS S3 - pub async fn send_request(&self, file_suffix: String) -> Result<()> { + pub async fn send_request(&self, file_suffix: &str) -> Result<()> { let builder = self.build_request_and_client(); // Generate the jwt_token @@ -167,7 +162,7 @@ impl SnowflakeHttpClient { "X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT", ) - .body(generate_s3_file_name(self.s3_path.clone(), file_suffix)); + .body(generate_s3_file_name(self.s3_path.as_deref(), file_suffix)); let response = builder .send() @@ -190,56 +185,43 @@ impl SnowflakeHttpClient { pub struct SnowflakeS3Client { s3_bucket: String, s3_path: Option, - s3_client: S3Client, + pub opendal_s3_engine: OpendalObjectStore, } impl SnowflakeS3Client { - pub async fn new( + pub fn new( s3_bucket: String, s3_path: Option, aws_access_key_id: String, aws_secret_access_key: String, aws_region: String, - ) -> Self { - let credentials = Credentials::new( - aws_access_key_id, - aws_secret_access_key, - // we don't allow temporary credentials - None, - None, - "rw_sink_to_s3_credentials", - ); - - let region = RegionProviderChain::first_try(Region::new(aws_region)).or_default_provider(); - - let config = aws_config::from_env() - .credentials_provider(credentials) - .region(region) - .load() - .await; - - // create the brand new s3 client used to sink files to s3 - let s3_client = S3Client::new(&config); + ) -> Result { + // just use default configuration here for opendal s3 engine + let config = ObjectStoreConfig::default(); + + // create the s3 engine for streaming upload to the intermediate s3 bucket + let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials( + &s3_bucket, + config, + &aws_access_key_id, + &aws_secret_access_key, + &aws_region, + ) + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to create opendal s3 engine, error: {}", + err + )) + })?; - Self { + Ok(Self { s3_bucket, s3_path, - s3_client, - } + opendal_s3_engine, + }) } - pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> { - self.s3_client - .put_object() - .bucket(self.s3_bucket.clone()) - .key(generate_s3_file_name(self.s3_path.clone(), file_suffix)) - .body(ByteStream::from(data)) - .send() - .await - .map_err(|err| { - SinkError::Snowflake(format!("failed to sink data to S3, error: {}", err)) - })?; - - Ok(()) + pub fn s3_path(&self) -> Option<&str> { + self.s3_path.as_deref() } } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 07da6a36a0e3a..27daa718b64f9 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -564,7 +564,7 @@ SnowflakeConfig: required: true - name: snowflake.s3_path field_type: String - comments: The optional s3 path to be specified the actual file location would be `:///` if this field is specified by user(s) otherwise it would be `://` + comments: The optional s3 path to be specified the actual file location would be `s3:////` if this field is specified by user(s) otherwise it would be `s3:///` required: false - name: snowflake.aws_access_key_id field_type: String @@ -578,10 +578,6 @@ SnowflakeConfig: field_type: String comments: The s3 region, e.g., us-east-2 required: true - - name: snowflake.max_batch_row_num - field_type: String - comments: The configurable max row(s) to batch, which should be *explicitly* specified by user(s) - required: true StarrocksConfig: fields: - name: starrocks.host diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 2aa4bd458806e..d7ba829dda632 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -33,6 +33,7 @@ pub struct OpendalObjectStore { pub(crate) op: Operator, pub(crate) engine_type: EngineType, } + #[derive(Clone)] pub enum EngineType { Memory, @@ -219,6 +220,7 @@ impl ObjectStore for OpendalObjectStore { pub struct OpendalStreamingUploader { writer: Writer, } + impl OpendalStreamingUploader { pub async fn new(op: Operator, path: String) -> ObjectResult { let writer = op diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 28f90a48e9ae0..c83498305d8e5 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -59,6 +59,7 @@ impl OpendalObjectStore { .with_jitter(), ) .finish(); + Ok(Self { op, engine_type: EngineType::S3, @@ -128,4 +129,47 @@ impl OpendalObjectStore { Ok(HttpClient::build(client_builder)?) } + + /// currently used by snowflake sink, + /// especially when sinking to the intermediate s3 bucket. + pub fn new_s3_engine_with_credentials( + bucket: &str, + object_store_config: ObjectStoreConfig, + aws_access_key_id: &str, + aws_secret_access_key: &str, + aws_region: &str, + ) -> ObjectResult { + // Create s3 builder with credentials. + let mut builder = S3::default(); + + // set credentials for s3 sink + builder.bucket(bucket); + builder.access_key_id(aws_access_key_id); + builder.secret_access_key(aws_secret_access_key); + builder.region(aws_region); + + let http_client = Self::new_http_client(&object_store_config)?; + builder.http_client(http_client); + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer( + RetryLayer::new() + .with_min_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_interval_ms, + )) + .with_max_delay(Duration::from_millis( + object_store_config.s3.object_store_req_retry_max_delay_ms, + )) + .with_max_times(object_store_config.s3.object_store_req_retry_max_attempts) + .with_factor(1.1) + .with_jitter(), + ) + .finish(); + + Ok(Self { + op, + engine_type: EngineType::S3, + }) + } }