Skip to content

Commit

Permalink
feat(sql): sql support const arithmetic expression (IGinX-THU#447)
Browse files Browse the repository at this point in the history
sql support const arithmetic expression
  • Loading branch information
jzl18thu authored Sep 24, 2024
1 parent 6eda6e5 commit bc31041
Show file tree
Hide file tree
Showing 23 changed files with 412 additions and 88 deletions.
2 changes: 1 addition & 1 deletion antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ queryClause
;

select
: selectClause fromClause whereClause? withClause? specialClause?
: selectClause (fromClause whereClause? withClause? specialClause?)?
;

selectClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.type.JoinAlgType;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.engine.shared.source.ConstantSource;
import cn.edu.tsinghua.iginx.engine.shared.source.GlobalSource;
import cn.edu.tsinghua.iginx.engine.shared.source.OperatorSource;
import cn.edu.tsinghua.iginx.engine.shared.source.Source;
Expand Down Expand Up @@ -72,6 +73,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -175,6 +177,7 @@ private Operator generateRoot(UnarySelectStatement selectStatement) {
root = initProjectWaitingForPath(selectStatement);
root = root != null ? root : initFilterAndMergeFragmentsWithJoin(selectStatement);
root = root != null ? root : initFromPart(selectStatement);
root = root != null ? root : initSelectConstArith(selectStatement);
root = root != null ? root : initFilterAndMergeFragments(selectStatement);

if (!checkRoot(root) && !checkIsMetaWritable()) {
Expand Down Expand Up @@ -382,6 +385,23 @@ private Operator initFromPart(UnarySelectStatement selectStatement) {
return root;
}

/**
* 如果SelectStatement的select部分全为常数表达式且from部分为空,构造以ConstantSource为输入Project操作符来初始化操作符树
*
* @param selectStatement select语句上下文
* @return 以ConstantSource为输入Project操作符的操作符树;
*/
private Operator initSelectConstArith(UnarySelectStatement selectStatement) {
if (!selectStatement.isAllConstArith() || !selectStatement.getFromParts().isEmpty()) {
return null;
}
List<String> columnNames =
selectStatement.getExpressions().stream()
.map(Expression::getColumnName)
.collect(Collectors.toList());
return new Project(new ConstantSource(selectStatement.getExpressions()), columnNames, null);
}

/**
* 如果SelectStatement有Join部分,根据Tag Filter和Path Prefix过滤选择Fragments,并将它们Join成一个操作符树
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.type.JoinAlgType;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.engine.shared.source.ConstantSource;
import cn.edu.tsinghua.iginx.engine.shared.source.FragmentSource;
import cn.edu.tsinghua.iginx.engine.shared.source.OperatorSource;
import cn.edu.tsinghua.iginx.engine.shared.source.Source;
Expand Down Expand Up @@ -95,7 +96,8 @@ public static List<String> findPathList(Operator operator) {

if (OperatorType.isUnaryOperator(operator.getType())) {
AbstractUnaryOperator unaryOperator = (AbstractUnaryOperator) operator;
if (unaryOperator.getSource().getType() != SourceType.Fragment) {
if (unaryOperator.getSource().getType() != SourceType.Fragment
&& unaryOperator.getSource().getType() != SourceType.Constant) {
pathList.addAll(findPathList(((OperatorSource) unaryOperator.getSource()).getOperator()));
}
} else if (OperatorType.isBinaryOperator(operator.getType())) {
Expand Down Expand Up @@ -123,7 +125,7 @@ public static void findProjectOperators(List<Project> projectOperatorList, Opera
if (OperatorType.isUnaryOperator(operator.getType())) {
UnaryOperator unaryOp = (UnaryOperator) operator;
Source source = unaryOp.getSource();
if (source.getType() != SourceType.Fragment) {
if (source.getType() != SourceType.Fragment && source.getType() != SourceType.Constant) {
findProjectOperators(projectOperatorList, ((OperatorSource) source).getOperator());
}
} else if (OperatorType.isBinaryOperator(operator.getType())) {
Expand Down Expand Up @@ -710,4 +712,11 @@ public static boolean covers(String a, String b) {
}
return true;
}

public static boolean isProjectFromConstant(Operator operator) {
if (operator instanceof Project) {
return ((Project) operator).getSource() instanceof ConstantSource;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static cn.edu.tsinghua.iginx.engine.physical.task.utils.TaskUtils.getBottomTasks;

import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
import cn.edu.tsinghua.iginx.engine.logical.utils.OperatorUtils;
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.MemoryPhysicalTaskDispatcher;
import cn.edu.tsinghua.iginx.engine.physical.optimizer.PhysicalOptimizer;
Expand Down Expand Up @@ -83,9 +84,14 @@ public RowStream execute(RequestContext ctx, Operator root) throws PhysicalExcep
}
PhysicalTask task = optimizer.optimize(root, ctx);
ctx.setPhysicalTree(task);
List<PhysicalTask> bottomTasks = new ArrayList<>();
getBottomTasks(bottomTasks, task);
commitBottomTasks(bottomTasks);

// 空表查询特殊处理,不需要getBottomTasks
if (!OperatorUtils.isProjectFromConstant(root)) {
List<PhysicalTask> bottomTasks = new ArrayList<>();
getBottomTasks(bottomTasks, task);
commitBottomTasks(bottomTasks);
}

TaskExecuteResult result = task.getResult();
if (result.getException() != null) {
throw result.getException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public class Table implements RowStream {

private RequestContext context;

public Table(Row row) {
this.header = row.getHeader();
this.rows = Collections.singletonList(row);
this.index = 0;
}

public Table(Header header, List<Row> rows) {
this.header = header;
this.rows = rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.ValueToSelectedPath;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
import cn.edu.tsinghua.iginx.engine.shared.source.ConstantSource;
import cn.edu.tsinghua.iginx.engine.shared.source.EmptySource;
import cn.edu.tsinghua.iginx.engine.shared.source.Source;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Bitmap;
import cn.edu.tsinghua.iginx.utils.Pair;
Expand Down Expand Up @@ -191,7 +193,22 @@ private Table transformToTable(RowStream stream) throws PhysicalException {
return new Table(header, rows);
}

private RowStream executeProject(Project project, Table table) {
private RowStream executeProject(Project project, Table table) throws PhysicalException {
Source source = project.getSource();
switch (source.getType()) {
case Operator:
case Empty:
return executeProjectFromOperator(project, table);
case Constant:
ConstantSource constantSource = (ConstantSource) source;
return new Table(RowUtils.buildConstRow(constantSource.getExpressionList()));
default:
throw new PhysicalException(
"Unexpected project source type in memory task: " + source.getType());
}
}

private RowStream executeProjectFromOperator(Project project, Table table) {
List<String> patterns = project.getPatterns();
Header header = table.getHeader();
List<Field> targetFields = new ArrayList<>();
Expand Down Expand Up @@ -358,7 +375,6 @@ private RowStream executeSetTransform(SetTransform setTransform, Table table)
SetMappingFunction function = (SetMappingFunction) functionCall.getFunction();
FunctionParams params = functionCall.getParams();
Table functable = RowUtils.preRowTransform(table, rowTransformMap, functionCall);
;
if (setTransform.isDistinct()) {
// min和max无需去重
if (!function.getIdentifier().equals(Max.MAX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.exception.UnexpectedOperatorException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutor;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.Table;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.Constants;
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
Expand Down Expand Up @@ -58,7 +60,9 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.UnaryOperator;
import cn.edu.tsinghua.iginx.engine.shared.operator.Union;
import cn.edu.tsinghua.iginx.engine.shared.operator.ValueToSelectedPath;
import cn.edu.tsinghua.iginx.engine.shared.source.ConstantSource;
import cn.edu.tsinghua.iginx.engine.shared.source.EmptySource;
import cn.edu.tsinghua.iginx.engine.shared.source.Source;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -168,8 +172,19 @@ public RowStream executeBinaryOperator(
return result;
}

private RowStream executeProject(Project project, RowStream stream) {
return new ProjectLazyStream(project, stream);
private RowStream executeProject(Project project, RowStream stream) throws PhysicalException {
Source source = project.getSource();
switch (source.getType()) {
case Operator:
case Empty:
return new ProjectLazyStream(project, stream);
case Constant:
ConstantSource constantSource = (ConstantSource) source;
return new Table(RowUtils.buildConstRow(constantSource.getExpressionList()));
default:
throw new PhysicalException(
"Unexpected project source type in memory task: " + source.getType());
}
}

private RowStream executeSelect(Select select, RowStream stream) {
Expand Down Expand Up @@ -197,7 +212,8 @@ private RowStream executeRowTransform(RowTransform rowTransform, RowStream strea
return new RowTransformLazyStream(rowTransform, stream);
}

private RowStream executeSetTransform(SetTransform setTransform, RowStream stream) {
private RowStream executeSetTransform(SetTransform setTransform, RowStream stream)
throws PhysicalException {
List<FunctionCall> functionCallList = setTransform.getFunctionCallList();
Map<List<String>, RowStream> distinctStreamMap = new HashMap<>();
Map<List<String>, RowStream> rowTransformStreamMap = new HashMap<>();
Expand Down Expand Up @@ -252,7 +268,7 @@ private RowStream executeGroupBy(GroupBy groupBy, RowStream stream) {
return new GroupByLazyStream(groupBy, stream);
}

private RowStream executeDistinct(Distinct distinct, RowStream stream) {
private RowStream executeDistinct(Distinct distinct, RowStream stream) throws PhysicalException {
Project project = new Project(EmptySource.EMPTY_SOURCE, distinct.getPatterns(), null);
stream = executeProject(project, stream);
return new DistinctLazyStream(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private static Value calculateBaseExpr(Row row, BaseExpression baseExpr) {

private static Value calculateFuncExpr(Row row, FuncExpression funcExpr)
throws PhysicalException {
if (row == null) {
row = RowUtils.buildConstRow(funcExpr.getExpressions());
}
String colName = funcExpr.getColumnName();
int index = row.getHeader().indexOf(colName);
if (index == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.expr.Expression;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionCall;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionParams;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionUtils;
Expand All @@ -46,6 +47,7 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.Downsample;
import cn.edu.tsinghua.iginx.engine.shared.operator.GroupBy;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.sql.utils.ExpressionUtils;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.*;
Expand Down Expand Up @@ -1350,4 +1352,22 @@ public static Table preRowTransform(
}
return ret;
}

public static Row buildConstRow(List<Expression> expressions) throws PhysicalException {
List<Field> fields = new ArrayList<>();
Object[] values = new Object[expressions.size()];

for (int i = 0; i < expressions.size(); i++) {
Expression expression = expressions.get(i);
if (!ExpressionUtils.isConstantArithmeticExpr(expression)) {
throw new PhysicalException("expression is not constant: " + expression.getColumnName());
}
Value value = ExprUtils.calculateExpr(null, expression);
values[i] = value.getValue();
fields.add(new Field(expression.getColumnName(), value.getDataType()));
}

Header header = new Header(fields);
return new Row(header, values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package cn.edu.tsinghua.iginx.engine.physical.task;

import cn.edu.tsinghua.iginx.engine.logical.utils.OperatorUtils;
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.exception.UnexpectedOperatorException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutor;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.OperatorMemoryExecutorFactory;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream.EmptyRowStream;
import cn.edu.tsinghua.iginx.engine.physical.task.visitor.TaskVisitor;
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
Expand Down Expand Up @@ -52,18 +54,25 @@ public PhysicalTask getParentTask() {
return parentTask;
}

public boolean isProjectFromConstant() {
return !getOperators().isEmpty() && OperatorUtils.isProjectFromConstant(getOperators().get(0));
}

@Override
public TaskExecuteResult execute() {
TaskExecuteResult parentResult = parentTask.getResult();
if (parentResult == null) {
return new TaskExecuteResult(
new PhysicalException("unexpected parent task execute result for " + this + ": null"));
}
if (parentResult.getException() != null) {
return parentResult;
RowStream stream = new EmptyRowStream();
if (!isProjectFromConstant()) {
TaskExecuteResult parentResult = parentTask.getResult();
if (parentResult == null) {
return new TaskExecuteResult(
new PhysicalException("unexpected parent task execute result for " + this + ": null"));
}
if (parentResult.getException() != null) {
return parentResult;
}
stream = parentResult.getRowStream();
}
List<Operator> operators = getOperators();
RowStream stream = parentResult.getRowStream();
OperatorMemoryExecutor executor =
OperatorMemoryExecutorFactory.getInstance().getMemoryExecutor();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public static void getBottomTasks(List<PhysicalTask> tasks, PhysicalTask root) {
break;
case UnaryMemory:
UnaryMemoryPhysicalTask unaryMemoryPhysicalTask = (UnaryMemoryPhysicalTask) root;
getBottomTasks(tasks, unaryMemoryPhysicalTask.getParentTask());
if (!unaryMemoryPhysicalTask.isProjectFromConstant()) {
getBottomTasks(tasks, unaryMemoryPhysicalTask.getParentTask());
}
break;
case MultipleMemory:
MultipleMemoryPhysicalTask multipleMemoryPhysicalTask = (MultipleMemoryPhysicalTask) root;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ public static String getFunctionName(Function function) {
"sum", (dataType) -> isWholeNumber(dataType) ? DataType.LONG : DataType.DOUBLE);
}

static Map<String, Integer> expectedParamNumMap = new HashMap<>(); // 此Map用于存储function期望的参数个数

static {
expectedParamNumMap.put("avg", 1);
expectedParamNumMap.put("sum", 1);
expectedParamNumMap.put("count", 1);
expectedParamNumMap.put("max", 1);
expectedParamNumMap.put("min", 1);
expectedParamNumMap.put("first_value", 1);
expectedParamNumMap.put("last_value", 1);
expectedParamNumMap.put("first", 1);
expectedParamNumMap.put("last", 1);
expectedParamNumMap.put("ratio", 2);
}

public static int getExpectedParamNum(String identifier) {
return expectedParamNumMap.get(identifier.toLowerCase());
}

/**
* 用于提取table中的表头字段和对应的索引
*
Expand Down
Loading

0 comments on commit bc31041

Please sign in to comment.