Skip to content

Commit

Permalink
feat(optimizer): Agg Pushdown Rule (IGinX-THU#544)
Browse files Browse the repository at this point in the history
实现了一系列聚合下推规则,能将聚合函数下推到算子树更底层。
修改IStorage以使对接层具有支持聚合下推的接口,并实现了PG、MySQL的聚合下推。
  • Loading branch information
Yihao-Xu authored Feb 21, 2025
1 parent 00ad508 commit f2494ff
Show file tree
Hide file tree
Showing 47 changed files with 2,260 additions and 304 deletions.
26 changes: 26 additions & 0 deletions .github/actions/confWriter/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ inputs:
description: "close all optimizer"
required: false
default: "false"
Set-ColumnPruning-ON:
description: "set ColumnPruningRule ON"
required: false
default: "false"

runs:
using: "composite" # Mandatory parameter
Expand Down Expand Up @@ -118,6 +122,28 @@ runs:
paths: ${{ inputs.Root-Dir-Path }}/core/target/iginx-core-*/conf/config.properties
statements: s/policyClassName=.*$/policyClassName=cn.edu.tsinghua.iginx.policy.test.KeyRangeTestPolicy/g

- if: inputs.Set-Filter-Fragment-OFF=='true'
name: Set FragmentPruningByFilterRule OFF
uses: ./.github/actions/edit
with:
paths: ${{ inputs.Root-Dir-Path }}/core/target/iginx-core-*/conf/config.properties
statements: s/ruleBasedOptimizer=NotFilterRemoveRule=on,FragmentPruningByFilterRule=on/ruleBasedOptimizer=NotFilterRemoveRule=on/g

- if: inputs.Set-ColumnPruning-ON=='true'
name: Set ColumnPruningRule ON
uses: ./.github/actions/edit
with:
paths: ${{ inputs.Root-Dir-Path }}/core/target/iginx-core-*/conf/config.properties
statements: |
s/ColumnPruningRule=off/ColumnPruningRule=on/g
- if: inputs.Set-AggPushDown-OFF=='true'
name: Set AggPushDownRule OFF
uses: ./.github/actions/edit
with:
paths: ${{ inputs.Root-Dir-Path }}/core/target/iginx-core-*/conf/config.properties
statements: s/AggPushDownRule=on/AggPushDownRule=off/g

- name: Change Log Conf
uses: ./.github/actions/edit
with:
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/tpc-h.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ on:
type: number
required: false
default: 60
agg-pushdown-matrix:
description: "The database which support agg push down"
type: string
required: false
default: '["PostgreSQL", "MYSQL"]'

jobs:
TPC-H-Test:
Expand Down Expand Up @@ -120,16 +125,21 @@ jobs:
with:
DB-name: ${{ matrix.DB-name }}
Close-Optimizer: ${{ inputs.close-optimizer }}
Set-ColumnPruning-ON: ${{ toJSON(inputs.close-optimizer == 'true' && matrix.DB-name == 'InfluxDB') }}
Metadata: ${{ matrix.metadata }}
Set-AggPushDown-OFF: ${{ toJSON(!contains(inputs.agg-pushdown-matrix, matrix.DB-name)) }}

- name: Change Old IGinX Config
uses: ./.github/actions/confWriter
with:
DB-name: ${{ matrix.DB-name }}
Open-Optimizer: "true"
Close-Optimizer: ${{ inputs.close-optimizer }}
Set-ColumnPruning-ON: ${{ toJSON(inputs.close-optimizer == 'true' && matrix.DB-name == 'InfluxDB') }}
Metadata: ${{ matrix.metadata }}
zookeeper-port: 2182
Root-Dir-Path: "IGinX"
Set-AggPushDown-OFF: ${{ toJSON(!contains(inputs.agg-pushdown-matrix, matrix.DB-name)) }}

- name: Download TPC-H Data
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ queryOptimizer=rbo
# 优化器规则
ruleBasedOptimizer=NotFilterRemoveRule=on,ColumnPruningRule=on,ConstantPropagationRule=on,DistinctEliminateRule=on,\
ConstantFoldingRule=on,FilterPushDownRule=on,JoinFactorizationRule=on,SetTransformPushDownPathUnionJoinRule=on,InFilterTransformRule=on,\
OuterJoinEliminateRule=on
OuterJoinEliminateRule=on,AggPushDownRule=on

#ruleBasedOptimizer=AllowNullColumnRule=on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,4 +768,26 @@ public static boolean isProjectFromConstant(Operator operator) {
}
return false;
}

