Skip to content

Commit

Permalink
Initial implementation of toggling explicit MV entry size for MVFixed…
Browse files Browse the repository at this point in the history
…ByteRawFwdIndex
  • Loading branch information
jackluo923 committed Sep 27, 2024
1 parent 3eef306 commit 84987ed
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {

private final VarByteChunkWriter _indexWriter;
private final DataType _valueType;
private boolean _explicitMVEntrySize = true;

/**
* Create a var-byte raw index creator for the given column
Expand Down Expand Up @@ -101,6 +102,14 @@ public boolean isSingleValue() {
return false;
}

public boolean isExplicitMVEntrySize() {
return _explicitMVEntrySize;
}

public void setExplicitMVEntrySize(boolean explicitMVEntrySize) {
_explicitMVEntrySize = explicitMVEntrySize;
}

@Override
public DataType getValueType() {
return _valueType;
Expand All @@ -110,8 +119,10 @@ public DataType getValueType() {
public void putIntMV(int[] values) {
byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
if (_explicitMVEntrySize) {
//write the length
byteBuffer.putInt(values.length);
}
//write the content of each element
for (int value : values) {
byteBuffer.putInt(value);
Expand All @@ -123,8 +134,10 @@ public void putIntMV(int[] values) {
public void putLongMV(long[] values) {
byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
if (_explicitMVEntrySize) {
//write the length
byteBuffer.putInt(values.length);
}
//write the content of each element
for (long value : values) {
byteBuffer.putLong(value);
Expand All @@ -136,8 +149,10 @@ public void putLongMV(long[] values) {
public void putFloatMV(float[] values) {
byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
if (_explicitMVEntrySize) {
//write the length
byteBuffer.putInt(values.length);
}
//write the content of each element
for (float value : values) {
byteBuffer.putFloat(value);
Expand All @@ -149,8 +164,10 @@ public void putFloatMV(float[] values) {
public void putDoubleMV(double[] values) {
byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
if (_explicitMVEntrySize) {
//write the length
byteBuffer.putInt(values.length);
}
//write the content of each element
for (double value : values) {
byteBuffer.putDouble(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkForwardIn
private static final int ROW_OFFSET_SIZE = VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;

private final int _maxChunkSize;
private boolean _explicitMVEntrySize = true;

public FixedByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType storedType) {
super(dataBuffer, storedType, false);
Expand All @@ -54,7 +55,7 @@ public ChunkReaderContext createContext() {
@Override
public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int numValues = _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining() / Integer.BYTES;
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getInt();
}
Expand All @@ -64,7 +65,7 @@ public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext context) {
@Override
public int[] getIntMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int numValues = _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining() / Integer.BYTES;
int[] valueBuffer = new int[numValues];
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getInt();
Expand All @@ -75,7 +76,7 @@ public int[] getIntMV(int docId, ChunkReaderContext context) {
@Override
public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int numValues = _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining() / Long.BYTES;
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getLong();
}
Expand All @@ -85,7 +86,7 @@ public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context)
@Override
public long[] getLongMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int numValues = _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining() / Long.BYTES;
long[] valueBuffer = new long[numValues];
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getLong();
Expand All @@ -96,7 +97,7 @@ public long[] getLongMV(int docId, ChunkReaderContext context) {
@Override
public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int numValues = _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining() / Float.BYTES;
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getFloat();
}
Expand All @@ -106,7 +107,7 @@ public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context
@Override
public float[] getFloatMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int numValues = _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining() / Float.BYTES;
float[] valueBuffer = new float[numValues];
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getFloat();
Expand All @@ -117,7 +118,7 @@ public float[] getFloatMV(int docId, ChunkReaderContext context) {
@Override
public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int numValues = _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining() / Double.BYTES;
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getDouble();
}
Expand All @@ -127,7 +128,7 @@ public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext conte
@Override
public double[] getDoubleMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int numValues = _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining() / Double.BYTES;
double[] valueBuffer = new double[numValues];
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getDouble();
Expand All @@ -138,7 +139,7 @@ public double[] getDoubleMV(int docId, ChunkReaderContext context) {
@Override
public int getNumValuesMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
return byteBuffer.getInt();
return _explicitMVEntrySize ? byteBuffer.getInt() : byteBuffer.remaining();
}

private ByteBuffer slice(int docId, ChunkReaderContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class MultiValueFixedByteRawIndexCreatorTest {

private static final Random RANDOM = new Random();

private static final boolean[] EXPLICIT_MV_ENTRY_SIZE_OPTIONS = {true, false};

@DataProvider(name = "compressionTypes")
public Object[][] compressionTypes() {
return Arrays.stream(ChunkCompressionType.values())
Expand All @@ -78,77 +80,86 @@ public void cleanup() {
@Test(dataProvider = "compressionTypes")
public void testMVInt(ChunkCompressionType compressionType, int writerVersion)
throws IOException {
// This tests varying lengths of MV rows
testMV(DataType.INT, ints(false), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV,
(reader, context, docId, buffer) -> {
int length = reader.getIntMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion);

// This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk
testMV(DataType.INT, ints(true), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV,
(reader, context, docId, buffer) -> {
int length = reader.getIntMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion);
for (boolean explicitMVEntrySizeOption : EXPLICIT_MV_ENTRY_SIZE_OPTIONS) {
// This tests varying lengths of MV rows
testMV(DataType.INT, ints(false), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV,
(reader, context, docId, buffer) -> {
int length = reader.getIntMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion, explicitMVEntrySizeOption);

// This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk
testMV(DataType.INT, ints(true), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV,
(reader, context, docId, buffer) -> {
int length = reader.getIntMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion, explicitMVEntrySizeOption);
}
}

@Test(dataProvider = "compressionTypes")
public void testMVLong(ChunkCompressionType compressionType, int writerVersion)
throws IOException {
// This tests varying lengths of MV rows
testMV(DataType.LONG, longs(false), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV,
(reader, context, docId, buffer) -> {
int length = reader.getLongMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion);

// This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk
testMV(DataType.LONG, longs(true), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV,
(reader, context, docId, buffer) -> {
int length = reader.getLongMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion);
for (boolean explicitMVEntrySizeOption : EXPLICIT_MV_ENTRY_SIZE_OPTIONS) {
// This tests varying lengths of MV rows
testMV(DataType.LONG, longs(false), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV,
(reader, context, docId, buffer) -> {
int length = reader.getLongMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion, explicitMVEntrySizeOption);

// This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk
testMV(DataType.LONG, longs(true), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV,
(reader, context, docId, buffer) -> {
int length = reader.getLongMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion, explicitMVEntrySizeOption);
}
}

@Test(dataProvider = "compressionTypes")
public void testMVFloat(ChunkCompressionType compressionType, int writerVersion)
throws IOException {
// This tests varying lengths of MV rows
testMV(DataType.FLOAT, floats(false), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV,
(reader, context, docId, buffer) -> {
int length = reader.getFloatMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion);

// This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk
testMV(DataType.FLOAT, floats(true), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV,
(reader, context, docId, buffer) -> {
int length = reader.getFloatMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion);
for (boolean explicitMVEntrySizeOption : EXPLICIT_MV_ENTRY_SIZE_OPTIONS) {
// This tests varying lengths of MV rows
testMV(DataType.FLOAT, floats(false), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV,
(reader, context, docId, buffer) -> {
int length = reader.getFloatMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion, explicitMVEntrySizeOption);

// This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk
testMV(DataType.FLOAT, floats(true), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV,
(reader, context, docId, buffer) -> {
int length = reader.getFloatMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion, explicitMVEntrySizeOption);
}
}

@Test(dataProvider = "compressionTypes")
public void testMVDouble(ChunkCompressionType compressionType, int writerVersion)
throws IOException {
// This tests varying lengths of MV rows
testMV(DataType.DOUBLE, doubles(false), x -> x.length, double[]::new,
MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context, docId, buffer) -> {
int length = reader.getDoubleMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion);

// This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk
testMV(DataType.DOUBLE, doubles(true), x -> x.length, double[]::new,
MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context, docId, buffer) -> {
int length = reader.getDoubleMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion);
for (boolean explicitMVEntrySizeOption : EXPLICIT_MV_ENTRY_SIZE_OPTIONS) {
// This tests varying lengths of MV rows
testMV(DataType.DOUBLE, doubles(false), x -> x.length, double[]::new,
MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context, docId, buffer) -> {
int length = reader.getDoubleMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion, explicitMVEntrySizeOption);

// This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk
testMV(DataType.DOUBLE, doubles(true), x -> x.length, double[]::new,
MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context, docId, buffer) -> {
int length = reader.getDoubleMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
}, compressionType, writerVersion, explicitMVEntrySizeOption);
}
}

public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T> sizeof, IntFunction<T> constructor,
Injector<T> injector, Extractor<T> extractor, ChunkCompressionType compressionType, int writerVersion)
Injector<T> injector, Extractor<T> extractor, ChunkCompressionType compressionType, int writerVersion,
boolean explicitMVEntrySize)
throws IOException {
String column = "testCol_" + dataType;
int numDocs = inputs.size();
Expand All @@ -158,6 +169,7 @@ public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T> sizeo
MultiValueFixedByteRawIndexCreator creator =
new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR), compressionType, column, numDocs, dataType,
maxElements, false, writerVersion, 1024 * 1024, 1000);
creator.setExplicitMVEntrySize(explicitMVEntrySize);
inputs.forEach(input -> injector.inject(creator, input));
creator.close();

Expand Down

0 comments on commit 84987ed

Please sign in to comment.