Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add second optimizer. #311

Draft
wants to merge 1 commit into
base: dev-optimizer-rework
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.sql.executor;

import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.opensearch.sql.analysis.AnalysisContext;
import org.opensearch.sql.analysis.Analyzer;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
Expand All @@ -28,7 +29,8 @@ public class QueryService {

private final ExecutionEngine executionEngine;

private final Planner planner;
@Setter
private Planner planner;

/**
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.pagination.CanPaginateVisitor;
import org.opensearch.sql.planner.Planner;

/**
* QueryExecution Factory.
Expand All @@ -43,6 +44,10 @@ public class QueryPlanFactory
*/
private final QueryService queryService;

public void setPlanner(Planner planner) {
queryService.setPlanner(planner);
}

/**
* NO_CONSUMER_RESPONSE_LISTENER should never be called. It is only used as constructor
* parameter of {@link QueryPlan}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

package org.opensearch.sql.planner.optimizer;

import static com.facebook.presto.matching.DefaultMatcher.DEFAULT_MATCHER;

import com.facebook.presto.matching.Match;
import java.util.List;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
Expand All @@ -19,14 +23,24 @@
* 1> Optimize the current node with all the rules.
* 2> Optimize the all the child nodes with all the rules.
* 3) In case the child node could change, Optimize the current node again.
* TODO update comment ^
*/
@RequiredArgsConstructor
public class LogicalPlanOptimizer {

private final List<Rule<? extends LogicalPlan>> rules;

private final OptimizingMode optimizingMode;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest removing the Mode. It's not needed.
Instead let's create a LogicalPlanOptimizer interface, and a separate SQLLogicalPlanOptimizer (that contains the create2() function) and PPLLogicalPlanOptimizer (that contains the old create() function. The Planner can be created with the appropriate PlanOptimizer.
I might consider also checking for aggregation in the plan and consider using the 'old' pushdown method when aggregation is present in the plan... that would be safer. But if you're confident that this pushdown works with aggregation, then we can push.


// TODO comment
public enum OptimizingMode {
PRESERVE_TREE_ORDER,
PRESERVE_RULE_ORDER
}

/**
* Create {@link LogicalPlanOptimizer} with pre-defined rules.
* TODO comment
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(List.of(
Expand All @@ -35,22 +49,99 @@ public static LogicalPlanOptimizer create() {
*/
new CreateTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_PAGE_SIZE,
TableScanPushDown.PUSH_DOWN_FILTER_DEEP,
TableScanPushDown.PUSH_DOWN_AGGREGATION_DEEP,
TableScanPushDown.PUSH_DOWN_FILTER_DEEP,
TableScanPushDown.PUSH_DOWN_SORT_DEEP,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_NESTED,
TableScanPushDown.PUSH_DOWN_PROJECT_DEEP,
TableScanPushDown.PUSH_DOWN_LIMIT_DEEP,
new CreateTableWriteBuilder()
), OptimizingMode.PRESERVE_RULE_ORDER);
}

/**
* TODO comment
* TODO rename
*/
public static LogicalPlanOptimizer create2() {
return new LogicalPlanOptimizer(List.of(
new CreateTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_PAGE_SIZE,
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_NESTED,
TableScanPushDown.PUSH_DOWN_PROJECT,
TableScanPushDown.PUSH_DOWN_LIMIT,
new CreateTableWriteBuilder()
));
), OptimizingMode.PRESERVE_TREE_ORDER);
}

/**
* Optimize {@link LogicalPlan}.
*/
public LogicalPlan optimize(LogicalPlan plan) {
return new LogicalPlanOptimizerVisitor(rules).optimize(plan);
log(plan);
var optimized = plan;
if (optimizingMode == OptimizingMode.PRESERVE_RULE_ORDER) {
optimized = new LogicalPlanOptimizerVisitor(rules).optimize(plan);
} else { //PRESERVE_TREE_ORDER
optimized = optimize2(plan);
}
log(optimized);
return optimized;
}


// TODO remove debugging
public void log(LogicalPlan plan) {
var node = plan;
System.out.println("==============");
System.out.println(node.getClass().getSimpleName());
while (node.getChild().size() > 0) {
node = node.getChild().get(0);
System.out.println(" |");
System.out.println(node.getClass().getSimpleName());
}
System.out.println("==============");
}

// TODO merge classes, reuse code and make rename functions

/**
* Optimize {@link LogicalPlan}.
*/
public LogicalPlan optimize2(LogicalPlan plan) {
LogicalPlan optimized = internalOptimize(plan);
optimized.replaceChildPlans(
optimized.getChild().stream().map(this::optimize2).collect(
Collectors.toList()));
return internalOptimize(optimized);
}

