Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): make "insane" mode more insane, and tolerate inconsistency in some more places #16083

Merged
merged 8 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions src/stream/src/common/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,45 @@ use risingwave_common::row::{Project, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::hash_util::Crc32FastBuilder;

use crate::consistency::consistency_panic;

/// A helper to compact the stream chunks by modifying the `Ops` and visibility of the chunk.
pub struct StreamChunkCompactor {
chunks: Vec<StreamChunk>,
key: Vec<usize>,
}

struct OpRowMutRefTuple<'a> {
previous: Option<OpRowMutRef<'a>>,
latest: OpRowMutRef<'a>,
before_prev: Option<OpRowMutRef<'a>>,
prev: OpRowMutRef<'a>,
}

impl<'a> OpRowMutRefTuple<'a> {
/// return true if no row left
fn push(&mut self, mut op_row: OpRowMutRef<'a>) -> bool {
debug_assert!(self.latest.vis());
match (self.latest.op(), op_row.op()) {
(Op::Insert, Op::Insert) => panic!("receive duplicated insert on the stream"),
(Op::Delete, Op::Delete) => panic!("receive duplicated delete on the stream"),
fn push(&mut self, mut curr: OpRowMutRef<'a>) -> bool {
debug_assert!(self.prev.vis());
match (self.prev.op(), curr.op()) {
(Op::Insert, Op::Insert) => {
consistency_panic!("receive duplicated insert on the stream");
// If need to tolerate inconsistency, override the previous insert.
// Note that because the primary key constraint has been violated, we
// don't mind losing some data here.
self.prev.set_vis(false);
self.prev = curr;
}
(Op::Delete, Op::Delete) => {
consistency_panic!("receive duplicated delete on the stream");
// If need to tolerate inconsistency, override the previous delete.
// Note that because the primary key constraint has been violated, we
// don't mind losing some data here.
self.prev.set_vis(false);
self.prev = curr;
}
(Op::Insert, Op::Delete) => {
self.latest.set_vis(false);
op_row.set_vis(false);
self.latest = if let Some(prev) = self.previous.take() {
// Delete a row that has been inserted, just hide the two ops.
self.prev.set_vis(false);
curr.set_vis(false);
self.prev = if let Some(prev) = self.before_prev.take() {
prev
} else {
return true;
Expand All @@ -57,8 +74,11 @@ impl<'a> OpRowMutRefTuple<'a> {
(Op::Delete, Op::Insert) => {
// The operation for the key must be (+, -, +) or (-, +). And the (+, -) must has
// been filtered.
debug_assert!(self.previous.is_none());
self.previous = Some(mem::replace(&mut self.latest, op_row));
debug_assert!(
self.before_prev.is_none(),
"should have been taken in the above match arm"
);
self.before_prev = Some(mem::replace(&mut self.prev, curr));
}
// `all the updateDelete` and `updateInsert` should be normalized to `delete`
// and`insert`
Expand All @@ -68,10 +88,10 @@ impl<'a> OpRowMutRefTuple<'a> {
}

fn as_update_op(&mut self) -> Option<(&mut OpRowMutRef<'a>, &mut OpRowMutRef<'a>)> {
self.previous.as_mut().map(|prev| {
self.before_prev.as_mut().map(|prev| {
debug_assert_eq!(prev.op(), Op::Delete);
debug_assert_eq!(self.latest.op(), Op::Insert);
(prev, &mut self.latest)
debug_assert_eq!(self.prev.op(), Op::Insert);
(prev, &mut self.prev)
})
}
}
Expand Down Expand Up @@ -219,8 +239,8 @@ impl StreamChunkCompactor {
match op_row_map.entry(Prehashed::new(key, hash)) {
Entry::Vacant(v) => {
v.insert(OpRowMutRefTuple {
previous: None,
latest: op_row,
before_prev: None,
prev: op_row,
});
}
Entry::Occupied(mut o) => {
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy};
use crate::cache::cache_may_stale;
use crate::common::cache::{StateCache, StateCacheFiller};
use crate::common::table::state_table_cache::StateTableWatermarkCache;
use crate::consistency::insane;
use crate::executor::{StreamExecutorError, StreamExecutorResult};

/// This num is arbitrary and we may want to improve this choice in the future.
Expand Down Expand Up @@ -363,7 +362,7 @@ where
)
};

let is_consistent_op = if insane() {
let is_consistent_op = if crate::consistency::insane() {
// In insane mode, we will have inconsistent operations applied on the table, even if
// our executor code do not expect that.
false
Expand Down
15 changes: 13 additions & 2 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tokio::time::Instant;
use tracing::{event, Instrument};

use super::exchange::output::{new_output, BoxedOutput};
use super::{AddMutation, Executor, UpdateMutation, Watermark};
use super::{AddMutation, Executor, TroublemakerExecutor, UpdateMutation, Watermark};
use crate::error::StreamResult;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{Barrier, Message, Mutation, StreamConsumer};
Expand Down Expand Up @@ -341,13 +341,22 @@ impl DispatchExecutorInner {

impl DispatchExecutor {
pub fn new(
input: Executor,
mut input: Executor,
dispatchers: Vec<DispatcherImpl>,
actor_id: u32,
fragment_id: u32,
context: Arc<SharedContext>,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
) -> Self {
if crate::consistency::insane() {
// make some trouble before dispatching to avoid generating invalid dist key.
let mut info = input.info().clone();
info.identity = format!("{} (embedded trouble)", info.identity);
let troublemaker = TroublemakerExecutor::new(input, chunk_size);
input = (info, troublemaker).into();
}

let actor_id_str = actor_id.to_string();
let fragment_id_str = fragment_id.to_string();
let actor_out_record_cnt = metrics
Expand Down Expand Up @@ -1033,6 +1042,7 @@ mod tests {
use risingwave_common::array::stream_chunk::StreamChunkTestExt;
use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder, Op};
use risingwave_common::catalog::Schema;
use risingwave_common::config;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_common::util::hash_util::Crc32FastBuilder;
Expand Down Expand Up @@ -1204,6 +1214,7 @@ mod tests {
fragment_id,
ctx.clone(),
metrics,
config::default::developer::stream_chunk_size(),
))
.execute();
pin_mut!(executor);
Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use super::monitor::StreamingMetrics;
use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message};
use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable};
use crate::common::StreamChunkBuilder;
use crate::consistency::consistency_panic;
use crate::executor::expect_first_barrier_from_aligned_stream;
use crate::task::ActorEvalErrorReport;

Expand Down Expand Up @@ -362,10 +363,10 @@ impl<S: StateStore, const USE_WATERMARK_CACHE: bool> DynamicFilterExecutor<S, US
if Some(row.datum_at(0))
!= current_epoch_value.as_ref().map(ToDatumRef::to_datum_ref)
{
bail!(
"Inconsistent Delete - current: {:?}, delete: {:?}",
current_epoch_value,
row
consistency_panic!(
current = ?current_epoch_value,
to_delete = ?row,
"inconsistent delete",
);
}
current_epoch_value = None;
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures_async_stream::try_stream;
use multimap::MultiMap;
use risingwave_common::array::*;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::config;
use risingwave_common::types::*;
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_expr::aggregate::AggCall;
Expand Down Expand Up @@ -127,6 +128,7 @@ async fn test_merger_sum_aggr() {
0,
ctx,
metrics,
config::default::developer::stream_chunk_size(),
);
let actor = Actor::new(
dispatcher,
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/lookup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common_estimate_size::collections::{EstimatedHashSet, EstimatedVe

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::consistency::consistency_panic;
use crate::task::AtomicU64Ref;

pub type LookupEntryState = EstimatedHashSet<OwnedRow>;
Expand Down Expand Up @@ -48,12 +49,12 @@ impl LookupCache {
match op {
Op::Insert | Op::UpdateInsert => {
if !values.insert(row) {
panic!("inserting a duplicated value");
consistency_panic!("inserting a duplicated value");
}
}
Op::Delete | Op::UpdateDelete => {
if !values.remove(&row) {
panic!("row {:?} should be in the cache", row);
consistency_panic!("row {:?} should be in the cache", row);
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/stream/src/executor/top_n/top_n_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_storage::StateStore;

use super::topn_cache_state::TopNCacheState;
use super::{CacheKey, GroupKey, ManagedTopNState};
use crate::consistency::{consistency_error, enable_strict_consistency};
use crate::executor::error::StreamExecutorResult;

const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
Expand Down Expand Up @@ -350,6 +351,12 @@ impl TopNCacheTrait for TopNCache<false> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
if !enable_strict_consistency() && self.table_row_count == Some(0) {
// If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
// should not panic. Instead, we pretend that we don't know about the actually row count.
consistency_error!("table row count is 0, but we receive a DELETE operation");
self.table_row_count = None;
}
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}
Expand Down Expand Up @@ -524,6 +531,11 @@ impl TopNCacheTrait for TopNCache<true> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
if !enable_strict_consistency() && self.table_row_count == Some(0) {
// If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
// should not panic. Instead, we pretend that we don't know about the actually row count.
self.table_row_count = None;
}
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/troublemaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ impl TroublemakerExecutor {
}
}
Message::Barrier(barrier) => {
assert!(vars.chunk_builder.take().is_none(), "we don't merge chunks");
assert!(
vars.chunk_builder.take().is_none(),
"we don't merge chunks across barriers"
);
yield Message::Barrier(barrier);
}
_ => yield msg,
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ impl StreamActorManager {
/// Create dispatchers with downstream information registered before
fn create_dispatcher(
&self,
env: StreamEnvironment,
input: Executor,
dispatchers: &[stream_plan::Dispatcher],
actor_id: ActorId,
Expand All @@ -403,6 +404,7 @@ impl StreamActorManager {
fragment_id,
shared_context.clone(),
self.streaming_metrics.clone(),
env.config().developer.chunk_size,
))
}

Expand Down Expand Up @@ -597,6 +599,7 @@ impl StreamActorManager {
.await?;

let dispatcher = self.create_dispatcher(
self.env.clone(),
executor,
&actor.dispatcher,
actor_id,
Expand Down
Loading