public static List<FunctionCall> getFunctionCallList(Operator operator) {
switch (operator.getType()) {
case GroupBy:
return ((GroupBy) operator).getFunctionCallList();
case SetTransform:
return ((SetTransform) operator).getFunctionCallList();
case RowTransform:
return ((RowTransform) operator).getFunctionCallList();
case MappingTransform:
return ((MappingTransform) operator).getFunctionCallList();
default:
return new ArrayList<>();
}
}

public static Operator getUnaryChild(Operator operator) {
if (isUnaryOperator(operator.getType())) {
return ((OperatorSource) ((UnaryOperator) operator).getSource()).getOperator();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,7 @@
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.DataTypeUtils;
import java.security.Key;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -929,4 +922,48 @@ private static boolean hasCaseWhen(Expression expr) {
return false;
}
}

public static Expression replacePaths(Expression expr, Map<String, String> pathMap) {
Expression res = copy(expr);
res.accept(
new ExpressionVisitor() {
@Override
public void visit(BaseExpression expression) {
expression.setPathName(
pathMap.getOrDefault(expression.getColumnName(), expression.getColumnName()));
}

@Override
public void visit(BinaryExpression expression) {}

@Override
public void visit(BracketExpression expression) {}

@Override
public void visit(ConstantExpression expression) {}

@Override
public void visit(FromValueExpression expression) {}

@Override
public void visit(FuncExpression expression) {}

@Override
public void visit(MultipleExpression expression) {}

@Override
public void visit(UnaryExpression expression) {}

@Override
public void visit(CaseWhenExpression expression) {}

@Override
public void visit(KeyExpression expression) {}

@Override
public void visit(SequenceExpression expression) {}
});

return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,4 +451,57 @@ public void visit(InFilter inFilter) {}
});
return pathFilters;
}

