Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Jan 27, 2025
1 parent b3c891a commit 4ff021e
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,12 +666,12 @@ impl<S: StateStore> SourceExecutor<S> {
latest_splits = init_reader_builder
.fetch_latest_splits(
recover_state.clone(),
is_uninitialized && self.is_shared_non_cdc,
self.is_shared_non_cdc,
)
.await?;
}
let reader_stream =
init_reader_builder.into_retry_stream(recover_state.clone(), is_uninitialized);
init_reader_builder.into_retry_stream(recover_state.clone(), is_uninitialized && self.is_shared_non_cdc);

if let Some(latest_splits) = latest_splits {
// make sure it is written to state table later.
Expand Down

0 comments on commit 4ff021e

Please sign in to comment.