diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java index 80e524e11f0e..e6850f26f9a7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java @@ -73,6 +73,7 @@ private PinotQueryRuleSets() { // join and semi-join rules CoreRules.PROJECT_TO_SEMI_JOIN, + PinotSeminJoinDistinctProjectRule.INSTANCE, // convert non-all union into all-union + distinct CoreRules.UNION_TO_DISTINCT, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSeminJoinDistinctProjectRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSeminJoinDistinctProjectRule.java new file mode 100644 index 000000000000..f724e6d7a10d --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSeminJoinDistinctProjectRule.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.calcite.rel.rules; + +import java.util.List; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; + + +/** + * Special rule for Pinot, this rule always append a distinct to the + * {@link org.apache.calcite.rel.logical.LogicalProject} on top of a Semi join + * {@link org.apache.calcite.rel.core.Join} to ensure the correctness of the query. + */ +public class PinotSeminJoinDistinctProjectRule extends RelOptRule { + public static final PinotSeminJoinDistinctProjectRule INSTANCE = + new PinotSeminJoinDistinctProjectRule(PinotRuleUtils.PINOT_REL_FACTORY); + + public PinotSeminJoinDistinctProjectRule(RelBuilderFactory factory) { + super(operand(LogicalJoin.class, some(operand(LogicalProject.class, any()), operand(LogicalProject.class, any()))), + factory, null); + } + + @Override + public void onMatch(RelOptRuleCall call) { + LogicalJoin join = call.rel(0); + if (join.getJoinType() != JoinRelType.SEMI) { + return; + } + LogicalProject leftProject = call.rel(1); + LogicalProject rightProject = call.rel(2); + + if (rightProject.getProjects().size() != 1) { + return; + } + RelNode newRightProject = insertDistinctToProject(call, rightProject); + call.transformTo(join.copy(join.getTraitSet(), List.of(leftProject, newRightProject))); + } + + private RelNode insertDistinctToProject(RelOptRuleCall call, LogicalProject project) { + RelBuilder relBuilder = call.builder(); + relBuilder.push(project); + relBuilder.distinct(); + return relBuilder.build(); + } +} diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json index 31db5ee99b2b..4295eedaa0b3 100644 --- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json +++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json @@ -158,9 +158,15 @@ " ├── [3]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n", " ├── [3]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n", " └── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n", - " └── [3]@localhost:1|[1] PROJECT\n", - " └── [3]@localhost:1|[1] FILTER\n", - " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n", + " └── [3]@localhost:1|[1] AGGREGATE_FINAL\n", + " └── [3]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[2]} (Subtree Omitted)\n", + " ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[3]} (Subtree Omitted)\n", + " ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[0]} (Subtree Omitted)\n", + " └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[1]}\n", + " └── [4]@localhost:1|[1] AGGREGATE_LEAF\n", + " └── [4]@localhost:1|[1] FILTER\n", + " └── [4]@localhost:1|[1] TABLE SCAN (b) null\n", "" ] }, @@ -186,9 +192,15 @@ " ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n", " ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n", " └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n", - " └── [3]@localhost:1|[1] PROJECT\n", - " └── [3]@localhost:1|[1] FILTER\n", - " └── [3]@localhost:1|[1] TABLE SCAN (b) null\n", + " └── [3]@localhost:1|[1] AGGREGATE_FINAL\n", + " └── [3]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n", + " ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[2]} (Subtree Omitted)\n", + " ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[3]} (Subtree Omitted)\n", + " ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[0]} (Subtree Omitted)\n", + " └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[1]}\n", + " └── [4]@localhost:1|[1] AGGREGATE_LEAF\n", + " └── [4]@localhost:1|[1] FILTER\n", + " └── [4]@localhost:1|[1] TABLE SCAN (b) null\n", "" ] }, diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json index d48795dc30cd..6d04bd2e792f 100644 --- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json +++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json @@ -230,8 +230,10 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col3=[$2])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -246,8 +248,10 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n LogicalProject(col3=[$2])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -280,8 +284,10 @@ "\n LogicalProject(col1=[$0], col3=[$2], col6=[$5])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col3=[$2])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -320,9 +326,11 @@ "\n LogicalFilter(condition=[=($1, _UTF-8'test')])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n LogicalProject(col3=[$2])", - "\n LogicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])", + "\n LogicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n LogicalTableScan(table=[[default, b]])", "\n PinotLogicalExchange(distribution=[hash[0]])", "\n LogicalProject(col3=[$2])", "\n LogicalFilter(condition=[=($0, _UTF-8'bar')])", @@ -532,13 +540,17 @@ "\n LogicalProject(col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar'), (_UTF-8'bar'.._UTF-8'foo'), (_UTF-8'foo'..+∞)]:CHAR(3) CHARACTER SET \"UTF-8\")])", - "\n LogicalTableScan(table=[[default, a]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar'), (_UTF-8'bar'.._UTF-8'foo'), (_UTF-8'foo'..+∞)]:CHAR(3) CHARACTER SET \"UTF-8\")])", + "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col3=[$2])", - "\n LogicalFilter(condition=[<($2, 100)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])", + "\n LogicalFilter(condition=[<($2, 100)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -558,13 +570,17 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar'), (_UTF-8'bar'.._UTF-8'foo'), (_UTF-8'foo'..+∞)]:CHAR(3) CHARACTER SET \"UTF-8\")])", - "\n LogicalTableScan(table=[[default, a]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[SEARCH($1, Sarg[(-∞.._UTF-8'bar'), (_UTF-8'bar'.._UTF-8'foo'), (_UTF-8'foo'..+∞)]:CHAR(3) CHARACTER SET \"UTF-8\")])", + "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col3=[$2])", - "\n LogicalFilter(condition=[<($2, 100)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])", + "\n LogicalFilter(condition=[<($2, 100)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -638,9 +654,11 @@ "\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])", "\n LogicalTableScan(table=[[default, b]])", "\n PinotLogicalExchange(distribution=[hash[0]])", - "\n LogicalProject(col3=[$2])", - "\n LogicalFilter(condition=[=($0, _UTF-8'fork')])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF])", + "\n LogicalFilter(condition=[=($0, _UTF-8'fork')])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] } diff --git a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json index 998bf0560633..b87d2a4ab455 100644 --- a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json +++ b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json @@ -49,9 +49,11 @@ "\n LogicalProject(col1=[$0], col2=[$1])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col2=[$1])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{1}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -65,9 +67,11 @@ "\n LogicalProject(col1=[$0], col2=[$1])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col2=[$1])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{1}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", "\n LogicalProject(col1=[$0])", "\n LogicalFilter(condition=[>($2, 0)])", @@ -88,9 +92,11 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col2=[$1])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{1}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", "\n LogicalProject(col1=[$0])", "\n LogicalFilter(condition=[>($2, 0)])", @@ -108,9 +114,11 @@ "\n LogicalProject(col1=[$0], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col2=[$1])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{1}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -126,9 +134,11 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col2=[$1])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{1}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -287,9 +297,11 @@ "\n LogicalProject(col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -303,9 +315,11 @@ "\n LogicalProject(col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[hash[0]], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -357,9 +371,11 @@ "\n LogicalProject(col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -375,9 +391,11 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -417,9 +435,11 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -437,9 +457,11 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -453,9 +475,11 @@ "\n LogicalProject(col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -469,9 +493,11 @@ "\n LogicalProject(col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -487,9 +513,11 @@ "\n LogicalProject(col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -505,9 +533,11 @@ "\n LogicalProject(col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -522,9 +552,11 @@ "\n LogicalProject(col2=[$1])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }, @@ -540,9 +572,11 @@ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n LogicalTableScan(table=[[default, a]])", "\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])", - "\n LogicalProject(col1=[$0])", - "\n LogicalFilter(condition=[>($2, 0)])", - "\n LogicalTableScan(table=[[default, b]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF])", + "\n LogicalFilter(condition=[>($2, 0)])", + "\n LogicalTableScan(table=[[default, b]])", "\n" ] }