Skip to content

Commit

Permalink
Add a new rule PinotSeminJoinDistinctProjectRule to apply a distinct …
Browse files Browse the repository at this point in the history
…to a semi join right side project
  • Loading branch information
xiangfu0 committed Jan 6, 2025
1 parent 26ad816 commit 18efb4f
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private PinotQueryRuleSets() {

// join and semi-join rules
CoreRules.PROJECT_TO_SEMI_JOIN,
PinotSeminJoinDistinctProjectRule.INSTANCE,

// convert non-all union into all-union + distinct
CoreRules.UNION_TO_DISTINCT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.calcite.rel.rules;

import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;


/**
* Special rule for Pinot, this rule always append a distinct to the
* {@link org.apache.calcite.rel.logical.LogicalProject} on top of a Semi join
* {@link org.apache.calcite.rel.core.Join} to ensure the correctness of the query.
*/
public class PinotSeminJoinDistinctProjectRule extends RelOptRule {
public static final PinotSeminJoinDistinctProjectRule INSTANCE =
new PinotSeminJoinDistinctProjectRule(PinotRuleUtils.PINOT_REL_FACTORY);

public PinotSeminJoinDistinctProjectRule(RelBuilderFactory factory) {
super(operand(LogicalJoin.class, some(operand(LogicalProject.class, any()), operand(LogicalProject.class, any()))),
factory, null);
}

@Override
public void onMatch(RelOptRuleCall call) {
LogicalJoin join = call.rel(0);
if (join.getJoinType() != JoinRelType.SEMI) {
return;
}
LogicalProject leftProject = call.rel(1);
LogicalProject rightProject = call.rel(2);

if (rightProject.getProjects().size() != 1) {
return;
}
RelNode newRightProject = insertDistinctToProject(call, rightProject);
call.transformTo(join.copy(join.getTraitSet(), List.of(leftProject, newRightProject)));
}

private RelNode insertDistinctToProject(RelOptRuleCall call, LogicalProject project) {
RelBuilder relBuilder = call.builder();
relBuilder.push(project);
relBuilder.distinct();
return relBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,15 @@
" ├── [3]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
" ├── [3]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
" └── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n",
" └── [3]@localhost:1|[1] PROJECT\n",
" └── [3]@localhost:1|[1] FILTER\n",
" └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
" └── [3]@localhost:1|[1] AGGREGATE_FINAL\n",
" └── [3]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[2]} (Subtree Omitted)\n",
" ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[3]} (Subtree Omitted)\n",
" ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[0]} (Subtree Omitted)\n",
" └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[1]}\n",
" └── [4]@localhost:1|[1] AGGREGATE_LEAF\n",
" └── [4]@localhost:1|[1] FILTER\n",
" └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
""
]
},
Expand All @@ -186,9 +192,15 @@
" ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
" ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
" └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
" └── [3]@localhost:1|[1] PROJECT\n",
" └── [3]@localhost:1|[1] FILTER\n",
" └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
" └── [3]@localhost:1|[1] AGGREGATE_FINAL\n",
" └── [3]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
" ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[2]} (Subtree Omitted)\n",
" ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[3]} (Subtree Omitted)\n",
" ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[0]} (Subtree Omitted)\n",
" └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[1]}\n",
" └── [4]@localhost:1|[1] AGGREGATE_LEAF\n",
" └── [4]@localhost:1|[1] FILTER\n",
" └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
""
]
},
Expand Down
66 changes: 42 additions & 24 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col3=[$2])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
Expand All @@ -246,8 +248,10 @@
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$2])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
Expand Down Expand Up @@ -280,8 +284,10 @@
"\n LogicalProject(col1=[$0], col3=[$2], col6=[$5])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col3=[$2])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
Expand Down Expand Up @@ -320,9 +326,11 @@
"\n LogicalFilter(condition=[=($1, _UTF-8'test')])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$2])",
"\n LogicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$2])",
"\n LogicalFilter(condition=[=($0, _UTF-8'bar')])",
Expand Down Expand Up @@ -532,13 +540,17 @@
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar'), (_UTF-8'bar'.._UTF-8'foo'), (_UTF-8'foo'..+∞)]:CHAR(3) CHARACTER SET \"UTF-8\")])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])",
"\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar'), (_UTF-8'bar'.._UTF-8'foo'), (_UTF-8'foo'..+∞)]:CHAR(3) CHARACTER SET \"UTF-8\")])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col3=[$2])",
"\n LogicalFilter(condition=[<($2, 100)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalFilter(condition=[<($2, 100)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
Expand All @@ -558,13 +570,17 @@
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0])",
"\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar'), (_UTF-8'bar'.._UTF-8'foo'), (_UTF-8'foo'..+∞)]:CHAR(3) CHARACTER SET \"UTF-8\")])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])",
"\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar'), (_UTF-8'bar'.._UTF-8'foo'), (_UTF-8'foo'..+∞)]:CHAR(3) CHARACTER SET \"UTF-8\")])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col3=[$2])",
"\n LogicalFilter(condition=[<($2, 100)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalFilter(condition=[<($2, 100)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
Expand Down Expand Up @@ -638,9 +654,11 @@
"\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$2])",
"\n LogicalFilter(condition=[=($0, _UTF-8'fork')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])",
"\n LogicalFilter(condition=[=($0, _UTF-8'fork')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
}
Expand Down
Loading

0 comments on commit 18efb4f

Please sign in to comment.