Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: more rename for legacy fs source #20323

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ macro_rules! for_all_classified_sources {
{ GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
{ Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
{ Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
Expand Down
13 changes: 0 additions & 13 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use aws_sdk_s3::types::Object;
use bytes::Bytes;
use enum_as_inner::EnumAsInner;
use futures::future::try_join_all;
Expand Down Expand Up @@ -49,7 +48,6 @@ use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONN
use crate::error::ConnectorResult as Result;
use crate::parser::schema_change::SchemaChangeEnvelope;
use crate::parser::ParserConfig;
use crate::source::filesystem::FsPageItem;
use crate::source::monitor::EnumeratorMetrics;
use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
use crate::with_options::WithOptions;
Expand Down Expand Up @@ -824,17 +822,6 @@ pub trait SplitMetaData: Sized {
/// [`None`] and the created source stream will be a pending stream.
pub type ConnectorState = Option<Vec<SplitImpl>>;

#[derive(Debug, Clone, Default)]
pub struct FsFilterCtrlCtx;
pub type FsFilterCtrlCtxRef = Arc<FsFilterCtrlCtx>;

#[async_trait]
pub trait FsListInner: Sized {
// fixme: better to implement as an Iterator, but the last page still have some contents
async fn get_next_page<T: for<'a> From<&'a Object>>(&mut self) -> Result<(Vec<T>, bool)>;
fn filter_policy(&self, ctx: &FsFilterCtrlCtx, page_num: usize, item: &FsPageItem) -> bool;
}

#[cfg(test)]
mod tests {
use maplit::*;
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ use super::opendal_source::OpendalSource;
use crate::error::ConnectorResult;
use crate::source::{SplitId, SplitMetaData};

/// [`FsSplit`] Describes a file or a split of a file. A file is a generic concept,
/// [`LegacyFsSplit`] Describes a file or a split of a file. A file is a generic concept,
/// and can be a local file, a distributed file system, or am object in S3 bucket.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct FsSplit {
pub struct LegacyFsSplit {
pub name: String,
pub offset: usize,
pub size: usize,
}

impl From<&Object> for FsSplit {
impl From<&Object> for LegacyFsSplit {
fn from(value: &Object) -> Self {
Self {
name: value.key().unwrap().to_owned(),
Expand All @@ -43,7 +43,7 @@ impl From<&Object> for FsSplit {
}
}

impl SplitMetaData for FsSplit {
impl SplitMetaData for LegacyFsSplit {
fn id(&self) -> SplitId {
self.name.as_str().into()
}
Expand All @@ -63,7 +63,7 @@ impl SplitMetaData for FsSplit {
}
}

impl FsSplit {
impl LegacyFsSplit {
pub fn new(name: String, start: usize, size: usize) -> Self {
Self {
name,
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

pub use opendal_source::GcsProperties;
pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, LEGACY_S3_CONNECTOR};
pub use s3::{
LegacyS3FileReader, LegacyS3Properties, LegacyS3SplitEnumerator, LEGACY_S3_CONNECTOR,
};
pub mod file_common;
pub mod nd_streaming;
pub use file_common::{FsPage, FsPageItem, FsSplit, OpendalFsSplit};
pub use file_common::{FsPage, FsPageItem, LegacyFsSplit, OpendalFsSplit};
pub mod opendal_source;
mod s3;
pub mod s3_v2;
73 changes: 62 additions & 11 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
use anyhow::Context;
use async_trait::async_trait;
use aws_sdk_s3::client::Client;
use itertools::Itertools;

use crate::aws_utils::{default_conn_config, s3_client};
use crate::connector_common::AwsAuthProps;
use crate::source::filesystem::file_common::FsSplit;
use crate::source::filesystem::s3::S3Properties;
use crate::source::{FsListInner, SourceEnumeratorContextRef, SplitEnumerator};
use crate::error::ConnectorResult;
use crate::source::filesystem::file_common::LegacyFsSplit;
use crate::source::filesystem::s3::LegacyS3Properties;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

/// Get the prefix from a glob
pub fn get_prefix(glob: &str) -> String {
Expand Down Expand Up @@ -56,7 +58,7 @@ pub fn get_prefix(glob: &str) -> String {
}

#[derive(Debug, Clone)]
pub struct S3SplitEnumerator {
pub struct LegacyS3SplitEnumerator {
pub(crate) bucket_name: String,
// prefix is used to reduce the number of objects to be listed
pub(crate) prefix: Option<String>,
Expand All @@ -68,9 +70,9 @@ pub struct S3SplitEnumerator {
}

#[async_trait]
impl SplitEnumerator for S3SplitEnumerator {
type Properties = S3Properties;
type Split = FsSplit;
impl SplitEnumerator for LegacyS3SplitEnumerator {
type Properties = LegacyS3Properties;
type Split = LegacyFsSplit;

async fn new(
properties: Self::Properties,
Expand All @@ -89,7 +91,7 @@ impl SplitEnumerator for S3SplitEnumerator {
(None, None)
};

Ok(S3SplitEnumerator {
Ok(LegacyS3SplitEnumerator {
bucket_name: properties.bucket_name,
matcher,
prefix,
Expand All @@ -100,16 +102,65 @@ impl SplitEnumerator for S3SplitEnumerator {

async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
// fetch one page as validation, no need to get all pages
let (_, _) = self.get_next_page::<FsSplit>().await?;
let (_, _) = self.get_next_page::<LegacyFsSplit>().await?;

Ok(vec![FsSplit {
Ok(vec![LegacyFsSplit {
name: "empty_split".to_owned(),
offset: 0,
size: 0,
}])
}
}

#[async_trait]
pub trait FsListInner: Sized {
// fixme: better to implement as an Iterator, but the last page still have some contents
async fn get_next_page<T: for<'a> From<&'a aws_sdk_s3::types::Object>>(
&mut self,
) -> ConnectorResult<(Vec<T>, bool)>;
}

#[async_trait]
impl FsListInner for LegacyS3SplitEnumerator {
async fn get_next_page<T: for<'a> From<&'a aws_sdk_s3::types::Object>>(
&mut self,
) -> ConnectorResult<(Vec<T>, bool)> {
let mut has_finished = false;
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket_name)
.set_prefix(self.prefix.clone());
if let Some(continuation_token) = self.next_continuation_token.take() {
req = req.continuation_token(continuation_token);
}
let mut res = req
.send()
.await
.with_context(|| format!("failed to list objects in bucket `{}`", self.bucket_name))?;
if res.is_truncated().unwrap_or_default() {
self.next_continuation_token
.clone_from(&res.next_continuation_token);
} else {
has_finished = true;
self.next_continuation_token = None;
}
let objects = res.contents.take().unwrap_or_default();
let matched_objs: Vec<T> = objects
.iter()
.filter(|obj| obj.key().is_some())
.filter(|obj| {
self.matcher
.as_ref()
.map(|m| m.matches(obj.key().unwrap()))
.unwrap_or(true)
})
.map(T::from)
.collect_vec();
Ok((matched_objs, has_finished))
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;
Expand Down Expand Up @@ -141,7 +192,7 @@ mod tests {
compression_format: CompressionFormat::None,
};
let mut enumerator =
S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into())
LegacyS3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into())
.await
.unwrap();
let splits = enumerator.list_splits().await.unwrap();
Expand Down
24 changes: 12 additions & 12 deletions src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
pub mod enumerator;
use std::collections::HashMap;

pub use enumerator::S3SplitEnumerator;
pub use enumerator::LegacyS3SplitEnumerator;

use crate::source::filesystem::file_common::CompressionFormat;
mod source;
use serde::Deserialize;
pub use source::S3FileReader;
pub use source::LegacyS3FileReader;

use crate::connector_common::AwsAuthProps;
use crate::source::filesystem::FsSplit;
use crate::source::filesystem::LegacyFsSplit;
use crate::source::{SourceProperties, UnknownFields};

pub const LEGACY_S3_CONNECTOR: &str = "s3";
Expand All @@ -47,15 +47,15 @@ pub struct S3PropertiesCommon {
}

#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct S3Properties {
pub struct LegacyS3Properties {
#[serde(flatten)]
pub common: S3PropertiesCommon,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl From<S3PropertiesCommon> for S3Properties {
impl From<S3PropertiesCommon> for LegacyS3Properties {
fn from(common: S3PropertiesCommon) -> Self {
Self {
common,
Expand All @@ -64,22 +64,22 @@ impl From<S3PropertiesCommon> for S3Properties {
}
}

impl SourceProperties for S3Properties {
type Split = FsSplit;
type SplitEnumerator = S3SplitEnumerator;
type SplitReader = S3FileReader;
impl SourceProperties for LegacyS3Properties {
type Split = LegacyFsSplit;
type SplitEnumerator = LegacyS3SplitEnumerator;
type SplitReader = LegacyS3FileReader;

const SOURCE_NAME: &'static str = LEGACY_S3_CONNECTOR;
}

impl UnknownFields for S3Properties {
impl UnknownFields for LegacyS3Properties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

impl From<&S3Properties> for AwsAuthProps {
fn from(props: &S3Properties) -> Self {
impl From<&LegacyS3Properties> for AwsAuthProps {
fn from(props: &LegacyS3Properties) -> Self {
let props = &props.common;
Self {
region: Some(props.region_name.clone()),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/s3/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

mod reader;
mod split_stream;
pub use reader::S3FileReader;
pub use reader::LegacyS3FileReader;
Loading
Loading