Skip to content

Commit

Permalink
feat(optimizer): Filter Pushdown Rule (#339)
Browse files Browse the repository at this point in the history
本次PR在RBO框架下实现了一个谓词下推规则,会将SELECT算子逐步往下推,在无法完全下推时会使用超集下推。测试结果见 【PR文档】feat(optimizer): Filter Pushdown Rule

比起原本的谓词下推,本次PR实现的谓词下推的优点有:

SELECT算子逐步下推,当确实无法下推到最底下时,SELECT算子可以下推到查询树较底下的位置,减少上方算子处理的数据量。而原本的下推在无法下推到最底下时,会将SELECT算子保留在原位置。
实现了Filter下推过CrossJoin、InnerJoin、OuterJoin的逻辑,当Filter下推过CrossJoin或OuterJoin时,可能可以改变Join算子,将其转换成更高效的算子。例如当Filter下推过CrossJoin时,可以将作为Join的ON条件,这样可以转换成InnerJoin,测试表明这种转换能带来极大的性能提升。当Filter下推过OuterJoin时,也可以根据情况将其转换成InnerJoin,或者将Full Outer Join转换成Left/Right Outer Join。
实现了SELECT算子的合并,当SELECT算子下推碰到SELECT算子时,会将两个算子合并,以减少数据处理量。当SELECT算子碰到INNER JOIN的ON条件时,也推入Inner Join中进行合并。
实现了ON条件的下推,可以将InnerJoin、OuterJoin中的ON条件进行下推。
实现了Filter下推过GroupBy和Distinct的逻辑,当Filter下推过这两种算子时,只能将关于GroupBy列或Distinct列的Filter下推。
  • Loading branch information
Yihao-Xu authored May 26, 2024
1 parent 960557b commit 641ec82
Show file tree
Hide file tree
Showing 41 changed files with 2,247 additions and 320 deletions.
12 changes: 9 additions & 3 deletions .github/actions/confWriter/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,19 @@ runs:
run: |
if [ "$RUNNER_OS" == "Linux" ]; then
sudo sed -i 's/enablePushDown=false/enablePushDown=true/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i 's/queryOptimizer=rbo/queryOptimizer=rbo,filter_push_down/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i 's/FilterPushDownAddSchemaPrefixRule=off,FilterPushDownAddSchemaPrefixRule=off/FilterPushDownAddSchemaPrefixRule=on,FilterPushDownAddSchemaPrefixRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i 's/FilterPushDownPathUnionJoinRule=off,FilterPushDownProjectReorderSortRule=off,FilterPushDownRenameRule=off,FilterPushDownSelectRule=off/FilterPushDownPathUnionJoinRule=on,FilterPushDownProjectReorderSortRule=on,FilterPushDownRenameRule=on,FilterPushDownSelectRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i 's/FilterPushDownSetOpRule=off,FilterPushDownTransformRule=off,FilterPushIntoJoinConditionRule=off,FilterPushOutJoinConditionRule=off,FilterPushDownGroupByRule=off/FilterPushDownSetOpRule=on,FilterPushDownTransformRule=on,FilterPushIntoJoinConditionRule=on,FilterPushOutJoinConditionRule=on,FilterPushDownGroupByRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
elif [ "$RUNNER_OS" == "Windows" ]; then
sed -i 's/enablePushDown=false/enablePushDown=true/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i 's/queryOptimizer=rbo/queryOptimizer=rbo,filter_push_down/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i 's/FilterPushDownAddSchemaPrefixRule=off,FilterPushDownAddSchemaPrefixRule=off/FilterPushDownAddSchemaPrefixRule=on,FilterPushDownAddSchemaPrefixRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i 's/FilterPushDownPathUnionJoinRule=off,FilterPushDownProjectReorderSortRule=off,FilterPushDownRenameRule=off,FilterPushDownSelectRule=off/FilterPushDownPathUnionJoinRule=on,FilterPushDownProjectReorderSortRule=on,FilterPushDownRenameRule=on,FilterPushDownSelectRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i 's/FilterPushDownSetOpRule=off,FilterPushDownTransformRule=off,FilterPushIntoJoinConditionRule=off,FilterPushOutJoinConditionRule=off,FilterPushDownGroupByRule=off/FilterPushDownSetOpRule=on,FilterPushDownTransformRule=on,FilterPushIntoJoinConditionRule=on,FilterPushOutJoinConditionRule=on,FilterPushDownGroupByRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
elif [ "$RUNNER_OS" == "macOS" ]; then
sudo sed -i '' 's/enablePushDown=false/enablePushDown=true/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i '' 's/queryOptimizer=rbo/queryOptimizer=rbo,filter_push_down/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i '' 's/FilterPushDownAddSchemaPrefixRule=off,FilterPushDownAddSchemaPrefixRule=off/FilterPushDownAddSchemaPrefixRule=on,FilterPushDownAddSchemaPrefixRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i '' 's/FilterPushDownPathUnionJoinRule=off,FilterPushDownProjectReorderSortRule=off,FilterPushDownRenameRule=off,FilterPushDownSelectRule=off/FilterPushDownPathUnionJoinRule=on,FilterPushDownProjectReorderSortRule=on,FilterPushDownRenameRule=on,FilterPushDownSelectRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
sed -i '' 's/FilterPushDownSetOpRule=off,FilterPushDownTransformRule=off,FilterPushIntoJoinConditionRule=off,FilterPushOutJoinConditionRule=off,FilterPushDownGroupByRule=off/FilterPushDownSetOpRule=on,FilterPushDownTransformRule=on,FilterPushIntoJoinConditionRule=on,FilterPushOutJoinConditionRule=on,FilterPushDownGroupByRule=on/g' ${GITHUB_WORKSPACE}/core/target/iginx-core-${VERSION}/conf/config.properties
else
echo "$RUNNER_OS is not supported"
exit 1
Expand Down
6 changes: 5 additions & 1 deletion conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ maxCachedPhysicalTaskPerStorage=500
queryOptimizer=rbo

# 优化器规则
ruleBasedOptimizer=NotFilterRemoveRule=on,FragmentPruningByFilterRule=on,ColumnPruningRule=on,FragmentPruningByPatternRule=on,ConstantPropagationRule=on,FilterConstantFoldingRule=on,RowTransformConstantFoldingRule=on
ruleBasedOptimizer=NotFilterRemoveRule=on,FragmentPruningByFilterRule=on,ColumnPruningRule=on,FragmentPruningByPatternRule=on,ConstantPropagationRule=on,\
FilterConstantFoldingRule=on,RowTransformConstantFoldingRule=on,FilterPushDownAddSchemaPrefixRule=off,FilterPushDownAddSchemaPrefixRule=off,\
FilterPushDownPathUnionJoinRule=off,FilterPushDownProjectReorderSortRule=off,FilterPushDownRenameRule=off,FilterPushDownSelectRule=off,\
FilterPushDownSetOpRule=off,FilterPushDownTransformRule=off,FilterPushIntoJoinConditionRule=off,FilterPushOutJoinConditionRule=off,FilterPushDownGroupByRule=off


# ParallelFilter触发行数
parallelFilterThreshold=10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public abstract class RuleCall {

private final Map<Operator, List<Operator>> childrenIndex;

private Object context;

public RuleCall(
Operator matchedRoot,
Map<Operator, Operator> parentIndexMap,
Expand Down Expand Up @@ -81,4 +83,12 @@ public void transformTo(Operator newRoot) {
}
}
}

public void setContext(Object context) {
this.context = context;
}

public Object getContext() {
return context;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.edu.tsinghua.iginx.engine.logical.optimizer.rules;

import static cn.edu.tsinghua.iginx.engine.logical.utils.OperatorUtils.covers;
import static cn.edu.tsinghua.iginx.engine.logical.utils.PathUtils.*;

import cn.edu.tsinghua.iginx.engine.logical.optimizer.core.RuleCall;
import cn.edu.tsinghua.iginx.engine.logical.utils.OperatorUtils;
Expand Down Expand Up @@ -384,65 +385,6 @@ private void changeColumnsFromFunctionCallList(
}
}

// 检查第一组模式是否完全包含第二组模式
public static boolean checkCoverage(Collection<String> groupA, Collection<String> groupB) {
for (String patternB : groupB) {
boolean covered = false;
for (String patternA : groupA) {
if (covers(patternA, patternB)) {
covered = true;
break;
}
}
if (!covered) {
return false; // 如果找不到覆盖patternB的patternA,则第一组不完全包含第二组
}
}
return true; // 所有的patternB都被至少一个patternA覆盖
}

/**
* 反向重命名模式列表中的模式
*
* @param aliasMap 重命名规则, key为旧模式,value为新模式,在这里我们要将新模式恢复为旧模式
* @param patterns 要重命名的模式列表
* @return 重命名后的模式列表
*/
private static List<String> recoverRenamedPatterns(
Map<String, String> aliasMap, List<String> patterns) {
List<String> renamedPatterns = new ArrayList<>();
for (String pattern : patterns) {
boolean matched = false;
for (Map.Entry<String, String> entry : aliasMap.entrySet()) {
String oldPattern = entry.getKey().replace("*", "$1"); // 通配符转换为正则的捕获组
String newPattern = entry.getValue().replace("*", "(.*)"); // 使用反向引用保留原始匹配的部分
if (pattern.matches(newPattern)) {
if (oldPattern.contains("$1") && !newPattern.contains("*")) {
// 如果旧模式中有通配符,但是新模式中没有,我们需要将新模式中的捕获组替换为通配符
oldPattern = oldPattern.replace("$1", "*");
}
String p = pattern.replaceAll(newPattern, oldPattern);
renamedPatterns.add(p);
matched = true;
break;
} else if (newPattern.equals(pattern)) {
renamedPatterns.add(entry.getKey());
matched = true;
break;
} else if (pattern.contains(".*")
&& newPattern.matches(StringUtils.reformatPath(pattern))) {
renamedPatterns.add(entry.getKey());
matched = true;
break;
}
}
if (!matched) { // 如果没有匹配的规则,添加原始模式
renamedPatterns.add(pattern);
}
}
return renamedPatterns;
}

/**
* 检查列中是否有通配符
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package cn.edu.tsinghua.iginx.engine.logical.optimizer.rules;

import cn.edu.tsinghua.iginx.engine.logical.optimizer.core.RuleCall;
import cn.edu.tsinghua.iginx.engine.shared.expr.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.AddSchemaPrefix;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.*;
import cn.edu.tsinghua.iginx.engine.shared.source.OperatorSource;

public class FilterPushDownAddSchemaPrefixRule extends Rule {

private static class InstanceHolder {
private static final FilterPushDownAddSchemaPrefixRule INSTANCE =
new FilterPushDownAddSchemaPrefixRule();
}

public static FilterPushDownAddSchemaPrefixRule getInstance() {
return InstanceHolder.INSTANCE;
}

protected FilterPushDownAddSchemaPrefixRule() {
/*
* we want to match the topology like:
* Select
* |
* AddSchemaPrefix
*/
super(
"FilterPushDownAddSchemaPrefixRule",
operand(Select.class, operand(AddSchemaPrefix.class, any())));
}

