Skip to content

Commit

Permalink
no need lock
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 27, 2025
1 parent 5cccf42 commit 484a5f8
Showing 1 changed file with 11 additions and 16 deletions.
27 changes: 11 additions & 16 deletions src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::array::StreamChunk;
use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{TableId, TableOption};
Expand Down Expand Up @@ -117,7 +116,7 @@ struct SyncedKvLogStoreExecutor<S: StateStore> {
flushed_chunk_future: Option<ReadFlushedChunkFuture>,
state_store: S,
local_state_store: S::Local,
buffer: Mutex<SyncedLogStoreBuffer>,
buffer: SyncedLogStoreBuffer,
}
// Stream interface
impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
Expand Down Expand Up @@ -155,12 +154,12 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
flushed_chunk_future: None,
state_store,
local_state_store,
buffer: Mutex::new(SyncedLogStoreBuffer {
buffer: SyncedLogStoreBuffer {
buffer: VecDeque::new(),
max_size: buffer_max_size,
next_chunk_id: 0,
metrics,
}),
},
upstream,
}
}
Expand Down Expand Up @@ -242,7 +241,7 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
state_store_stream: &mut Option<StateStoreStream<S>>,
flushed_chunk_future: &mut Option<ReadFlushedChunkFuture>,
state_store: &S,
buffer: &mut Mutex<SyncedLogStoreBuffer>,
buffer: &mut SyncedLogStoreBuffer,

local_state_store: &mut S::Local,
metrics: &mut KvLogStoreMetrics,
Expand Down Expand Up @@ -335,7 +334,7 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
log_store_state: &mut Option<StateStoreStream<S>>,
read_flushed_chunk_future: &mut Option<ReadFlushedChunkFuture>,
state_store: &S,
buffer: &mut Mutex<SyncedLogStoreBuffer>,
buffer: &mut SyncedLogStoreBuffer,
) -> StreamExecutorResult<Option<StreamChunk>> {
// 1. read state store
if let Some(chunk) = Self::try_next_state_store_item(log_store_state).await? {
Expand Down Expand Up @@ -412,11 +411,11 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
read_flushed_chunk_future: &mut Option<ReadFlushedChunkFuture>,
serde: &LogStoreRowSerde,
state_store: &S,
buffer: &mut Mutex<SyncedLogStoreBuffer>,
buffer: &mut SyncedLogStoreBuffer,
table_id: TableId,
read_metrics: &KvLogStoreReadMetrics,
) -> StreamExecutorResult<Option<StreamChunk>> {
let Some((item_epoch, item)) = buffer.lock().pop_front() else {
let Some((item_epoch, item)) = buffer.pop_front() else {
return Ok(None);
};
match item {
Expand Down Expand Up @@ -470,7 +469,7 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
metrics: &mut KvLogStoreMetrics,
truncation_offset: Option<ReaderTruncationOffsetType>,
seq_id: &mut SeqIdType,
buffer: &mut Mutex<SyncedLogStoreBuffer>,
buffer: &mut SyncedLogStoreBuffer,
actor_id: ActorId,
) -> StreamExecutorResult<()> {
let epoch = state_store.epoch();
Expand Down Expand Up @@ -507,7 +506,6 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
);

// Add to buffer
let mut buffer = buffer.lock();
buffer.buffer.push_back((
epoch,
LogStoreBufferItem::Barrier {
Expand Down Expand Up @@ -536,14 +534,12 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
serde: &LogStoreRowSerde,
start_seq_id: SeqIdType,
end_seq_id: SeqIdType,
buffer: &mut Mutex<SyncedLogStoreBuffer>,
buffer: &mut SyncedLogStoreBuffer,
chunk: StreamChunk,
state_store: &mut S::Local,
) -> StreamExecutorResult<()> {
let chunk_to_flush = {
let mut buffer = buffer.lock();
buffer.add_or_flush_chunk(start_seq_id, end_seq_id, chunk, state_store)
};
let chunk_to_flush =
{ buffer.add_or_flush_chunk(start_seq_id, end_seq_id, chunk, state_store) };
match chunk_to_flush {
None => {}
Some(chunk_to_flush) => {
Expand All @@ -557,7 +553,6 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
)
.await?;
{
let mut buffer = buffer.lock();
buffer.add_flushed_item_to_buffer(
start_seq_id,
end_seq_id,
Expand Down

0 comments on commit 484a5f8

Please sign in to comment.