Skip to content

Commit

Permalink
fix list
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Nov 23, 2023
1 parent 743226f commit 4c774eb
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ impl<C: OpenDalProperties> OpendalEnumerator<C>
where
C: Send + Clone + PartialEq + 'static + Sync,
{
/// create opendal gcs engine.
/// create opendal gcs source.
pub fn new_gcs_source(gcs_properties: GcsProperties) -> anyhow::Result<Self> {
// Create gcs backend builder.
// Create gcs builder.
let mut builder = Gcs::default();

builder.bucket(&gcs_properties.bucket_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,13 @@ where
.op
.lister_with("/")
.delimiter("")
.metakey(Metakey::ContentLength | Metakey::ContentType)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
let stream = stream::unfold(object_lister, |mut object_lister| async move {
match object_lister.next().await {
Some(Ok(object)) => {
// todo: manual filtering prefix
let name = object.path().to_string();

let om = object.metadata();

let t = match om.last_modified() {
Expand All @@ -109,11 +108,11 @@ where
Some((Ok(metadata), object_lister))
}
Some(Err(err)) => {
tracing::error!("list fail");
tracing::error!("list object fail, err {}", err);
Some((Err(err.into()), object_lister))
}
None => {
tracing::info!("list to the end");
tracing::info!("list object completed.");
None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ impl<C: OpenDalProperties> OpendalEnumerator<C>
where
C: Sized + Send + Clone + PartialEq + 'static + Sync,
{
/// create opendal s3 engine.
/// create opendal s3 source.
pub fn new_s3_source(s3_properties: S3Properties) -> anyhow::Result<Self> {
// Create s3 backend builder.
// Create s3 builder.
let mut builder = S3::default();
builder.bucket(&s3_properties.bucket_name);
builder.region(&s3_properties.region_name);
Expand Down
33 changes: 3 additions & 30 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,12 @@ impl ConnectorSource {
ConnectorProperties::Gcs(prop) => {
let lister: OpendalEnumerator<GcsProperties> =
OpendalEnumerator::new_gcs_source(*prop)?;
Ok(build_opendal_fs_list_stream(
FsListCtrlContext {
interval: Duration::from_secs(60),
last_tick: None,
filter_ctx: FsFilterCtrlCtx,
},
lister,
))
Ok(build_opendal_fs_list_stream(lister))
}
ConnectorProperties::OpenDalS3(prop) => {
let lister: OpendalEnumerator<OpendalS3Properties> =
OpendalEnumerator::new_s3_source(prop.s3_properties)?;
Ok(build_opendal_fs_list_stream(
FsListCtrlContext {
interval: Duration::from_secs(60),
last_tick: None,
filter_ctx: FsFilterCtrlCtx,
},
lister,
))
Ok(build_opendal_fs_list_stream(lister))
}
other => Err(internal_error(format!("Unsupported source: {:?}", other))),
}
Expand Down Expand Up @@ -195,20 +181,7 @@ impl ConnectorSource {
}

#[try_stream(boxed, ok = FsPageItem, error = RwError)]
async fn build_opendal_fs_list_stream<C: OpenDalProperties>(
_ctrl_ctx: FsListCtrlContext,
lister: OpendalEnumerator<C>,
) {
// let mut interval = time::interval(ctrl_ctx.interval);
// interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

// let prefix = match lister.get_prefix().as_ref() {
// Some(prefix) => prefix,
// None => {
// let empty_prefix = "".to_string();
// &empty_prefix
// }
// };
async fn build_opendal_fs_list_stream<C: OpenDalProperties>(lister: OpendalEnumerator<C>) {
let prefix = lister
.get_prefix()
.as_ref()
Expand Down
4 changes: 3 additions & 1 deletion src/stream/src/executor/source/list_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::executor::monitor::StreamingMetrics;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::*;

const CHUNK_SIZE: usize = 1024;

#[allow(dead_code)]
pub struct FsListExecutor<S: StateStore> {
actor_ctx: ActorContextRef,
Expand Down Expand Up @@ -103,7 +105,7 @@ impl<S: StateStore> FsListExecutor<S> {
.get_opendal_source_list()
.map_err(StreamExecutorError::connector_error)?;
let chunked_stream = stream
.chunks(1024) // Group FsPageItems into chunks of size 1024
.chunks(CHUNK_SIZE) // Group FsPageItems into chunks of size 1024
.map(|chunk| {
let rows = chunk
.into_iter()
Expand Down

0 comments on commit 4c774eb

Please sign in to comment.