Skip to content

Commit

Permalink
feat(sql): auto increment sequence & alter key (IGinX-THU#458)
Browse files Browse the repository at this point in the history
1. 增加生成自增列的语法
2. 增加KEY列的操作语法,包括:将特定列升级成KEY列,将KEY列降级成普通列
详见https://oxlh5mrwi0.feishu.cn/docx/Gd8pd1aMDoNMIexXBPJcaea6n0g
  • Loading branch information
jzl18thu authored Oct 11, 2024
1 parent bc31041 commit 8080e89
Show file tree
Hide file tree
Showing 27 changed files with 859 additions and 39 deletions.
15 changes: 14 additions & 1 deletion antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,16 @@ selectClause
;

selectSublist
: expression asClause?
: KEY (asClause | asKeyClause)
| (expression | sequence) (asClause | asKeyClause)?
;

sequence
: SEQUENCE LR_BRACKET (start = constant COMMA increment = constant)? RR_BRACKET
;

asKeyClause
: AS KEY
;

expression
Expand Down Expand Up @@ -1126,6 +1135,10 @@ ELSE
END
: E N D
;

SEQUENCE
: S E Q U E N C E
;
//============================

// End of the keywords list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import cn.edu.tsinghua.iginx.engine.shared.expr.Expression;
import cn.edu.tsinghua.iginx.engine.shared.expr.FromValueExpression;
import cn.edu.tsinghua.iginx.engine.shared.expr.FuncExpression;
import cn.edu.tsinghua.iginx.engine.shared.expr.SequenceExpression;
import cn.edu.tsinghua.iginx.engine.shared.function.Function;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionCall;
import cn.edu.tsinghua.iginx.engine.shared.function.FunctionParams;
Expand Down Expand Up @@ -213,6 +214,8 @@ private Operator generateRoot(UnarySelectStatement selectStatement) {

root = buildLimit(selectStatement, root);

root = buildAddSequence(selectStatement, root);

root = buildReorder(selectStatement, root);

root = buildRename(selectStatement, root);
Expand Down Expand Up @@ -682,6 +685,30 @@ private Operator buildDownSampleQuery(UnarySelectStatement selectStatement, Oper
new KeyRange(selectStatement.getStartKey(), selectStatement.getEndKey()));
}

/**
* 根据SelectStatement构建查询树的AddSequence操作符
*
* @param selectStatement Select上下文
* @param root 当前根节点
* @return 添加了AddSequence操作符的根节点;如果没有Sequence,返回原根节点
*/
private Operator buildAddSequence(UnarySelectStatement selectStatement, Operator root) {
List<SequenceExpression> sequences = selectStatement.getSequenceExpressionList();
if (!sequences.isEmpty()) {
List<Long> startList = new ArrayList<>();
List<Long> incrementList = new ArrayList<>();
List<String> columns = new ArrayList<>();
sequences.forEach(
sequence -> {
startList.add(sequence.getStart());
incrementList.add(sequence.getIncrement());
columns.add(sequence.getColumnName());
});
root = new AddSequence(new OperatorSource(root), startList, incrementList, columns);
}
return root;
}

/**
* 根据SelectStatement构建查询树的Reorder操作符
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import cn.edu.tsinghua.iginx.engine.shared.function.system.Max;
import cn.edu.tsinghua.iginx.engine.shared.function.system.Min;
import cn.edu.tsinghua.iginx.engine.shared.operator.AddSchemaPrefix;
import cn.edu.tsinghua.iginx.engine.shared.operator.AddSequence;
import cn.edu.tsinghua.iginx.engine.shared.operator.BinaryOperator;
import cn.edu.tsinghua.iginx.engine.shared.operator.CrossJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.Distinct;
Expand Down Expand Up @@ -137,6 +138,8 @@ public RowStream executeUnaryOperator(
return executeAddSchemaPrefix((AddSchemaPrefix) operator, table);
case GroupBy:
return executeGroupBy((GroupBy) operator, table);
case AddSequence:
return executeAddSequence((AddSequence) operator, table);
case Distinct:
return executeDistinct((Distinct) operator, table);
case ValueToSelectedPath:
Expand Down Expand Up @@ -417,22 +420,27 @@ private RowStream executeMappingTransform(MappingTransform mappingTransform, Tab
return RowUtils.calMappingTransform(table, functionCallList);
}

private RowStream executeRename(Rename rename, Table table) {
private RowStream executeRename(Rename rename, Table table) throws PhysicalException {
Header header = table.getHeader();
List<Pair<String, String>> aliasList = rename.getAliasList();
Header newHeader = header.renamedHeader(aliasList, rename.getIgnorePatterns());
Pair<Header, Integer> pair = header.renamedHeader(aliasList, rename.getIgnorePatterns());
Header newHeader = pair.k;
int colIndex = pair.v;

List<Row> rows = new ArrayList<>();
table
.getRows()
.forEach(
row -> {
if (newHeader.hasKey()) {
rows.add(new Row(newHeader, row.getKey(), row.getValues()));
} else {
rows.add(new Row(newHeader, row.getValues()));
}
});
if (colIndex == -1) {
table.getRows().forEach(row -> rows.add(new Row(newHeader, row.getKey(), row.getValues())));
} else {
HashSet<Long> keySet = new HashSet<>();
for (Row row : table.getRows()) {
Row newRow = RowUtils.transformColumnToKey(newHeader, row, colIndex);
if (keySet.contains(newRow.getKey())) {
throw new PhysicalTaskExecuteFailureException("duplicated key found: " + newRow.getKey());
}
keySet.add(newRow.getKey());
rows.add(newRow);
}
}

return new Table(newHeader, rows);
}
Expand Down Expand Up @@ -481,6 +489,33 @@ private RowStream executeGroupBy(GroupBy groupBy, Table table) throws PhysicalEx
return new Table(header, rows);
}

private RowStream executeAddSequence(AddSequence addSequence, Table table) {
Header header = table.getHeader();
List<Field> targetFields = new ArrayList<>(header.getFields());
addSequence.getColumns().forEach(column -> targetFields.add(new Field(column, DataType.LONG)));
Header newHeader = new Header(header.getKey(), targetFields);

List<Row> rows = new ArrayList<>();
int oldSize = header.getFieldSize();
int newSize = targetFields.size();
int sequenceSize = newSize - oldSize;
List<Long> cur = new ArrayList<>(addSequence.getStartList());
List<Long> increments = new ArrayList<>(addSequence.getIncrementList());
table
.getRows()
.forEach(
row -> {
Object[] values = new Object[newSize];
System.arraycopy(row.getValues(), 0, values, 0, oldSize);
for (int i = 0; i < sequenceSize; i++) {
values[oldSize + i] = cur.get(i);
cur.set(i, cur.get(i) + increments.get(i));
}
rows.add(new Row(newHeader, row.getKey(), values));
});
return new Table(newHeader, rows);
}

private RowStream executeReorder(Reorder reorder, Table table) {
Header header = table.getHeader();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* IGinX - the polystore system with high performance
* Copyright (C) Tsinghua University
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream;

import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.AddSequence;
import cn.edu.tsinghua.iginx.thrift.DataType;
import java.util.ArrayList;
import java.util.List;

public class AddSequenceLazyStream extends UnaryLazyStream {

private Header header;

private final AddSequence addSequence;

private final List<Long> cur;

private final List<Long> increments;

private final int oldSize;

private final int newSize;

private final int sequenceSize;

public AddSequenceLazyStream(AddSequence addSequence, RowStream stream) throws PhysicalException {
super(stream);
this.addSequence = addSequence;
this.cur = new ArrayList<>(addSequence.getStartList());
this.increments = new ArrayList<>(addSequence.getIncrementList());
this.header = getHeader();
this.oldSize = stream.getHeader().getFieldSize();
this.newSize = header.getFieldSize();
this.sequenceSize = newSize - oldSize;
}

@Override
public Header getHeader() throws PhysicalException {
if (header == null) {
Header header = stream.getHeader();
List<Field> targetFields = new ArrayList<>(stream.getHeader().getFields());
addSequence
.getColumns()
.forEach(column -> targetFields.add(new Field(column, DataType.LONG)));
this.header = new Header(header.getKey(), targetFields);
}
return header;
}

@Override
public boolean hasNext() throws PhysicalException {
return stream.hasNext();
}

@Override
public Row next() throws PhysicalException {
if (!hasNext()) {
throw new IllegalStateException("row stream doesn't have more data!");
}

Row row = stream.next();
Object[] values = new Object[newSize];
System.arraycopy(row.getValues(), 0, values, 0, oldSize);
for (int i = 0; i < sequenceSize; i++) {
values[oldSize + i] = cur.get(i);
cur.set(i, cur.get(i) + increments.get(i));
}
return new Row(header, row.getKey(), values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,39 @@
package cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream;

import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalTaskExecuteFailureException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
import cn.edu.tsinghua.iginx.engine.shared.operator.Rename;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.HashSet;

public class RenameLazyStream extends UnaryLazyStream {

private final Rename rename;

private Header header;

private int colIndex;

private final HashSet<Long> keySet;

public RenameLazyStream(Rename rename, RowStream stream) {
super(stream);
this.rename = rename;
this.keySet = new HashSet<>();
}

@Override
public Header getHeader() throws PhysicalException {
if (header == null) {
Header header = stream.getHeader();
this.header = header.renamedHeader(rename.getAliasList(), rename.getIgnorePatterns());
Pair<Header, Integer> pair =
header.renamedHeader(rename.getAliasList(), rename.getIgnorePatterns());
this.header = pair.k;
this.colIndex = pair.v;
}
return header;
}
Expand All @@ -56,10 +68,15 @@ public Row next() throws PhysicalException {
}

Row row = stream.next();
if (header.hasKey()) {
if (colIndex == -1) {
return new Row(header, row.getKey(), row.getValues());
} else {
return new Row(header, row.getValues());
Row newRow = RowUtils.transformColumnToKey(header, row, colIndex);
if (keySet.contains(newRow.getKey())) {
throw new PhysicalTaskExecuteFailureException("duplicated key found: " + newRow.getKey());
}
keySet.add(newRow.getKey());
return newRow;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import cn.edu.tsinghua.iginx.engine.shared.function.system.Max;
import cn.edu.tsinghua.iginx.engine.shared.function.system.Min;
import cn.edu.tsinghua.iginx.engine.shared.operator.AddSchemaPrefix;
import cn.edu.tsinghua.iginx.engine.shared.operator.AddSequence;
import cn.edu.tsinghua.iginx.engine.shared.operator.BinaryOperator;
import cn.edu.tsinghua.iginx.engine.shared.operator.CrossJoin;
import cn.edu.tsinghua.iginx.engine.shared.operator.Distinct;
Expand Down Expand Up @@ -119,6 +120,9 @@ public RowStream executeUnaryOperator(
case Distinct:
result = executeDistinct((Distinct) operator, stream);
break;
case AddSequence:
result = executeAddSequence((AddSequence) operator, stream);
break;
case ValueToSelectedPath:
result = executeValueToSelectedPath((ValueToSelectedPath) operator, stream);
break;
Expand Down Expand Up @@ -274,6 +278,11 @@ private RowStream executeDistinct(Distinct distinct, RowStream stream) throws Ph
return new DistinctLazyStream(stream);
}

private RowStream executeAddSequence(AddSequence addSequence, RowStream stream)
throws PhysicalException {
return new AddSequenceLazyStream(addSequence, stream);
}

private RowStream executeValueToSelectedPath(ValueToSelectedPath operator, RowStream stream) {
return new ValueToSelectedPathLazyStream(operator, stream);
}
Expand Down
Loading

0 comments on commit 8080e89

Please sign in to comment.