Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): add AddLogStore rule #20386

Open
wants to merge 6 commits into
base: kwannoel/unaligned-join
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions e2e_test/streaming/unaligned-join.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
statement ok
DROP TABLE IF EXISTS fact;

statement ok
DROP TABLE IF EXISTS dim;

statement ok
set streaming_unaligned_join = true;

statement ok
create table fact(v0 int primary key, v1 int, v2 varchar, v3 varchar);

# correspond to 1
statement ok
INSERT INTO fact
SELECT
x as v0,
1 as v1,
'abcdefgakjandjkw' as v2,
'jkb1ku1bu' as v3
FROM generate_series(1, 100000) t(x);

# correspond to 2
statement ok
INSERT INTO fact
SELECT
x as v0,
2 as v1,
'abcdefgakjandjkw' as v2,
'jkb1ku1bu' as v3
FROM generate_series(100001, 200000) t(x);

statement ok
create table dim(v1 int);

statement ok
INSERT INTO dim VALUES(1), (2);

statement ok
create materialized view m1 as
select v0, count(*)
from fact join dim on fact.v1 = dim.v1
group by v0;

statement ok
DELETE FROM dim;

statement ok
flush;

statement ok
DROP TABLE fact cascade;

statement ok
DROP TABLE dim cascade;
6 changes: 6 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ pub fn visit_stream_node_tables_inner<F>(
always!(node.table, "Materialize")
}

// Global Approx Percentile
NodeBody::GlobalApproxPercentile(node) => {
always!(node.bucket_state_table, "GlobalApproxPercentileBucketState");
always!(node.count_state_table, "GlobalApproxPercentileCountState");
Expand All @@ -276,6 +277,11 @@ pub fn visit_stream_node_tables_inner<F>(
always!(node.left_table, "AsOfJoinLeft");
always!(node.right_table, "AsOfJoinRight");
}

// Synced Log Store
NodeBody::SyncLogStore(node) => {
always!(node.log_store_table, "StreamSyncLogStore");
}
_ => {}
}
};
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,16 @@ impl PlanRoot {
))?;
}

if ctx.session_ctx().config().streaming_unaligned_join() {
// TODO: make it a logical optimization.
// Rewrite joins with index to delta join
plan = plan.optimize_by_rules(&OptimizationStage::new(
"Add Logstore for Unaligned join",
vec![AddLogstoreRule::create()],
ApplyOrder::BottomUp,
))?;
}

// Inline session timezone
plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;

Expand Down
33 changes: 33 additions & 0 deletions src/frontend/src/optimizer/rule/add_logstore_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::optimizer::plan_node::StreamSyncLogStore;
use crate::optimizer::rule::{BoxedRule, Rule};
use crate::PlanRef;

pub struct AddLogstoreRule {}

impl Rule for AddLogstoreRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
plan.as_stream_hash_join()?;
let log_store_plan = StreamSyncLogStore::new(plan);
Some(log_store_plan.into())
}
}

impl AddLogstoreRule {
pub fn create() -> BoxedRule {
Box::new(Self {})
}
}
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ mod apply_hop_window_transpose_rule;
pub use apply_hop_window_transpose_rule::*;
mod agg_call_merge_rule;
pub use agg_call_merge_rule::*;
mod add_logstore_rule;
mod pull_up_correlated_predicate_agg_rule;
mod source_to_iceberg_scan_rule;
mod source_to_kafka_scan_rule;
Expand All @@ -248,6 +249,7 @@ mod table_function_to_mysql_query_rule;
mod table_function_to_postgres_query_rule;
mod values_extract_project_rule;

pub use add_logstore_rule::*;
pub use batch::batch_iceberg_predicate_pushdown::*;
pub use batch::batch_push_limit_to_scan_rule::*;
pub use pull_up_correlated_predicate_agg_rule::*;
Expand Down Expand Up @@ -336,6 +338,7 @@ macro_rules! for_all_rules {
, { PullUpCorrelatedPredicateAggRule }
, { SourceToKafkaScanRule }
, { SourceToIcebergScanRule }
, { AddLogstoreRule }
}
};
}
Expand Down