Skip to content

Commit

Permalink
return pending if no results from the logstore side
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 6, 2025
1 parent 30848e8 commit 1c4dd36
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
//! - [] Handle paused stream
use std::collections::VecDeque;
use std::future::pending;
use std::pin::Pin;

use await_tree::InstrumentAwait;
Expand Down Expand Up @@ -294,7 +295,7 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
buffer
) => {
let logstore_item = logstore_item?;
Ok(logstore_item.map(Message::Chunk))
Ok(Some(Message::Chunk(logstore_item)))
}
}
}
Expand All @@ -314,15 +315,15 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
read_flushed_chunk_future: &mut Option<ReadFlushedChunkFuture>,
state_store: &S,
buffer: &mut SyncedLogStoreBuffer,
) -> StreamExecutorResult<Option<StreamChunk>> {
) -> StreamExecutorResult<StreamChunk> {
// 1. read state store
if let Some(chunk) = Self::try_next_state_store_item(log_store_state).await? {
return Ok(Some(chunk));
return Ok(chunk);
}

// 2. read existing flushed chunk future
if let Some(chunk) = Self::try_next_flushed_chunk_future(read_flushed_chunk_future).await? {
return Ok(Some(chunk));
return Ok(chunk);
}

// 3. read buffer
Expand All @@ -337,9 +338,9 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
)
.await?
{
return Ok(Some(chunk));
return Ok(chunk);
}
Ok(None)
pending().await
}

async fn try_next_state_store_item(
Expand Down

0 comments on commit 1c4dd36

Please sign in to comment.