/** 获取filter以及所有子filter的类型 */
public static Set<FilterType> getFilterType(Filter filter) {
Set<FilterType> filterTypes = new HashSet<>();
filter.accept(
new FilterVisitor() {
@Override
public void visit(AndFilter filter) {
filterTypes.add(FilterType.And);
}

@Override
public void visit(OrFilter filter) {
filterTypes.add(FilterType.Or);
}

@Override
public void visit(NotFilter filter) {
filterTypes.add(FilterType.Not);
}

@Override
public void visit(KeyFilter filter) {
filterTypes.add(FilterType.Key);
}

@Override
public void visit(ValueFilter filter) {
filterTypes.add(FilterType.Value);
}

@Override
public void visit(PathFilter filter) {
filterTypes.add(FilterType.Path);
}

@Override
public void visit(BoolFilter filter) {
filterTypes.add(FilterType.Bool);
}

@Override
public void visit(ExprFilter filter) {
filterTypes.add(FilterType.Expr);
}

@Override
public void visit(InFilter inFilter) {
filterTypes.add(FilterType.In);
}
});
return filterTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.operator.Delete;
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.SetTransform;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval;
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
Expand Down Expand Up @@ -56,15 +52,30 @@ public interface IStorage {
TaskExecuteResult executeProjectDummyWithSelect(
Project project, Select select, DataArea dataArea);

default boolean isSupportProjectWithSetTransform(SetTransform setTransform, DataArea dataArea) {
/** 询问底层是否支持仅带AGG的查询 */
default boolean isSupportProjectWithAgg(Operator agg, DataArea dataArea, boolean isDummy) {
return false;
}

default TaskExecuteResult executeProjectWithSetTransform(
Project project, SetTransform setTransform, DataArea dataArea) {
throw new UnsupportedOperationException();
default boolean isSupportProjectWithAggSelect(
Operator agg, Select select, DataArea dataArea, boolean isDummy) {
return false;
}

/** 对非叠加分片带Agg带谓词下推的查询 */
TaskExecuteResult executeProjectWithAggSelect(
Project project, Select select, Operator agg, DataArea dataArea);

/** 对叠加分片带Agg带谓词下推的查询 */
TaskExecuteResult executeProjectDummyWithAggSelect(
Project project, Select select, Operator agg, DataArea dataArea);

/** 对非叠加分片带Agg的查询 */
TaskExecuteResult executeProjectWithAgg(Project project, Operator agg, DataArea dataArea);

/** 对叠加分片带Agg的查询 */
TaskExecuteResult executeProjectDummyWithAgg(Project project, Operator agg, DataArea dataArea);

/** 对非叠加分片删除数据 */
TaskExecuteResult executeDelete(Delete delete, DataArea dataArea);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,19 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.TooManyPhysicalTasksException;
import cn.edu.tsinghua.iginx.engine.physical.exception.UnexpectedOperatorException;
import cn.edu.tsinghua.iginx.engine.physical.memory.MemoryPhysicalTaskDispatcher;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutor;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutorFactory;
import cn.edu.tsinghua.iginx.engine.physical.optimizer.ReplicaDispatcher;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
import cn.edu.tsinghua.iginx.engine.physical.storage.StorageManager;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy.PushDownStrategy;
import cn.edu.tsinghua.iginx.engine.physical.storage.execute.pushdown.strategy.PushDownStrategyFactory;
import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue;
import cn.edu.tsinghua.iginx.engine.physical.task.GlobalPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
import cn.edu.tsinghua.iginx.metadata.entity.FragmentMeta;
Expand Down Expand Up @@ -148,70 +146,19 @@ private StoragePhysicalTaskExecutor() {
boolean isDummyStorageUnit = task.isDummyStorageUnit();
DataArea dataArea =
new DataArea(storageUnit, fragmentMeta.getKeyInterval());

switch (op.getType()) {
case Project:
boolean needSelectPushDown =
pair.k.isSupportProjectWithSelect()
&& operators.size() == 2
&& operators.get(1).getType() == OperatorType.Select;
boolean needSetTransformPushDown =
operators.size() == 2
&& operators.get(1).getType()
== OperatorType.SetTransform;
boolean canSetTransformPushDown =
needSetTransformPushDown
&& pair.k.isSupportProjectWithSetTransform(
(SetTransform) operators.get(1), dataArea);
if (isDummyStorageUnit) {
if (needSelectPushDown) {
result =
pair.k.executeProjectDummyWithSelect(
(Project) op, (Select) operators.get(1), dataArea);
} else if (needSetTransformPushDown) {
throw new IllegalStateException();
} else {
result = pair.k.executeProjectDummy((Project) op, dataArea);
}
} else {
if (needSelectPushDown) {
result =
pair.k.executeProjectWithSelect(
(Project) op, (Select) operators.get(1), dataArea);
} else if (needSetTransformPushDown) {
if (canSetTransformPushDown) {
result =
pair.k.executeProjectWithSetTransform(
(Project) op,
(SetTransform) operators.get(1),
dataArea);
} else {
TaskExecuteResult tempResult =
pair.k.executeProject((Project) op, dataArea);
if (tempResult.getException() != null) {
result = tempResult;
} else {
// set transform push down is not supported, execute set
// transform in memory
OperatorMemoryExecutor executor =
OperatorMemoryExecutorFactory.getInstance()
.getMemoryExecutor();
try {
RowStream rowStream =
executor.executeUnaryOperator(
(SetTransform) operators.get(1),
tempResult.getRowStream(),
task.getContext());
result = new TaskExecuteResult(rowStream);
} catch (PhysicalException e) {
result = new TaskExecuteResult(e);
}
}
}
} else {
result = pair.k.executeProject((Project) op, dataArea);
}
}
PushDownStrategy strategy =
PushDownStrategyFactory.getStrategy(
operators, pair.k, dataArea, isDummyStorageUnit);
result =
strategy.execute(
(Project) op,
operators,
dataArea,
pair.k,
isDummyStorageUnit,
task.getContext());
break;
case Insert:
result = pair.k.executeInsert((Insert) op, dataArea);
Expand Down
Loading

0 comments on commit f2494ff

Please sign in to comment.