From 4c774eb4bcd1cf79d7727aa78b7935b6f9d4f508 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 23 Nov 2023 17:59:39 +0800 Subject: [PATCH] fix list --- .../filesystem/opendal_source/gcs_source.rs | 4 +-- .../opendal_source/opendal_enumerator.rs | 7 ++-- .../filesystem/opendal_source/s3_source.rs | 4 +-- src/source/src/connector_source.rs | 33 ++----------------- .../src/executor/source/list_executor.rs | 4 ++- 5 files changed, 13 insertions(+), 39 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs index 3fa91d9a541a1..c3b49c1eecb0a 100644 --- a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs @@ -26,9 +26,9 @@ impl OpendalEnumerator where C: Send + Clone + PartialEq + 'static + Sync, { - /// create opendal gcs engine. + /// create opendal gcs source. pub fn new_gcs_source(gcs_properties: GcsProperties) -> anyhow::Result { - // Create gcs backend builder. + // Create gcs builder. let mut builder = Gcs::default(); builder.bucket(&gcs_properties.bucket_name); diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index 1d259fd90a6b4..cf9586bfb2a5e 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -82,14 +82,13 @@ where .op .lister_with("/") .delimiter("") - .metakey(Metakey::ContentLength | Metakey::ContentType) + .metakey(Metakey::ContentLength | Metakey::LastModified) .await?; let stream = stream::unfold(object_lister, |mut object_lister| async move { match object_lister.next().await { Some(Ok(object)) => { // todo: manual filtering prefix let name = object.path().to_string(); - let om = object.metadata(); let t = match om.last_modified() { @@ -109,11 +108,11 @@ where Some((Ok(metadata), object_lister)) } Some(Err(err)) => { - tracing::error!("list fail"); + tracing::error!("list object fail, err {}", err); Some((Err(err.into()), object_lister)) } None => { - tracing::info!("list to the end"); + tracing::info!("list object completed."); None } } diff --git a/src/connector/src/source/filesystem/opendal_source/s3_source.rs b/src/connector/src/source/filesystem/opendal_source/s3_source.rs index 17fefb0623fdc..0681bbd0c3448 100644 --- a/src/connector/src/source/filesystem/opendal_source/s3_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/s3_source.rs @@ -28,9 +28,9 @@ impl OpendalEnumerator where C: Sized + Send + Clone + PartialEq + 'static + Sync, { - /// create opendal s3 engine. + /// create opendal s3 source. pub fn new_s3_source(s3_properties: S3Properties) -> anyhow::Result { - // Create s3 backend builder. + // Create s3 builder. let mut builder = S3::default(); builder.bucket(&s3_properties.bucket_name); builder.region(&s3_properties.region_name); diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index f3e1560c55022..d0003c32b8921 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -97,26 +97,12 @@ impl ConnectorSource { ConnectorProperties::Gcs(prop) => { let lister: OpendalEnumerator = OpendalEnumerator::new_gcs_source(*prop)?; - Ok(build_opendal_fs_list_stream( - FsListCtrlContext { - interval: Duration::from_secs(60), - last_tick: None, - filter_ctx: FsFilterCtrlCtx, - }, - lister, - )) + Ok(build_opendal_fs_list_stream(lister)) } ConnectorProperties::OpenDalS3(prop) => { let lister: OpendalEnumerator = OpendalEnumerator::new_s3_source(prop.s3_properties)?; - Ok(build_opendal_fs_list_stream( - FsListCtrlContext { - interval: Duration::from_secs(60), - last_tick: None, - filter_ctx: FsFilterCtrlCtx, - }, - lister, - )) + Ok(build_opendal_fs_list_stream(lister)) } other => Err(internal_error(format!("Unsupported source: {:?}", other))), } @@ -195,20 +181,7 @@ impl ConnectorSource { } #[try_stream(boxed, ok = FsPageItem, error = RwError)] -async fn build_opendal_fs_list_stream( - _ctrl_ctx: FsListCtrlContext, - lister: OpendalEnumerator, -) { - // let mut interval = time::interval(ctrl_ctx.interval); - // interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - // let prefix = match lister.get_prefix().as_ref() { - // Some(prefix) => prefix, - // None => { - // let empty_prefix = "".to_string(); - // &empty_prefix - // } - // }; +async fn build_opendal_fs_list_stream(lister: OpendalEnumerator) { let prefix = lister .get_prefix() .as_ref() diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 0a142569fbdf2..931fec5c2c0b3 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -34,6 +34,8 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; +const CHUNK_SIZE: usize = 1024; + #[allow(dead_code)] pub struct FsListExecutor { actor_ctx: ActorContextRef, @@ -103,7 +105,7 @@ impl FsListExecutor { .get_opendal_source_list() .map_err(StreamExecutorError::connector_error)?; let chunked_stream = stream - .chunks(1024) // Group FsPageItems into chunks of size 1024 + .chunks(CHUNK_SIZE) // Group FsPageItems into chunks of size 1024 .map(|chunk| { let rows = chunk .into_iter()