Skip to content

Commit

Permalink
feat(stream): make "insane" mode more insane, and tolerate inconsiste…
Browse files Browse the repository at this point in the history
…ncy in some more places (#16083)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Apr 3, 2024
1 parent 0f0e3e7 commit 6e35a50
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 28 deletions.
54 changes: 37 additions & 17 deletions src/stream/src/common/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,45 @@ use risingwave_common::row::{Project, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::hash_util::Crc32FastBuilder;

use crate::consistency::consistency_panic;

/// A helper to compact the stream chunks by modifying the `Ops` and visibility of the chunk.
pub struct StreamChunkCompactor {
chunks: Vec<StreamChunk>,
key: Vec<usize>,
}

struct OpRowMutRefTuple<'a> {
previous: Option<OpRowMutRef<'a>>,
latest: OpRowMutRef<'a>,
before_prev: Option<OpRowMutRef<'a>>,
prev: OpRowMutRef<'a>,
}

impl<'a> OpRowMutRefTuple<'a> {
/// return true if no row left
fn push(&mut self, mut op_row: OpRowMutRef<'a>) -> bool {
debug_assert!(self.latest.vis());
match (self.latest.op(), op_row.op()) {
(Op::Insert, Op::Insert) => panic!("receive duplicated insert on the stream"),
(Op::Delete, Op::Delete) => panic!("receive duplicated delete on the stream"),
fn push(&mut self, mut curr: OpRowMutRef<'a>) -> bool {
debug_assert!(self.prev.vis());
match (self.prev.op(), curr.op()) {
(Op::Insert, Op::Insert) => {
consistency_panic!("receive duplicated insert on the stream");
// If need to tolerate inconsistency, override the previous insert.
// Note that because the primary key constraint has been violated, we
// don't mind losing some data here.
self.prev.set_vis(false);
self.prev = curr;
}
(Op::Delete, Op::Delete) => {
consistency_panic!("receive duplicated delete on the stream");
// If need to tolerate inconsistency, override the previous delete.
// Note that because the primary key constraint has been violated, we
// don't mind losing some data here.
self.prev.set_vis(false);
self.prev = curr;
}
(Op::Insert, Op::Delete) => {
self.latest.set_vis(false);
op_row.set_vis(false);
self.latest = if let Some(prev) = self.previous.take() {
// Delete a row that has been inserted, just hide the two ops.
self.prev.set_vis(false);
curr.set_vis(false);
self.prev = if let Some(prev) = self.before_prev.take() {
prev
} else {
return true;
Expand All @@ -57,8 +74,11 @@ impl<'a> OpRowMutRefTuple<'a> {
(Op::Delete, Op::Insert) => {
// The operation for the key must be (+, -, +) or (-, +). And the (+, -) must has
// been filtered.
debug_assert!(self.previous.is_none());
self.previous = Some(mem::replace(&mut self.latest, op_row));
debug_assert!(
self.before_prev.is_none(),
"should have been taken in the above match arm"
);
self.before_prev = Some(mem::replace(&mut self.prev, curr));
}
// `all the updateDelete` and `updateInsert` should be normalized to `delete`
// and`insert`
Expand All @@ -68,10 +88,10 @@ impl<'a> OpRowMutRefTuple<'a> {
}

fn as_update_op(&mut self) -> Option<(&mut OpRowMutRef<'a>, &mut OpRowMutRef<'a>)> {
self.previous.as_mut().map(|prev| {
self.before_prev.as_mut().map(|prev| {
debug_assert_eq!(prev.op(), Op::Delete);
debug_assert_eq!(self.latest.op(), Op::Insert);
(prev, &mut self.latest)
debug_assert_eq!(self.prev.op(), Op::Insert);
(prev, &mut self.prev)
})
}
}
Expand Down Expand Up @@ -219,8 +239,8 @@ impl StreamChunkCompactor {
match op_row_map.entry(Prehashed::new(key, hash)) {
Entry::Vacant(v) => {
v.insert(OpRowMutRefTuple {
previous: None,
latest: op_row,
before_prev: None,
prev: op_row,
});
}
Entry::Occupied(mut o) => {
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy};
use crate::cache::cache_may_stale;
use crate::common::cache::{StateCache, StateCacheFiller};
use crate::common::table::state_table_cache::StateTableWatermarkCache;
use crate::consistency::insane;
use crate::executor::{StreamExecutorError, StreamExecutorResult};

/// This num is arbitrary and we may want to improve this choice in the future.
Expand Down Expand Up @@ -363,7 +362,7 @@ where
)
};

let is_consistent_op = if insane() {
let is_consistent_op = if crate::consistency::insane() {
// In insane mode, we will have inconsistent operations applied on the table, even if
// our executor code do not expect that.
false
Expand Down
15 changes: 13 additions & 2 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tokio::time::Instant;
use tracing::{event, Instrument};

use super::exchange::output::{new_output, BoxedOutput};
use super::{AddMutation, Executor, UpdateMutation, Watermark};
use super::{AddMutation, Executor, TroublemakerExecutor, UpdateMutation, Watermark};
use crate::error::StreamResult;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{Barrier, Message, Mutation, StreamConsumer};
Expand Down Expand Up @@ -341,13 +341,22 @@ impl DispatchExecutorInner {

impl DispatchExecutor {
pub fn new(
input: Executor,
mut input: Executor,
dispatchers: Vec<DispatcherImpl>,
actor_id: u32,
fragment_id: u32,
context: Arc<SharedContext>,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
) -> Self {
if crate::consistency::insane() {
// make some trouble before dispatching to avoid generating invalid dist key.
let mut info = input.info().clone();
info.identity = format!("{} (embedded trouble)", info.identity);
let troublemaker = TroublemakerExecutor::new(input, chunk_size);
input = (info, troublemaker).into();
}

let actor_id_str = actor_id.to_string();
let fragment_id_str = fragment_id.to_string();
let actor_out_record_cnt = metrics
Expand Down Expand Up @@ -1033,6 +1042,7 @@ mod tests {
use risingwave_common::array::stream_chunk::StreamChunkTestExt;
use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder, Op};
use risingwave_common::catalog::Schema;
use risingwave_common::config;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_common::util::hash_util::Crc32FastBuilder;
Expand Down Expand Up @@ -1204,6 +1214,7 @@ mod tests {
fragment_id,
ctx.clone(),
metrics,
config::default::developer::stream_chunk_size(),
))
.execute();
pin_mut!(executor);
Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use super::monitor::StreamingMetrics;
use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message};
use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable};
use crate::common::StreamChunkBuilder;
use crate::consistency::consistency_panic;
use crate::executor::expect_first_barrier_from_aligned_stream;
use crate::task::ActorEvalErrorReport;

Expand Down Expand Up @@ -362,10 +363,10 @@ impl<S: StateStore, const USE_WATERMARK_CACHE: bool> DynamicFilterExecutor<S, US
if Some(row.datum_at(0))
!= current_epoch_value.as_ref().map(ToDatumRef::to_datum_ref)
{
bail!(
"Inconsistent Delete - current: {:?}, delete: {:?}",
current_epoch_value,
row
consistency_panic!(
current = ?current_epoch_value,
to_delete = ?row,
"inconsistent delete",
);
}
current_epoch_value = None;
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures_async_stream::try_stream;
use multimap::MultiMap;
use risingwave_common::array::*;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::config;
use risingwave_common::types::*;
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_expr::aggregate::AggCall;
Expand Down Expand Up @@ -127,6 +128,7 @@ async fn test_merger_sum_aggr() {
0,
ctx,
metrics,
config::default::developer::stream_chunk_size(),
);
let actor = Actor::new(
dispatcher,
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/lookup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common_estimate_size::collections::{EstimatedHashSet, EstimatedVe

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::consistency::consistency_panic;
use crate::task::AtomicU64Ref;

pub type LookupEntryState = EstimatedHashSet<OwnedRow>;
Expand Down Expand Up @@ -48,12 +49,12 @@ impl LookupCache {
match op {
Op::Insert | Op::UpdateInsert => {
if !values.insert(row) {
panic!("inserting a duplicated value");
consistency_panic!("inserting a duplicated value");
}
}
Op::Delete | Op::UpdateDelete => {
if !values.remove(&row) {
panic!("row {:?} should be in the cache", row);
consistency_panic!("row {:?} should be in the cache", row);
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/stream/src/executor/top_n/top_n_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_storage::StateStore;

use super::topn_cache_state::TopNCacheState;
use super::{CacheKey, GroupKey, ManagedTopNState};
use crate::consistency::{consistency_error, enable_strict_consistency};
use crate::executor::error::StreamExecutorResult;

const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
Expand Down Expand Up @@ -350,6 +351,12 @@ impl TopNCacheTrait for TopNCache<false> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
if !enable_strict_consistency() && self.table_row_count == Some(0) {
// If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
// should not panic. Instead, we pretend that we don't know about the actually row count.
consistency_error!("table row count is 0, but we receive a DELETE operation");
self.table_row_count = None;
}
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}
Expand Down Expand Up @@ -524,6 +531,11 @@ impl TopNCacheTrait for TopNCache<true> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
if !enable_strict_consistency() && self.table_row_count == Some(0) {
// If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
// should not panic. Instead, we pretend that we don't know about the actually row count.
self.table_row_count = None;
}
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/troublemaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ impl TroublemakerExecutor {
}
}
Message::Barrier(barrier) => {
assert!(vars.chunk_builder.take().is_none(), "we don't merge chunks");
assert!(
vars.chunk_builder.take().is_none(),
"we don't merge chunks across barriers"
);
yield Message::Barrier(barrier);
}
_ => yield msg,
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ impl StreamActorManager {
/// Create dispatchers with downstream information registered before
fn create_dispatcher(
&self,
env: StreamEnvironment,
input: Executor,
dispatchers: &[stream_plan::Dispatcher],
actor_id: ActorId,
Expand All @@ -403,6 +404,7 @@ impl StreamActorManager {
fragment_id,
shared_context.clone(),
self.streaming_metrics.clone(),
env.config().developer.chunk_size,
))
}

Expand Down Expand Up @@ -597,6 +599,7 @@ impl StreamActorManager {
.await?;

let dispatcher = self.create_dispatcher(
self.env.clone(),
executor,
&actor.dispatcher,
actor_id,
Expand Down

0 comments on commit 6e35a50

Please sign in to comment.