diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index ac498839ed67c..186ecd314c5fc 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -35,7 +35,7 @@ macro_rules! for_all_classified_sources { { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit }, { Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit }, { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit }, - { S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit }, + { S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit }, { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> }, { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> }, { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> }, diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 406d678a65ee6..3b7c01c185c18 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use anyhow::anyhow; use async_trait::async_trait; -use aws_sdk_s3::types::Object; use bytes::Bytes; use enum_as_inner::EnumAsInner; use futures::future::try_join_all; @@ -49,7 +48,6 @@ use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONN use crate::error::ConnectorResult as Result; use crate::parser::schema_change::SchemaChangeEnvelope; use crate::parser::ParserConfig; -use crate::source::filesystem::FsPageItem; use crate::source::monitor::EnumeratorMetrics; use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc}; use crate::with_options::WithOptions; @@ -824,17 +822,6 @@ pub trait SplitMetaData: Sized { /// [`None`] and the created source stream will be a pending stream. pub type ConnectorState = Option>; -#[derive(Debug, Clone, Default)] -pub struct FsFilterCtrlCtx; -pub type FsFilterCtrlCtxRef = Arc; - -#[async_trait] -pub trait FsListInner: Sized { - // fixme: better to implement as an Iterator, but the last page still have some contents - async fn get_next_page From<&'a Object>>(&mut self) -> Result<(Vec, bool)>; - fn filter_policy(&self, ctx: &FsFilterCtrlCtx, page_num: usize, item: &FsPageItem) -> bool; -} - #[cfg(test)] mod tests { use maplit::*; diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index f89758a0ef51c..50f56ea1f53fe 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -24,16 +24,16 @@ use super::opendal_source::OpendalSource; use crate::error::ConnectorResult; use crate::source::{SplitId, SplitMetaData}; -/// [`FsSplit`] Describes a file or a split of a file. A file is a generic concept, +/// [`LegacyFsSplit`] Describes a file or a split of a file. A file is a generic concept, /// and can be a local file, a distributed file system, or am object in S3 bucket. #[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] -pub struct FsSplit { +pub struct LegacyFsSplit { pub name: String, pub offset: usize, pub size: usize, } -impl From<&Object> for FsSplit { +impl From<&Object> for LegacyFsSplit { fn from(value: &Object) -> Self { Self { name: value.key().unwrap().to_owned(), @@ -43,7 +43,7 @@ impl From<&Object> for FsSplit { } } -impl SplitMetaData for FsSplit { +impl SplitMetaData for LegacyFsSplit { fn id(&self) -> SplitId { self.name.as_str().into() } @@ -63,7 +63,7 @@ impl SplitMetaData for FsSplit { } } -impl FsSplit { +impl LegacyFsSplit { pub fn new(name: String, start: usize, size: usize) -> Self { Self { name, diff --git a/src/connector/src/source/filesystem/mod.rs b/src/connector/src/source/filesystem/mod.rs index 22f2629be2d4b..b2f4e091cef2b 100644 --- a/src/connector/src/source/filesystem/mod.rs +++ b/src/connector/src/source/filesystem/mod.rs @@ -13,10 +13,11 @@ // limitations under the License. pub use opendal_source::GcsProperties; -pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, LEGACY_S3_CONNECTOR}; +pub use s3::{ + LegacyS3FileReader, LegacyS3Properties, LegacyS3SplitEnumerator, LEGACY_S3_CONNECTOR, +}; pub mod file_common; pub mod nd_streaming; -pub use file_common::{FsPage, FsPageItem, FsSplit, OpendalFsSplit}; +pub use file_common::{FsPage, FsPageItem, LegacyFsSplit, OpendalFsSplit}; pub mod opendal_source; mod s3; -pub mod s3_v2; diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index 65eb551afd183..79c55a6a0282d 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -15,12 +15,14 @@ use anyhow::Context; use async_trait::async_trait; use aws_sdk_s3::client::Client; +use itertools::Itertools; use crate::aws_utils::{default_conn_config, s3_client}; use crate::connector_common::AwsAuthProps; -use crate::source::filesystem::file_common::FsSplit; -use crate::source::filesystem::s3::S3Properties; -use crate::source::{FsListInner, SourceEnumeratorContextRef, SplitEnumerator}; +use crate::error::ConnectorResult; +use crate::source::filesystem::file_common::LegacyFsSplit; +use crate::source::filesystem::s3::LegacyS3Properties; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; /// Get the prefix from a glob pub fn get_prefix(glob: &str) -> String { @@ -56,7 +58,7 @@ pub fn get_prefix(glob: &str) -> String { } #[derive(Debug, Clone)] -pub struct S3SplitEnumerator { +pub struct LegacyS3SplitEnumerator { pub(crate) bucket_name: String, // prefix is used to reduce the number of objects to be listed pub(crate) prefix: Option, @@ -68,9 +70,9 @@ pub struct S3SplitEnumerator { } #[async_trait] -impl SplitEnumerator for S3SplitEnumerator { - type Properties = S3Properties; - type Split = FsSplit; +impl SplitEnumerator for LegacyS3SplitEnumerator { + type Properties = LegacyS3Properties; + type Split = LegacyFsSplit; async fn new( properties: Self::Properties, @@ -89,7 +91,7 @@ impl SplitEnumerator for S3SplitEnumerator { (None, None) }; - Ok(S3SplitEnumerator { + Ok(LegacyS3SplitEnumerator { bucket_name: properties.bucket_name, matcher, prefix, @@ -100,9 +102,9 @@ impl SplitEnumerator for S3SplitEnumerator { async fn list_splits(&mut self) -> crate::error::ConnectorResult> { // fetch one page as validation, no need to get all pages - let (_, _) = self.get_next_page::().await?; + let (_, _) = self.get_next_page::().await?; - Ok(vec![FsSplit { + Ok(vec![LegacyFsSplit { name: "empty_split".to_owned(), offset: 0, size: 0, @@ -110,6 +112,55 @@ impl SplitEnumerator for S3SplitEnumerator { } } +#[async_trait] +pub trait FsListInner: Sized { + // fixme: better to implement as an Iterator, but the last page still have some contents + async fn get_next_page From<&'a aws_sdk_s3::types::Object>>( + &mut self, + ) -> ConnectorResult<(Vec, bool)>; +} + +#[async_trait] +impl FsListInner for LegacyS3SplitEnumerator { + async fn get_next_page From<&'a aws_sdk_s3::types::Object>>( + &mut self, + ) -> ConnectorResult<(Vec, bool)> { + let mut has_finished = false; + let mut req = self + .client + .list_objects_v2() + .bucket(&self.bucket_name) + .set_prefix(self.prefix.clone()); + if let Some(continuation_token) = self.next_continuation_token.take() { + req = req.continuation_token(continuation_token); + } + let mut res = req + .send() + .await + .with_context(|| format!("failed to list objects in bucket `{}`", self.bucket_name))?; + if res.is_truncated().unwrap_or_default() { + self.next_continuation_token + .clone_from(&res.next_continuation_token); + } else { + has_finished = true; + self.next_continuation_token = None; + } + let objects = res.contents.take().unwrap_or_default(); + let matched_objs: Vec = objects + .iter() + .filter(|obj| obj.key().is_some()) + .filter(|obj| { + self.matcher + .as_ref() + .map(|m| m.matches(obj.key().unwrap())) + .unwrap_or(true) + }) + .map(T::from) + .collect_vec(); + Ok((matched_objs, has_finished)) + } +} + #[cfg(test)] mod tests { use itertools::Itertools; @@ -141,7 +192,7 @@ mod tests { compression_format: CompressionFormat::None, }; let mut enumerator = - S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into()) + LegacyS3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into()) .await .unwrap(); let splits = enumerator.list_splits().await.unwrap(); diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index 21250c8c81b09..0e7960789b2c3 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -14,15 +14,15 @@ pub mod enumerator; use std::collections::HashMap; -pub use enumerator::S3SplitEnumerator; +pub use enumerator::LegacyS3SplitEnumerator; use crate::source::filesystem::file_common::CompressionFormat; mod source; use serde::Deserialize; -pub use source::S3FileReader; +pub use source::LegacyS3FileReader; use crate::connector_common::AwsAuthProps; -use crate::source::filesystem::FsSplit; +use crate::source::filesystem::LegacyFsSplit; use crate::source::{SourceProperties, UnknownFields}; pub const LEGACY_S3_CONNECTOR: &str = "s3"; @@ -47,7 +47,7 @@ pub struct S3PropertiesCommon { } #[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)] -pub struct S3Properties { +pub struct LegacyS3Properties { #[serde(flatten)] pub common: S3PropertiesCommon, @@ -55,7 +55,7 @@ pub struct S3Properties { pub unknown_fields: HashMap, } -impl From for S3Properties { +impl From for LegacyS3Properties { fn from(common: S3PropertiesCommon) -> Self { Self { common, @@ -64,22 +64,22 @@ impl From for S3Properties { } } -impl SourceProperties for S3Properties { - type Split = FsSplit; - type SplitEnumerator = S3SplitEnumerator; - type SplitReader = S3FileReader; +impl SourceProperties for LegacyS3Properties { + type Split = LegacyFsSplit; + type SplitEnumerator = LegacyS3SplitEnumerator; + type SplitReader = LegacyS3FileReader; const SOURCE_NAME: &'static str = LEGACY_S3_CONNECTOR; } -impl UnknownFields for S3Properties { +impl UnknownFields for LegacyS3Properties { fn unknown_fields(&self) -> HashMap { self.unknown_fields.clone() } } -impl From<&S3Properties> for AwsAuthProps { - fn from(props: &S3Properties) -> Self { +impl From<&LegacyS3Properties> for AwsAuthProps { + fn from(props: &LegacyS3Properties) -> Self { let props = &props.common; Self { region: Some(props.region_name.clone()), diff --git a/src/connector/src/source/filesystem/s3/source/mod.rs b/src/connector/src/source/filesystem/s3/source/mod.rs index e7e6c5db0daed..87e9067537ad7 100644 --- a/src/connector/src/source/filesystem/s3/source/mod.rs +++ b/src/connector/src/source/filesystem/s3/source/mod.rs @@ -14,4 +14,4 @@ mod reader; mod split_stream; -pub use reader::S3FileReader; +pub use reader::LegacyS3FileReader; diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 5f2631b913ab4..eeb613f52c48d 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -36,9 +36,9 @@ use crate::connector_common::AwsAuthProps; use crate::error::ConnectorResult; use crate::parser::ParserConfig; use crate::source::base::{SplitMetaData, SplitReader}; -use crate::source::filesystem::file_common::FsSplit; +use crate::source::filesystem::file_common::LegacyFsSplit; use crate::source::filesystem::nd_streaming::need_nd_streaming; -use crate::source::filesystem::s3::S3Properties; +use crate::source::filesystem::s3::LegacyS3Properties; use crate::source::{ into_chunk_stream, BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, }; @@ -46,22 +46,22 @@ use crate::source::{ const STREAM_READER_CAPACITY: usize = 4096; #[derive(Debug)] -pub struct S3FileReader { +pub struct LegacyS3FileReader { #[expect(dead_code)] split_offset: HashMap, bucket_name: String, s3_client: s3_client::Client, - splits: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, } -impl S3FileReader { +impl LegacyS3FileReader { #[try_stream(boxed, ok = Vec, error = crate::error::ConnectorError)] pub async fn stream_read_object( client_for_s3: s3_client::Client, bucket_name: String, - split: FsSplit, + split: LegacyFsSplit, source_ctx: SourceContextRef, ) { let actor_id = source_ctx.actor_id.to_string(); @@ -73,7 +73,7 @@ impl S3FileReader { let object_name = split.name.clone(); - let byte_stream = match S3FileReader::get_object( + let byte_stream = match LegacyS3FileReader::get_object( &client_for_s3, &bucket_name, &object_name, @@ -171,13 +171,13 @@ impl S3FileReader { } #[async_trait] -impl SplitReader for S3FileReader { - type Properties = S3Properties; - type Split = FsSplit; +impl SplitReader for LegacyS3FileReader { + type Properties = LegacyS3Properties; + type Split = LegacyFsSplit; async fn new( - props: S3Properties, - splits: Vec, + props: LegacyS3Properties, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, @@ -189,7 +189,7 @@ impl SplitReader for S3FileReader { let bucket_name = props.common.bucket_name; let s3_client = s3_client(&sdk_config, Some(default_conn_config())); - let s3_file_reader = S3FileReader { + let s3_file_reader = LegacyS3FileReader { split_offset: HashMap::new(), bucket_name, s3_client, @@ -206,7 +206,7 @@ impl SplitReader for S3FileReader { } } -impl S3FileReader { +impl LegacyS3FileReader { #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)] async fn into_stream_inner(self) { for split in self.splits { @@ -248,7 +248,7 @@ mod tests { }; use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::s3::S3PropertiesCommon; - use crate::source::filesystem::S3SplitEnumerator; + use crate::source::filesystem::LegacyS3SplitEnumerator; use crate::source::{ SourceColumnDesc, SourceContext, SourceEnumeratorContext, SplitEnumerator, }; @@ -256,7 +256,7 @@ mod tests { #[tokio::test] #[ignore] async fn test_s3_split_reader() { - let props: S3Properties = S3PropertiesCommon { + let props: LegacyS3Properties = S3PropertiesCommon { region_name: "ap-southeast-1".to_owned(), bucket_name: "mingchao-s3-source".to_owned(), match_pattern: None, @@ -267,7 +267,7 @@ mod tests { } .into(); let mut enumerator = - S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::dummy().into()) + LegacyS3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::dummy().into()) .await .unwrap(); let splits = enumerator.list_splits().await.unwrap(); @@ -292,9 +292,10 @@ mod tests { }, }; - let reader = S3FileReader::new(props, splits, config, SourceContext::dummy().into(), None) - .await - .unwrap(); + let reader = + LegacyS3FileReader::new(props, splits, config, SourceContext::dummy().into(), None) + .await + .unwrap(); let msg_stream = reader.into_stream_inner(); #[for_await] diff --git a/src/connector/src/source/filesystem/s3_v2/lister.rs b/src/connector/src/source/filesystem/s3_v2/lister.rs deleted file mode 100644 index ee0132742a40d..0000000000000 --- a/src/connector/src/source/filesystem/s3_v2/lister.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2025 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use anyhow::Context; -use async_trait::async_trait; -use aws_sdk_s3::types::Object; -use itertools::Itertools; - -use crate::error::ConnectorResult; -use crate::source::filesystem::{FsPageItem, S3SplitEnumerator}; -use crate::source::{FsFilterCtrlCtx, FsListInner}; - -#[async_trait] -impl FsListInner for S3SplitEnumerator { - async fn get_next_page From<&'a Object>>( - &mut self, - ) -> ConnectorResult<(Vec, bool)> { - let mut has_finished = false; - let mut req = self - .client - .list_objects_v2() - .bucket(&self.bucket_name) - .set_prefix(self.prefix.clone()); - if let Some(continuation_token) = self.next_continuation_token.take() { - req = req.continuation_token(continuation_token); - } - let mut res = req - .send() - .await - .with_context(|| format!("failed to list objects in bucket `{}`", self.bucket_name))?; - if res.is_truncated().unwrap_or_default() { - self.next_continuation_token - .clone_from(&res.next_continuation_token); - } else { - has_finished = true; - self.next_continuation_token = None; - } - let objects = res.contents.take().unwrap_or_default(); - let matched_objs: Vec = objects - .iter() - .filter(|obj| obj.key().is_some()) - .filter(|obj| { - self.matcher - .as_ref() - .map(|m| m.matches(obj.key().unwrap())) - .unwrap_or(true) - }) - .map(T::from) - .collect_vec(); - Ok((matched_objs, has_finished)) - } - - fn filter_policy(&self, _ctx: &FsFilterCtrlCtx, _page_num: usize, _item: &FsPageItem) -> bool { - true - } -} diff --git a/src/connector/src/source/filesystem/s3_v2/mod.rs b/src/connector/src/source/filesystem/s3_v2/mod.rs deleted file mode 100644 index ae95f6fedb186..0000000000000 --- a/src/connector/src/source/filesystem/s3_v2/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2025 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod lister; diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 5585dc5378ae5..3ea03f7b4df19 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -16,7 +16,7 @@ pub mod prelude { // import all split enumerators pub use crate::source::datagen::DatagenSplitEnumerator; pub use crate::source::filesystem::opendal_source::OpendalEnumerator; - pub use crate::source::filesystem::S3SplitEnumerator; + pub use crate::source::filesystem::LegacyS3SplitEnumerator; pub use crate::source::google_pubsub::PubsubSplitEnumerator as GooglePubsubSplitEnumerator; pub use crate::source::iceberg::IcebergSplitEnumerator; pub use crate::source::kafka::KafkaSplitEnumerator; diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index 95fdf24b111aa..ed4692a2db2a5 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -21,7 +21,7 @@ use risingwave_pb::catalog::PbStreamSourceInfo; use risingwave_pb::plan_common::PbColumnCatalog; #[expect(deprecated)] -use super::fs_reader::FsSourceReader; +use super::fs_reader::LegacyFsSourceReader; use super::reader::SourceReader; use crate::error::ConnectorResult; use crate::parser::additional_columns::source_add_partition_offset_cols; @@ -44,8 +44,8 @@ pub struct SourceDesc { #[deprecated = "will be replaced by new fs source (list + fetch)"] #[expect(deprecated)] #[derive(Debug)] -pub struct FsSourceDesc { - pub source: FsSourceReader, +pub struct LegacyFsSourceDesc { + pub source: LegacyFsSourceReader, pub columns: Vec, pub metrics: Arc, } @@ -139,7 +139,7 @@ impl SourceDescBuilder { #[deprecated = "will be replaced by new fs source (list + fetch)"] #[expect(deprecated)] - pub fn build_fs_source_desc(&self) -> ConnectorResult { + pub fn build_fs_source_desc(&self) -> ConnectorResult { let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?; match ( @@ -161,10 +161,13 @@ impl SourceDescBuilder { let columns = self.column_catalogs_to_source_column_descs(); - let source = - FsSourceReader::new(self.with_properties.clone(), columns.clone(), parser_config)?; + let source = LegacyFsSourceReader::new( + self.with_properties.clone(), + columns.clone(), + parser_config, + )?; - Ok(FsSourceDesc { + Ok(LegacyFsSourceDesc { source, columns, metrics: self.metrics.clone(), diff --git a/src/connector/src/source/reader/fs_reader.rs b/src/connector/src/source/reader/fs_reader.rs index 41817c3a85a97..6a2ea3223e0a5 100644 --- a/src/connector/src/source/reader/fs_reader.rs +++ b/src/connector/src/source/reader/fs_reader.rs @@ -29,15 +29,14 @@ use crate::source::{ use crate::WithOptionsSecResolved; #[derive(Clone, Debug)] -pub struct FsSourceReader { +pub struct LegacyFsSourceReader { pub config: ConnectorProperties, pub columns: Vec, pub properties: WithOptionsSecResolved, pub parser_config: SpecificParserConfig, } -impl FsSourceReader { - #[allow(clippy::too_many_arguments)] +impl LegacyFsSourceReader { pub fn new( properties: WithOptionsSecResolved, columns: Vec, diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 4cb45b983f0fa..5b6545e7086f8 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -473,6 +473,33 @@ KinesisProperties: required: false alias: - kinesis.assumerole.external_id +LegacyS3Properties: + fields: + - name: s3.region_name + field_type: String + required: true + - name: s3.bucket_name + field_type: String + required: true + - name: match_pattern + field_type: String + required: false + default: Default::default + - name: s3.credentials.access + field_type: String + required: false + default: Default::default + - name: s3.credentials.secret + field_type: String + required: false + default: Default::default + - name: s3.endpoint_url + field_type: String + required: false + - name: compression_format + field_type: CompressionFormat + required: false + default: Default::default MongodbCommon: fields: - name: mongodb.url @@ -1065,33 +1092,6 @@ PulsarProperties: contains a generated suffix in the subscription name. The subscription name will be `{subscription_name_prefix}-{fragment_id}-{actor_id}`. required: false -S3Properties: - fields: - - name: s3.region_name - field_type: String - required: true - - name: s3.bucket_name - field_type: String - required: true - - name: match_pattern - field_type: String - required: false - default: Default::default - - name: s3.credentials.access - field_type: String - required: false - default: Default::default - - name: s3.credentials.secret - field_type: String - required: false - default: Default::default - - name: s3.endpoint_url - field_type: String - required: false - - name: compression_format - field_type: CompressionFormat - required: false - default: Default::default TestSourceProperties: fields: - name: properties diff --git a/src/stream/src/executor/source/legacy_fs_source_executor.rs b/src/stream/src/executor/source/legacy_fs_source_executor.rs index 5ac97261677a6..8774d5c468b7f 100644 --- a/src/stream/src/executor/source/legacy_fs_source_executor.rs +++ b/src/stream/src/executor/source/legacy_fs_source_executor.rs @@ -25,7 +25,7 @@ use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::error::ConnectorError; -use risingwave_connector::source::reader::desc::{FsSourceDesc, SourceDescBuilder}; +use risingwave_connector::source::reader::desc::{LegacyFsSourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ BoxSourceChunkStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, @@ -89,7 +89,7 @@ impl LegacyFsSourceExecutor { async fn build_stream_source_reader( &mut self, - source_desc: &FsSourceDesc, + source_desc: &LegacyFsSourceDesc, state: ConnectorState, ) -> StreamExecutorResult { let column_ids = source_desc @@ -121,7 +121,7 @@ impl LegacyFsSourceExecutor { async fn rebuild_stream_reader( &mut self, - source_desc: &FsSourceDesc, + source_desc: &LegacyFsSourceDesc, stream: &mut StreamReaderWithPause, ) -> StreamExecutorResult<()> { let target_state: Vec = self @@ -141,7 +141,7 @@ impl LegacyFsSourceExecutor { async fn apply_split_change( &mut self, - source_desc: &FsSourceDesc, + source_desc: &LegacyFsSourceDesc, stream: &mut StreamReaderWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { @@ -207,7 +207,7 @@ impl LegacyFsSourceExecutor { async fn replace_stream_reader_with_target_state( &mut self, - source_desc: &FsSourceDesc, + source_desc: &LegacyFsSourceDesc, stream: &mut StreamReaderWithPause, target_state: Vec, ) -> StreamExecutorResult<()> {