Skip to content

Commit

Permalink
fix(transform): export python data to IGinX correctly (IGinX-THU#438)
Browse files Browse the repository at this point in the history
确保Transform作业能够正确导出Binary数据结果
  • Loading branch information
shinyano authored Sep 3, 2024
1 parent ce1baba commit 75f8361
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@

package cn.edu.tsinghua.iginx.transform.data;

import static cn.edu.tsinghua.iginx.constant.GlobalConstant.TRANSFORM_PREFIX;
import static cn.edu.tsinghua.iginx.utils.ByteUtils.getByteArrayFromLongArray;

import cn.edu.tsinghua.iginx.engine.ContextBuilder;
import cn.edu.tsinghua.iginx.engine.StatementExecutor;
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.thrift.ExecuteStatementReq;
import java.time.Instant;
import cn.edu.tsinghua.iginx.thrift.*;
import cn.edu.tsinghua.iginx.utils.Bitmap;
import cn.edu.tsinghua.iginx.utils.ByteUtils;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,57 +51,76 @@ public IginXWriter(long sessionId) {

@Override
public void write(BatchData batchData) {
String insertSQL = buildSQL(batchData);
LOGGER.info("Insert statement: {}", insertSQL);

if (!insertSQL.equals("")) {
ExecuteStatementReq req = new ExecuteStatementReq(sessionId, insertSQL);
RequestContext context = contextBuilder.build(req);
executor.execute(context);
} else {
LOGGER.error("Fail to execute insert statement.");
}
InsertRowRecordsReq rowRecordsReq = buildInsertRowReq(batchData);
RequestContext ctx = contextBuilder.build(rowRecordsReq);
executor.execute(ctx);
}

private String buildSQL(BatchData batchData) {
StringBuilder builder = new StringBuilder();

// construct paths
builder.append("INSERT INTO transform(key, ");
private InsertRowRecordsReq buildInsertRowReq(BatchData batchData) {
Header header = batchData.getHeader();
header
.getFields()
.forEach(field -> builder.append(reformatPath(field.getFullName())).append(","));
builder.deleteCharAt(builder.length() - 1);

// construct values
builder.append(") VALUES");
// use System.nanoTime() to avoid timestamp mistake on windows runner in action
long index = System.nanoTime();
for (Row row : batchData.getRowList()) {
builder.append(" (");
builder.append(index).append(",");
for (Object value : row.getValues()) {
builder.append(value).append(",");
Object[] valuesList;

valuesList = batchData.getRowList().stream().map(Row::getValues).toArray();
long[] keys = new long[valuesList.length];
long startKey = System.nanoTime();
for (int i = 0; i < valuesList.length; i++) {
keys[i] = startKey + i;
}

List<Field> sortedFields = header.getFields();
Integer[] index = new Integer[sortedFields.size()];
for (int i = 0; i < sortedFields.size(); i++) {
index[i] = i;
}
Arrays.sort(index, Comparator.comparing(i -> sortedFields.get(i).getName()));
sortedFields.sort(Comparator.comparing(Field::getName));

Object[] sortedValuesList = Arrays.copyOf(valuesList, valuesList.length);
for (int i = 0; i < sortedValuesList.length; i++) {
Object[] values = new Object[index.length];
for (int j = 0; j < index.length; j++) {
values[j] = ((Object[]) sortedValuesList[i])[index[j]];
}
builder.deleteCharAt(builder.length() - 1);
builder.append("),");
index++;
sortedValuesList[i] = values;
}
List<String> sortedPaths =
sortedFields.stream()
.map(e -> TRANSFORM_PREFIX + "." + reformatPath(e.getName()))
.collect(Collectors.toList());
List<DataType> sortedDataTypeList =
sortedFields.stream().map(Field::getType).collect(Collectors.toList());
List<Map<String, String>> sortedTagsList =
sortedFields.stream().map(Field::getTags).collect(Collectors.toList());

List<ByteBuffer> valueBufferList = new ArrayList<>();
List<ByteBuffer> bitmapBufferList = new ArrayList<>();
for (int i = 0; i < keys.length; i++) {
Object[] values = (Object[]) sortedValuesList[i];
valueBufferList.add(ByteUtils.getRowByteBuffer(values, sortedDataTypeList));
Bitmap bitmap = new Bitmap(values.length);
for (int j = 0; j < values.length; j++) {
if (values[j] != null) {
bitmap.mark(j);
}
}
bitmapBufferList.add(ByteBuffer.wrap(bitmap.getBytes()));
}
builder.deleteCharAt(builder.length() - 1).append(";");
return builder.toString();
}

private long getCurrentTimeInNS() {
Instant now = Instant.now();
return now.getEpochSecond() * 1_000_000_000L + now.getNano();
InsertRowRecordsReq req = new InsertRowRecordsReq();
req.setSessionId(sessionId);
req.setPaths(sortedPaths);
req.setKeys(getByteArrayFromLongArray(keys));
req.setValuesList(valueBufferList);
req.setBitmapList(bitmapBufferList);
req.setDataTypeList(sortedDataTypeList);
req.setTagsList(sortedTagsList);
req.setTimePrecision(TimePrecision.NS);
return req;
}

private String reformatPath(String path) {
if (!path.contains("(") && !path.contains(")")) return path;
path = path.replaceAll("[{]", "[");
path = path.replaceAll("[}]", "]");
return path;
// iotdb cannot handle "\" in path
return path.replace("\\", "_");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@

package cn.edu.tsinghua.iginx.transform.data;

import static cn.edu.tsinghua.iginx.utils.TagKVUtils.fromFullName;
import static cn.edu.tsinghua.iginx.utils.TagKVUtils.tagMatchRegex;

import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.transform.api.Reader;
import cn.edu.tsinghua.iginx.transform.exception.ReadBatchException;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PemjaReader implements Reader {

private static final Logger LOGGER = LoggerFactory.getLogger(PemjaReader.class);

private final List<Object> data;

private final int batchSize;
Expand All @@ -38,25 +49,39 @@ public class PemjaReader implements Reader {

private int offset = 0;

public PemjaReader(List<Object> data, int batchSize) {
public PemjaReader(List<Object> data, int batchSize) throws ReadBatchException {
this.data = data;
this.batchSize = batchSize;

this.header = getHeaderFromData();
this.rowList = getRowListFromData();
}

private Header getHeaderFromData() {
private Header getHeaderFromData() throws ReadBatchException {
List<Object> firstRow;
List<DataType> typeList = new ArrayList<>();
List<Field> fieldList = new ArrayList<>();
if (isList(data.get(0))) {
firstRow = (List<Object>) data.get(0);
if (data.size() >= 2 && isList(data.get(1))) {
typeList = parseTypeList((List<Object>) data.get(1));
}
} else {
firstRow = data;
}

List<Field> fieldList = new ArrayList<>();
for (Object fieldName : firstRow) {
fieldList.add(new Field((String) fieldName, DataType.BINARY));
String fieldName;
Pair<String, Map<String, String>> p;
for (int i = 0; i < firstRow.size(); i++) {
fieldName = (String) firstRow.get(i);
Matcher matcher = tagMatchRegex.matcher(fieldName);
if (!matcher.find()) {
throw new ReadBatchException(
"Invalid path :" + fieldName + ". Tags should be wrapped around by '{' & '}'.");
} else {
p = fromFullName(fieldName);
fieldList.add(new Field(p.k, typeList.isEmpty() ? DataType.BINARY : typeList.get(i), p.v));
}
}
return new Header(fieldList);
}
Expand All @@ -80,6 +105,26 @@ private boolean isList(Object object) {
return object instanceof List<?>;
}

private List<DataType> parseTypeList(List<Object> dataList) {
List<DataType> res = new ArrayList<>();
for (Object value : dataList) {
// python won't pass integer, float, byte
if (value instanceof Long) {
res.add(DataType.LONG);
} else if (value instanceof Double) {
res.add(DataType.DOUBLE);
} else if (value instanceof Boolean) {
res.add(DataType.BOOLEAN);
} else if (value instanceof String || value instanceof byte[]) {
res.add(DataType.BINARY);
} else {
throw new IllegalArgumentException(
"Invalid datatype for " + value + " of class: " + value.getClass());
}
}
return res;
}

@Override
public boolean hasNextBatch() {
return offset < rowList.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import cn.edu.tsinghua.iginx.transform.api.Writer;
import cn.edu.tsinghua.iginx.transform.data.BatchData;
import cn.edu.tsinghua.iginx.transform.data.PemjaReader;
import cn.edu.tsinghua.iginx.transform.exception.ReadBatchException;
import cn.edu.tsinghua.iginx.transform.exception.WriteBatchException;
import java.util.*;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,15 +78,17 @@ public void process(BatchData batchData) {
});

List<Object> res = (List<Object>) interpreter.invokeMethod(UDF_CLASS, UDF_FUNC, data);
PemjaReader reader = new PemjaReader(res, config.getBatchSize());

try {
PemjaReader reader = new PemjaReader(res, config.getBatchSize());
while (reader.hasNextBatch()) {
BatchData nextBatchData = reader.loadNextBatch();
writer.writeBatch(nextBatchData);
}
} catch (WriteBatchException e) {
LOGGER.error("PemjaWorker identifier={} fail to writer data.", identifier, e);
} catch (ReadBatchException e) {
LOGGER.error("Failed to read data from python transformer.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* IGinX - the polystore system with high performance
* Copyright (C) Tsinghua University
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package cn.edu.tsinghua.iginx.transform.exception;

public class ReadBatchException extends TransformException {

private static final long serialVersionUID = -2342950195730341247L;

public ReadBatchException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public class GlobalConstant {
public static final Long KEY_MIN_VAL = Long.MIN_VALUE + 1;

public static final Long KEY_MAX_VAL = Long.MAX_VALUE;

public static final String TRANSFORM_PREFIX = "transform";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cn.edu.tsinghua.iginx.utils;

import java.util.*;
import java.util.regex.Pattern;

public class TagKVUtils {

Expand All @@ -41,6 +42,11 @@ public class TagKVUtils {
*/
public static final String tagSuffix = "A"; // "#tagSuffix";

/** 匹配类似 <string>({<string>=<string>(, <string>=<string>)*})? 的可能含tag的列名全名 */
public static final Pattern tagMatchRegex =
Pattern.compile(
"^[^\\{\\}]+(?:\\{[^\\{\\}=,]+=[^\\{\\}=,]+(?:, [^\\{\\}=,]+=[^\\{\\}=,]+)*\\})?$");

public static String toPhysicalPath(String name, Map<String, String> tags) {
StringBuilder builder = new StringBuilder();
builder.append(name);
Expand Down
Loading

0 comments on commit 75f8361

Please sign in to comment.