From 0d278ab7494c9caeb0de497c5ac058faff6bad32 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 6 Feb 2025 16:19:53 +0800 Subject: [PATCH] refine --- src/stream/src/executor/asof_join.rs | 2 +- src/stream/src/executor/hash_agg.rs | 5 ++--- src/stream/src/executor/hash_join.rs | 1 + src/stream/src/executor/join/hash_join.rs | 8 +++----- src/stream/src/executor/top_n/utils.rs | 4 +++- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/stream/src/executor/asof_join.rs b/src/stream/src/executor/asof_join.rs index 3e46acf94056a..49f8b3cb4f2d3 100644 --- a/src/stream/src/executor/asof_join.rs +++ b/src/stream/src/executor/asof_join.rs @@ -461,11 +461,11 @@ impl AsOfJoinExecutor if left_post_commit .post_yield_barrier(update_vnode_bitmap) .await? + .unwrap_or(false) { self.watermark_buffers .values_mut() .for_each(|buffers| buffers.clear()); - // self.inequality_watermarks.fill(None); } // Report metrics of cached join rows/entries diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index a08837609defe..493a2968fcddc 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -673,9 +673,8 @@ impl HashAggExecutor { post_commit.post_yield_barrier(update_vnode_bitmap.clone()) })) .await? - .into_iter() - .flatten() - .next() + .pop() + .expect("should have at least one table") .map(|(_, cache_may_stale)| cache_may_stale) { // Manipulate the cache if necessary. diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 98d890bfc2a21..2656af4890783 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -660,6 +660,7 @@ impl HashJoinExecutor JoinHashMapPostCommit<'_, K, S> { pub async fn post_yield_barrier( self, vnode_bitmap: Option>, - ) -> StreamExecutorResult { + ) -> StreamExecutorResult> { let cache_may_stale = self.state.post_yield_barrier(vnode_bitmap.clone()).await?; if let Some(degree_state) = self.degree_state { let _ = degree_state.post_yield_barrier(vnode_bitmap).await?; } - let cache_may_stale = cache_may_stale - .map(|(_, cache_may_stale)| cache_may_stale) - .unwrap_or(false); - if cache_may_stale { + let cache_may_stale = cache_may_stale.map(|(_, cache_may_stale)| cache_may_stale); + if cache_may_stale.unwrap_or(false) { self.inner.clear(); } Ok(cache_may_stale) diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index 105edf524a2d1..a0058836ee954 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -40,7 +40,9 @@ pub trait TopNExecutorBase: Send + 'static { /// Flush the buffered chunk to the storage backend. fn try_flush_data(&mut self) -> impl Future> + Send; - fn clear_cache(&mut self) {} + fn clear_cache(&mut self) { + unreachable!() + } fn evict(&mut self) {}