From 484a5f809c7f4780c69c65792142d95f90203e22 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 27 Jan 2025 17:42:29 +0800 Subject: [PATCH] no need lock --- src/stream/src/executor/sync_kv_log_store.rs | 27 ++++++++------------ 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/stream/src/executor/sync_kv_log_store.rs b/src/stream/src/executor/sync_kv_log_store.rs index 239733a86a5bb..107c3f33065e6 100644 --- a/src/stream/src/executor/sync_kv_log_store.rs +++ b/src/stream/src/executor/sync_kv_log_store.rs @@ -67,7 +67,6 @@ use futures::future::BoxFuture; use futures::{FutureExt, TryStreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; -use parking_lot::Mutex; use risingwave_common::array::StreamChunk; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::{TableId, TableOption}; @@ -117,7 +116,7 @@ struct SyncedKvLogStoreExecutor { flushed_chunk_future: Option, state_store: S, local_state_store: S::Local, - buffer: Mutex, + buffer: SyncedLogStoreBuffer, } // Stream interface impl SyncedKvLogStoreExecutor { @@ -155,12 +154,12 @@ impl SyncedKvLogStoreExecutor { flushed_chunk_future: None, state_store, local_state_store, - buffer: Mutex::new(SyncedLogStoreBuffer { + buffer: SyncedLogStoreBuffer { buffer: VecDeque::new(), max_size: buffer_max_size, next_chunk_id: 0, metrics, - }), + }, upstream, } } @@ -242,7 +241,7 @@ impl SyncedKvLogStoreExecutor { state_store_stream: &mut Option>, flushed_chunk_future: &mut Option, state_store: &S, - buffer: &mut Mutex, + buffer: &mut SyncedLogStoreBuffer, local_state_store: &mut S::Local, metrics: &mut KvLogStoreMetrics, @@ -335,7 +334,7 @@ impl SyncedKvLogStoreExecutor { log_store_state: &mut Option>, read_flushed_chunk_future: &mut Option, state_store: &S, - buffer: &mut Mutex, + buffer: &mut SyncedLogStoreBuffer, ) -> StreamExecutorResult> { // 1. read state store if let Some(chunk) = Self::try_next_state_store_item(log_store_state).await? { @@ -412,11 +411,11 @@ impl SyncedKvLogStoreExecutor { read_flushed_chunk_future: &mut Option, serde: &LogStoreRowSerde, state_store: &S, - buffer: &mut Mutex, + buffer: &mut SyncedLogStoreBuffer, table_id: TableId, read_metrics: &KvLogStoreReadMetrics, ) -> StreamExecutorResult> { - let Some((item_epoch, item)) = buffer.lock().pop_front() else { + let Some((item_epoch, item)) = buffer.pop_front() else { return Ok(None); }; match item { @@ -470,7 +469,7 @@ impl SyncedKvLogStoreExecutor { metrics: &mut KvLogStoreMetrics, truncation_offset: Option, seq_id: &mut SeqIdType, - buffer: &mut Mutex, + buffer: &mut SyncedLogStoreBuffer, actor_id: ActorId, ) -> StreamExecutorResult<()> { let epoch = state_store.epoch(); @@ -507,7 +506,6 @@ impl SyncedKvLogStoreExecutor { ); // Add to buffer - let mut buffer = buffer.lock(); buffer.buffer.push_back(( epoch, LogStoreBufferItem::Barrier { @@ -536,14 +534,12 @@ impl SyncedKvLogStoreExecutor { serde: &LogStoreRowSerde, start_seq_id: SeqIdType, end_seq_id: SeqIdType, - buffer: &mut Mutex, + buffer: &mut SyncedLogStoreBuffer, chunk: StreamChunk, state_store: &mut S::Local, ) -> StreamExecutorResult<()> { - let chunk_to_flush = { - let mut buffer = buffer.lock(); - buffer.add_or_flush_chunk(start_seq_id, end_seq_id, chunk, state_store) - }; + let chunk_to_flush = + { buffer.add_or_flush_chunk(start_seq_id, end_seq_id, chunk, state_store) }; match chunk_to_flush { None => {} Some(chunk_to_flush) => { @@ -557,7 +553,6 @@ impl SyncedKvLogStoreExecutor { ) .await?; { - let mut buffer = buffer.lock(); buffer.add_flushed_item_to_buffer( start_seq_id, end_seq_id,