@Override
public boolean matches(RuleCall call) {
return super.matches(call);
}

@Override
public void onMatch(RuleCall call) {
Select select = (Select) call.getMatchedRoot();
AddSchemaPrefix addSchemaPrefix =
(AddSchemaPrefix) ((OperatorSource) select.getSource()).getOperator();
removeFilterPrefix(select.getFilter(), addSchemaPrefix.getSchemaPrefix());

select.setSource(addSchemaPrefix.getSource());
addSchemaPrefix.setSource(new OperatorSource(select));
call.transformTo(addSchemaPrefix);
}

private void removeFilterPrefix(Filter filter, String prefix) {
filter.accept(
new FilterVisitor() {
@Override
public void visit(AndFilter filter) {}

@Override
public void visit(OrFilter filter) {}

@Override
public void visit(NotFilter filter) {}

@Override
public void visit(KeyFilter filter) {}

@Override
public void visit(ValueFilter filter) {
filter.setPath(removePrefix(filter.getPath(), prefix));
}

@Override
public void visit(PathFilter filter) {
filter.setPathA(removePrefix(filter.getPathA(), prefix));
filter.setPathB(removePrefix(filter.getPathB(), prefix));
}

@Override
public void visit(BoolFilter filter) {}

@Override
public void visit(ExprFilter filter) {
Expression exprA = filter.getExpressionA();
Expression exprB = filter.getExpressionB();
removeExpressionPrefix(exprA, prefix);
removeExpressionPrefix(exprB, prefix);
}
});
}

