Skip to content

Commit

Permalink
refactor: more rename for legacy fs source
Browse files Browse the repository at this point in the history
follow up #20322, but in connector crate
  • Loading branch information
xxchan committed Jan 27, 2025
1 parent cdfdb97 commit aea6d8f
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 191 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ macro_rules! for_all_classified_sources {
{ GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
{ Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
{ Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ S3, $crate::source::filesystem::LegacyS3Properties, $crate::source::filesystem::LegacyFsSplit },
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
Expand Down
13 changes: 0 additions & 13 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use aws_sdk_s3::types::Object;
use bytes::Bytes;
use enum_as_inner::EnumAsInner;
use futures::future::try_join_all;
Expand Down Expand Up @@ -49,7 +48,6 @@ use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONN
use crate::error::ConnectorResult as Result;
use crate::parser::schema_change::SchemaChangeEnvelope;
use crate::parser::ParserConfig;
use crate::source::filesystem::FsPageItem;
use crate::source::monitor::EnumeratorMetrics;
use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc, SqlServerCdc};
use crate::with_options::WithOptions;
Expand Down Expand Up @@ -824,17 +822,6 @@ pub trait SplitMetaData: Sized {
/// [`None`] and the created source stream will be a pending stream.
pub type ConnectorState = Option<Vec<SplitImpl>>;

#[derive(Debug, Clone, Default)]
pub struct FsFilterCtrlCtx;
pub type FsFilterCtrlCtxRef = Arc<FsFilterCtrlCtx>;

#[async_trait]
pub trait FsListInner: Sized {
// fixme: better to implement as an Iterator, but the last page still have some contents
async fn get_next_page<T: for<'a> From<&'a Object>>(&mut self) -> Result<(Vec<T>, bool)>;
fn filter_policy(&self, ctx: &FsFilterCtrlCtx, page_num: usize, item: &FsPageItem) -> bool;
}

#[cfg(test)]
mod tests {
use maplit::*;
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ use super::opendal_source::OpendalSource;
use crate::error::ConnectorResult;
use crate::source::{SplitId, SplitMetaData};

/// [`FsSplit`] Describes a file or a split of a file. A file is a generic concept,
/// [`LegacyFsSplit`] Describes a file or a split of a file. A file is a generic concept,
/// and can be a local file, a distributed file system, or am object in S3 bucket.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct FsSplit {
pub struct LegacyFsSplit {
pub name: String,
pub offset: usize,
pub size: usize,
}

impl From<&Object> for FsSplit {
impl From<&Object> for LegacyFsSplit {
fn from(value: &Object) -> Self {
Self {
name: value.key().unwrap().to_owned(),
Expand All @@ -43,7 +43,7 @@ impl From<&Object> for FsSplit {
}
}

impl SplitMetaData for FsSplit {
impl SplitMetaData for LegacyFsSplit {
fn id(&self) -> SplitId {
self.name.as_str().into()
}
Expand All @@ -63,7 +63,7 @@ impl SplitMetaData for FsSplit {
}
}

impl FsSplit {
impl LegacyFsSplit {
pub fn new(name: String, start: usize, size: usize) -> Self {
Self {
name,
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

pub use opendal_source::GcsProperties;
pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, LEGACY_S3_CONNECTOR};
pub use s3::{
LegacyS3FileReader, LegacyS3Properties, LegacyS3SplitEnumerator, LEGACY_S3_CONNECTOR,
};
pub mod file_common;
pub mod nd_streaming;
pub use file_common::{FsPage, FsPageItem, FsSplit, OpendalFsSplit};
pub use file_common::{FsPage, FsPageItem, LegacyFsSplit, OpendalFsSplit};
pub mod opendal_source;
mod s3;
pub mod s3_v2;
73 changes: 62 additions & 11 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
use anyhow::Context;
use async_trait::async_trait;
use aws_sdk_s3::client::Client;
use itertools::Itertools;

use crate::aws_utils::{default_conn_config, s3_client};
use crate::connector_common::AwsAuthProps;
use crate::source::filesystem::file_common::FsSplit;
use crate::source::filesystem::s3::S3Properties;
use crate::source::{FsListInner, SourceEnumeratorContextRef, SplitEnumerator};
use crate::error::ConnectorResult;
use crate::source::filesystem::file_common::LegacyFsSplit;
use crate::source::filesystem::s3::LegacyS3Properties;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

/// Get the prefix from a glob
pub fn get_prefix(glob: &str) -> String {
Expand Down Expand Up @@ -56,7 +58,7 @@ pub fn get_prefix(glob: &str) -> String {
}

#[derive(Debug, Clone)]
pub struct S3SplitEnumerator {
pub struct LegacyS3SplitEnumerator {
pub(crate) bucket_name: String,
// prefix is used to reduce the number of objects to be listed
pub(crate) prefix: Option<String>,
Expand All @@ -68,9 +70,9 @@ pub struct S3SplitEnumerator {
}

#[async_trait]
impl SplitEnumerator for S3SplitEnumerator {
type Properties = S3Properties;
type Split = FsSplit;
impl SplitEnumerator for LegacyS3SplitEnumerator {
type Properties = LegacyS3Properties;
type Split = LegacyFsSplit;

async fn new(
properties: Self::Properties,
Expand All @@ -89,7 +91,7 @@ impl SplitEnumerator for S3SplitEnumerator {
(None, None)
};

Ok(S3SplitEnumerator {
Ok(LegacyS3SplitEnumerator {
bucket_name: properties.bucket_name,
matcher,
prefix,
Expand All @@ -100,16 +102,65 @@ impl SplitEnumerator for S3SplitEnumerator {

async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<Self::Split>> {
// fetch one page as validation, no need to get all pages
let (_, _) = self.get_next_page::<FsSplit>().await?;
let (_, _) = self.get_next_page::<LegacyFsSplit>().await?;

Ok(vec![FsSplit {
Ok(vec![LegacyFsSplit {
name: "empty_split".to_owned(),
offset: 0,
size: 0,
}])
}
}

#[async_trait]
pub trait FsListInner: Sized {
// fixme: better to implement as an Iterator, but the last page still have some contents
async fn get_next_page<T: for<'a> From<&'a aws_sdk_s3::types::Object>>(
&mut self,
) -> ConnectorResult<(Vec<T>, bool)>;
}

#[async_trait]
impl FsListInner for LegacyS3SplitEnumerator {
async fn get_next_page<T: for<'a> From<&'a aws_sdk_s3::types::Object>>(
&mut self,
) -> ConnectorResult<(Vec<T>, bool)> {
let mut has_finished = false;
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket_name)
.set_prefix(self.prefix.clone());
if let Some(continuation_token) = self.next_continuation_token.take() {
req = req.continuation_token(continuation_token);
}
let mut res = req
.send()
.await
.with_context(|| format!("failed to list objects in bucket `{}`", self.bucket_name))?;
if res.is_truncated().unwrap_or_default() {
self.next_continuation_token
.clone_from(&res.next_continuation_token);
} else {
has_finished = true;
self.next_continuation_token = None;
}
let objects = res.contents.take().unwrap_or_default();
let matched_objs: Vec<T> = objects
.iter()
.filter(|obj| obj.key().is_some())
.filter(|obj| {
self.matcher
.as_ref()
.map(|m| m.matches(obj.key().unwrap()))
.unwrap_or(true)
})
.map(T::from)
.collect_vec();
Ok((matched_objs, has_finished))
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;
Expand Down Expand Up @@ -141,7 +192,7 @@ mod tests {
compression_format: CompressionFormat::None,
};
let mut enumerator =
S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into())
LegacyS3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into())
.await
.unwrap();
let splits = enumerator.list_splits().await.unwrap();
Expand Down
24 changes: 12 additions & 12 deletions src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
pub mod enumerator;
use std::collections::HashMap;

pub use enumerator::S3SplitEnumerator;
pub use enumerator::LegacyS3SplitEnumerator;

use crate::source::filesystem::file_common::CompressionFormat;
mod source;
use serde::Deserialize;
pub use source::S3FileReader;
pub use source::LegacyS3FileReader;

use crate::connector_common::AwsAuthProps;
use crate::source::filesystem::FsSplit;
use crate::source::filesystem::LegacyFsSplit;
use crate::source::{SourceProperties, UnknownFields};

pub const LEGACY_S3_CONNECTOR: &str = "s3";
Expand All @@ -47,15 +47,15 @@ pub struct S3PropertiesCommon {
}

#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct S3Properties {
pub struct LegacyS3Properties {
#[serde(flatten)]
pub common: S3PropertiesCommon,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl From<S3PropertiesCommon> for S3Properties {
impl From<S3PropertiesCommon> for LegacyS3Properties {
fn from(common: S3PropertiesCommon) -> Self {
Self {
common,
Expand All @@ -64,22 +64,22 @@ impl From<S3PropertiesCommon> for S3Properties {
}
}

impl SourceProperties for S3Properties {
type Split = FsSplit;
type SplitEnumerator = S3SplitEnumerator;
type SplitReader = S3FileReader;
impl SourceProperties for LegacyS3Properties {
type Split = LegacyFsSplit;
type SplitEnumerator = LegacyS3SplitEnumerator;
type SplitReader = LegacyS3FileReader;

const SOURCE_NAME: &'static str = LEGACY_S3_CONNECTOR;
}

impl UnknownFields for S3Properties {
impl UnknownFields for LegacyS3Properties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

impl From<&S3Properties> for AwsAuthProps {
fn from(props: &S3Properties) -> Self {
impl From<&LegacyS3Properties> for AwsAuthProps {
fn from(props: &LegacyS3Properties) -> Self {
let props = &props.common;
Self {
region: Some(props.region_name.clone()),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/s3/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

mod reader;
mod split_stream;
pub use reader::S3FileReader;
pub use reader::LegacyS3FileReader;
Loading

0 comments on commit aea6d8f

Please sign in to comment.