private LogicalPlan internalOptimize(LogicalPlan plan) {
LogicalPlan node = plan;
boolean done = false;
while (!done) {
done = true;
for (Rule rule : rules) {
Match match = DEFAULT_MATCHER.match(rule.pattern(), node);
if (match.isPresent()) {
node = rule.apply(match.value(), match.captures());

// For new TableScanPushDown impl, pattern match doesn't necessarily cause
// push down to happen. So reiterate all rules against the node only if the node
// is actually replaced by any rule.
// TODO: may need to introduce fixed point or maximum iteration limit in future
if (node != match.value()) {
done = false;
}
}
}
}
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public LogicalPlanOptimizerVisitor(List<Rule<? extends LogicalPlan>> rules) {
}

public LogicalPlan optimize(LogicalPlan planTree) {
log(planTree);
var node = planTree;
for (int i = 0; i < rules.size(); i++) {
// TODO how to avoid unchecked cast
Expand All @@ -43,23 +42,9 @@ public LogicalPlan optimize(LogicalPlan planTree) {
i--;
}
}
log(node);
return node;
}

// TODO remove debugging
public void log(LogicalPlan plan) {
var node = plan;
System.out.println("==============");
System.out.println(node.getClass().getSimpleName());
while (node.getChild().size() > 0) {
node = node.getChild().get(0);
System.out.println(" |");
System.out.println(node.getClass().getSimpleName());
}
System.out.println("==============");
}

private boolean isCurrentRuleAppliedToNode(LogicalPlan node) {
for (var logEntry : log) {
// A rule could be applied to the exactly equal, but not same tree node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -37,26 +39,34 @@
public class PushDownRule<T extends LogicalPlan> extends Rule<T> {

private final Class<T> clazz;
private final BiFunction<T, TableScanBuilder, Boolean> pushDownFunction;
private final List<Function<LogicalPlan, Boolean>> exceptions;
private BiFunction<T, TableScanBuilder, Boolean> pushDownFunction;
private List<Function<LogicalPlan, Boolean>> exceptions;
private boolean deepTreeTraverse;

public PushDownRule(Class<T> clazz,
boolean canBeAppliedMultipleTimes,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction,
Function<LogicalPlan, Boolean> exception) {
super(canBeAppliedMultipleTimes);
public PushDownRule(Class<T> clazz) {
this.clazz = clazz;
}

@SafeVarargs
public final PushDownRule<T> configureDeepTraverseRule(
boolean canBeAppliedMultipleTimes,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction,
Function<LogicalPlan, Boolean>... exceptions) {
this.canBeAppliedMultipleTimes = canBeAppliedMultipleTimes;
this.deepTreeTraverse = true;
this.pushDownFunction = pushDownFunction;
this.exceptions = List.of(exception, getDefaultException());
this.exceptions = new ArrayList<>(Arrays.asList(exceptions));
this.exceptions.add(0, getDefaultException());
return this;
}

public PushDownRule(Class<T> clazz,
boolean canBeAppliedMultipleTimes,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
super(canBeAppliedMultipleTimes);
this.clazz = clazz;
public final PushDownRule<T> configureRegularRule(
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
this.canBeAppliedMultipleTimes = true;
this.deepTreeTraverse = false;
this.pushDownFunction = pushDownFunction;
this.exceptions = List.of(getDefaultException());
this.exceptions = List.of();
return this;
}

/**
Expand Down Expand Up @@ -99,7 +109,9 @@ private Optional<TableScanBuilder> findTableScanBuilder(LogicalPlan node) {
}
return Optional.of((TableScanBuilder) children.get(0));
}
plans.addAll(children);
if (deepTreeTraverse) {
plans.addAll(children);
}
} while (!plans.isEmpty());
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,18 @@
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPlan;

/**
* Optimization Rule.
* @param <T> LogicalPlan.
*/
@RequiredArgsConstructor
public abstract class Rule<T> {

@Accessors(fluent = true)
@Getter
protected final boolean canBeAppliedMultipleTimes;

public Rule() {
this.canBeAppliedMultipleTimes = false;
}
protected boolean canBeAppliedMultipleTimes;

/**
* Get the {@link Pattern}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,48 +29,65 @@
public class TableScanPushDown {

/** Push down optimize rule for filtering condition. */
public static final Rule<? extends LogicalPlan> PUSH_DOWN_FILTER =
new PushDownRule<>(LogicalFilter.class, true,
public static final Rule<? extends LogicalPlan> PUSH_DOWN_FILTER_DEEP =
new PushDownRule<>(LogicalFilter.class).configureDeepTraverseRule(true,
(filter, scanBuilder) -> scanBuilder.pushDownFilter(filter),
(plan) -> plan instanceof LogicalAggregation || plan instanceof LogicalProject);
(plan) -> plan instanceof LogicalAggregation,
(plan) -> plan instanceof LogicalProject);
public static final Rule<? extends LogicalPlan> PUSH_DOWN_FILTER =
new PushDownRule<>(LogicalFilter.class).configureRegularRule(
(filter, scanBuilder) -> scanBuilder.pushDownFilter(filter));

/** Push down optimize rule for aggregate operator. */
public static final Rule<? extends LogicalPlan> PUSH_DOWN_AGGREGATION =
new PushDownRule<>(LogicalAggregation.class, false,
public static final Rule<? extends LogicalPlan> PUSH_DOWN_AGGREGATION_DEEP =
new PushDownRule<>(LogicalAggregation.class).configureDeepTraverseRule(true,
(agg, scanBuilder) -> scanBuilder.pushDownAggregation(agg),
(plan) -> plan instanceof LogicalProject);
public static final Rule<? extends LogicalPlan> PUSH_DOWN_AGGREGATION =
new PushDownRule<>(LogicalAggregation.class).configureRegularRule(
(agg, scanBuilder) -> scanBuilder.pushDownAggregation(agg));

/** Push down optimize rule for sort operator. */
public static final Rule<? extends LogicalPlan> PUSH_DOWN_SORT =
new PushDownRule<>(LogicalSort.class, true,
public static final Rule<? extends LogicalPlan> PUSH_DOWN_SORT_DEEP =
new PushDownRule<>(LogicalSort.class).configureDeepTraverseRule(true,
(sort, scanBuilder) -> scanBuilder.pushDownSort(sort),
(plan) -> plan instanceof LogicalProject);
public static final Rule<? extends LogicalPlan> PUSH_DOWN_SORT =
new PushDownRule<>(LogicalSort.class).configureRegularRule(
(sort, scanBuilder) -> scanBuilder.pushDownSort(sort));

/** Push down optimize rule for limit operator. */
public static final Rule<? extends LogicalPlan> PUSH_DOWN_LIMIT =
new PushDownRule<>(LogicalLimit.class, false,
public static final Rule<? extends LogicalPlan> PUSH_DOWN_LIMIT_DEEP =
new PushDownRule<>(LogicalLimit.class).configureDeepTraverseRule(false,
(limit, scanBuilder) -> scanBuilder.pushDownLimit(limit),
(plan) -> plan instanceof LogicalSort || plan instanceof LogicalFilter
|| plan instanceof LogicalProject);
(plan) -> plan instanceof LogicalSort,
(plan) -> plan instanceof LogicalFilter,
(plan) -> plan instanceof LogicalProject);
public static final Rule<? extends LogicalPlan> PUSH_DOWN_LIMIT =
new PushDownRule<>(LogicalLimit.class).configureRegularRule(
(limit, scanBuilder) -> scanBuilder.pushDownLimit(limit));

/** Push down optimize rule for Project operator. */
public static final Rule<? extends LogicalPlan> PUSH_DOWN_PROJECT =
new PushDownRule<>(LogicalProject.class, false,
public static final Rule<? extends LogicalPlan> PUSH_DOWN_PROJECT_DEEP =
new PushDownRule<>(LogicalProject.class).configureDeepTraverseRule(false,
(project, scanBuilder) -> scanBuilder.pushDownProject(project),
(plan) -> plan instanceof LogicalEval || plan instanceof LogicalWindow);
(plan) -> plan instanceof LogicalWindow);
public static final Rule<? extends LogicalPlan> PUSH_DOWN_PROJECT =
new PushDownRule<>(LogicalProject.class).configureRegularRule(
(project, scanBuilder) -> scanBuilder.pushDownProject(project));

/** Push down optimize rule for highlight operator. */
public static final Rule<? extends LogicalPlan> PUSH_DOWN_HIGHLIGHT =
new PushDownRule<>(LogicalHighlight.class, true,
new PushDownRule<>(LogicalHighlight.class).configureDeepTraverseRule(true,
(highlight, scanBuilder) -> scanBuilder.pushDownHighlight(highlight));

/** Push down optimize rule for nested operator. */
public static final Rule<? extends LogicalPlan> PUSH_DOWN_NESTED =
new PushDownRule<>(LogicalNested.class, false,
new PushDownRule<>(LogicalNested.class).configureDeepTraverseRule(false,
(nested, scanBuilder) -> scanBuilder.pushDownNested(nested));

/** Push down optimize rule for paginate operator. */
public static final Rule<? extends LogicalPlan> PUSH_DOWN_PAGE_SIZE =
new PushDownRule<>(LogicalPaginate.class, false,
new PushDownRule<>(LogicalPaginate.class).configureDeepTraverseRule(false,
(paginate, scanBuilder) -> scanBuilder.pushDownPageSize(paginate));
}
Loading