Skip to content

Commit

Permalink
empty stream
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyano committed Feb 25, 2025
1 parent 068db5d commit 242df73
Showing 1 changed file with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -780,15 +782,19 @@ private void processClearData(RequestContext ctx)
process(ctx);
}

private void setEmptyQueryResp(RequestContext ctx, List<String> paths) {
private void setEmptyQueryResp(RequestContext ctx) throws PhysicalException {
Result result = new Result(RpcUtils.SUCCESS);
// TODO: refactor this part
throw new UnsupportedOperationException("Not implemented yet");
// result.setKeys(new Long[0]);
// result.setValuesList(new ArrayList<>());
// result.setBitmapList(new ArrayList<>());
// result.setPaths(paths);
// ctx.setResult(result);
BigIntVector vector =
(BigIntVector)
org.apache.arrow.vector.types.pojo.Field.notNullable(
KEY_NAME, Types.MinorType.BIGINT.getType())
.createVector(ctx.getAllocator());
vector.allocateNew(1);
vector.setValueCount(1);
vector.setSafe(0, 0);
VectorSchemaRoot root = VectorSchemaRoots.create(Collections.singletonList(vector), 1);
result.setArrowData(getBytesFromVector(root, ctx.getAllocator()));
ctx.setResult(result);
}

private void setResult(RequestContext ctx, BatchStream stream)
Expand Down Expand Up @@ -833,37 +839,49 @@ private void setResultFromBatchStream(RequestContext ctx, BatchStream stream)
return;
}
if (stream == null) {
setEmptyQueryResp(ctx, new ArrayList<>());
setEmptyQueryResp(ctx);
return;
}

List<ByteBuffer> dataList = getBytesFromBatchStream(stream, ctx.getAllocator());
Status status = RpcUtils.SUCCESS;
if (ctx.getWarningMsg() != null && !ctx.getWarningMsg().isEmpty()) {
status = new Status(StatusCode.PARTIAL_SUCCESS.getStatusCode());
status.setMessage(ctx.getWarningMsg());
}
result = new Result(status);
result.setArrowData(dataList);
ctx.setResult(result);
}

private List<ByteBuffer> getBytesFromBatchStream(BatchStream stream, BufferAllocator allocator)
throws PhysicalException {
List<ByteBuffer> dataList = new ArrayList<>();
try (BatchStream batchStream = stream) {
while (batchStream.hasNext()) {
try (Batch batch = batchStream.getNext()) {
try (VectorSchemaRoot flattened = batch.flattened(ctx.getAllocator());
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(flattened, null, out)) {
writer.start();
writer.writeBatch();
writer.end();
ByteBuffer data = ByteBuffer.wrap(out.toByteArray());
dataList.add(data);
} catch (IOException e) {
throw new PhysicalException(e);
}
dataList.addAll(getBytesFromVector(batch.flattened(allocator), allocator));
}
}
}
return dataList;
}

Status status = RpcUtils.SUCCESS;
if (ctx.getWarningMsg() != null && !ctx.getWarningMsg().isEmpty()) {
status = new Status(StatusCode.PARTIAL_SUCCESS.getStatusCode());
status.setMessage(ctx.getWarningMsg());
}
result = new Result(status);
result.setArrowData(dataList);
ctx.setResult(result);
private List<ByteBuffer> getBytesFromVector(
VectorSchemaRoot vectorSchemaRoot, BufferAllocator allocator) throws PhysicalException {
List<ByteBuffer> dataList = new ArrayList<>();
try (VectorSchemaRoot root = vectorSchemaRoot;
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) {
writer.start();
writer.writeBatch();
writer.end();
ByteBuffer data = ByteBuffer.wrap(out.toByteArray());
dataList.add(data);
} catch (IOException e) {
throw new PhysicalException(e);
}
return dataList;
}

private void setShowColumnsResult(RequestContext ctx, BatchStream stream)
Expand Down

0 comments on commit 242df73

Please sign in to comment.