Skip to content

Commit

Permalink
ARROW-6078: [Java] Implement dictionary-encoded subfields for List type
Browse files Browse the repository at this point in the history
Related to [ARROW-6078](https://issues.apache.org/jira/browse/ARROW-6078).
For example, int type List (valueCount = 5) has data like below:
10, 20
10, 20
30, 40, 50
30, 40, 50
10, 20
could be encoded to:
0, 1
0, 1
2, 3, 4
2, 3, 4
0, 1
with list type dictionary
10, 20, 30, 40, 50
or
10,
20,
30,
40,
50

Closes apache#4972 from tianchen92/ARROW-1175 and squashes the following commits:

5d2f751 <tianchen92> Update java/vector/src/main/java/org/apache/arrow/vector/dictionary/ListSubfieldEncoder.java
fbd122b <tianchen> fix
c51ec00 <tianchen> add replaceDataVector in BaseListVector
658958b <tianchen> make BaseListVector extend FieldVector
6c9d95d <tianchen> refactor BaseListVector
0b6cec5 <tianchen> resolve conflict
a54ecd1 <tianchen> ARROW-6078:  Implement dictionary-encoded subfields for List type

Lead-authored-by: tianchen <[email protected]>
Co-authored-by: tianchen92 <[email protected]>
Signed-off-by: Micah Kornfield <[email protected]>
  • Loading branch information
2 people authored and emkornfield committed Aug 31, 2019
1 parent beea8f9 commit 157b179
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.complex;

import org.apache.arrow.vector.FieldVector;

/**
* Abstraction for all list type vectors.
*/
public interface BaseListVector extends FieldVector {

/**
* Get data vector start index with the given list index.
*/
int getElementStartIndex(int index);

/**
* Get data vector end index with the given list index.
*/
int getElementEndIndex(int index);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import io.netty.buffer.ArrowBuf;

/** Base class for Vectors that contain repeated values. */
public abstract class BaseRepeatedValueVector extends BaseValueVector implements RepeatedValueVector {
public abstract class BaseRepeatedValueVector extends BaseValueVector implements RepeatedValueVector, BaseListVector {

public static final FieldVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
public static final String DATA_VECTOR_NAME = "$data$";
Expand Down Expand Up @@ -305,7 +305,6 @@ protected void replaceDataVector(FieldVector v) {
vector = v;
}


@Override
public int getValueCount() {
return valueCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import io.netty.buffer.ArrowBuf;

/** A ListVector where every list value is of the same size. */
public class FixedSizeListVector extends BaseValueVector implements FieldVector, PromotableVector {
public class FixedSizeListVector extends BaseValueVector implements BaseListVector, PromotableVector {

public static FixedSizeListVector empty(String name, int size, BufferAllocator allocator) {
FieldType fieldType = FieldType.nullable(new ArrowType.FixedSizeList(size));
Expand Down Expand Up @@ -543,6 +543,16 @@ public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
return visitor.visit(this, value);
}

@Override
public int getElementStartIndex(int index) {
return listSize * index;
}

@Override
public int getElementEndIndex(int index) {
return listSize * (index + 1);
}

private class TransferImpl implements TransferPair {

FixedSizeListVector to;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
* </ol>
* The latter two are managed by its superclass.
*/
public class ListVector extends BaseRepeatedValueVector implements FieldVector, PromotableVector {
public class ListVector extends BaseRepeatedValueVector implements PromotableVector {

public static ListVector empty(String name, BufferAllocator allocator) {
return new ListVector(name, allocator, FieldType.nullable(ArrowType.List.INSTANCE), null);
Expand Down Expand Up @@ -829,4 +829,14 @@ public void setLastSet(int value) {
public int getLastSet() {
return lastSet;
}

@Override
public int getElementStartIndex(int index) {
return offsetBuffer.getInt(index * OFFSET_WIDTH);
}

@Override
public int getElementEndIndex(int index) {
return offsetBuffer.getInt((index + 1) * OFFSET_WIDTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,59 @@ public static ValueVector decode(ValueVector indices, Dictionary dictionary) {
return encoder.decode(indices);
}

/**
* Populates indices between start and end with the encoded values of vector.
* @param vector the vector to encode
* @param indices the index vector
* @param encoding the hash table for encoding
* @param start the start index
* @param end the end index
*/
static void buildIndexVector(
ValueVector vector,
BaseIntVector indices,
DictionaryHashTable encoding,
int start,
int end) {

for (int i = start; i < end; i++) {
if (!vector.isNull(i)) {
// if it's null leave it null
// note: this may fail if value was not included in the dictionary
int encoded = encoding.getIndex(i, vector);
if (encoded == -1) {
throw new IllegalArgumentException("Dictionary encoding not defined for value:" + vector.getObject(i));
}
indices.setWithPossibleTruncate(i, encoded);
}
}
}

/**
* Retrieve values to target vector from index vector.
* @param indices the index vector
* @param transfer the {@link TransferPair} to copy dictionary data into target vector.
* @param dictionaryCount the value count of dictionary vector.
* @param start the start index
* @param end the end index
*/
static void retrieveIndexVector(
BaseIntVector indices,
TransferPair transfer,
int dictionaryCount,
int start,
int end) {
for (int i = start; i < end; i++) {
if (!indices.isNull(i)) {
int indexAsInt = (int) indices.getValueAsLong(i);
if (indexAsInt > dictionaryCount) {
throw new IllegalArgumentException("Provided dictionary does not contain value for index " + indexAsInt);
}
transfer.copyValueSafe(indexAsInt, i);
}
}
}

/**
* Encodes a vector with the built hash table in this encoder.
*/
Expand All @@ -91,22 +144,8 @@ public ValueVector encode(ValueVector vector) {
BaseIntVector indices = (BaseIntVector) createdVector;
indices.allocateNew();

int count = vector.getValueCount();

for (int i = 0; i < count; i++) {
if (!vector.isNull(i)) { // if it's null leave it null
// note: this may fail if value was not included in the dictionary
//int encoded = lookUps.get(value);
int encoded = hashTable.getIndex(i, vector);
if (encoded == -1) {
throw new IllegalArgumentException("Dictionary encoding not defined for value:" + vector.getObject(i));
}
indices.setWithPossibleTruncate(i, encoded);
}
}

indices.setValueCount(count);

buildIndexVector(vector, indices, hashTable, 0, vector.getValueCount());
indices.setValueCount(vector.getValueCount());
return indices;
}

Expand All @@ -122,15 +161,7 @@ public ValueVector decode(ValueVector indices) {
transfer.getTo().allocateNewSafe();

BaseIntVector baseIntVector = (BaseIntVector) indices;
for (int i = 0; i < count; i++) {
if (!baseIntVector.isNull(i)) {
int indexAsInt = (int) baseIntVector.getValueAsLong(i);
if (indexAsInt > dictionaryCount) {
throw new IllegalArgumentException("Provided dictionary does not contain value for index " + indexAsInt);
}
transfer.copyValueSafe(indexAsInt, i);
}
}
retrieveIndexVector(baseIntVector, transfer, dictionaryCount, 0, count);
ValueVector decoded = transfer.getTo();
decoded.setValueCount(count);
return decoded;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.dictionary;

import java.util.Collections;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.BaseListVector;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.TransferPair;

/**
* Sub fields encoder/decoder for Dictionary encoded {@link BaseListVector}.
*/
public class ListSubfieldEncoder {

private final DictionaryHashTable hashTable;
private final Dictionary dictionary;
private final BufferAllocator allocator;

/**
* Construct an instance.
*/
public ListSubfieldEncoder(Dictionary dictionary, BufferAllocator allocator) {
this.dictionary = dictionary;
this.allocator = allocator;
BaseListVector dictVector = (BaseListVector) dictionary.getVector();
hashTable = new DictionaryHashTable(getDataVector(dictVector));
}

private FieldVector getDataVector(BaseListVector vector) {
return vector.getChildrenFromFields().get(0);
}

private BaseListVector cloneVector(BaseListVector vector) {

final FieldType fieldType = vector.getField().getFieldType();
BaseListVector cloned = (BaseListVector) fieldType.createNewSingleVector(vector.getField().getName(),
allocator, /*schemaCallBack=*/null);

final ArrowFieldNode fieldNode = new ArrowFieldNode(vector.getValueCount(), vector.getNullCount());
cloned.loadFieldBuffers(fieldNode, vector.getFieldBuffers());

return cloned;
}

/**
* Dictionary encodes subfields for complex vector with a provided dictionary.
* The dictionary must contain all values in the sub fields vector.
* @param vector vector to encode
* @return dictionary encoded vector
*/
public BaseListVector encodeListSubField(BaseListVector vector) {
final int valueCount = vector.getValueCount();

FieldType indexFieldType = new FieldType(vector.getField().isNullable(),
dictionary.getEncoding().getIndexType(), dictionary.getEncoding(), vector.getField().getMetadata());
Field valueField = new Field(vector.getField().getName(), indexFieldType,null);

// clone list vector and initialize data vector
BaseListVector encoded = cloneVector(vector);
encoded.initializeChildrenFromFields(Collections.singletonList(valueField));
BaseIntVector indices = (BaseIntVector) getDataVector(encoded);

ValueVector dataVector = getDataVector(vector);
for (int i = 0; i < valueCount; i++) {
if (!vector.isNull(i)) {
int start = vector.getElementStartIndex(i);
int end = vector.getElementEndIndex(i);

DictionaryEncoder.buildIndexVector(dataVector, indices, hashTable, start, end);
}
}

return encoded;
}

/**
* Decodes a dictionary subfields encoded vector using the provided dictionary.
* @param vector dictionary encoded vector, its data vector must be int type
* @return vector with values restored from dictionary
*/
public BaseListVector decodeListSubField(BaseListVector vector) {

int valueCount = vector.getValueCount();
BaseListVector dictionaryVector = (BaseListVector) dictionary.getVector();
int dictionaryValueCount = getDataVector(dictionaryVector).getValueCount();

// clone list vector and initialize data vector
BaseListVector decoded = cloneVector(vector);
Field dataVectorField = getDataVector(dictionaryVector).getField();
decoded.initializeChildrenFromFields(Collections.singletonList(dataVectorField));

// get data vector
ValueVector dataVector = getDataVector(decoded);

TransferPair transfer = getDataVector(dictionaryVector).makeTransferPair(dataVector);
BaseIntVector indices = (BaseIntVector) getDataVector(vector);

for (int i = 0; i < valueCount; i++) {

if (!vector.isNull(i)) {
int start = vector.getElementStartIndex(i);
int end = vector.getElementEndIndex(i);

DictionaryEncoder.retrieveIndexVector(indices, transfer, dictionaryValueCount, start, end);
}
}
return decoded;
}
}
Loading

0 comments on commit 157b179

Please sign in to comment.