From 5cccf4242bd9648429861a7ce6a20054ae3e5595 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 27 Jan 2025 16:53:03 +0800 Subject: [PATCH] update read path --- src/stream/src/executor/sync_kv_log_store.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/stream/src/executor/sync_kv_log_store.rs b/src/stream/src/executor/sync_kv_log_store.rs index 417ae253f4cae..239733a86a5bb 100644 --- a/src/stream/src/executor/sync_kv_log_store.rs +++ b/src/stream/src/executor/sync_kv_log_store.rs @@ -268,6 +268,17 @@ impl SyncedKvLogStoreExecutor { buffer, actor_id, ).await?; + let should_update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id).is_some(); + if should_update_vnode_bitmap { + *state_store_stream = Some(read_persisted_log_store( + serde, + table_id, + metrics, + state_store.clone(), + barrier.epoch.prev, + None, + ).await?); + } Ok(Some(Message::Barrier(barrier))) } Message::Chunk(chunk) => {