diff --git a/src/stream/src/common/compact_chunk.rs b/src/stream/src/common/compact_chunk.rs index 1583ffc47f723..24e194bdb0c82 100644 --- a/src/stream/src/common/compact_chunk.rs +++ b/src/stream/src/common/compact_chunk.rs @@ -27,6 +27,8 @@ use risingwave_common::row::{Project, RowExt}; use risingwave_common::types::DataType; use risingwave_common::util::hash_util::Crc32FastBuilder; +use crate::consistency::consistency_panic; + /// A helper to compact the stream chunks by modifying the `Ops` and visibility of the chunk. pub struct StreamChunkCompactor { chunks: Vec, @@ -34,21 +36,36 @@ pub struct StreamChunkCompactor { } struct OpRowMutRefTuple<'a> { - previous: Option>, - latest: OpRowMutRef<'a>, + before_prev: Option>, + prev: OpRowMutRef<'a>, } impl<'a> OpRowMutRefTuple<'a> { /// return true if no row left - fn push(&mut self, mut op_row: OpRowMutRef<'a>) -> bool { - debug_assert!(self.latest.vis()); - match (self.latest.op(), op_row.op()) { - (Op::Insert, Op::Insert) => panic!("receive duplicated insert on the stream"), - (Op::Delete, Op::Delete) => panic!("receive duplicated delete on the stream"), + fn push(&mut self, mut curr: OpRowMutRef<'a>) -> bool { + debug_assert!(self.prev.vis()); + match (self.prev.op(), curr.op()) { + (Op::Insert, Op::Insert) => { + consistency_panic!("receive duplicated insert on the stream"); + // If need to tolerate inconsistency, override the previous insert. + // Note that because the primary key constraint has been violated, we + // don't mind losing some data here. + self.prev.set_vis(false); + self.prev = curr; + } + (Op::Delete, Op::Delete) => { + consistency_panic!("receive duplicated delete on the stream"); + // If need to tolerate inconsistency, override the previous delete. + // Note that because the primary key constraint has been violated, we + // don't mind losing some data here. + self.prev.set_vis(false); + self.prev = curr; + } (Op::Insert, Op::Delete) => { - self.latest.set_vis(false); - op_row.set_vis(false); - self.latest = if let Some(prev) = self.previous.take() { + // Delete a row that has been inserted, just hide the two ops. + self.prev.set_vis(false); + curr.set_vis(false); + self.prev = if let Some(prev) = self.before_prev.take() { prev } else { return true; @@ -57,8 +74,11 @@ impl<'a> OpRowMutRefTuple<'a> { (Op::Delete, Op::Insert) => { // The operation for the key must be (+, -, +) or (-, +). And the (+, -) must has // been filtered. - debug_assert!(self.previous.is_none()); - self.previous = Some(mem::replace(&mut self.latest, op_row)); + debug_assert!( + self.before_prev.is_none(), + "should have been taken in the above match arm" + ); + self.before_prev = Some(mem::replace(&mut self.prev, curr)); } // `all the updateDelete` and `updateInsert` should be normalized to `delete` // and`insert` @@ -68,10 +88,10 @@ impl<'a> OpRowMutRefTuple<'a> { } fn as_update_op(&mut self) -> Option<(&mut OpRowMutRef<'a>, &mut OpRowMutRef<'a>)> { - self.previous.as_mut().map(|prev| { + self.before_prev.as_mut().map(|prev| { debug_assert_eq!(prev.op(), Op::Delete); - debug_assert_eq!(self.latest.op(), Op::Insert); - (prev, &mut self.latest) + debug_assert_eq!(self.prev.op(), Op::Insert); + (prev, &mut self.prev) }) } } @@ -219,8 +239,8 @@ impl StreamChunkCompactor { match op_row_map.entry(Prehashed::new(key, hash)) { Entry::Vacant(v) => { v.insert(OpRowMutRefTuple { - previous: None, - latest: op_row, + before_prev: None, + prev: op_row, }); } Entry::Occupied(mut o) => { diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 8e19e35fd5bab..d05cd891565e6 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -67,7 +67,6 @@ use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; use crate::cache::cache_may_stale; use crate::common::cache::{StateCache, StateCacheFiller}; use crate::common::table::state_table_cache::StateTableWatermarkCache; -use crate::consistency::insane; use crate::executor::{StreamExecutorError, StreamExecutorResult}; /// This num is arbitrary and we may want to improve this choice in the future. @@ -363,7 +362,7 @@ where ) }; - let is_consistent_op = if insane() { + let is_consistent_op = if crate::consistency::insane() { // In insane mode, we will have inconsistent operations applied on the table, even if // our executor code do not expect that. false diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index a2b229d118ed9..28c5edd57ead8 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -35,7 +35,7 @@ use tokio::time::Instant; use tracing::{event, Instrument}; use super::exchange::output::{new_output, BoxedOutput}; -use super::{AddMutation, Executor, UpdateMutation, Watermark}; +use super::{AddMutation, Executor, TroublemakerExecutor, UpdateMutation, Watermark}; use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Message, Mutation, StreamConsumer}; @@ -341,13 +341,22 @@ impl DispatchExecutorInner { impl DispatchExecutor { pub fn new( - input: Executor, + mut input: Executor, dispatchers: Vec, actor_id: u32, fragment_id: u32, context: Arc, metrics: Arc, + chunk_size: usize, ) -> Self { + if crate::consistency::insane() { + // make some trouble before dispatching to avoid generating invalid dist key. + let mut info = input.info().clone(); + info.identity = format!("{} (embedded trouble)", info.identity); + let troublemaker = TroublemakerExecutor::new(input, chunk_size); + input = (info, troublemaker).into(); + } + let actor_id_str = actor_id.to_string(); let fragment_id_str = fragment_id.to_string(); let actor_out_record_cnt = metrics @@ -1033,6 +1042,7 @@ mod tests { use risingwave_common::array::stream_chunk::StreamChunkTestExt; use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder, Op}; use risingwave_common::catalog::Schema; + use risingwave_common::config; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; use risingwave_common::util::hash_util::Crc32FastBuilder; @@ -1204,6 +1214,7 @@ mod tests { fragment_id, ctx.clone(), metrics, + config::default::developer::stream_chunk_size(), )) .execute(); pin_mut!(executor); diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index f6ca469fb1623..fe94c6449ac49 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -41,6 +41,7 @@ use super::monitor::StreamingMetrics; use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message}; use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable}; use crate::common::StreamChunkBuilder; +use crate::consistency::consistency_panic; use crate::executor::expect_first_barrier_from_aligned_stream; use crate::task::ActorEvalErrorReport; @@ -362,10 +363,10 @@ impl DynamicFilterExecutor; @@ -48,12 +49,12 @@ impl LookupCache { match op { Op::Insert | Op::UpdateInsert => { if !values.insert(row) { - panic!("inserting a duplicated value"); + consistency_panic!("inserting a duplicated value"); } } Op::Delete | Op::UpdateDelete => { if !values.remove(&row) { - panic!("row {:?} should be in the cache", row); + consistency_panic!("row {:?} should be in the cache", row); } } } diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index 333173053a861..3ce58e4e6017d 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -25,6 +25,7 @@ use risingwave_storage::StateStore; use super::topn_cache_state::TopNCacheState; use super::{CacheKey, GroupKey, ManagedTopNState}; +use crate::consistency::{consistency_error, enable_strict_consistency}; use crate::executor::error::StreamExecutorResult; const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2; @@ -350,6 +351,12 @@ impl TopNCacheTrait for TopNCache { res_ops: &mut Vec, res_rows: &mut Vec, ) -> StreamExecutorResult<()> { + if !enable_strict_consistency() && self.table_row_count == Some(0) { + // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we + // should not panic. Instead, we pretend that we don't know about the actually row count. + consistency_error!("table row count is 0, but we receive a DELETE operation"); + self.table_row_count = None; + } if let Some(row_count) = self.table_row_count.as_mut() { *row_count -= 1; } @@ -524,6 +531,11 @@ impl TopNCacheTrait for TopNCache { res_ops: &mut Vec, res_rows: &mut Vec, ) -> StreamExecutorResult<()> { + if !enable_strict_consistency() && self.table_row_count == Some(0) { + // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we + // should not panic. Instead, we pretend that we don't know about the actually row count. + self.table_row_count = None; + } if let Some(row_count) = self.table_row_count.as_mut() { *row_count -= 1; } diff --git a/src/stream/src/executor/troublemaker.rs b/src/stream/src/executor/troublemaker.rs index 655efba6bf6c1..72a257f331450 100644 --- a/src/stream/src/executor/troublemaker.rs +++ b/src/stream/src/executor/troublemaker.rs @@ -110,7 +110,10 @@ impl TroublemakerExecutor { } } Message::Barrier(barrier) => { - assert!(vars.chunk_builder.take().is_none(), "we don't merge chunks"); + assert!( + vars.chunk_builder.take().is_none(), + "we don't merge chunks across barriers" + ); yield Message::Barrier(barrier); } _ => yield msg, diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index a628b3a841d7b..28bd0a9e220ab 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -385,6 +385,7 @@ impl StreamActorManager { /// Create dispatchers with downstream information registered before fn create_dispatcher( &self, + env: StreamEnvironment, input: Executor, dispatchers: &[stream_plan::Dispatcher], actor_id: ActorId, @@ -403,6 +404,7 @@ impl StreamActorManager { fragment_id, shared_context.clone(), self.streaming_metrics.clone(), + env.config().developer.chunk_size, )) } @@ -597,6 +599,7 @@ impl StreamActorManager { .await?; let dispatcher = self.create_dispatcher( + self.env.clone(), executor, &actor.dispatcher, actor_id,