From 0c70a63e06d6519c2c3e1b9a6d7678b06bf5b272 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 4 Feb 2025 18:49:57 +0800 Subject: [PATCH 1/6] add optimizer rule --- .../src/optimizer/rule/add_logstore_rule.rs | 25 +++++++++++++++++++ src/frontend/src/optimizer/rule/mod.rs | 3 +++ 2 files changed, 28 insertions(+) create mode 100644 src/frontend/src/optimizer/rule/add_logstore_rule.rs diff --git a/src/frontend/src/optimizer/rule/add_logstore_rule.rs b/src/frontend/src/optimizer/rule/add_logstore_rule.rs new file mode 100644 index 0000000000000..6d938de5915ca --- /dev/null +++ b/src/frontend/src/optimizer/rule/add_logstore_rule.rs @@ -0,0 +1,25 @@ +// Copyright 2025 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::optimizer::rule::Rule; +use crate::PlanRef; + +pub struct AddLogstoreRule {} + +impl Rule for AddLogstoreRule { + fn apply(&self, plan: PlanRef) -> Option { + let stream_hash_join = plan.as_stream_hash_join()?; + None + } +} diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index a90d381bcbf34..d5ee68d1c9ddd 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -240,6 +240,7 @@ mod apply_hop_window_transpose_rule; pub use apply_hop_window_transpose_rule::*; mod agg_call_merge_rule; pub use agg_call_merge_rule::*; +mod add_logstore_rule; mod pull_up_correlated_predicate_agg_rule; mod source_to_iceberg_scan_rule; mod source_to_kafka_scan_rule; @@ -248,6 +249,7 @@ mod table_function_to_mysql_query_rule; mod table_function_to_postgres_query_rule; mod values_extract_project_rule; +pub use add_logstore_rule::*; pub use batch::batch_iceberg_predicate_pushdown::*; pub use batch::batch_push_limit_to_scan_rule::*; pub use pull_up_correlated_predicate_agg_rule::*; @@ -336,6 +338,7 @@ macro_rules! for_all_rules { , { PullUpCorrelatedPredicateAggRule } , { SourceToKafkaScanRule } , { SourceToIcebergScanRule } + , { AddLogstoreRule } } }; } From a46b96ba67d53fe02d8c40549a0c20fb7940fe5e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 4 Feb 2025 19:00:21 +0800 Subject: [PATCH 2/6] define apply --- src/frontend/src/optimizer/rule/add_logstore_rule.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/optimizer/rule/add_logstore_rule.rs b/src/frontend/src/optimizer/rule/add_logstore_rule.rs index 6d938de5915ca..6e21cd8a41021 100644 --- a/src/frontend/src/optimizer/rule/add_logstore_rule.rs +++ b/src/frontend/src/optimizer/rule/add_logstore_rule.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::optimizer::plan_node::StreamSyncLogStore; use crate::optimizer::rule::Rule; use crate::PlanRef; @@ -19,7 +20,8 @@ pub struct AddLogstoreRule {} impl Rule for AddLogstoreRule { fn apply(&self, plan: PlanRef) -> Option { - let stream_hash_join = plan.as_stream_hash_join()?; - None + plan.as_stream_hash_join()?; + let log_store_plan = StreamSyncLogStore::new(plan); + Some(log_store_plan.into()) } } From b209a87811e8436a0f4863b338d28f154c5b897f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 4 Feb 2025 19:07:25 +0800 Subject: [PATCH 3/6] create and apply rule --- src/frontend/src/optimizer/mod.rs | 10 ++++++++++ src/frontend/src/optimizer/rule/add_logstore_rule.rs | 8 +++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index d60f435648b3a..99f304ed9f188 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -514,6 +514,16 @@ impl PlanRoot { ))?; } + if ctx.session_ctx().config().streaming_unaligned_join() { + // TODO: make it a logical optimization. + // Rewrite joins with index to delta join + plan = plan.optimize_by_rules(&OptimizationStage::new( + "Add Logstore for Unaligned join", + vec![AddLogstoreRule::create()], + ApplyOrder::BottomUp, + ))?; + } + // Inline session timezone plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?; diff --git a/src/frontend/src/optimizer/rule/add_logstore_rule.rs b/src/frontend/src/optimizer/rule/add_logstore_rule.rs index 6e21cd8a41021..53364c56b0370 100644 --- a/src/frontend/src/optimizer/rule/add_logstore_rule.rs +++ b/src/frontend/src/optimizer/rule/add_logstore_rule.rs @@ -13,7 +13,7 @@ // limitations under the License. use crate::optimizer::plan_node::StreamSyncLogStore; -use crate::optimizer::rule::Rule; +use crate::optimizer::rule::{BoxedRule, Rule}; use crate::PlanRef; pub struct AddLogstoreRule {} @@ -25,3 +25,9 @@ impl Rule for AddLogstoreRule { Some(log_store_plan.into()) } } + +impl AddLogstoreRule { + pub fn create() -> BoxedRule { + Box::new(Self {}) + } +} From ebf71ad2e99be21482198cc818891df1512fd2f6 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 5 Feb 2025 12:53:47 +0800 Subject: [PATCH 4/6] add internal tables --- src/common/src/util/stream_graph_visitor.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index f14ceadc09c42..03738744fa4a9 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -266,6 +266,7 @@ pub fn visit_stream_node_tables_inner( always!(node.table, "Materialize") } + // Global Approx Percentile NodeBody::GlobalApproxPercentile(node) => { always!(node.bucket_state_table, "GlobalApproxPercentileBucketState"); always!(node.count_state_table, "GlobalApproxPercentileCountState"); @@ -276,6 +277,11 @@ pub fn visit_stream_node_tables_inner( always!(node.left_table, "AsOfJoinLeft"); always!(node.right_table, "AsOfJoinRight"); } + + // Synced Log Store + NodeBody::SyncLogStore(node) => { + always!(node.log_store_table, "StreamSyncLogStore"); + } _ => {} } }; From df02066f9951314081e620271b5a5b05eca690c0 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 5 Feb 2025 12:54:04 +0800 Subject: [PATCH 5/6] add slt test --- e2e_test/streaming/unaligned-join.slt | 49 +++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 e2e_test/streaming/unaligned-join.slt diff --git a/e2e_test/streaming/unaligned-join.slt b/e2e_test/streaming/unaligned-join.slt new file mode 100644 index 0000000000000..91da4d1796994 --- /dev/null +++ b/e2e_test/streaming/unaligned-join.slt @@ -0,0 +1,49 @@ +statement ok +DROP TABLE IF EXISTS fact; + +statement ok +DROP TABLE IF EXISTS dim; + +statement ok +set streaming_unaligned_join = true; + +statement ok +create table fact(v0 int primary key, v1 int, v2 varchar, v3 varchar); + +# correspond to 1 +statement ok +INSERT INTO fact + SELECT + x as v0, + 1 as v1, + 'abcdefgakjandjkw' as v2, + 'jkb1ku1bu' as v3 + FROM generate_series(1, 100000) t(x); + +# correspond to 2 +statement ok +INSERT INTO fact + SELECT + x as v0, + 2 as v1, + 'abcdefgakjandjkw' as v2, + 'jkb1ku1bu' as v3 + FROM generate_series(100001, 200000) t(x); + +statement ok +create table dim(v1 int); + +statement ok +INSERT INTO dim VALUES(1), (2); + +statement ok +create materialized view m1 as + select v0, count(*) + from fact join dim on fact.v1 = dim.v1 + group by v0; + +statement ok +DELETE FROM dim; + +statement ok +flush; \ No newline at end of file From b3dfe9e0a771e6d54eae0ec56529349dcd1c8229 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 6 Feb 2025 20:12:54 +0800 Subject: [PATCH 6/6] clean --- e2e_test/streaming/unaligned-join.slt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/e2e_test/streaming/unaligned-join.slt b/e2e_test/streaming/unaligned-join.slt index 91da4d1796994..9bcee7ba81f99 100644 --- a/e2e_test/streaming/unaligned-join.slt +++ b/e2e_test/streaming/unaligned-join.slt @@ -46,4 +46,10 @@ statement ok DELETE FROM dim; statement ok -flush; \ No newline at end of file +flush; + +statement ok +DROP TABLE fact cascade; + +statement ok +DROP TABLE dim cascade; \ No newline at end of file