Skip to content

Commit

Permalink
fixes #3122
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Feb 6, 2025
1 parent 3306fd6 commit 08b7936
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static DataPageV2 uncompressed(
definitionLevels,
dataEncoding,
data,
0,
Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()),
statistics,
false);
Expand Down Expand Up @@ -89,6 +90,7 @@ public static DataPageV2 uncompressed(
definitionLevels,
dataEncoding,
data,
0,
Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()),
statistics,
false);
Expand Down Expand Up @@ -124,6 +126,7 @@ public static DataPageV2 compressed(
definitionLevels,
dataEncoding,
data,
Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()),
uncompressedSize,
statistics,
true);
Expand All @@ -138,6 +141,10 @@ public static DataPageV2 compressed(
private final Statistics<?> statistics;
private final boolean isCompressed;

/**
* @deprecated will be removed in 2.0.0. Use {@link DataPageV2#DataPageV2(int, int, int, long, BytesInput, BytesInput, Encoding, BytesInput, int, int, Statistics, boolean)} instead
*/
@Deprecated
public DataPageV2(
int rowCount,
int nullCount,
Expand All @@ -163,6 +170,33 @@ public DataPageV2(
this.isCompressed = isCompressed;
}

public DataPageV2(
int rowCount,
int nullCount,
int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput data,
int compressedSize,
int uncompressedSize,
Statistics<?> statistics,
boolean isCompressed) {
super(compressedSize, uncompressedSize, valueCount);
if (!isCompressed && compressedSize != 0) {
throw new IllegalArgumentException("compressedSize must be 0 if page is not compressed");
}

this.rowCount = rowCount;
this.nullCount = nullCount;
this.repetitionLevels = repetitionLevels;
this.definitionLevels = definitionLevels;
this.dataEncoding = dataEncoding;
this.data = data;
this.statistics = statistics;
this.isCompressed = isCompressed;
}

private DataPageV2(
int rowCount,
int nullCount,
Expand All @@ -172,14 +206,11 @@ private DataPageV2(
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput data,
int compressedSize,
int uncompressedSize,
Statistics<?> statistics,
boolean isCompressed) {
super(
Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()),
uncompressedSize,
valueCount,
firstRowIndex);
super(compressedSize, uncompressedSize, valueCount, firstRowIndex);
this.rowCount = rowCount;
this.nullCount = nullCount;
this.repetitionLevels = repetitionLevels;
Expand All @@ -190,6 +221,11 @@ private DataPageV2(
this.isCompressed = isCompressed;
}

@Override
public int getCompressedSize() {
return isCompressed ? super.getCompressedSize() : 0;
}

public int getRowCount() {
return rowCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public abstract class Page {
this.uncompressedSize = uncompressedSize;
}

/**
* @return the compressed size of the page when the bytes are compressed, otherwise return 0
*/
public int getCompressedSize() {
return compressedSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.column.impl;

import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;

import java.util.List;
Expand Down Expand Up @@ -135,4 +136,46 @@ public void testOptional() throws Exception {
}
assertEquals(0, converter.count);
}

@Test
public void testV2AllNullValues() throws Exception {
MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
ColumnDescriptor col = schema.getColumns().get(0);
MemPageWriter pageWriter = new MemPageWriter();
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(
col,
pageWriter,
ParquetProperties.builder()
.withDictionaryPageSize(1024)
.withWriterVersion(PARQUET_2_0)
.withPageSize(2048)
.build());
for (int i = 0; i < rows; i++) {
columnWriterV2.writeNull(0, 0);
}
columnWriterV2.writePage();
columnWriterV2.finalizeColumnChunk();
List<DataPage> pages = pageWriter.getPages();
int valueCount = 0;
int rowCount = 0;
for (DataPage dataPage : pages) {
DataPageV2 page = (DataPageV2) dataPage;
valueCount += page.getValueCount();
rowCount += page.getRowCount();
assertFalse(page.isCompressed());
assertEquals(0, page.getCompressedSize());
}
assertEquals(rows, rowCount);
assertEquals(rows, valueCount);
MemPageReader pageReader = new MemPageReader(rows, pages.iterator(), pageWriter.getDictionaryPage());
ValidatingConverter converter = new ValidatingConverter();
ColumnReader columnReader =
new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
for (int i = 0; i < rows; i++) {
assertEquals(0, columnReader.getCurrentRepetitionLevel());
assertEquals(0, columnReader.getCurrentDefinitionLevel());
columnReader.consume();
}
assertEquals(0, converter.count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2123,6 +2123,9 @@ private PageHeader newDataPageV2Header(
int dlByteLength) {
DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
valueCount, nullCount, rowCount, getEncoding(dataEncoding), dlByteLength, rlByteLength);
if (compressedSize == 0) {
dataPageHeaderV2.setIs_compressed(false);
}
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
pageHeader.setData_page_header_v2(dataPageHeaderV2);
return pageHeader;
Expand All @@ -2142,38 +2145,18 @@ public void writeDataPageV2Header(
BlockCipher.Encryptor blockEncryptor,
byte[] pageHeaderAAD)
throws IOException {
writePageHeader(
newDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
crc),
to,
blockEncryptor,
pageHeaderAAD);
}

private PageHeader newDataPageV2Header(
int uncompressedSize,
int compressedSize,
int valueCount,
int nullCount,
int rowCount,
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
int dlByteLength,
int crc) {
DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
valueCount, nullCount, rowCount, getEncoding(dataEncoding), dlByteLength, rlByteLength);
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
pageHeader.setData_page_header_v2(dataPageHeaderV2);
PageHeader pageHeader = newDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength);
pageHeader.setCrc(crc);
return pageHeader;

writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD);
}

public void writeDictionaryPageHeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,19 @@ public void writePageV2(
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
int uncompressedSize = toIntWithCheck(data.size() + repetitionLevels.size() + definitionLevels.size());
// TODO: decide if we compress
BytesInput compressedData = compressor.compress(data);
if (null != pageBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
BytesInput compressedData = BytesInput.empty();
int compressedSize = 0;
if (data.size() > 0) {
// TODO: decide if we compress
compressedData = compressor.compress(data);
if (null != pageBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
compressedData =
BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
}
compressedSize =
toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size());
}
int compressedSize =
toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size());
tempOutputStream.reset();
if (null != headerBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,7 @@ public ColumnChunkPageReader readAllPages(
definitionLevels,
converter.getEncoding(dataHeaderV2.getEncoding()),
values,
dataHeaderV2.isIs_compressed() ? compressedPageSize : 0,
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), type),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,8 +1150,11 @@ public void writeDataPageV2(
int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels");
int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels");

int compressedSize =
toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page");
int compressedSize = 0;
if (compressedData.size() > 0) {
compressedSize =
toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page");
}

int uncompressedSize =
toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ public DataPage visit(DataPageV2 data) {
BytesInput.from(data.getDefinitionLevels().toByteArray()),
data.getDataEncoding(),
BytesInput.from(data.getData().toByteArray()),
data.isCompressed() ? data.getCompressedSize() : 0,
data.getUncompressedSize(),
data.getStatistics(),
data.isCompressed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

Expand All @@ -60,6 +62,8 @@
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
Expand Down Expand Up @@ -592,4 +596,49 @@ public void testSizeStatisticsControl() throws Exception {
}
}
}

@Test
public void testV2WriteAllNullValues() throws Exception {
MessageType schema = Types.buildMessage().optional(FLOAT).named("float").named("msg");

Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);

File file = temp.newFile();
temp.delete();
Path path = new Path(file.getAbsolutePath());

SimpleGroupFactory factory = new SimpleGroupFactory(schema);
Group nullValue = factory.newGroup();
int recordCount = 10;

try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withAllocator(allocator)
.withConf(conf)
.withWriterVersion(WriterVersion.PARQUET_2_0)
.withDictionaryEncoding(false)
.build()) {
for (int i = 0; i < recordCount; i++) {
writer.write(nullValue);
}
}

try (ParquetReader<Group> reader =
ParquetReader.builder(new GroupReadSupport(), path).build()) {
int readRecordCount = 0;
for (Group group = reader.read(); group != null; group = reader.read()) {
assertEquals(nullValue.toString(), group.toString());
++readRecordCount;
}
assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount);
}

try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
reader.f.seek(blockMetaData.getStartingPos());
PageHeader pageHeader = Util.readPageHeader(reader.f);
assertFalse(pageHeader.getData_page_header_v2().isIs_compressed());
assertEquals(0, pageHeader.getCompressed_page_size());
}
}
}

0 comments on commit 08b7936

Please sign in to comment.