diff --git a/.github/actions/confWriter/action.yml b/.github/actions/confWriter/action.yml index 2d5fbaec88..e8540622b0 100644 --- a/.github/actions/confWriter/action.yml +++ b/.github/actions/confWriter/action.yml @@ -45,6 +45,10 @@ inputs: description: "close all optimizer" required: false default: "false" + Set-ColumnPruning-ON: + description: "set ColumnPruningRule ON" + required: false + default: "false" runs: using: "composite" # Mandatory parameter @@ -118,6 +122,28 @@ runs: paths: ${{ inputs.Root-Dir-Path }}/core/target/iginx-core-*/conf/config.properties statements: s/policyClassName=.*$/policyClassName=cn.edu.tsinghua.iginx.policy.test.KeyRangeTestPolicy/g + - if: inputs.Set-Filter-Fragment-OFF=='true' + name: Set FragmentPruningByFilterRule OFF + uses: ./.github/actions/edit + with: + paths: ${{ inputs.Root-Dir-Path }}/core/target/iginx-core-*/conf/config.properties + statements: s/ruleBasedOptimizer=NotFilterRemoveRule=on,FragmentPruningByFilterRule=on/ruleBasedOptimizer=NotFilterRemoveRule=on/g + + - if: inputs.Set-ColumnPruning-ON=='true' + name: Set ColumnPruningRule ON + uses: ./.github/actions/edit + with: + paths: ${{ inputs.Root-Dir-Path }}/core/target/iginx-core-*/conf/config.properties + statements: | + s/ColumnPruningRule=off/ColumnPruningRule=on/g + + - if: inputs.Set-AggPushDown-OFF=='true' + name: Set AggPushDownRule OFF + uses: ./.github/actions/edit + with: + paths: ${{ inputs.Root-Dir-Path }}/core/target/iginx-core-*/conf/config.properties + statements: s/AggPushDownRule=on/AggPushDownRule=off/g + - name: Change Log Conf uses: ./.github/actions/edit with: diff --git a/.github/workflows/tpc-h.yml b/.github/workflows/tpc-h.yml index 08a8242fcb..88759a9d3d 100644 --- a/.github/workflows/tpc-h.yml +++ b/.github/workflows/tpc-h.yml @@ -38,6 +38,11 @@ on: type: number required: false default: 60 + agg-pushdown-matrix: + description: "The database which support agg push down" + type: string + required: false + default: '["PostgreSQL", "MYSQL"]' jobs: TPC-H-Test: @@ -120,16 +125,21 @@ jobs: with: DB-name: ${{ matrix.DB-name }} Close-Optimizer: ${{ inputs.close-optimizer }} + Set-ColumnPruning-ON: ${{ toJSON(inputs.close-optimizer == 'true' && matrix.DB-name == 'InfluxDB') }} Metadata: ${{ matrix.metadata }} + Set-AggPushDown-OFF: ${{ toJSON(!contains(inputs.agg-pushdown-matrix, matrix.DB-name)) }} - name: Change Old IGinX Config uses: ./.github/actions/confWriter with: DB-name: ${{ matrix.DB-name }} + Open-Optimizer: "true" Close-Optimizer: ${{ inputs.close-optimizer }} + Set-ColumnPruning-ON: ${{ toJSON(inputs.close-optimizer == 'true' && matrix.DB-name == 'InfluxDB') }} Metadata: ${{ matrix.metadata }} zookeeper-port: 2182 Root-Dir-Path: "IGinX" + Set-AggPushDown-OFF: ${{ toJSON(!contains(inputs.agg-pushdown-matrix, matrix.DB-name)) }} - name: Download TPC-H Data shell: bash diff --git a/conf/config.properties b/conf/config.properties index 5376600fd2..ac99609b0e 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -97,7 +97,7 @@ queryOptimizer=rbo # 优化器规则 ruleBasedOptimizer=NotFilterRemoveRule=on,ColumnPruningRule=on,ConstantPropagationRule=on,DistinctEliminateRule=on,\ ConstantFoldingRule=on,FilterPushDownRule=on,JoinFactorizationRule=on,SetTransformPushDownPathUnionJoinRule=on,InFilterTransformRule=on,\ - OuterJoinEliminateRule=on + OuterJoinEliminateRule=on,AggPushDownRule=on #ruleBasedOptimizer=AllowNullColumnRule=on diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/utils/OperatorUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/utils/OperatorUtils.java index 5d84483afe..3eb84d93c9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/utils/OperatorUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/utils/OperatorUtils.java @@ -768,4 +768,26 @@ public static boolean isProjectFromConstant(Operator operator) { } return false; } + + public static List getFunctionCallList(Operator operator) { + switch (operator.getType()) { + case GroupBy: + return ((GroupBy) operator).getFunctionCallList(); + case SetTransform: + return ((SetTransform) operator).getFunctionCallList(); + case RowTransform: + return ((RowTransform) operator).getFunctionCallList(); + case MappingTransform: + return ((MappingTransform) operator).getFunctionCallList(); + default: + return new ArrayList<>(); + } + } + + public static Operator getUnaryChild(Operator operator) { + if (isUnaryOperator(operator.getType())) { + return ((OperatorSource) ((UnaryOperator) operator).getSource()).getOperator(); + } + return null; + } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/ExprUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/ExprUtils.java index 6c2b2c2ebb..3bd1db0f92 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/ExprUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/ExprUtils.java @@ -36,14 +36,7 @@ import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.DataTypeUtils; import java.security.Key; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.Set; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -929,4 +922,48 @@ private static boolean hasCaseWhen(Expression expr) { return false; } } + + public static Expression replacePaths(Expression expr, Map pathMap) { + Expression res = copy(expr); + res.accept( + new ExpressionVisitor() { + @Override + public void visit(BaseExpression expression) { + expression.setPathName( + pathMap.getOrDefault(expression.getColumnName(), expression.getColumnName())); + } + + @Override + public void visit(BinaryExpression expression) {} + + @Override + public void visit(BracketExpression expression) {} + + @Override + public void visit(ConstantExpression expression) {} + + @Override + public void visit(FromValueExpression expression) {} + + @Override + public void visit(FuncExpression expression) {} + + @Override + public void visit(MultipleExpression expression) {} + + @Override + public void visit(UnaryExpression expression) {} + + @Override + public void visit(CaseWhenExpression expression) {} + + @Override + public void visit(KeyExpression expression) {} + + @Override + public void visit(SequenceExpression expression) {} + }); + + return res; + } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java index c34d771bbe..542748c1d9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/utils/FilterUtils.java @@ -451,4 +451,57 @@ public void visit(InFilter inFilter) {} }); return pathFilters; } + + /** 获取filter以及所有子filter的类型 */ + public static Set getFilterType(Filter filter) { + Set filterTypes = new HashSet<>(); + filter.accept( + new FilterVisitor() { + @Override + public void visit(AndFilter filter) { + filterTypes.add(FilterType.And); + } + + @Override + public void visit(OrFilter filter) { + filterTypes.add(FilterType.Or); + } + + @Override + public void visit(NotFilter filter) { + filterTypes.add(FilterType.Not); + } + + @Override + public void visit(KeyFilter filter) { + filterTypes.add(FilterType.Key); + } + + @Override + public void visit(ValueFilter filter) { + filterTypes.add(FilterType.Value); + } + + @Override + public void visit(PathFilter filter) { + filterTypes.add(FilterType.Path); + } + + @Override + public void visit(BoolFilter filter) { + filterTypes.add(FilterType.Bool); + } + + @Override + public void visit(ExprFilter filter) { + filterTypes.add(FilterType.Expr); + } + + @Override + public void visit(InFilter inFilter) { + filterTypes.add(FilterType.In); + } + }); + return filterTypes; + } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java index 84a62a7c3e..e240bddfc6 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java @@ -23,11 +23,7 @@ import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; -import cn.edu.tsinghua.iginx.engine.shared.operator.Delete; -import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; -import cn.edu.tsinghua.iginx.engine.shared.operator.Project; -import cn.edu.tsinghua.iginx.engine.shared.operator.Select; -import cn.edu.tsinghua.iginx.engine.shared.operator.SetTransform; +import cn.edu.tsinghua.iginx.engine.shared.operator.*; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval; import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; @@ -56,15 +52,30 @@ public interface IStorage { TaskExecuteResult executeProjectDummyWithSelect( Project project, Select select, DataArea dataArea); - default boolean isSupportProjectWithSetTransform(SetTransform setTransform, DataArea dataArea) { + /** 询问底层是否支持仅带AGG的查询 */ + default boolean isSupportProjectWithAgg(Operator agg, DataArea dataArea, boolean isDummy) { return false; } - default TaskExecuteResult executeProjectWithSetTransform( - Project project, SetTransform setTransform, DataArea dataArea) { - throw new UnsupportedOperationException(); + default boolean isSupportProjectWithAggSelect( + Operator agg, Select select, DataArea dataArea, boolean isDummy) { + return false; } + /** 对非叠加分片带Agg带谓词下推的查询 */ + TaskExecuteResult executeProjectWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea); + + /** 对叠加分片带Agg带谓词下推的查询 */ + TaskExecuteResult executeProjectDummyWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea); + + /** 对非叠加分片带Agg的查询 */ + TaskExecuteResult executeProjectWithAgg(Project project, Operator agg, DataArea dataArea); + + /** 对叠加分片带Agg的查询 */ + TaskExecuteResult executeProjectDummyWithAgg(Project project, Operator agg, DataArea dataArea); + /** 对非叠加分片删除数据 */ TaskExecuteResult executeDelete(Delete delete, DataArea dataArea); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java index e64c1de7d4..4508835f31 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java @@ -26,21 +26,19 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.TooManyPhysicalTasksException; import cn.edu.tsinghua.iginx.engine.physical.exception.UnexpectedOperatorException; import cn.edu.tsinghua.iginx.engine.physical.memory.MemoryPhysicalTaskDispatcher; -import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutor; -import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutorFactory; import cn.edu.tsinghua.iginx.engine.physical.optimizer.ReplicaDispatcher; import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; import cn.edu.tsinghua.iginx.engine.physical.storage.StorageManager; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; +import cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy.PushDownStrategy; +import cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy.PushDownStrategyFactory; import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue; import cn.edu.tsinghua.iginx.engine.physical.task.GlobalPhysicalTask; import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask; import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask; import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; -import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; import cn.edu.tsinghua.iginx.engine.shared.operator.*; -import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType; import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager; import cn.edu.tsinghua.iginx.metadata.IMetaManager; import cn.edu.tsinghua.iginx.metadata.entity.FragmentMeta; @@ -148,70 +146,19 @@ private StoragePhysicalTaskExecutor() { boolean isDummyStorageUnit = task.isDummyStorageUnit(); DataArea dataArea = new DataArea(storageUnit, fragmentMeta.getKeyInterval()); - switch (op.getType()) { case Project: - boolean needSelectPushDown = - pair.k.isSupportProjectWithSelect() - && operators.size() == 2 - && operators.get(1).getType() == OperatorType.Select; - boolean needSetTransformPushDown = - operators.size() == 2 - && operators.get(1).getType() - == OperatorType.SetTransform; - boolean canSetTransformPushDown = - needSetTransformPushDown - && pair.k.isSupportProjectWithSetTransform( - (SetTransform) operators.get(1), dataArea); - if (isDummyStorageUnit) { - if (needSelectPushDown) { - result = - pair.k.executeProjectDummyWithSelect( - (Project) op, (Select) operators.get(1), dataArea); - } else if (needSetTransformPushDown) { - throw new IllegalStateException(); - } else { - result = pair.k.executeProjectDummy((Project) op, dataArea); - } - } else { - if (needSelectPushDown) { - result = - pair.k.executeProjectWithSelect( - (Project) op, (Select) operators.get(1), dataArea); - } else if (needSetTransformPushDown) { - if (canSetTransformPushDown) { - result = - pair.k.executeProjectWithSetTransform( - (Project) op, - (SetTransform) operators.get(1), - dataArea); - } else { - TaskExecuteResult tempResult = - pair.k.executeProject((Project) op, dataArea); - if (tempResult.getException() != null) { - result = tempResult; - } else { - // set transform push down is not supported, execute set - // transform in memory - OperatorMemoryExecutor executor = - OperatorMemoryExecutorFactory.getInstance() - .getMemoryExecutor(); - try { - RowStream rowStream = - executor.executeUnaryOperator( - (SetTransform) operators.get(1), - tempResult.getRowStream(), - task.getContext()); - result = new TaskExecuteResult(rowStream); - } catch (PhysicalException e) { - result = new TaskExecuteResult(e); - } - } - } - } else { - result = pair.k.executeProject((Project) op, dataArea); - } - } + PushDownStrategy strategy = + PushDownStrategyFactory.getStrategy( + operators, pair.k, dataArea, isDummyStorageUnit); + result = + strategy.execute( + (Project) op, + operators, + dataArea, + pair.k, + isDummyStorageUnit, + task.getContext()); break; case Insert: result = pair.k.executeInsert((Insert) op, dataArea); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/AggPushDownStrategy.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/AggPushDownStrategy.java new file mode 100644 index 0000000000..22f9451c91 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/AggPushDownStrategy.java @@ -0,0 +1,47 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy; + +import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; +import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; +import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; +import cn.edu.tsinghua.iginx.engine.shared.RequestContext; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; +import cn.edu.tsinghua.iginx.engine.shared.operator.Project; +import java.util.List; + +public class AggPushDownStrategy extends PushDownStrategy { + @Override + public TaskExecuteResult execute( + Project project, + List operators, + DataArea dataArea, + IStorage storage, + boolean isDummyStorageUnit, + RequestContext requestContext) { + TaskExecuteResult result = + isDummyStorageUnit + ? storage.executeProjectDummyWithAgg(project, operators.get(1), dataArea) + : storage.executeProjectWithAgg(project, operators.get(1), dataArea); + + List remainingOperators = getRemainingOperators(operators, 2); + return executeRemainingOperators(remainingOperators, result, requestContext); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/AggSelectPushDownStrategy.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/AggSelectPushDownStrategy.java new file mode 100644 index 0000000000..3c2443c2a3 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/AggSelectPushDownStrategy.java @@ -0,0 +1,50 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy; + +import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; +import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; +import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; +import cn.edu.tsinghua.iginx.engine.shared.RequestContext; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; +import cn.edu.tsinghua.iginx.engine.shared.operator.Project; +import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import java.util.List; + +public class AggSelectPushDownStrategy extends PushDownStrategy { + @Override + public TaskExecuteResult execute( + Project project, + List operators, + DataArea dataArea, + IStorage storage, + boolean isDummyStorageUnit, + RequestContext requestContext) { + TaskExecuteResult result = + isDummyStorageUnit + ? storage.executeProjectDummyWithAggSelect( + project, (Select) operators.get(1), operators.get(2), dataArea) + : storage.executeProjectWithAggSelect( + project, (Select) operators.get(1), operators.get(2), dataArea); + + List remainingOperators = getRemainingOperators(operators, 3); + return executeRemainingOperators(remainingOperators, result, requestContext); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/NoPushDownStrategy.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/NoPushDownStrategy.java new file mode 100644 index 0000000000..486f019c41 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/NoPushDownStrategy.java @@ -0,0 +1,48 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy; + +import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; +import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; +import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; +import cn.edu.tsinghua.iginx.engine.shared.RequestContext; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; +import cn.edu.tsinghua.iginx.engine.shared.operator.Project; +import java.util.List; + +public class NoPushDownStrategy extends PushDownStrategy { + + @Override + public TaskExecuteResult execute( + Project project, + List operators, + DataArea dataArea, + IStorage storage, + boolean isDummyStorageUnit, + RequestContext requestContext) { + TaskExecuteResult result = + isDummyStorageUnit + ? storage.executeProjectDummy(project, dataArea) + : storage.executeProject(project, dataArea); + + List remainingOperators = getRemainingOperators(operators, 1); + return executeRemainingOperators(remainingOperators, result, requestContext); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownStrategy.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownStrategy.java new file mode 100644 index 0000000000..25d57693b0 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownStrategy.java @@ -0,0 +1,76 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy; + +import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; +import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutor; +import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutorFactory; +import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; +import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; +import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; +import cn.edu.tsinghua.iginx.engine.shared.RequestContext; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; +import cn.edu.tsinghua.iginx.engine.shared.operator.Project; +import cn.edu.tsinghua.iginx.engine.shared.operator.UnaryOperator; +import java.util.Collections; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class PushDownStrategy { + Logger logger = LoggerFactory.getLogger(PushDownStrategy.class); + + public abstract TaskExecuteResult execute( + Project project, + List operators, + DataArea dataArea, + IStorage storage, + boolean isDummyStorageUnit, + RequestContext context); + + protected TaskExecuteResult executeRemainingOperators( + List remainingOperators, TaskExecuteResult result, RequestContext context) { + OperatorMemoryExecutor executor = + OperatorMemoryExecutorFactory.getInstance().getMemoryExecutor(); + + if (result.getException() == null) { + for (Operator operator : remainingOperators) { + try { + result = + new TaskExecuteResult( + executor.executeUnaryOperator( + (UnaryOperator) operator, result.getRowStream(), context)); + } catch (Exception e) { + logger.error("Execute remaining operator error: ", e); + result = new TaskExecuteResult(new PhysicalException(e)); + } + } + } + + return result; + } + + protected static List getRemainingOperators(List operators, int startIndex) { + if (operators.size() <= startIndex) { + return Collections.emptyList(); + } + return operators.subList(startIndex, operators.size()); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownStrategyFactory.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownStrategyFactory.java new file mode 100644 index 0000000000..af196f4914 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownStrategyFactory.java @@ -0,0 +1,69 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy; + +import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; +import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; +import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType; +import java.util.List; + +public class PushDownStrategyFactory { + private static PushDownType determinePushDownType( + List operators, IStorage storage, DataArea dataArea, boolean isDummyStorageUnit) { + if (operators.size() >= 2) { + OperatorType secondOpType = operators.get(1).getType(); + if (storage.isSupportProjectWithSelect() && secondOpType == OperatorType.Select) { + return PushDownType.SelectPushDown; + } + if ((secondOpType == OperatorType.GroupBy || secondOpType == OperatorType.SetTransform) + && storage.isSupportProjectWithAgg(operators.get(1), dataArea, isDummyStorageUnit)) { + return PushDownType.AggPushDown; + } + } + if (operators.size() >= 3) { + OperatorType thirdOpType = operators.get(2).getType(); + if (operators.get(1).getType() == OperatorType.Select + && (thirdOpType == OperatorType.GroupBy || thirdOpType == OperatorType.SetTransform) + && storage.isSupportProjectWithAggSelect( + operators.get(2), (Select) operators.get(1), dataArea, isDummyStorageUnit)) { + return PushDownType.AggSelectPushDown; + } + } + return PushDownType.NoPushDown; + } + + public static PushDownStrategy getStrategy( + List operators, IStorage storage, DataArea dataArea, boolean isDummyStorageUnit) { + PushDownType pushDownType = + determinePushDownType(operators, storage, dataArea, isDummyStorageUnit); + switch (pushDownType) { + case SelectPushDown: + return new SelectPushDownStrategy(); + case AggPushDown: + return new AggPushDownStrategy(); + case AggSelectPushDown: + return new AggSelectPushDownStrategy(); + default: + return new NoPushDownStrategy(); + } + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownType.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownType.java new file mode 100644 index 0000000000..24507b0b52 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/PushDownType.java @@ -0,0 +1,27 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy; + +public enum PushDownType { + SelectPushDown, + AggPushDown, + AggSelectPushDown, + NoPushDown +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/SelectPushDownStrategy.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/SelectPushDownStrategy.java new file mode 100644 index 0000000000..5a82544834 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/pushdown/strategy/SelectPushDownStrategy.java @@ -0,0 +1,49 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy; + +import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; +import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; +import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; +import cn.edu.tsinghua.iginx.engine.shared.RequestContext; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; +import cn.edu.tsinghua.iginx.engine.shared.operator.Project; +import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import java.util.List; + +public class SelectPushDownStrategy extends PushDownStrategy { + + @Override + public TaskExecuteResult execute( + Project project, + List operators, + DataArea dataArea, + IStorage storage, + boolean isDummyStorageUnit, + RequestContext requestContext) { + TaskExecuteResult result = + isDummyStorageUnit + ? storage.executeProjectDummyWithSelect(project, (Select) operators.get(1), dataArea) + : storage.executeProjectWithSelect(project, (Select) operators.get(1), dataArea); + + List remainingOperators = getRemainingOperators(operators, 2); + return executeRemainingOperators(remainingOperators, result, requestContext); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/data/read/Field.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/data/read/Field.java index 310516f869..9d01ff4861 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/data/read/Field.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/data/read/Field.java @@ -52,7 +52,7 @@ public Field(String name, String fullName, DataType type) { this(name, fullName, type, Collections.emptyMap()); } - private Field(String name, String fullName, DataType type, Map tags) { + public Field(String name, String fullName, DataType type, Map tags) { this.name = Objects.requireNonNull(name); this.fullName = Objects.requireNonNull(fullName); this.type = Objects.requireNonNull(type); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/BinaryExpression.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/BinaryExpression.java index dad19db3df..96353099f4 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/BinaryExpression.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/BinaryExpression.java @@ -71,6 +71,15 @@ public String getColumnName() { + rightExpression.getColumnName(); } + @Override + public String getCalColumnName() { + return leftExpression.getCalColumnName() + + " " + + Operator.operatorToCalString(op) + + " " + + rightExpression.getCalColumnName(); + } + @Override public ExpressionType getType() { return ExpressionType.Binary; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/BracketExpression.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/BracketExpression.java index 65c102f0d7..d77de9f7d8 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/BracketExpression.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/BracketExpression.java @@ -46,6 +46,11 @@ public String getColumnName() { return "(" + expression.getColumnName() + ")"; } + @Override + public String getCalColumnName() { + return "(" + expression.getCalColumnName() + ")"; + } + @Override public ExpressionType getType() { return ExpressionType.Bracket; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/Expression.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/Expression.java index 50c762c5ef..499cec0cfa 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/Expression.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/Expression.java @@ -23,6 +23,10 @@ public interface Expression { String getColumnName(); + default String getCalColumnName() { + return getColumnName(); + } + ExpressionType getType(); boolean hasAlias(); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/MultipleExpression.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/MultipleExpression.java index 579c881695..37bfd92b93 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/MultipleExpression.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/MultipleExpression.java @@ -86,6 +86,23 @@ public String getColumnName() { return sb.toString(); } + @Override + public String getCalColumnName() { + StringBuilder sb = new StringBuilder(); + if (ops.get(0) == Operator.MINUS) { + sb.append("-"); + } + for (int i = 0; i < children.size(); i++) { + sb.append(children.get(i).getColumnName()); + if (i + 1 < ops.size()) { + sb.append(" "); + sb.append(Operator.operatorToCalString(ops.get(i + 1))); + sb.append(" "); + } + } + return sb.toString(); + } + @Override public ExpressionType getType() { return ExpressionType.Multiple; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/Operator.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/Operator.java index 5a296cc826..2540e32dbc 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/Operator.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/Operator.java @@ -22,8 +22,8 @@ public enum Operator { PLUS, MINUS, - STAR, - DIV, + STAR, // 乘法展示, × + DIV, // 除法展示, ÷ MOD; public static String operatorToString(Operator operator) { @@ -43,6 +43,17 @@ public static String operatorToString(Operator operator) { } } + public static String operatorToCalString(Operator operator) { + switch (operator) { + case STAR: + return "*"; + case DIV: + return "/"; + default: + return operatorToString(operator); + } + } + /** 判断两个op是否有相同的优先级 */ public static boolean hasSamePriority(Operator op1, Operator op2) { return ((op1 == PLUS || op1 == MINUS) && (op2 == PLUS || op2 == MINUS)) diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/UnaryExpression.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/UnaryExpression.java index 9fc8699386..d57a713441 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/UnaryExpression.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/expr/UnaryExpression.java @@ -52,6 +52,11 @@ public String getColumnName() { return Operator.operatorToString(operator) + " " + expression.getColumnName(); } + @Override + public String getCalColumnName() { + return Operator.operatorToCalString(operator) + " " + expression.getColumnName(); + } + @Override public ExpressionType getType() { return ExpressionType.Unary; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/FunctionParams.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/FunctionParams.java index 3513b30e69..0c2837b9a3 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/FunctionParams.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/FunctionParams.java @@ -30,7 +30,7 @@ public class FunctionParams { - private final List paths; + private List paths; private final List expressions; @@ -75,12 +75,17 @@ public Expression getExpression(int i) { public void setExpression(int i, Expression expression) { expressions.set(i, expression); + paths.set(i, expression.getColumnName()); } public List getPaths() { return paths; } + public void updatePaths() { + this.paths = expressions.stream().map(Expression::getColumnName).collect(Collectors.toList()); + } + public List getArgs() { return args; } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/GroupBy.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/GroupBy.java index b1e7840424..075ec234bf 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/GroupBy.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/GroupBy.java @@ -56,6 +56,17 @@ public List getGroupByCols() { return groupByCols; } + public void setGroupByExpression(int i, Expression expression) { + groupByExpressions.set(i, expression); + groupByCols.set(i, expression.getColumnName()); + } + + public void updateGroupByCols() { + groupByCols.clear(); + groupByCols.addAll( + groupByExpressions.stream().map(Expression::getColumnName).collect(Collectors.toList())); + } + public List getFunctionCallList() { return functionCallList; } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/Sort.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/Sort.java index 7243e39a23..d466265579 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/Sort.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/Sort.java @@ -43,7 +43,10 @@ public Sort(Source source, List sortByExpressions, List so if (sortTypes == null || sortTypes.isEmpty()) { throw new IllegalArgumentException("sortType shouldn't be null"); } - this.sortByExpressions = sortByExpressions; + this.sortByExpressions = new ArrayList<>(); + for (Expression expression : sortByExpressions) { + this.sortByExpressions.add(ExprUtils.copy(expression)); + } this.sortByCols = sortByExpressions.stream().map(Expression::getColumnName).collect(Collectors.toList()); this.sortTypes = sortTypes; diff --git a/dataSource/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java b/dataSource/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java index 882c5131e4..ea11ac4d08 100644 --- a/dataSource/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java +++ b/dataSource/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java @@ -39,6 +39,8 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.filter.BoolFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; +import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType; +import cn.edu.tsinghua.iginx.engine.shared.source.OperatorSource; import cn.edu.tsinghua.iginx.filesystem.common.AbstractConfig; import cn.edu.tsinghua.iginx.filesystem.common.Configs; import cn.edu.tsinghua.iginx.filesystem.common.FileSystemException; @@ -228,11 +230,39 @@ public TaskExecuteResult executeProjectDummyWithSelect( } @Override - public boolean isSupportProjectWithSetTransform(SetTransform setTransform, DataArea dataArea) { + public TaskExecuteResult executeProjectWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectWithAgg(Project project, Operator agg, DataArea dataArea) { + DataArea reshapedDataArea = + new DataArea(dataArea.getStorageUnit(), KeyInterval.getDefaultKeyInterval()); + + return executeQuery( + unitOf(dataArea), getDataTargetOf(project, reshapedDataArea), AggregateType.COUNT); + } + + @Override + public boolean isSupportProjectWithAgg(Operator agg, DataArea dataArea, boolean isDummy) { if (!isLegacyParquet) { return false; } + if (isDummy) return false; + if (agg.getType() != OperatorType.SetTransform) return false; + if (((OperatorSource) ((UnaryOperator) agg).getSource()).getOperator().getType() + == OperatorType.Select) return false; + + SetTransform setTransform = (SetTransform) agg; + // just push down in full column fragment KeyInterval keyInterval = dataArea.getKeyInterval(); if (keyInterval.getStartKey() > 0 || keyInterval.getEndKey() < Long.MAX_VALUE) { @@ -261,17 +291,9 @@ public boolean isSupportProjectWithSetTransform(SetTransform setTransform, DataA } @Override - public TaskExecuteResult executeProjectWithSetTransform( - Project project, SetTransform setTransform, DataArea dataArea) { - if (!isSupportProjectWithSetTransform(setTransform, dataArea)) { - throw new IllegalArgumentException("unsupported set transform"); - } - - DataArea reshapedDataArea = - new DataArea(dataArea.getStorageUnit(), KeyInterval.getDefaultKeyInterval()); - - return executeQuery( - unitOf(dataArea), getDataTargetOf(project, reshapedDataArea), AggregateType.COUNT); + public TaskExecuteResult executeProjectDummyWithAgg( + Project project, Operator agg, DataArea dataArea) { + return null; } @Override diff --git a/dataSource/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java b/dataSource/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java index 64f1d12101..c1b8df8880 100644 --- a/dataSource/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java +++ b/dataSource/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java @@ -37,10 +37,8 @@ import cn.edu.tsinghua.iginx.engine.shared.data.write.DataView; import cn.edu.tsinghua.iginx.engine.shared.data.write.RowDataView; import cn.edu.tsinghua.iginx.engine.shared.expr.*; -import cn.edu.tsinghua.iginx.engine.shared.operator.Delete; -import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; -import cn.edu.tsinghua.iginx.engine.shared.operator.Project; -import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.operator.*; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.*; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilterType; @@ -471,6 +469,29 @@ public TaskExecuteResult executeProjectDummyWithSelect( return new TaskExecuteResult(rowStream); } + @Override + public TaskExecuteResult executeProjectWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectWithAgg(Project project, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAgg( + Project project, Operator agg, DataArea dataArea) { + return null; + } + @Override public TaskExecuteResult executeProject(Project project, DataArea dataArea) { String storageUnit = dataArea.getStorageUnit(); diff --git a/dataSource/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java b/dataSource/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java index 5bd8a0b21c..fc384b6770 100644 --- a/dataSource/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java +++ b/dataSource/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java @@ -36,10 +36,7 @@ import cn.edu.tsinghua.iginx.engine.shared.data.write.ColumnDataView; import cn.edu.tsinghua.iginx.engine.shared.data.write.DataView; import cn.edu.tsinghua.iginx.engine.shared.data.write.RowDataView; -import cn.edu.tsinghua.iginx.engine.shared.operator.Delete; -import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; -import cn.edu.tsinghua.iginx.engine.shared.operator.Project; -import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.operator.*; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.*; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.iotdb.exception.IoTDBException; @@ -394,6 +391,29 @@ public TaskExecuteResult executeProjectDummyWithSelect( return executeProjectDummyWithFilter(project, select.getFilter()); } + @Override + public TaskExecuteResult executeProjectWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectWithAgg(Project project, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAgg( + Project project, Operator agg, DataArea dataArea) { + return null; + } + private TaskExecuteResult executeProjectDummyWithFilter(Project project, Filter filter) { try { StringBuilder builder = new StringBuilder(); diff --git a/dataSource/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java b/dataSource/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java index 5f1fe9fcf8..fd3c0c4b04 100644 --- a/dataSource/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java +++ b/dataSource/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java @@ -31,10 +31,7 @@ import cn.edu.tsinghua.iginx.engine.shared.data.read.FilterRowStreamWrapper; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; import cn.edu.tsinghua.iginx.engine.shared.data.write.DataView; -import cn.edu.tsinghua.iginx.engine.shared.operator.Delete; -import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; -import cn.edu.tsinghua.iginx.engine.shared.operator.Project; -import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.operator.*; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.AndFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter; @@ -184,6 +181,29 @@ public TaskExecuteResult executeProjectDummyWithSelect( return queryDummy(project, area, select.getFilter()); } + @Override + public TaskExecuteResult executeProjectWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectWithAgg(Project project, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAgg( + Project project, Operator agg, DataArea dataArea) { + return null; + } + private TaskExecuteResult queryDummy(Project project, DataArea area, Filter filter) { KeyInterval range = area.getKeyInterval(); List patterns = project.getPatterns(); diff --git a/dataSource/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java b/dataSource/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java index 366cf4bbd1..fb46d88217 100644 --- a/dataSource/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java +++ b/dataSource/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java @@ -27,10 +27,7 @@ import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; import cn.edu.tsinghua.iginx.engine.shared.KeyRange; -import cn.edu.tsinghua.iginx.engine.shared.operator.Delete; -import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; -import cn.edu.tsinghua.iginx.engine.shared.operator.Project; -import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.operator.*; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval; @@ -277,6 +274,29 @@ public TaskExecuteResult executeProjectDummyWithSelect( return new TaskExecuteResult(new RedisQueryRowStream(columns, filter), null); } + @Override + public TaskExecuteResult executeProjectWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectWithAgg(Project project, Operator agg, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAgg( + Project project, Operator agg, DataArea dataArea) { + return null; + } + @Override public TaskExecuteResult executeProject(Project project, DataArea dataArea) { String storageUnit = dataArea.getStorageUnit(); diff --git a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java index 38d4d73a43..d95f21d6b5 100644 --- a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java +++ b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java @@ -25,26 +25,33 @@ import static cn.edu.tsinghua.iginx.relational.tools.TagKVUtils.toFullName; import cn.edu.tsinghua.iginx.engine.logical.utils.LogicalFilterUtils; +import cn.edu.tsinghua.iginx.engine.logical.utils.OperatorUtils; import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.exception.StorageInitializationException; +import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.ExprUtils; +import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils; import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; import cn.edu.tsinghua.iginx.engine.shared.KeyRange; +import cn.edu.tsinghua.iginx.engine.shared.data.Value; import cn.edu.tsinghua.iginx.engine.shared.data.read.ClearEmptyRowStreamWrapper; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; import cn.edu.tsinghua.iginx.engine.shared.data.write.BitmapView; import cn.edu.tsinghua.iginx.engine.shared.data.write.ColumnDataView; import cn.edu.tsinghua.iginx.engine.shared.data.write.DataView; import cn.edu.tsinghua.iginx.engine.shared.data.write.RowDataView; -import cn.edu.tsinghua.iginx.engine.shared.operator.Delete; -import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; -import cn.edu.tsinghua.iginx.engine.shared.operator.Project; -import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.expr.*; +import cn.edu.tsinghua.iginx.engine.shared.function.FunctionCall; +import cn.edu.tsinghua.iginx.engine.shared.function.system.*; +import cn.edu.tsinghua.iginx.engine.shared.operator.*; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.*; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; +import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType; +import cn.edu.tsinghua.iginx.engine.shared.source.OperatorSource; import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval; import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.metadata.entity.StorageEngineMeta; @@ -55,6 +62,7 @@ import cn.edu.tsinghua.iginx.relational.query.entity.RelationQueryRowStream; import cn.edu.tsinghua.iginx.relational.tools.ColumnField; import cn.edu.tsinghua.iginx.relational.tools.FilterTransformer; +import cn.edu.tsinghua.iginx.relational.tools.QuoteBaseExpressionDecorator; import cn.edu.tsinghua.iginx.relational.tools.RelationSchema; import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; @@ -96,6 +104,9 @@ public class RelationalStorage implements IStorage { private final FilterTransformer filterTransformer; + private static final Set SUPPORTED_AGGREGATE_FUNCTIONS = + new HashSet<>(Arrays.asList(Count.COUNT, Sum.SUM, Avg.AVG, Max.MAX, Min.MIN)); + private Connection getConnection(String databaseName) { if (databaseName.startsWith("dummy")) { return null; @@ -518,6 +529,173 @@ public TaskExecuteResult executeProjectWithSelect( return executeProjectWithFilter(project, select.getFilter(), dataArea); } + /** 获取ProjectWithFilter中将所有table join到一起进行查询的SQL语句 */ + private String getProjectWithFilterSQL( + Filter filter, Map tableNameToColumnNames, boolean isAgg) { + List tableNames = new ArrayList<>(); + List> fullColumnNamesList = new ArrayList<>(); + List> fullColumnNamesListForExpandFilter = new ArrayList<>(); + String firstTable = ""; + char quote = relationalMeta.getQuote(); + for (Map.Entry entry : tableNameToColumnNames.entrySet()) { + String tableName = entry.getKey(); + if (firstTable.isEmpty()) { + firstTable = tableName; + } + tableNames.add(tableName); + List fullColumnNames = new ArrayList<>(Arrays.asList(entry.getValue().split(", "))); + + // 将columnNames中的列名加上tableName前缀 + if (isAgg) { + fullColumnNamesList.add( + fullColumnNames.stream() + .map( + s -> + RelationSchema.getQuoteFullName(tableName, s, quote) + + " AS " + + quote + + RelationSchema.getFullName(tableName, s) + + quote) + .collect(Collectors.toList())); + } else { + fullColumnNamesList.add( + fullColumnNames.stream() + .map(s -> RelationSchema.getQuoteFullName(tableName, s, quote)) + .collect(Collectors.toList())); + } + fullColumnNamesListForExpandFilter.add( + fullColumnNames.stream() + .map(s -> RelationSchema.getFullName(tableName, s)) + .collect(Collectors.toList())); + fullColumnNamesListForExpandFilter + .get(fullColumnNamesListForExpandFilter.size() - 1) + .add(RelationSchema.getFullName(tableName, KEY_NAME)); + } + + StringBuilder fullColumnNames = new StringBuilder(); + fullColumnNames.append( + RelationSchema.getQuoteFullName(firstTable, KEY_NAME, relationalMeta.getQuote())); + for (List columnNames : fullColumnNamesList) { + for (String columnName : columnNames) { + fullColumnNames.append(", ").append(columnName); + } + } + + // 将Filter中的keyFilter替换成带tablename的value filter + keyFilterAddTableName(filter, firstTable); + + // 将所有表进行full join + String fullTableName = getFullJoinTables(tableNames, fullColumnNamesList); + + // 对通配符做处理,将通配符替换成对应的列名 + if (filterTransformer.toString(filter).contains("*")) { + // 把fullColumnNamesList中的列名全部用removeFullColumnNameQuote去掉引号 + fullColumnNamesList.replaceAll( + columnNames -> { + List newColumnNames = new ArrayList<>(); + for (String columnName : columnNames) { + newColumnNames.add(removeFullColumnNameQuote(columnName)); + } + return newColumnNames; + }); + filter = expandFilter(filter, fullColumnNamesListForExpandFilter); + filter = LogicalFilterUtils.mergeTrue(filter); + } + + String fullColumnNamesStr = fullColumnNames.toString(); + String filterStr = filterTransformer.toString(filter); + String orderByKey = + RelationSchema.getQuoteFullName(tableNames.get(0), KEY_NAME, relationalMeta.getQuote()); + if (!relationalMeta.isSupportFullJoin()) { + // 如果不支持full join,需要为left join + union模拟的full join表起别名,同时select、where、order by的部分都要调整 + fullColumnNamesStr = fullColumnNamesStr.replaceAll("`\\.`", "."); + filterStr = filterStr.replaceAll("`\\.`", "."); + filterStr = + filterStr.replace( + getQuotName(KEY_NAME), getQuotName(tableNames.get(0) + SEPARATOR + KEY_NAME)); + orderByKey = orderByKey.replaceAll("`\\.`", "."); + } + return String.format( + QUERY_STATEMENT_WITHOUT_KEYNAME, + fullColumnNamesStr, + fullTableName, + filterStr.isEmpty() ? "" : "WHERE " + filterStr, + orderByKey); + } + + private String getProjectDummyWithSQL( + Filter filter, String databaseName, Map tableNameToColumnNames) + throws SQLException { + List tableNames = new ArrayList<>(); + List> fullColumnNamesList = new ArrayList<>(); + List> fullQuoteColumnNamesList = new ArrayList<>(); + + // 这里获取所有table的所有列名,用于concat时生成key列。 + Map> allColumnNameForTable = + getAllColumnNameForTable(databaseName, tableNameToColumnNames); + + // 将columnNames中的列名加上tableName前缀,带JOIN的查询语句中需要用到 + for (Map.Entry entry : tableNameToColumnNames.entrySet()) { + String tableName = entry.getKey(); + tableNames.add(tableName); + List columnNames = new ArrayList<>(Arrays.asList(entry.getValue().split(", "))); + columnNames.replaceAll(s -> RelationSchema.getFullName(tableName, s)); + fullColumnNamesList.add(columnNames); + + List fullColumnNames = new ArrayList<>(Arrays.asList(entry.getValue().split(", "))); + fullColumnNames.replaceAll( + s -> RelationSchema.getQuoteFullName(tableName, s, relationalMeta.getQuote())); + fullQuoteColumnNamesList.add(fullColumnNames); + } + + String fullTableName = getDummyFullJoinTables(tableNames, allColumnNameForTable); + + Filter copyFilter = + dummyFilterSetTrueByColumnNames( + cutFilterDatabaseNameForDummy(filter.copy(), databaseName), + fullColumnNamesList.stream().flatMap(List::stream).collect(Collectors.toList())); + + // 对通配符做处理,将通配符替换成对应的列名 + if (filterTransformer.toString(copyFilter).contains("*")) { + // 把fullColumnNamesList中的列名全部用removeFullColumnNameQuote去掉引号 + fullColumnNamesList.replaceAll( + columnNames -> { + List newColumnNames = new ArrayList<>(); + for (String columnName : columnNames) { + newColumnNames.add(removeFullColumnNameQuote(columnName)); + } + return newColumnNames; + }); + copyFilter = expandFilter(copyFilter, fullColumnNamesList); + copyFilter = LogicalFilterUtils.mergeTrue(copyFilter); + } + + String filterStr = filterTransformer.toString(copyFilter); + String orderByKey = + buildConcat( + fullQuoteColumnNamesList.stream().flatMap(List::stream).collect(Collectors.toList())); + if (!relationalMeta.isSupportFullJoin()) { + // 如果不支持full join,需要为left join + union模拟的full join表起别名,同时select、where、order by的部分都要调整 + char quote = relationalMeta.getQuote(); + fullQuoteColumnNamesList.forEach( + columnNames -> columnNames.replaceAll(s -> s.replaceAll(quote + "\\." + quote, "."))); + filterStr = filterStr.replaceAll("`\\.`", "."); + filterStr = + filterStr.replace( + getQuotName(KEY_NAME), getQuotName(tableNames.get(0) + SEPARATOR + KEY_NAME)); + orderByKey = getQuotName(tableNames.get(0) + SEPARATOR + KEY_NAME); + } + + return String.format( + relationalMeta.getConcatQueryStatement(), + buildConcat( + fullQuoteColumnNamesList.stream().flatMap(List::stream).collect(Collectors.toList())), + fullQuoteColumnNamesList.stream().flatMap(List::stream).collect(Collectors.joining(", ")), + fullTableName, + filterStr.isEmpty() ? "" : "WHERE " + filterStr, + orderByKey); + } + private TaskExecuteResult executeProjectWithFilter( Project project, Filter filter, DataArea dataArea) { try { @@ -540,7 +718,7 @@ private TaskExecuteResult executeProjectWithFilter( String statement; // 如果table>1的情况下存在Value或Path Filter,说明filter的匹配需要跨table,此时需要将所有table join到一起进行查询 - if (!filter.toString().contains("*") + if (FilterUtils.getAllPathsFromFilter(filter).stream().noneMatch(s -> s.contains("*")) && !(tableNameToColumnNames.size() > 1 && filterContainsType(Arrays.asList(FilterType.Value, FilterType.Path), filter))) { for (Map.Entry entry : tableNameToColumnNames.entrySet()) { @@ -571,53 +749,7 @@ && filterContainsType(Arrays.asList(FilterType.Value, FilterType.Path), filter)) } // table中带有了通配符,将所有table都join到一起进行查询,以便输入filter. else if (!tableNameToColumnNames.isEmpty()) { - List tableNames = new ArrayList<>(); - List> fullColumnNamesList = new ArrayList<>(); - for (Map.Entry entry : tableNameToColumnNames.entrySet()) { - String tableName = entry.getKey(); - tableNames.add(tableName); - List fullColumnNames = - new ArrayList<>(Arrays.asList(entry.getValue().split(", "))); - - // 将columnNames中的列名加上tableName前缀 - fullColumnNames.replaceAll( - s -> RelationSchema.getQuoteFullName(tableName, s, relationalMeta.getQuote())); - fullColumnNamesList.add(fullColumnNames); - } - - StringBuilder fullColumnNames = new StringBuilder(); - fullColumnNames.append( - RelationSchema.getQuoteFullName( - tableNames.get(0), KEY_NAME, relationalMeta.getQuote())); - for (List columnNames : fullColumnNamesList) { - for (String columnName : columnNames) { - fullColumnNames.append(", ").append(columnName); - } - } - - // 将所有表进行full join - String fullTableName = getFullJoinTables(tableNames, fullColumnNamesList); - - String fullColumnNamesStr = fullColumnNames.toString(); - String filterStr = filterTransformer.toString(expandFilter); - String orderByKey = - RelationSchema.getQuoteFullName(tableNames.get(0), KEY_NAME, relationalMeta.getQuote()); - if (!relationalMeta.isSupportFullJoin()) { - // 如果不支持full join,需要为left join + union模拟的full join表起别名,同时select、where、order by的部分都要调整 - fullColumnNamesStr = fullColumnNamesStr.replaceAll("`\\.`", "."); - filterStr = filterStr.replaceAll("`\\.`", "."); - filterStr = - filterStr.replace( - getQuotName(KEY_NAME), getQuotName(tableNames.get(0) + SEPARATOR + KEY_NAME)); - orderByKey = orderByKey.replaceAll("`\\.`", "."); - } - statement = - String.format( - QUERY_STATEMENT_WITHOUT_KEYNAME, - fullColumnNamesStr, - fullTableName, - filterStr.isEmpty() ? "" : "WHERE " + filterStr, - orderByKey); + statement = getProjectWithFilterSQL(filter.copy(), tableNameToColumnNames, false); ResultSet rs = null; try { @@ -682,12 +814,14 @@ private String getFullJoinTables(List tableNames, List> ful } else { // 不支持全连接,就要用Left Join+Union来模拟全连接 StringBuilder allColumns = new StringBuilder(); - fullColumnList.forEach( - columnList -> - columnList.forEach( - column -> - allColumns.append( - String.format("%s AS %s, ", column, column.replaceAll("`\\.`", "."))))); + for (List columnList : fullColumnList) { + for (String column : columnList) { + if (!column.contains(" AS ")) { + column = String.format("%s AS %s", column, column.replaceAll("`\\.`", ".")); + } + allColumns.append(column).append(", "); + } + } allColumns.delete(allColumns.length() - 2, allColumns.length()); fullTableName.append("("); @@ -1085,6 +1219,515 @@ public TaskExecuteResult executeProjectDummyWithSelect( return executeProjectDummyWithFilter(project, filter); } + @Override + public boolean isSupportProjectWithAgg(Operator agg, DataArea dataArea, boolean isDummy) { + if (agg.getType() != OperatorType.GroupBy && agg.getType() != OperatorType.SetTransform) { + return false; + } + List functionCalls = OperatorUtils.getFunctionCallList(agg); + for (FunctionCall functionCall : functionCalls) { + if (!SUPPORTED_AGGREGATE_FUNCTIONS.contains(functionCall.getFunction().getIdentifier())) { + return false; + } + if (functionCall.getParams().isDistinct()) return false; + } + + if (isDummy) { + List patterns = new ArrayList<>(); + for (FunctionCall fc : functionCalls) { + patterns.addAll(fc.getParams().getPaths()); + } + patterns = patterns.stream().distinct().collect(Collectors.toList()); + try { + Map> splitResults = splitAndMergeHistoryQueryPatterns(patterns); + if (splitResults.size() != 1) { + return false; + } + } catch (SQLException e) { + return false; + } + } + List exprList = new ArrayList<>(); + functionCalls.forEach(fc -> exprList.addAll(fc.getParams().getExpressions())); + // Group By Column和Function参数中不能带有函数,只能有四则运算表达式 + if (agg.getType() == OperatorType.GroupBy) { + List gbc = ((GroupBy) agg).getGroupByExpressions(); + exprList.addAll(gbc); + } + + for (Expression expr : exprList) { + final boolean[] isValid = {true}; + expr.accept( + new ExpressionVisitor() { + @Override + public void visit(BaseExpression expression) { + if (expression.getColumnName().contains("*")) { + isValid[0] = false; + } + } + + @Override + public void visit(BinaryExpression expression) {} + + @Override + public void visit(BracketExpression expression) {} + + @Override + public void visit(ConstantExpression expression) {} + + @Override + public void visit(FromValueExpression expression) { + isValid[0] = false; + } + + @Override + public void visit(FuncExpression expression) { + isValid[0] = false; + } + + @Override + public void visit(MultipleExpression expression) {} + + @Override + public void visit(UnaryExpression expression) {} + + @Override + public void visit(CaseWhenExpression expression) { + isValid[0] = false; + } + + @Override + public void visit(KeyExpression expression) {} + + @Override + public void visit(SequenceExpression expression) { + isValid[0] = false; + } + }); + + if (!isValid[0]) { + return false; + } + } + + return true; + } + + @Override + public boolean isSupportProjectWithAggSelect( + Operator agg, Select select, DataArea dataArea, boolean isDummy) { + return isSupportProjectWithAgg(agg, dataArea, isDummy) && isSupportProjectWithSelect(); + } + + @Override + public TaskExecuteResult executeProjectWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + List functionCalls = OperatorUtils.getFunctionCallList(agg); + List gbc = new ArrayList<>(); + if (agg.getType() == OperatorType.GroupBy) { + gbc = ((GroupBy) agg).getGroupByExpressions(); + } + try { + String databaseName = dataArea.getStorageUnit(); + Connection conn = getConnection(databaseName); + if (conn == null) { + return new TaskExecuteResult( + new RelationalTaskExecuteFailureException( + String.format("cannot connect to database %s", databaseName))); + } + Map tableNameToColumnNames = + splitAndMergeQueryPatterns(databaseName, project.getPatterns()); + + String statement = + getProjectWithFilterSQL(select.getFilter().copy(), tableNameToColumnNames, true); + statement = statement.substring(0, statement.length() - 1); // 去掉最后的分号 + Map fullName2Name = new HashMap<>(); + statement = + generateAggSql(functionCalls, gbc, statement, tableNameToColumnNames, fullName2Name); + + ResultSet rs = null; + try { + Statement stmt = conn.createStatement(); + rs = stmt.executeQuery(statement); + LOGGER.info("[Query] execute query: {}", statement); + } catch (SQLException e) { + LOGGER.error("meet error when executing query {}: ", statement, e); + } + + if (rs == null) { + return new TaskExecuteResult( + new RelationalTaskExecuteFailureException("execute query failure")); + } + + Map columnTypeMap = getSumDataType(functionCalls); + + RowStream rowStream = + new ClearEmptyRowStreamWrapper( + new RelationQueryRowStream( + Collections.singletonList(databaseName), + Collections.singletonList(rs), + false, + select.getFilter(), + project.getTagFilter(), + Collections.singletonList(conn), + relationalMeta, + columnTypeMap, + fullName2Name, + true)); + return new TaskExecuteResult(rowStream); + + } catch (SQLException | RelationalTaskExecuteFailureException e) { + LOGGER.error("unexpected error: ", e); + return new TaskExecuteResult( + new RelationalTaskExecuteFailureException( + String.format("execute project task in %s failure", engineName), e)); + } + } + + private Map getSumDataType(List functionCalls) + throws RelationalTaskExecuteFailureException { + // 如果下推的函数有sum,需要判断结果是小数还是整数 + List columns = null; + Map columnTypeMap = new HashMap<>(); + for (FunctionCall fc : functionCalls) { + if (fc.getFunction().getIdentifier().equalsIgnoreCase(Sum.SUM)) { + if (columns == null) { + columns = getColumns(null, null); + } + if (isSumResultDouble(fc.getParams().getExpression(0), columns)) { + columnTypeMap.put(fc.getFunctionStr(), DataType.DOUBLE); + } else { + columnTypeMap.put(fc.getFunctionStr(), DataType.LONG); + } + } + } + return columnTypeMap; + } + + private String generateAggSql( + List functionCalls, + List gbc, + String statement, + Map table2Column, + Map fullName2Name) { + char quote = relationalMeta.getQuote(); + List fullColumnNames = new ArrayList<>(); + for (Map.Entry entry : table2Column.entrySet()) { + String tableName = entry.getKey(); + List columnNames = new ArrayList<>(Arrays.asList(entry.getValue().split(", "))); + for (String columnName : columnNames) { + fullColumnNames.add(tableName + SEPARATOR + columnName); + } + } + // 在statement的基础上添加group by和函数内容 + // 这里的处理形式是生成一个形如 + // SELECT max(derived."a") AS "max(test.a)", sum(derived."b") AS "sum(test.b)", derived."c" AS + // "test.c" + // FROM (SELECT a, b, c, d FROM test) AS derived GROUP BY c; + // 的SQL语句 + // 几个注意的点,1. 嵌套子查询必须重命名,否则会报错,这里重命名为derived。 + // 2. 查询出来的结果也需要重命名来适配原始的列名,这里重命名为"max(test.a)" + StringBuilder sqlColumnsStr = new StringBuilder(); + for (FunctionCall functionCall : functionCalls) { + String functionName = functionCall.getFunction().getIdentifier(); + Expression param = functionCall.getParams().getExpressions().get(0); + + List expandExprs = expandExpression(param, fullColumnNames); + for (Expression expr : expandExprs) { + String IGinXTagKVName = + String.format( + "%s(%s)", functionName, exprToIGinX(ExprUtils.copy(expr)).getColumnName()); + fullName2Name.put(IGinXTagKVName, functionCall.getFunctionStr()); + + String format = "%s(%s)"; + // 如果是avg函数,且参数是base类型,在mysql下小数位数仅有5位,需要转换为decimal来补齐 + // 仅在mysql下这么做,pg也可以用,但会出现一些误差,例如3.200000和3.1999999的区别,测试不好通过 + if (functionName.equalsIgnoreCase(Avg.AVG) + && param.getType() == Expression.ExpressionType.Base + && engineName.equalsIgnoreCase("mysql")) { + format = "%s(CAST(%s AS DECIMAL(34, 16)))"; + } + + sqlColumnsStr.append( + String.format( + format, functionName, exprAdapt(ExprUtils.copy(expr)).getCalColumnName())); + sqlColumnsStr.append(" AS "); + sqlColumnsStr.append(quote).append(IGinXTagKVName).append(quote); + sqlColumnsStr.append(", "); + } + } + + for (Expression expr : gbc) { + String originColumnStr = quote + expr.getColumnName() + quote; + sqlColumnsStr + .append(exprAdapt(ExprUtils.copy(expr)).getCalColumnName()) + .append(" AS ") + .append(originColumnStr) + .append(", "); + } + sqlColumnsStr.delete(sqlColumnsStr.length() - 2, sqlColumnsStr.length()); + + statement = "SELECT " + sqlColumnsStr + " FROM (" + statement + ") AS derived"; + if (!gbc.isEmpty()) { + statement += + " GROUP BY " + + gbc.stream() + .map(e -> exprAdapt(ExprUtils.copy(e)).getCalColumnName()) + .collect(Collectors.joining(", ")); + } + statement += ";"; + return statement; + } + + /** + * 表达式适配下推到PG的形式 1.将baseExpression转换为QuoteBaseExpression,以让其在SQL中被引号包裹 + * 如果SQL使用了JOIN,那列名形如`table.column`,如果没有,则形如`table`.`column` + */ + private Expression exprAdapt(Expression expr) { + if (expr instanceof BaseExpression) { + return new QuoteBaseExpressionDecorator((BaseExpression) expr, relationalMeta.getQuote()); + } + expr.accept( + new ExpressionVisitor() { + @Override + public void visit(BaseExpression expression) {} + + @Override + public void visit(BinaryExpression expression) { + if (expression.getLeftExpression() instanceof BaseExpression) { + expression.setLeftExpression( + new QuoteBaseExpressionDecorator( + (BaseExpression) expression.getLeftExpression(), relationalMeta.getQuote())); + } + if (expression.getRightExpression() instanceof BaseExpression) { + expression.setRightExpression( + new QuoteBaseExpressionDecorator( + (BaseExpression) expression.getRightExpression(), relationalMeta.getQuote())); + } + } + + @Override + public void visit(BracketExpression expression) { + if (expression.getExpression() instanceof BaseExpression) { + expression.setExpression( + new QuoteBaseExpressionDecorator( + (BaseExpression) expression.getExpression(), relationalMeta.getQuote())); + } + } + + @Override + public void visit(ConstantExpression expression) {} + + @Override + public void visit(FromValueExpression expression) {} + + @Override + public void visit(FuncExpression expression) { + for (int i = 0; i < expression.getExpressions().size(); i++) { + if (expression.getExpressions().get(i) instanceof BaseExpression) { + expression + .getExpressions() + .set( + i, + new QuoteBaseExpressionDecorator( + (BaseExpression) expression.getExpressions().get(i), + relationalMeta.getQuote())); + } + } + } + + @Override + public void visit(MultipleExpression expression) { + for (int i = 0; i < expression.getChildren().size(); i++) { + if (expression.getChildren().get(i) instanceof BaseExpression) { + expression + .getChildren() + .set( + i, + new QuoteBaseExpressionDecorator( + (BaseExpression) expression.getChildren().get(i), + relationalMeta.getQuote())); + } + } + } + + @Override + public void visit(UnaryExpression expression) { + if (expression.getExpression() instanceof BaseExpression) { + expression.setExpression( + new QuoteBaseExpressionDecorator( + (BaseExpression) expression.getExpression(), relationalMeta.getQuote())); + } + } + + @Override + public void visit(CaseWhenExpression expression) {} + + @Override + public void visit(KeyExpression expression) {} + + @Override + public void visit(SequenceExpression expression) {} + }); + + return expr; + } + + /** 表达式修改成取回到IGinX的形式,TagKV的形式要是{t=v1, t2=v2} */ + private Expression exprToIGinX(Expression expr) { + expr.accept( + new ExpressionVisitor() { + @Override + public void visit(BaseExpression expression) { + String fullColumnName = expression.getColumnName(); + Pair> split = splitFullName(fullColumnName); + if (split.v == null || split.v.isEmpty()) { + return; + } + StringBuilder sb = new StringBuilder(); + sb.append(split.k); + sb.append("{"); + Map tagKV = split.v; + for (Map.Entry entry : tagKV.entrySet()) { + sb.append(entry.getKey()); + sb.append("="); + sb.append(entry.getValue()); + sb.append(","); + } + sb.delete(sb.length() - 1, sb.length()); + sb.append("}"); + expression.setPathName(sb.toString()); + } + + @Override + public void visit(BinaryExpression expression) {} + + @Override + public void visit(BracketExpression expression) {} + + @Override + public void visit(ConstantExpression expression) {} + + @Override + public void visit(FromValueExpression expression) {} + + @Override + public void visit(FuncExpression expression) {} + + @Override + public void visit(MultipleExpression expression) {} + + @Override + public void visit(UnaryExpression expression) {} + + @Override + public void visit(CaseWhenExpression expression) {} + + @Override + public void visit(KeyExpression expression) {} + + @Override + public void visit(SequenceExpression expression) {} + }); + return expr; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAggSelect( + Project project, Select select, Operator agg, DataArea dataArea) { + List connList = new ArrayList<>(); + Filter filter = select.getFilter(); + List functionCalls = OperatorUtils.getFunctionCallList(agg); + List gbc = new ArrayList<>(); + if (agg.getType() == OperatorType.GroupBy) { + gbc = ((GroupBy) agg).getGroupByExpressions(); + } + try { + List databaseNameList = new ArrayList<>(); + List resultSets = new ArrayList<>(); + ResultSet rs = null; + Connection conn; + Statement stmt; + String statement; + + // 如果下推的函数有sum,需要判断结果是小数还是整数 + Map columnTypeMap = getSumDataType(functionCalls); + + Map> splitResults = + splitAndMergeHistoryQueryPatterns(project.getPatterns()); + Map fullName2Name = new HashMap<>(); + for (Map.Entry> splitEntry : splitResults.entrySet()) { + Map tableNameToColumnNames = splitEntry.getValue(); + String databaseName = splitEntry.getKey(); + conn = getConnection(databaseName); + if (conn == null) { + continue; + } + connList.add(conn); + + // 如果table没有带通配符,那直接简单构建起查询语句即可 + statement = getProjectDummyWithSQL(filter, databaseName, tableNameToColumnNames); + gbc.forEach(this::cutExprDatabaseNameForDummy); + functionCalls.forEach( + fc -> fc.getParams().getExpressions().forEach(this::cutExprDatabaseNameForDummy)); + + statement = + generateAggSql(functionCalls, gbc, statement, tableNameToColumnNames, fullName2Name); + + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery(statement); + LOGGER.info("[Query] execute query: {}", statement); + } catch (SQLException e) { + LOGGER.error("meet error when executing query {}: ", statement, e); + } + if (rs != null) { + databaseNameList.add(databaseName); + resultSets.add(rs); + } + } + + RowStream rowStream = + new ClearEmptyRowStreamWrapper( + new RelationQueryRowStream( + databaseNameList, + resultSets, + true, + filter, + project.getTagFilter(), + connList, + relationalMeta, + columnTypeMap, + fullName2Name, + true)); + return new TaskExecuteResult(rowStream); + } catch (SQLException | RelationalTaskExecuteFailureException e) { + LOGGER.error("unexpected error: ", e); + return new TaskExecuteResult( + new RelationalTaskExecuteFailureException( + String.format("execute project task in %s failure", engineName), e)); + } + } + + @Override + public TaskExecuteResult executeProjectWithAgg(Project project, Operator agg, DataArea dataArea) { + return executeProjectWithAggSelect( + project, + new Select(new OperatorSource(project), new BoolFilter(true), null), + agg, + dataArea); + } + + @Override + public TaskExecuteResult executeProjectDummyWithAgg( + Project project, Operator agg, DataArea dataArea) { + return executeProjectDummyWithAggSelect( + project, + new Select(new OperatorSource(project), new BoolFilter(true), null), + agg, + dataArea); + } + private TaskExecuteResult executeProjectDummyWithFilter(Project project, Filter filter) { List connList = new ArrayList<>(); try { @@ -1106,27 +1749,27 @@ private TaskExecuteResult executeProjectDummyWithFilter(Project project, Filter } connList.add(conn); - // 这里获取所有table的所有列名,用于concat时生成key列。 - Map> allColumnNameForTable = - getAllColumnNameForTable(databaseName, tableNameToColumnNames); - // 如果table没有带通配符,那直接简单构建起查询语句即可 if (!filter.toString().contains("*") && !(tableNameToColumnNames.size() > 1 && filterContainsType(Arrays.asList(FilterType.Value, FilterType.Path), filter))) { - Filter expandFilter = expandFilter(filter.copy(), tableNameToColumnNames); + Filter expandFilter = + expandFilter( + cutFilterDatabaseNameForDummy(filter.copy(), databaseName), + tableNameToColumnNames); for (Map.Entry entry : splitEntry.getValue().entrySet()) { String tableName = entry.getKey(); String fullQuotColumnNames = getQuotColumnNames(entry.getValue()); List fullPathList = Arrays.asList(entry.getValue().split(", ")); - fullPathList.replaceAll( + fullPathList.replaceAll(s -> RelationSchema.getFullName(tableName, s)); + List fullQuotePathList = Arrays.asList(entry.getValue().split(", ")); + fullQuotePathList.replaceAll( s -> RelationSchema.getQuoteFullName(tableName, s, relationalMeta.getQuote())); + String filterStr = filterTransformer.toString( - dummyFilterSetTrueByColumnNames( - cutFilterDatabaseNameForDummy(expandFilter.copy(), databaseName), - fullPathList)); - String concatKey = buildConcat(fullPathList); + dummyFilterSetTrueByColumnNames(expandFilter.copy(), fullPathList)); + String concatKey = buildConcat(fullQuotePathList); statement = String.format( relationalMeta.getConcatQueryStatement(), @@ -1150,66 +1793,7 @@ && filterContainsType(Arrays.asList(FilterType.Value, FilterType.Path), filter)) } // table中带有了通配符,将所有table都join到一起进行查询,以便输入filter. else if (!tableNameToColumnNames.isEmpty()) { - List tableNames = new ArrayList<>(); - List> fullColumnNamesList = new ArrayList<>(); - List> fullQuoteColumnNamesList = new ArrayList<>(); - - // 将columnNames中的列名加上tableName前缀,带JOIN的查询语句中需要用到 - for (Map.Entry entry : tableNameToColumnNames.entrySet()) { - String tableName = entry.getKey(); - tableNames.add(tableName); - List columnNames = new ArrayList<>(Arrays.asList(entry.getValue().split(", "))); - columnNames.replaceAll(s -> RelationSchema.getFullName(tableName, s)); - fullColumnNamesList.add(columnNames); - - List fullColumnNames = - new ArrayList<>(Arrays.asList(entry.getValue().split(", "))); - fullColumnNames.replaceAll( - s -> RelationSchema.getQuoteFullName(tableName, s, relationalMeta.getQuote())); - fullQuoteColumnNamesList.add(fullColumnNames); - } - - String fullTableName = getDummyFullJoinTables(tableNames, allColumnNameForTable); - - Filter copyFilter = - dummyFilterSetTrueByColumnNames( - cutFilterDatabaseNameForDummy(filter.copy(), databaseName), - fullColumnNamesList.stream().flatMap(List::stream).collect(Collectors.toList())); - - copyFilter = expandFilter(copyFilter, tableNameToColumnNames); - - String filterStr = filterTransformer.toString(copyFilter); - String orderByKey = - buildConcat( - fullQuoteColumnNamesList.stream() - .flatMap(List::stream) - .collect(Collectors.toList())); - if (!relationalMeta.isSupportFullJoin()) { - // 如果不支持full join,需要为left join + union模拟的full join表起别名,同时select、where、order by的部分都要调整 - char quote = relationalMeta.getQuote(); - fullQuoteColumnNamesList.forEach( - columnNames -> - columnNames.replaceAll(s -> s.replaceAll(quote + "\\." + quote, "."))); - filterStr = filterStr.replaceAll("`\\.`", "."); - filterStr = - filterStr.replace( - getQuotName(KEY_NAME), getQuotName(tableNames.get(0) + SEPARATOR + KEY_NAME)); - orderByKey = getQuotName(tableNames.get(0) + SEPARATOR + KEY_NAME); - } - - statement = - String.format( - relationalMeta.getConcatQueryStatement(), - buildConcat( - fullQuoteColumnNamesList.stream() - .flatMap(List::stream) - .collect(Collectors.toList())), - fullQuoteColumnNamesList.stream() - .flatMap(List::stream) - .collect(Collectors.joining(", ")), - fullTableName, - filterStr.isEmpty() ? "" : "WHERE " + filterStr, - orderByKey); + statement = getProjectDummyWithSQL(filter, databaseName, tableNameToColumnNames); try { stmt = conn.createStatement(); @@ -2049,4 +2633,212 @@ public void release() throws PhysicalException { throw new RelationalException(e); } } + + private boolean isSumResultDouble(Expression expr, List columns) { + Map columnTypeMap = new HashMap<>(); + for (Column column : columns) { + columnTypeMap.put(splitFullName(column.getPath()).k, column.getDataType()); + } + boolean[] isDouble = {false}; + expr.accept( + new ExpressionVisitor() { + @Override + public void visit(BaseExpression expression) { + String path = expression.getColumnName(); + if (columnTypeMap.containsKey(path)) { + isDouble[0] |= columnTypeMap.get(path) == DataType.DOUBLE; + } + } + + @Override + public void visit(BinaryExpression expression) { + isDouble[0] |= + expression.getOp() == cn.edu.tsinghua.iginx.engine.shared.expr.Operator.DIV; + } + + @Override + public void visit(BracketExpression expression) {} + + @Override + public void visit(ConstantExpression expression) { + isDouble[0] |= + expression.getValue() instanceof Double || expression.getValue() instanceof Float; + } + + @Override + public void visit(FromValueExpression expression) {} + + @Override + public void visit(FuncExpression expression) {} + + @Override + public void visit(MultipleExpression expression) { + isDouble[0] |= + expression.getOps().stream() + .anyMatch(op -> op == cn.edu.tsinghua.iginx.engine.shared.expr.Operator.DIV); + } + + @Override + public void visit(UnaryExpression expression) {} + + @Override + public void visit(CaseWhenExpression expression) {} + + @Override + public void visit(KeyExpression expression) {} + + @Override + public void visit(SequenceExpression expression) {} + }); + + return isDouble[0]; + } + + private void cutExprDatabaseNameForDummy(Expression expr) { + expr.accept( + new ExpressionVisitor() { + @Override + public void visit(BaseExpression expression) { + expression.setPathName(expression.getColumnName().split("\\.")[1]); + } + + @Override + public void visit(BinaryExpression expression) {} + + @Override + public void visit(BracketExpression expression) {} + + @Override + public void visit(ConstantExpression expression) {} + + @Override + public void visit(FromValueExpression expression) {} + + @Override + public void visit(FuncExpression expression) {} + + @Override + public void visit(MultipleExpression expression) {} + + @Override + public void visit(UnaryExpression expression) {} + + @Override + public void visit(CaseWhenExpression expression) {} + + @Override + public void visit(KeyExpression expression) {} + + @Override + public void visit(SequenceExpression expression) {} + }); + } + + /** + * 获取替换了*和tag kv的expression + * + * @return + */ + private List expandExpression(Expression expression, List fullColumnNames) { + Queue queue = new LinkedList<>(); + List result = new ArrayList<>(); + queue.add(expression); + while (!queue.isEmpty()) { + Expression expr = queue.poll(); + BaseExpression[] be = {null}; + List matchesColumns = new ArrayList<>(); + expr.accept( + new ExpressionVisitor() { + @Override + public void visit(BaseExpression expression) { + if (be[0] != null) return; + if (expression.getColumnName().contains("*")) { + be[0] = expression; + } + String pattern = StringUtils.reformatPath(expression.getColumnName()); + for (String fullColumnName : fullColumnNames) { + String columnNameWithoutTag = splitFullName(fullColumnName).k; + if (Pattern.matches(pattern, columnNameWithoutTag)) { + matchesColumns.add(fullColumnName); + } + } + if (matchesColumns.size() > 1) { + be[0] = expression; + } else { + matchesColumns.clear(); + } + } + + @Override + public void visit(BinaryExpression expression) {} + + @Override + public void visit(BracketExpression expression) {} + + @Override + public void visit(ConstantExpression expression) {} + + @Override + public void visit(FromValueExpression expression) {} + + @Override + public void visit(FuncExpression expression) {} + + @Override + public void visit(MultipleExpression expression) {} + + @Override + public void visit(UnaryExpression expression) {} + + @Override + public void visit(CaseWhenExpression expression) {} + + @Override + public void visit(KeyExpression expression) {} + + @Override + public void visit(SequenceExpression expression) {} + }); + + if (be[0] == null) { + result.add(expr); + continue; + } + for (String columnName : matchesColumns) { + be[0].setPathName(columnName); + queue.add(ExprUtils.copy(expr)); + } + } + + return result; + } + + private void keyFilterAddTableName(Filter filter, String tableName) { + switch (filter.getType()) { + case Or: + List orChildren = ((OrFilter) filter).getChildren(); + orChildren.replaceAll(child -> keyFilter2ValueFilter(child, tableName)); + orChildren.forEach(child -> keyFilterAddTableName(child, tableName)); + break; + case And: + List andChildren = ((AndFilter) filter).getChildren(); + andChildren.replaceAll(child -> keyFilter2ValueFilter(child, tableName)); + andChildren.forEach(child -> keyFilterAddTableName(child, tableName)); + break; + case Not: + NotFilter notFilter = (NotFilter) filter; + notFilter.setChild(keyFilter2ValueFilter(notFilter.getChild(), tableName)); + keyFilterAddTableName(notFilter.getChild(), tableName); + break; + } + } + + private Filter keyFilter2ValueFilter(Filter filter, String tableName) { + if (filter.getType() != FilterType.Key) { + return filter; + } + KeyFilter keyFilter = (KeyFilter) filter; + return new ValueFilter( + tableName + SEPARATOR + KEY_NAME, keyFilter.getOp(), new Value(keyFilter.getValue())); + } } diff --git a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/query/entity/RelationQueryRowStream.java b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/query/entity/RelationQueryRowStream.java index 0c7f45aeea..494735e605 100644 --- a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/query/entity/RelationQueryRowStream.java +++ b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/query/entity/RelationQueryRowStream.java @@ -27,17 +27,20 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.exception.RowFetchException; +import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils; import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils; import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; +import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.relational.meta.AbstractRelationalMeta; import cn.edu.tsinghua.iginx.relational.tools.RelationSchema; import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; +import java.math.BigDecimal; import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -56,6 +59,8 @@ public class RelationQueryRowStream implements RowStream { private final boolean isDummy; + private final boolean isAgg; + private final Filter filter; private boolean[] gotNext; // 标记每个结果集是否已经获取到下一行,如果是,则在下次调用 next() 时无需再调用该结果集的 next() @@ -78,10 +83,16 @@ public class RelationQueryRowStream implements RowStream { private AbstractRelationalMeta relationalMeta; + private Map fullName2Name; // 记录带tagkv的fullname到不带tagkv的name的映射 + private String fullKeyName = KEY_NAME; private boolean isPushDown = false; + private Map sumResType; // 记录聚合下推的sum的返回类型(需要提前计算,因为PG会统一返回小数) + + private boolean needFilter = false; + public RelationQueryRowStream( List databaseNameList, List resultSets, @@ -91,11 +102,38 @@ public RelationQueryRowStream( List connList, AbstractRelationalMeta relationalMeta) throws SQLException { + this( + databaseNameList, + resultSets, + isDummy, + filter, + tagFilter, + connList, + relationalMeta, + null, + null, + false); + } + + public RelationQueryRowStream( + List databaseNameList, + List resultSets, + boolean isDummy, + Filter filter, + TagFilter tagFilter, + List connList, + AbstractRelationalMeta relationalMeta, + Map sumResType, + Map fullName2Name, + boolean isAgg) + throws SQLException { this.resultSets = resultSets; this.isDummy = isDummy; this.filter = filter; this.connList = connList; this.relationalMeta = relationalMeta; + this.isAgg = isAgg; + this.sumResType = sumResType; if (resultSets.isEmpty()) { this.header = new Header(Field.KEY, Collections.emptyList()); @@ -110,6 +148,10 @@ public RelationQueryRowStream( this.fieldToColumnName = new HashMap<>(); this.resultSetHasColumnWithTheSameName = new ArrayList<>(); + needFilter = (!isAgg && resultSets.size() != 1) || isDummy; + Set filterTypes = FilterUtils.getFilterType(filter); + needFilter |= filterTypes.contains(FilterType.Expr); + for (int i = 0; i < resultSets.size(); i++) { ResultSetMetaData resultSetMetaData = resultSets.get(i).getMetaData(); @@ -143,18 +185,27 @@ public RelationQueryRowStream( Pair> namesAndTags = splitFullName(columnName); Field field; + DataType type = relationalMeta.getDataTypeTransformer().fromEngineType(typeName); + if (isAgg + && sumResType != null + && sumResType.containsKey(fullName2Name.getOrDefault(columnName, columnName))) { + type = sumResType.get(fullName2Name.getOrDefault(columnName, columnName)); + } + String path; if (isDummy) { - field = - new Field( - databaseNameList.get(i) + SEPARATOR + tableName + SEPARATOR + namesAndTags.k, - relationalMeta.getDataTypeTransformer().fromEngineType(typeName), - namesAndTags.v); + path = + databaseNameList.get(i) + + SEPARATOR + + (isAgg ? "" : tableName + SEPARATOR) + + namesAndTags.k; + } else { + path = (isAgg ? "" : tableName + SEPARATOR) + namesAndTags.k; + } + + if (isAgg && fullName2Name.containsKey(path)) { + field = new Field(fullName2Name.get(path), path, type, namesAndTags.v); } else { - field = - new Field( - tableName + SEPARATOR + namesAndTags.k, - relationalMeta.getDataTypeTransformer().fromEngineType(typeName), - namesAndTags.v); + field = new Field(path, type, namesAndTags.v); } if (filterByTags && !TagKVUtils.match(namesAndTags.v, tagFilter)) { @@ -274,6 +325,13 @@ private void cacheOneRow() throws SQLException, PhysicalException { tableNameSet.add(tableName); Object value = getResultSetObject(resultSet, columnName, tableName); + if (value instanceof BigDecimal) { + if (header.getField(startIndex + j).getType() == DataType.LONG) { + value = ((BigDecimal) value).longValue(); + } else { + value = ((BigDecimal) value).doubleValue(); + } + } if (header.getField(startIndex + j).getType() == DataType.BINARY && value != null) { tempValue = value.toString().getBytes(); } else if (header.getField(startIndex + j).getType() == DataType.BOOLEAN @@ -288,18 +346,19 @@ private void cacheOneRow() throws SQLException, PhysicalException { } cachedValues[startIndex + j] = tempValue; } - - if (isDummy) { - // 在Dummy查询的Join操作中,key列的值是由多个Join表的所有列的值拼接而成的,但实际上的Key列仅由一个表的所有列的值拼接而成 - // 所以在这里需要将key列的值截断为一个表的所有列的值,因为能合并在一行里的不同表的数据一定是key相同的 - // 所以查询出来的KEY值一定是(我们需要的KEY值 * 表的数量),因此只需要裁剪取第一个表的key列的值即可 - String keyString = resultSet.getString(fullKeyName); - keyString = keyString.substring(0, keyString.length() / tableNameSet.size()); - tempKey = toHash(keyString); - } else { - tempKey = resultSet.getLong(fullKeyName); + if (!isAgg) { + if (isDummy) { + // 在Dummy查询的Join操作中,key列的值是由多个Join表的所有列的值拼接而成的,但实际上的Key列仅由一个表的所有列的值拼接而成 + // 所以在这里需要将key列的值截断为一个表的所有列的值,因为能合并在一行里的不同表的数据一定是key相同的 + // 所以查询出来的KEY值一定是(我们需要的KEY值 * 表的数量),因此只需要裁剪取第一个表的key列的值即可 + String keyString = resultSet.getString(fullKeyName); + keyString = keyString.substring(0, keyString.length() / tableNameSet.size()); + tempKey = toHash(keyString); + } else { + tempKey = resultSet.getLong(fullKeyName); + } + cachedKeys[i] = tempKey; } - cachedKeys[i] = tempKey; } else { cachedKeys[i] = Long.MAX_VALUE; @@ -333,7 +392,8 @@ private void cacheOneRow() throws SQLException, PhysicalException { startIndex = endIndex; } cachedRow = new Row(header, key, values); - if (!validate(filter, cachedRow)) { + // Agg状态下,所有表join在一起,不需要后过滤 + if (needFilter && !validate(filter, cachedRow)) { continue; } } else { @@ -362,7 +422,8 @@ private Object getResultSetObject(ResultSet resultSet, String columnName, String for (int j = 1; j <= resultSetMetaData.getColumnCount(); j++) { String tempColumnName = resultSetMetaData.getColumnName(j); String tempTableName = resultSetMetaData.getTableName(j); - if (tempColumnName.equals(columnName) && tempTableName.equals(tableName)) { + if (tempColumnName.equals(columnName) + && (tempTableName.isEmpty() || tempTableName.equals(tableName))) { return resultSet.getObject(j); } } diff --git a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/FilterTransformer.java b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/FilterTransformer.java index 5d7ddb131d..e6af64e854 100644 --- a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/FilterTransformer.java +++ b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/FilterTransformer.java @@ -102,12 +102,12 @@ private String toString(ValueFilter filter) { case LIKE: case LIKE_AND: op = relationalMeta.getRegexpOp(); - value = "'" + filter.getValue().getBinaryVAsString() + "$" + "'"; + value = "'^" + filter.getValue().getBinaryVAsString() + "$" + "'"; break; case NOT_LIKE: case NOT_LIKE_AND: op = relationalMeta.getNotRegexpOp(); - value = "'" + filter.getValue().getBinaryVAsString() + "$" + "'"; + value = "'^" + filter.getValue().getBinaryVAsString() + "$" + "'"; break; default: // postgresql does not support "==" but uses "=" instead diff --git a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/QuoteBaseExpressionDecorator.java b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/QuoteBaseExpressionDecorator.java new file mode 100644 index 0000000000..487f89fc2f --- /dev/null +++ b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/QuoteBaseExpressionDecorator.java @@ -0,0 +1,71 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.relational.tools; + +import cn.edu.tsinghua.iginx.engine.shared.expr.BaseExpression; +import cn.edu.tsinghua.iginx.engine.shared.expr.Expression; +import cn.edu.tsinghua.iginx.engine.shared.expr.ExpressionVisitor; + +public class QuoteBaseExpressionDecorator implements Expression { + private final BaseExpression baseExpression; + private final char quote; + + private static String DERIVED = "derived"; + + public QuoteBaseExpressionDecorator(BaseExpression baseExpression, char quote) { + this.baseExpression = baseExpression; + this.quote = quote; + } + + @Override + public String getColumnName() { + return DERIVED + "." + quote + baseExpression.getColumnName() + quote; + } + + @Override + public ExpressionType getType() { + return baseExpression.getType(); + } + + @Override + public boolean hasAlias() { + return baseExpression.hasAlias(); + } + + @Override + public String getAlias() { + return baseExpression.getAlias(); + } + + @Override + public void setAlias(String alias) { + baseExpression.setAlias(alias); + } + + @Override + public void accept(ExpressionVisitor visitor) { + visitor.visit(baseExpression); + } + + @Override + public boolean equalExceptAlias(Expression expr) { + return baseExpression.equalExceptAlias(expr); + } +} diff --git a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/RelationSchema.java b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/RelationSchema.java index 706c9ed73c..ab1217bf61 100644 --- a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/RelationSchema.java +++ b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/RelationSchema.java @@ -37,14 +37,16 @@ public RelationSchema(String path, char quote) { public RelationSchema(String path, boolean isDummy, char quote) { int firstSeparator = path.indexOf("."); + if (firstSeparator < 0) firstSeparator = 0; if (isDummy) { databaseName = path.substring(0, firstSeparator); path = path.substring(firstSeparator + 1); } else { databaseName = ""; } + int lastSeparator = path.lastIndexOf("."); - tableName = path.substring(0, lastSeparator); + tableName = lastSeparator < 0 ? "" : path.substring(0, lastSeparator); columnName = path.substring(lastSeparator + 1); this.quote = quote; } diff --git a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/TagKVUtils.java b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/TagKVUtils.java index 166f1ef28b..973b9581a7 100644 --- a/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/TagKVUtils.java +++ b/dataSource/relational/src/main/java/cn/edu/tsinghua/iginx/relational/tools/TagKVUtils.java @@ -30,7 +30,7 @@ public class TagKVUtils { public static Pair> splitFullName(String fullName) { - if (!fullName.contains(TAGKV_SEPARATOR)) { + if (!fullName.contains(TAGKV_SEPARATOR) || !fullName.contains(TAGKV_EQUAL)) { return new Pair<>(fullName, null); } diff --git a/dataSource/relational/src/main/resources/mysql-meta.properties b/dataSource/relational/src/main/resources/mysql-meta.properties index f7b84a6ee2..679b5ddcdd 100644 --- a/dataSource/relational/src/main/resources/mysql-meta.properties +++ b/dataSource/relational/src/main/resources/mysql-meta.properties @@ -18,6 +18,20 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # +2.upsert_conflict_statement= ??? +3.regex_like_symbol=REGEXP_LIKE,REGEXP_SUBSTR,REGEXP_INSTR,REGEXP_REPLACE ??? https://eco.dameng.com/document/dm/zh-cn/faq/faq-sql-gramm +4.not_regex_like_symbol=NOT REGEXP, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# + # 配置MySQL META以供JDBCMeta读取 # 驱动类 diff --git a/dataSource/relational/src/main/resources/postgresql-meta.properties b/dataSource/relational/src/main/resources/postgresql-meta.properties index d14c7647a4..4c0ebab528 100644 --- a/dataSource/relational/src/main/resources/postgresql-meta.properties +++ b/dataSource/relational/src/main/resources/postgresql-meta.properties @@ -62,6 +62,7 @@ SERIAL8=IGinX-LONG FLOAT4=IGinX-FLOAT DECIMAL=IGinX-DOUBLE FLOAT8=IGinX-DOUBLE +NUMERIC=IGinX-DOUBLE # IGinX types 2 pg types IGinX-INTEGER=INTEGER diff --git a/dataSource/vectordb/src/main/java/cn/edu/tsinghua/iginx/vectordb/MilvusStorage.java b/dataSource/vectordb/src/main/java/cn/edu/tsinghua/iginx/vectordb/MilvusStorage.java index ee7f2dd057..a004f3a1dd 100644 --- a/dataSource/vectordb/src/main/java/cn/edu/tsinghua/iginx/vectordb/MilvusStorage.java +++ b/dataSource/vectordb/src/main/java/cn/edu/tsinghua/iginx/vectordb/MilvusStorage.java @@ -35,10 +35,7 @@ import cn.edu.tsinghua.iginx.engine.shared.KeyRange; import cn.edu.tsinghua.iginx.engine.shared.data.write.DataView; import cn.edu.tsinghua.iginx.engine.shared.data.write.RowDataView; -import cn.edu.tsinghua.iginx.engine.shared.operator.Delete; -import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; -import cn.edu.tsinghua.iginx.engine.shared.operator.Project; -import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.operator.*; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.*; import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval; @@ -389,6 +386,30 @@ public TaskExecuteResult executeProjectDummyWithSelect( return executeProjectDummyWithFilter(project, select.getFilter()); } + @Override + public TaskExecuteResult executeProjectWithAggSelect( + Project project, Select select, Operator operator, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAggSelect( + Project project, Select select, Operator operator, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectWithAgg( + Project project, Operator operator, DataArea dataArea) { + return null; + } + + @Override + public TaskExecuteResult executeProjectDummyWithAgg( + Project project, Operator operator, DataArea dataArea) { + return null; + } + @Override public TaskExecuteResult executeDelete(Delete delete, DataArea dataArea) { String databaseName = dataArea.getStorageUnit(); diff --git a/dependency/src/main/resources/iginx-optimizer-extension-0.8.0-SNAPSHOT.jar b/dependency/src/main/resources/iginx-optimizer-extension-0.8.0-SNAPSHOT.jar index dbeff30323..00e110761e 100644 Binary files a/dependency/src/main/resources/iginx-optimizer-extension-0.8.0-SNAPSHOT.jar and b/dependency/src/main/resources/iginx-optimizer-extension-0.8.0-SNAPSHOT.jar differ diff --git a/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/OptimizerUtils.java b/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/OptimizerUtils.java new file mode 100644 index 0000000000..e7d20e80d8 --- /dev/null +++ b/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/OptimizerUtils.java @@ -0,0 +1,116 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.logical.optimizer; + +import cn.edu.tsinghua.iginx.engine.shared.expr.*; +import cn.edu.tsinghua.iginx.engine.shared.function.FunctionCall; +import cn.edu.tsinghua.iginx.engine.shared.operator.GroupBy; +import cn.edu.tsinghua.iginx.engine.shared.operator.Operator; +import cn.edu.tsinghua.iginx.engine.shared.operator.SetTransform; +import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class OptimizerUtils { + public static boolean validateAggPushDown(Operator operator) { + if (operator == null) { + return false; + } + if (operator.getType() != OperatorType.GroupBy + && operator.getType() != OperatorType.SetTransform) { + return false; + } + + List functionCallList = new ArrayList<>(); + List expressions = new ArrayList<>(); + + if (operator.getType() == OperatorType.GroupBy) { + GroupBy groupBy = (GroupBy) operator; + functionCallList = groupBy.getFunctionCallList(); + expressions.addAll(groupBy.getGroupByExpressions()); + } else { + // SetTransform + SetTransform setTransform = (SetTransform) operator; + functionCallList = setTransform.getFunctionCallList(); + } + + for (FunctionCall fc : functionCallList) { + if (!Arrays.asList("AVG", "MAX", "MIN", "SUM", "COUNT") + .contains(fc.getFunction().getIdentifier().toUpperCase())) { + return false; + } + + expressions.addAll(fc.getParams().getExpressions()); + } + + for (Expression expression : expressions) { + final boolean[] isValid = {true}; + expression.accept( + new ExpressionVisitor() { + @Override + public void visit(BaseExpression expression) {} + + @Override + public void visit(BinaryExpression expression) {} + + @Override + public void visit(BracketExpression expression) {} + + @Override + public void visit(ConstantExpression expression) {} + + @Override + public void visit(FromValueExpression expression) { + isValid[0] = false; + } + + @Override + public void visit(FuncExpression expression) {} + + @Override + public void visit(MultipleExpression expression) {} + + @Override + public void visit(UnaryExpression expression) {} + + @Override + public void visit(CaseWhenExpression expression) { + isValid[0] = false; + } + + @Override + public void visit(KeyExpression expression) { + isValid[0] = false; + } + + @Override + public void visit(SequenceExpression expression) { + isValid[0] = false; + } + }); + if (!isValid[0]) { + return false; + } + } + + return true; + } +} diff --git a/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/rules/ColumnPruningRule.java b/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/rules/ColumnPruningRule.java index 425fd22492..925450067d 100644 --- a/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/rules/ColumnPruningRule.java +++ b/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/rules/ColumnPruningRule.java @@ -72,7 +72,7 @@ public ColumnPruningRule() { "ColumnPruningRule", "ColumnPruningRule", operand(AbstractOperator.class, any()), - 0, + -1, RuleStrategy.ONCE); } diff --git a/optimizer/src/main/java/cn/edu/tsinghua/iginx/physical/optimizer/naive/NaivePhysicalOptimizer.java b/optimizer/src/main/java/cn/edu/tsinghua/iginx/physical/optimizer/naive/NaivePhysicalOptimizer.java index 5109387c85..1848a1ffd5 100644 --- a/optimizer/src/main/java/cn/edu/tsinghua/iginx/physical/optimizer/naive/NaivePhysicalOptimizer.java +++ b/optimizer/src/main/java/cn/edu/tsinghua/iginx/physical/optimizer/naive/NaivePhysicalOptimizer.java @@ -20,6 +20,7 @@ package cn.edu.tsinghua.iginx.physical.optimizer.naive; import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; +import cn.edu.tsinghua.iginx.engine.logical.utils.OperatorUtils; import cn.edu.tsinghua.iginx.engine.physical.memory.MemoryPhysicalTaskDispatcher; import cn.edu.tsinghua.iginx.engine.physical.optimizer.PhysicalOptimizer; import cn.edu.tsinghua.iginx.engine.physical.optimizer.ReplicaDispatcher; @@ -93,22 +94,33 @@ private PhysicalTask constructUnaryTask(UnaryOperator operator, RequestContext c PhysicalTask sourceTask = constructTask(operatorSource.getOperator(), context); // push down Select operator if (ConfigDescriptor.getInstance().getConfig().isEnablePushDown() - && sourceTask instanceof StoragePhysicalTask - && sourceOperator.getType() == OperatorType.Project - && ((Project) sourceOperator).getTagFilter() == null - && ((UnaryOperator) sourceOperator).getSource().getType() == SourceType.Fragment) { - switch (operator.getType()) { - case Select: - if (((Select) operator).getTagFilter() == null) { + && sourceTask instanceof StoragePhysicalTask) { + if (sourceOperator.getType() == OperatorType.Project + && ((Project) sourceOperator).getTagFilter() == null + && ((UnaryOperator) sourceOperator).getSource().getType() == SourceType.Fragment) { + switch (operator.getType()) { + case Select: + if (((Select) operator).getTagFilter() == null) { + sourceTask.getOperators().add(operator); + return sourceTask; + } + break; + case SetTransform: + case GroupBy: sourceTask.getOperators().add(operator); return sourceTask; - } - break; - case SetTransform: - sourceTask.getOperators().add(operator); - return sourceTask; - default: - break; + default: + break; + } + } else if ((operator.getType() == OperatorType.GroupBy + || operator.getType() == OperatorType.SetTransform) + && sourceOperator.getType() == OperatorType.Select + && OperatorUtils.getUnaryChild(sourceOperator) != null + && OperatorUtils.getUnaryChild(sourceOperator).getType() == OperatorType.Project + && ((UnaryOperator) OperatorUtils.getUnaryChild(sourceOperator)).getSource().getType() + == SourceType.Fragment) { + sourceTask.getOperators().add(operator); + return sourceTask; } } operators.add(operator); diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java index ad0f0c15ab..898a04537d 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java @@ -280,7 +280,7 @@ public void frequentCountAndInsertOverlapData() throws PhysicalException { SetTransform countOperator = new SetTransform(projectSource, Collections.singletonList(countCall)); - Assume.assumeTrue(storage.isSupportProjectWithSetTransform(countOperator, dataArea)); + Assume.assumeTrue(storage.isSupportProjectWithAgg(countOperator, dataArea, false)); for (int seed = 0; seed < 10; seed++) { Random random = new Random(seed); @@ -291,8 +291,7 @@ public void frequentCountAndInsertOverlapData() throws PhysicalException { keys.add(key + i); } insertData(key, rows, "us.d1.s1", "us.d1.s2", "us.d1.s3", "us.d1.s4"); - TaskExecuteResult result = - storage.executeProjectWithSetTransform(project, countOperator, dataArea); + TaskExecuteResult result = storage.executeProjectWithAgg(project, countOperator, dataArea); checkResult(result); RowStream rowStream = result.getRowStream(); Assert.assertTrue(rowStream.hasNext()); diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/postgresql/PostgreSQLCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/postgresql/PostgreSQLCapacityExpansionIT.java index d3c0f3a270..1ca78529a7 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/postgresql/PostgreSQLCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/postgresql/PostgreSQLCapacityExpansionIT.java @@ -155,7 +155,8 @@ private void testFloatData() { List pathList = Constant.READ_ONLY_FLOAT_PATH_LIST; List> valuesList = Constant.READ_ONLY_FLOAT_VALUES_LIST; SQLTestTools.executeAndCompare(session, statement, pathList, valuesList); - statement = "select wt02.float from tm.wf05 where wt02.float = 44.55;"; + statement = + "select wt02.float from tm.wf05 where wt02.float >= 44.54;"; // 浮点数的=在下推时产生的浮点数误差会使结果不一致,这里改用一个较为宽松的条件 valuesList = Arrays.asList(Arrays.asList(44.55F)); SQLTestTools.executeAndCompare(session, statement, pathList, valuesList); } diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/optimizer/OptimizerIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/optimizer/OptimizerIT.java index 82e7beb9c0..0c61b918bf 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/optimizer/OptimizerIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/optimizer/OptimizerIT.java @@ -1365,7 +1365,7 @@ public void testDistinctEliminate() { assertTrue(executor.execute("EXPLAIN " + statement).contains("isDistinct: false")); assertEquals(closeResult, executor.execute(statement)); - statement = "SELECT max(distinct s1) FROM us.d1 GROUP BY s2;"; + statement = "SELECT max(distinct s1) FROM us.d1 GROUP BY s2 ORDER BY s2;"; executor.execute(closeRule); assertTrue(executor.execute("EXPLAIN " + statement).contains("isDistinct: true")); closeResult = executor.execute(statement); @@ -1382,7 +1382,8 @@ public void testDistinctEliminate() { assertTrue(executor.execute("EXPLAIN " + statement).contains("isDistinct: true")); assertEquals(closeResult, executor.execute(statement)); - statement = "SELECT avg(distinct s1), count(distinct s2) FROM us.d1 GROUP BY s2, s3;"; + statement = + "SELECT avg(distinct s1), count(distinct s2) FROM us.d1 GROUP BY s2, s3 ORDER BY s2, s3;"; executor.execute(closeRule); assertTrue(executor.execute("EXPLAIN " + statement).contains("isDistinct: true")); closeResult = executor.execute(statement); @@ -1726,6 +1727,225 @@ public void testInFilterTransformRule() { closeExplain.contains("us.*.s1 &!= 1 && us.*.s1 &!= 2 && us.*.s1 != 3 && us.*.s1 != 4")); } + @Test + public void testAggPushdown() { + if (isScaling) return; + String insert = "INSERT INTO test(key, a.a, a.b, a.c, b.a, b.b, b.c)"; + insert += + " VALUES (1, 1, 1.1, true, 1, 1.1, true), (2, 2, 2.2, false, 2, 2.2, false), (3, 1, 2.2, true, 1, 1.1, true), (4, 2, 1.1, false, 2, 2.2, false);"; + executor.execute(insert); + + String openRule = "SET RULES AggPushDownRule=on;"; + String closeRule = "SET RULES AggPushDownRule=off;"; + String openRes = "", closeRes = ""; + + String statement = + "SELECT sum(test.a.b) from test.a JOIN test.b on test.a.a = test.b.a group by test.a.a;"; + String explain = "EXPLAIN " + statement; + String expected = + "ResultSets:\n" + + "+----------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+----------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: sum(test.a.b)|\n" + + "| +--Rename | Rename| AliasList: (sum(sum_test_a_b), sum(test.a.b))|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.a, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--InnerJoin | InnerJoin| PrefixA: test.a, PrefixB: test.b, IsNatural: false, Filter: test.a.a == test.b.a|\n" + + "| +--Rename | Rename| AliasList: (sum(test.a.b), sum_test_a_b)|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.a, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Project| Project| Patterns: test.a.a,test.a.b, Target DU: unit0000000002|\n" + + "| +--Project | Project| Patterns: test.b.a, Target DU: unit0000000002|\n" + + "+----------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 8\n"; + + executor.execute(openRule); + openRes = executor.execute(statement); + executor.executeAndCompare(explain, expected); + expected = + "ResultSets:\n" + + "+----------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+----------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: sum(test.a.b)|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.a, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--InnerJoin| InnerJoin| PrefixA: test.a, PrefixB: test.b, IsNatural: false, Filter: test.a.a == test.b.a|\n" + + "| +--Project| Project| Patterns: test.a.a,test.a.b, Target DU: unit0000000002|\n" + + "| +--Project| Project| Patterns: test.b.a, Target DU: unit0000000002|\n" + + "+----------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 5\n"; + executor.execute(closeRule); + executor.executeAndCompare(explain, expected); + + statement = "SELECT sum(aaa) FROM (SELECT a as aaa, b as bbb FROM test.a) GROUP BY bbb;"; + explain = "EXPLAIN " + statement; + expected = + "ResultSets:\n" + + "+--------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+--------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: sum(aaa)|\n" + + "| +--Rename | Rename| AliasList: (test.a.a, aaa),(test.a.b, bbb),(sum(test.a.a), sum(aaa))|\n" + + "| +--Reorder | Reorder| Order: test.a.b,sum(test.a.a)|\n" + + "| +--Project | Project| Patterns: test.a.b,sum(test.a.a)|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.b, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Project| Project| Patterns: test.a.a,test.a.b, Target DU: unit0000000002|\n" + + "+--------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 6\n"; + executor.execute(openRule); + openRes = executor.execute(statement); + executor.executeAndCompare(explain, expected); + expected = + "ResultSets:\n" + + "+--------------------+-------------+----------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+--------------------+-------------+----------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: sum(aaa)|\n" + + "| +--GroupBy | GroupBy|GroupByCols: bbb, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Rename | Rename| AliasList: (test.a.a, aaa),(test.a.b, bbb)|\n" + + "| +--Reorder | Reorder| Order: test.a.a,test.a.b|\n" + + "| +--Project | Project| Patterns: test.a.a,test.a.b|\n" + + "| +--Project| Project| Patterns: test.a.a,test.a.b, Target DU: unit0000000002|\n" + + "+--------------------+-------------+----------------------------------------------------------------------------------------------------+\n" + + "Total line number = 6\n"; + executor.execute(closeRule); + executor.executeAndCompare(explain, expected); + closeRes = executor.execute(statement); + assertEquals(openRes, closeRes); + + statement = + "SELECT sum(test.a.a) FROM (SELECT a , b FROM test.a ORDER BY b) GROUP BY test.a.b;"; + explain = "EXPLAIN " + statement; + expected = + "ResultSets:\n" + + "+------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: sum(test.a.a)|\n" + + "| +--Reorder | Reorder| Order: test.a.b,sum(test.a.a)|\n" + + "| +--Project | Project| Patterns: test.a.b,sum(test.a.a)|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.b, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Project| Project| Patterns: test.a.a,test.a.b, Target DU: unit0000000002|\n" + + "+------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 5\n"; + executor.execute(openRule); + executor.executeAndCompare(explain, expected); + openRes = executor.execute(statement); + expected = + "ResultSets:\n" + + "+--------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+--------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: sum(test.a.a)|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.b, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Reorder | Reorder| Order: test.a.a,test.a.b|\n" + + "| +--Sort | Sort| SortBy: test.a.b, SortType: ASC|\n" + + "| +--Project | Project| Patterns: test.a.a,test.a.b|\n" + + "| +--Project| Project| Patterns: test.a.a,test.a.b, Target DU: unit0000000002|\n" + + "+--------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 6\n"; + executor.execute(closeRule); + executor.executeAndCompare(explain, expected); + closeRes = executor.execute(statement); + assertEquals(openRes, closeRes); + + statement = + "SELECT sum(test.a.a) FROM (SELECT a, b, c FROM test.a UNION ALL SELECT a, b, c FROM test.b) GROUP BY test.a.b ORDER BY test.a.b;"; + explain = "EXPLAIN " + statement; + expected = + "ResultSets:\n" + + "+--------------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+--------------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: sum(test.a.a)|\n" + + "| +--Sort | Sort| SortBy: test.a.b, SortType: ASC|\n" + + "| +--Rename | Rename| AliasList: (sum(sum(test.a.a)), sum(test.a.a))|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.b, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Union | Union| LeftOrder: test.a.b,sum(test.a.a), RightOrder: test.b.b,sum(test.b.a), isDistinct: false|\n" + + "| +--Reorder | Reorder| Order: test.a.b,sum(test.a.a)|\n" + + "| +--Project | Project| Patterns: test.a.b,sum(test.a.a)|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.b, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Project| Project| Patterns: test.a.a,test.a.b,test.a.c, Target DU: unit0000000002|\n" + + "| +--Reorder | Reorder| Order: test.b.b,sum(test.b.a)|\n" + + "| +--Project | Project| Patterns: test.b.b,sum(test.b.a)|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.b.b, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Project| Project| Patterns: test.b.a,test.b.b,test.b.c, Target DU: unit0000000002|\n" + + "+--------------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 13\n"; + executor.execute(openRule); + executor.executeAndCompare(explain, expected); + openRes = executor.execute(statement); + expected = + "ResultSets:\n" + + "+----------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+----------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: sum(test.a.a)|\n" + + "| +--Sort | Sort| SortBy: test.a.b, SortType: ASC|\n" + + "| +--GroupBy | GroupBy|GroupByCols: test.a.b, FuncList(Name, FuncType): (sum, System), MappingType: SetMapping isDistinct: false|\n" + + "| +--Union | Union| LeftOrder: test.a.a,test.a.b,test.a.c, RightOrder: test.b.a,test.b.b,test.b.c, isDistinct: false|\n" + + "| +--Reorder | Reorder| Order: test.a.a,test.a.b,test.a.c|\n" + + "| +--Project | Project| Patterns: test.a.a,test.a.b,test.a.c|\n" + + "| +--Project| Project| Patterns: test.a.a,test.a.b,test.a.c, Target DU: unit0000000002|\n" + + "| +--Reorder | Reorder| Order: test.b.a,test.b.b,test.b.c|\n" + + "| +--Project | Project| Patterns: test.b.a,test.b.b,test.b.c|\n" + + "| +--Project| Project| Patterns: test.b.a,test.b.b,test.b.c, Target DU: unit0000000002|\n" + + "+----------------------+-------------+---------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 10\n"; + executor.execute(closeRule); + executor.executeAndCompare(explain, expected); + closeRes = executor.execute(statement); + assertEquals(openRes, closeRes); + + statement = + "SELECT count(DISTINCT test.a.b), count(DISTINCT test.b.b) FROM (SELECT * FROM test.a JOIN test.b ON test.a.a = test.b.a) GROUP BY test.a.a;"; + explain = "EXPLAIN " + statement; + expected = + "ResultSets:\n" + + "+--------------------------+-------------+--------------------------------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+--------------------------+-------------+--------------------------------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: count(distinct test.a.b),count(distinct test.b.b)|\n" + + "| +--Reorder | Reorder| Order: test.a.a,count(distinct test.a.b),count(distinct test.b.b)|\n" + + "| +--Project | Project| Patterns: test.a.a,count(distinct test.a.b),count(distinct test.b.b)|\n" + + "| +--Rename | Rename|AliasList: (count(distinct count_test_a_b), count(distinct test.a.b)),(count(distinct count_test_b_b), count(distinct test.b.b))|\n" + + "| +--GroupBy | GroupBy| GroupByCols: test.a.a, FuncList(Name, FuncType): (count, System),(count, System), MappingType: SetMapping isDistinct: true|\n" + + "| +--InnerJoin | InnerJoin| PrefixA: test.a, PrefixB: test.b, IsNatural: false, Filter: test.a.a == test.b.a|\n" + + "| +--Rename | Rename| AliasList: (count(distinct test.a.b), count_test_a_b)|\n" + + "| +--GroupBy | GroupBy| GroupByCols: test.a.a, FuncList(Name, FuncType): (count, System), MappingType: SetMapping isDistinct: true|\n" + + "| +--Project| Project| Patterns: test.a.*, Target DU: unit0000000002|\n" + + "| +--Rename | Rename| AliasList: (count(distinct test.b.b), count_test_b_b)|\n" + + "| +--GroupBy | GroupBy| GroupByCols: test.b.a, FuncList(Name, FuncType): (count, System), MappingType: SetMapping isDistinct: true|\n" + + "| +--Project| Project| Patterns: test.b.*, Target DU: unit0000000002|\n" + + "+--------------------------+-------------+--------------------------------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 12\n"; + executor.execute(openRule); + executor.executeAndCompare(explain, expected); + openRes = executor.execute(statement); + expected = + "ResultSets:\n" + + "+--------------------------+-------------+--------------------------------------------------------------------------------------------------------------------------------+\n" + + "| Logical Tree|Operator Type| Operator Info|\n" + + "+--------------------------+-------------+--------------------------------------------------------------------------------------------------------------------------------+\n" + + "|Reorder | Reorder| Order: count(distinct test.a.b),count(distinct test.b.b)|\n" + + "| +--Reorder | Reorder| Order: test.a.a,count(distinct test.a.b),count(distinct test.b.b)|\n" + + "| +--Project | Project| Patterns: test.a.a,count(distinct test.a.b),count(distinct test.b.b)|\n" + + "| +--Rename | Rename|AliasList: (count(distinct count_test_a_b), count(distinct test.a.b)),(count(distinct count_test_b_b), count(distinct test.b.b))|\n" + + "| +--GroupBy | GroupBy| GroupByCols: test.a.a, FuncList(Name, FuncType): (count, System),(count, System), MappingType: SetMapping isDistinct: true|\n" + + "| +--InnerJoin | InnerJoin| PrefixA: test.a, PrefixB: test.b, IsNatural: false, Filter: test.a.a == test.b.a|\n" + + "| +--Rename | Rename| AliasList: (count(distinct test.a.b), count_test_a_b)|\n" + + "| +--GroupBy | GroupBy| GroupByCols: test.a.a, FuncList(Name, FuncType): (count, System), MappingType: SetMapping isDistinct: true|\n" + + "| +--Project| Project| Patterns: test.a.*, Target DU: unit0000000002|\n" + + "| +--Rename | Rename| AliasList: (count(distinct test.b.b), count_test_b_b)|\n" + + "| +--GroupBy | GroupBy| GroupByCols: test.b.a, FuncList(Name, FuncType): (count, System), MappingType: SetMapping isDistinct: true|\n" + + "| +--Project| Project| Patterns: test.b.*, Target DU: unit0000000002|\n" + + "+--------------------------+-------------+--------------------------------------------------------------------------------------------------------------------------------+\n" + + "Total line number = 12\n"; + executor.executeAndCompare(explain, expected); + closeRes = executor.execute(statement); + executor.execute(closeRule); + assertEquals(openRes, closeRes); + } + @Test public void testAllowNullColumnRule() { String openRule = "SET RULES AllowNullColumnRule=on;"; diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java index 944def765e..9cfa2a2e44 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java @@ -1034,7 +1034,8 @@ public void testDistinct() { + "Total line number = 1\n"; executor.executeAndCompare(statement, expected); - statement = "SELECT a, COUNT(b), AVG(b), SUM(b), MIN(b), MAX(b) FROM test GROUP BY a;"; + statement = + "SELECT a, COUNT(b), AVG(b), SUM(b), MIN(b), MAX(b) FROM test GROUP BY a ORDER BY a;"; expected = "ResultSets:\n" + "+------+-------------+------------------+-----------+-----------+-----------+\n" @@ -1049,7 +1050,7 @@ public void testDistinct() { executor.executeAndCompare(statement, expected); statement = - "SELECT a, COUNT(DISTINCT b), AVG(DISTINCT b), SUM(DISTINCT b), MIN(DISTINCT b), MAX(DISTINCT b) FROM test GROUP BY a;"; + "SELECT a, COUNT(DISTINCT b), AVG(DISTINCT b), SUM(DISTINCT b), MIN(DISTINCT b), MAX(DISTINCT b) FROM test GROUP BY a ORDER BY a;"; expected = "ResultSets:\n" + "+------+----------------------+--------------------+--------------------+--------------------+--------------------+\n" @@ -5319,7 +5320,7 @@ public void testHavingSubQuery() { + "(3, 2, 2, 1.1, \"val3\"), (4, 3, 2, 2.1, \"val2\"), (5, 1, 2, 3.1, \"val2\"), (6, 2, 2, 5.1, \"val3\");"; executor.execute(insert); - String statement = "SELECT AVG(a), b FROM test.a GROUP BY b;"; + String statement = "SELECT AVG(a), b FROM test.a GROUP BY b ORDER BY b;"; String expected = "ResultSets:\n" + "+-------------+--------+\n" @@ -5354,7 +5355,7 @@ public void testHavingSubQuery() { + "Total line number = 1\n"; executor.executeAndCompare(statement, expected); - statement = "SELECT AVG(a + c), b FROM test.a GROUP BY b;"; + statement = "SELECT AVG(a + c), b FROM test.a GROUP BY b ORDER BY b;"; expected = "ResultSets:\n" + "+------------------------+--------+\n" @@ -5708,15 +5709,15 @@ public void testCTE() { + "SELECT ao.outlet, ao.average_bonus_for_outlet, min.min_avg_bonus_for_outlet, max.max_avg_bonus_for_outlet " + "FROM avg_per_outlet AS ao " + "CROSS JOIN min_bonus_outlet AS min " - + "CROSS JOIN max_bonus_outlet AS max;"; + + "CROSS JOIN max_bonus_outlet AS max ORDER BY ao.outlet;"; expected = "ResultSets:\n" + "+---------+---------------------------+----------------------------+----------------------------+\n" + "|ao.outlet|ao.average_bonus_for_outlet|min.min_avg_bonus_for_outlet|max.max_avg_bonus_for_outlet|\n" + "+---------+---------------------------+----------------------------+----------------------------+\n" - + "| 211| 1897.5| 1716.0| 2020.0|\n" + "| 105| 2020.0| 1716.0| 2020.0|\n" + "| 123| 1716.0| 1716.0| 2020.0|\n" + + "| 211| 1897.5| 1716.0| 2020.0|\n" + "| 224| 1968.0| 1716.0| 2020.0|\n" + "+---------+---------------------------+----------------------------+----------------------------+\n" + "Total line number = 4\n"; @@ -5858,7 +5859,7 @@ public void testValueToMeta() { executor.executeAndCompare(statement, expected); statement = - "SELECT value2meta(SELECT suffix FROM prefix_test WHERE type = \"boolean\") FROM test GROUP BY c.b;"; + "SELECT value2meta(SELECT suffix FROM prefix_test WHERE type = \"boolean\") FROM test GROUP BY c.b ORDER BY c.b;"; expected = "ResultSets:\n" + "+--------+\n"