private void removeExpressionPrefix(Expression expression, String prefix) {
expression.accept(
new ExpressionVisitor() {
@Override
public void visit(BaseExpression expression) {
expression.setPathName(removePrefix(expression.getPathName(), prefix));
}

@Override
public void visit(BinaryExpression expression) {}

@Override
public void visit(BracketExpression expression) {}

@Override
public void visit(ConstantExpression expression) {}

@Override
public void visit(FromValueExpression expression) {}

@Override
public void visit(FuncExpression expression) {
expression.getColumns().replaceAll(column -> removePrefix(column, prefix));
}

@Override
public void visit(MultipleExpression expression) {}

@Override
public void visit(UnaryExpression expression) {}
});
}

private String removePrefix(String path, String prefix) {
if (prefix != null && path.startsWith(prefix + ".")) {
return path.substring(prefix.length() + 1);
}
return path;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package cn.edu.tsinghua.iginx.engine.logical.optimizer.rules;

import cn.edu.tsinghua.iginx.engine.logical.optimizer.core.RuleCall;
import cn.edu.tsinghua.iginx.engine.logical.utils.LogicalFilterUtils;
import cn.edu.tsinghua.iginx.engine.shared.operator.*;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.AndFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.engine.shared.source.OperatorSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;

public class FilterPushDownGroupByRule extends Rule {
private static List<OperatorType> validOps =
Arrays.asList(OperatorType.GroupBy, OperatorType.Distinct);

private static class InstanceHolder {
private static final FilterPushDownGroupByRule INSTANCE = new FilterPushDownGroupByRule();
}

public static FilterPushDownGroupByRule getInstance() {
return InstanceHolder.INSTANCE;
}

protected FilterPushDownGroupByRule() {
/*
* we want to match the topology like:
* Select
* |
* GroupBy/Distinct
*/
super(
"FilterPushDownGroupByRule",
operand(Select.class, operand(AbstractUnaryOperator.class, any())));
}

@Override
public boolean matches(RuleCall call) {
Select select = (Select) call.getMatchedRoot();
AbstractUnaryOperator operator =
(AbstractUnaryOperator) call.getChildrenIndex().get(select).get(0);
if (!validOps.contains(operator.getType())) {
return false;
}

// 如果没有GroupBy Key,不能下推
List<String> groupByCols = getGroupByCols(operator);
if (groupByCols.isEmpty()) {
return false;
}

// 分解Filter为一系列AND连接的子条件
List<Filter> splitFilters = LogicalFilterUtils.splitFilter(select.getFilter());
List<Filter> pushFilters = new ArrayList<>(), remainFilters = new ArrayList<>();
for (Filter filter : splitFilters) {
// 如果Filter中的列仅包含GroupBy Key,可以下推,否则不行
if (new HashSet<>(groupByCols).containsAll(LogicalFilterUtils.getPathsFromFilter(filter))) {
pushFilters.add(filter);
} else {
remainFilters.add(filter);
}
}

if (pushFilters.isEmpty()) {
return false;
}

call.setContext(new Object[] {pushFilters, remainFilters});
return true;
}

@Override
public void onMatch(RuleCall call) {
List<Filter> pushFilters = (List<Filter>) ((Object[]) call.getContext())[0];
List<Filter> remainFilters = (List<Filter>) ((Object[]) call.getContext())[1];

Select select = (Select) call.getMatchedRoot();
AbstractUnaryOperator operator =
(AbstractUnaryOperator) call.getChildrenIndex().get(select).get(0);
Select newSelect =
new Select(operator.getSource(), new AndFilter(pushFilters), select.getTagFilter());
operator.setSource(new OperatorSource(newSelect));

if (remainFilters.isEmpty()) {
call.transformTo(operator);
} else {
select.setFilter(new AndFilter(remainFilters));
call.transformTo(select);
}
}

private List<String> getGroupByCols(AbstractUnaryOperator operator) {
if (operator.getType() == OperatorType.GroupBy) {
return ((GroupBy) operator).getGroupByCols();
} else if (operator.getType() == OperatorType.Distinct) {
return ((Distinct) operator).getPatterns();
}
throw new IllegalArgumentException("OperatorType is not GroupBy or Distinct");
}
}
Loading

0 comments on commit 641ec82

Please sign in to comment.