diff --git a/e2e_test/streaming/unaligned-join.slt b/e2e_test/streaming/unaligned-join.slt new file mode 100644 index 0000000000000..9bcee7ba81f99 --- /dev/null +++ b/e2e_test/streaming/unaligned-join.slt @@ -0,0 +1,55 @@ +statement ok +DROP TABLE IF EXISTS fact; + +statement ok +DROP TABLE IF EXISTS dim; + +statement ok +set streaming_unaligned_join = true; + +statement ok +create table fact(v0 int primary key, v1 int, v2 varchar, v3 varchar); + +# correspond to 1 +statement ok +INSERT INTO fact + SELECT + x as v0, + 1 as v1, + 'abcdefgakjandjkw' as v2, + 'jkb1ku1bu' as v3 + FROM generate_series(1, 100000) t(x); + +# correspond to 2 +statement ok +INSERT INTO fact + SELECT + x as v0, + 2 as v1, + 'abcdefgakjandjkw' as v2, + 'jkb1ku1bu' as v3 + FROM generate_series(100001, 200000) t(x); + +statement ok +create table dim(v1 int); + +statement ok +INSERT INTO dim VALUES(1), (2); + +statement ok +create materialized view m1 as + select v0, count(*) + from fact join dim on fact.v1 = dim.v1 + group by v0; + +statement ok +DELETE FROM dim; + +statement ok +flush; + +statement ok +DROP TABLE fact cascade; + +statement ok +DROP TABLE dim cascade; \ No newline at end of file diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index f14ceadc09c42..03738744fa4a9 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -266,6 +266,7 @@ pub fn visit_stream_node_tables_inner( always!(node.table, "Materialize") } + // Global Approx Percentile NodeBody::GlobalApproxPercentile(node) => { always!(node.bucket_state_table, "GlobalApproxPercentileBucketState"); always!(node.count_state_table, "GlobalApproxPercentileCountState"); @@ -276,6 +277,11 @@ pub fn visit_stream_node_tables_inner( always!(node.left_table, "AsOfJoinLeft"); always!(node.right_table, "AsOfJoinRight"); } + + // Synced Log Store + NodeBody::SyncLogStore(node) => { + always!(node.log_store_table, "StreamSyncLogStore"); + } _ => {} } }; diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index d60f435648b3a..99f304ed9f188 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -514,6 +514,16 @@ impl PlanRoot { ))?; } + if ctx.session_ctx().config().streaming_unaligned_join() { + // TODO: make it a logical optimization. + // Rewrite joins with index to delta join + plan = plan.optimize_by_rules(&OptimizationStage::new( + "Add Logstore for Unaligned join", + vec![AddLogstoreRule::create()], + ApplyOrder::BottomUp, + ))?; + } + // Inline session timezone plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?; diff --git a/src/frontend/src/optimizer/rule/add_logstore_rule.rs b/src/frontend/src/optimizer/rule/add_logstore_rule.rs new file mode 100644 index 0000000000000..53364c56b0370 --- /dev/null +++ b/src/frontend/src/optimizer/rule/add_logstore_rule.rs @@ -0,0 +1,33 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::optimizer::plan_node::StreamSyncLogStore; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::PlanRef; + +pub struct AddLogstoreRule {} + +impl Rule for AddLogstoreRule { + fn apply(&self, plan: PlanRef) -> Option { + plan.as_stream_hash_join()?; + let log_store_plan = StreamSyncLogStore::new(plan); + Some(log_store_plan.into()) + } +} + +impl AddLogstoreRule { + pub fn create() -> BoxedRule { + Box::new(Self {}) + } +} diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index a90d381bcbf34..d5ee68d1c9ddd 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -240,6 +240,7 @@ mod apply_hop_window_transpose_rule; pub use apply_hop_window_transpose_rule::*; mod agg_call_merge_rule; pub use agg_call_merge_rule::*; +mod add_logstore_rule; mod pull_up_correlated_predicate_agg_rule; mod source_to_iceberg_scan_rule; mod source_to_kafka_scan_rule; @@ -248,6 +249,7 @@ mod table_function_to_mysql_query_rule; mod table_function_to_postgres_query_rule; mod values_extract_project_rule; +pub use add_logstore_rule::*; pub use batch::batch_iceberg_predicate_pushdown::*; pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; @@ -336,6 +338,7 @@ macro_rules! for_all_rules { , { PullUpCorrelatedPredicateAggRule } , { SourceToKafkaScanRule } , { SourceToIcebergScanRule } + , { AddLogstoreRule } } }; }