Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 6, 2025
1 parent 4083a47 commit 0d278ab
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/executor/asof_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,11 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
if left_post_commit
.post_yield_barrier(update_vnode_bitmap)
.await?
.unwrap_or(false)
{
self.watermark_buffers
.values_mut()
.for_each(|buffers| buffers.clear());
// self.inequality_watermarks.fill(None);
}

// Report metrics of cached join rows/entries
Expand Down
5 changes: 2 additions & 3 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,8 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
post_commit.post_yield_barrier(update_vnode_bitmap.clone())
}))
.await?
.into_iter()
.flatten()
.next()
.pop()
.expect("should have at least one table")
.map(|(_, cache_may_stale)| cache_may_stale)
{
// Manipulate the cache if necessary.
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
if left_post_commit
.post_yield_barrier(update_vnode_bitmap)
.await?
.unwrap_or(false)
{
self.watermark_buffers
.values_mut()
Expand Down
8 changes: 3 additions & 5 deletions src/stream/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,15 +502,13 @@ impl<K: HashKey, S: StateStore> JoinHashMapPostCommit<'_, K, S> {
pub async fn post_yield_barrier(
self,
vnode_bitmap: Option<Arc<Bitmap>>,
) -> StreamExecutorResult<bool> {
) -> StreamExecutorResult<Option<bool>> {
let cache_may_stale = self.state.post_yield_barrier(vnode_bitmap.clone()).await?;
if let Some(degree_state) = self.degree_state {
let _ = degree_state.post_yield_barrier(vnode_bitmap).await?;
}
let cache_may_stale = cache_may_stale
.map(|(_, cache_may_stale)| cache_may_stale)
.unwrap_or(false);
if cache_may_stale {
let cache_may_stale = cache_may_stale.map(|(_, cache_may_stale)| cache_may_stale);
if cache_may_stale.unwrap_or(false) {
self.inner.clear();
}
Ok(cache_may_stale)
Expand Down
4 changes: 3 additions & 1 deletion src/stream/src/executor/top_n/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ pub trait TopNExecutorBase: Send + 'static {
/// Flush the buffered chunk to the storage backend.
fn try_flush_data(&mut self) -> impl Future<Output = StreamExecutorResult<()>> + Send;

fn clear_cache(&mut self) {}
fn clear_cache(&mut self) {
unreachable!()
}

fn evict(&mut self) {}

Expand Down

0 comments on commit 0d278ab

Please sign in to comment.