From 0d520b1de7ea2c1f24f230310fd9cb20a9073e13 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Fri, 7 Feb 2025 19:08:22 +0800 Subject: [PATCH 1/4] fixes #3122 --- .../parquet/crypto/AesCtrDecryptor.java | 12 +- .../parquet/crypto/AesGcmDecryptor.java | 13 +- .../converter/ParquetMetadataConverter.java | 131 ++++++++++++--- .../hadoop/ColumnChunkPageWriteStore.java | 11 +- .../parquet/hadoop/ParquetFileWriter.java | 158 +++++++++++++++++- .../hadoop/rewrite/ParquetRewriter.java | 2 + .../parquet/hadoop/TestParquetFileWriter.java | 8 +- .../parquet/hadoop/TestParquetWriter.java | 115 ++++++++++++- 8 files changed, 412 insertions(+), 38 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java index 2735f63bfb..d16be490c2 100755 --- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java @@ -55,7 +55,11 @@ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) { public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) { int plainTextLength = cipherTextLength - NONCE_LENGTH; - if (plainTextLength < 1) { + if (plainTextLength == 0) { + return new byte[0]; + } + + if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); } @@ -91,7 +95,11 @@ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) { int cipherTextLength = ciphertext.limit() - ciphertext.position() - SIZE_LENGTH; int plainTextLength = cipherTextLength - NONCE_LENGTH; - if (plainTextLength < 1) { + if (plainTextLength == 0) { + return ByteBuffer.allocate(0); + } + + if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java index dc378effcf..0cbb57fea1 100755 --- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java @@ -51,7 +51,11 @@ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) { public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) { int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH; - if (plainTextLength < 1) { + if (plainTextLength == 0) { + return new byte[0]; + } + + if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); } @@ -81,7 +85,12 @@ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) { int cipherTextOffset = SIZE_LENGTH; int cipherTextLength = ciphertext.limit() - ciphertext.position() - SIZE_LENGTH; int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH; - if (plainTextLength < 1) { + + if (plainTextLength == 0) { + return ByteBuffer.allocate(0); + } + + if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index e72f2c33a2..46baf848e8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1981,7 +1981,8 @@ public void writeDataPageV2Header( rowCount, dataEncoding, rlByteLength, - dlByteLength), + dlByteLength, + true /* compressed by default */), to); } @@ -2059,6 +2060,10 @@ public void writeDataPageV1Header( pageHeaderAAD); } + /** + * @deprecated will be removed in 2.0.0. Use {@link ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, org.apache.parquet.column.Encoding, int, int, boolean, OutputStream)} instead + */ + @Deprecated public void writeDataPageV2Header( int uncompressedSize, int compressedSize, @@ -2079,11 +2084,16 @@ public void writeDataPageV2Header( dataEncoding, rlByteLength, dlByteLength, + true, /* compressed by default */ to, null, null); } + /** + * @deprecated will be removed in 2.0.0. Use {@link ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, org.apache.parquet.column.Encoding, int, int, boolean, OutputStream, BlockCipher.Encryptor, byte[])} instead + */ + @Deprecated public void writeDataPageV2Header( int uncompressedSize, int compressedSize, @@ -2097,22 +2107,26 @@ public void writeDataPageV2Header( BlockCipher.Encryptor blockEncryptor, byte[] pageHeaderAAD) throws IOException { - writePageHeader( - newDataPageV2Header( - uncompressedSize, - compressedSize, - valueCount, - nullCount, - rowCount, - dataEncoding, - rlByteLength, - dlByteLength), + writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + true, /* compressed by default */ to, blockEncryptor, pageHeaderAAD); } - private PageHeader newDataPageV2Header( + /** + * @deprecated will be removed in 2.0.0. Use {@link ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, org.apache.parquet.column.Encoding, int, int, boolean, int, OutputStream, BlockCipher.Encryptor, byte[])} instead + */ + @Deprecated + public void writeDataPageV2Header( int uncompressedSize, int compressedSize, int valueCount, @@ -2120,12 +2134,26 @@ private PageHeader newDataPageV2Header( int rowCount, org.apache.parquet.column.Encoding dataEncoding, int rlByteLength, - int dlByteLength) { - DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2( - valueCount, nullCount, rowCount, getEncoding(dataEncoding), dlByteLength, rlByteLength); - PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize); - pageHeader.setData_page_header_v2(dataPageHeaderV2); - return pageHeader; + int dlByteLength, + int crc, + OutputStream to, + BlockCipher.Encryptor blockEncryptor, + byte[] pageHeaderAAD) + throws IOException { + writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + true, /* compressed by default */ + crc, + to, + blockEncryptor, + pageHeaderAAD); } public void writeDataPageV2Header( @@ -2137,7 +2165,34 @@ public void writeDataPageV2Header( org.apache.parquet.column.Encoding dataEncoding, int rlByteLength, int dlByteLength, - int crc, + boolean compressed, + OutputStream to) + throws IOException { + writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + compressed, + to, + null, + null); + } + + public void writeDataPageV2Header( + int uncompressedSize, + int compressedSize, + int valueCount, + int nullCount, + int rowCount, + org.apache.parquet.column.Encoding dataEncoding, + int rlByteLength, + int dlByteLength, + boolean compressed, OutputStream to, BlockCipher.Encryptor blockEncryptor, byte[] pageHeaderAAD) @@ -2152,12 +2207,43 @@ public void writeDataPageV2Header( dataEncoding, rlByteLength, dlByteLength, - crc), + compressed), to, blockEncryptor, pageHeaderAAD); } + public void writeDataPageV2Header( + int uncompressedSize, + int compressedSize, + int valueCount, + int nullCount, + int rowCount, + org.apache.parquet.column.Encoding dataEncoding, + int rlByteLength, + int dlByteLength, + boolean compressed, + int crc, + OutputStream to, + BlockCipher.Encryptor blockEncryptor, + byte[] pageHeaderAAD) + throws IOException { + PageHeader pageHeader = newDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + compressed); + + pageHeader.setCrc(crc); + + writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD); + } + private PageHeader newDataPageV2Header( int uncompressedSize, int compressedSize, @@ -2167,12 +2253,13 @@ private PageHeader newDataPageV2Header( org.apache.parquet.column.Encoding dataEncoding, int rlByteLength, int dlByteLength, - int crc) { + boolean compressed) { DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2( valueCount, nullCount, rowCount, getEncoding(dataEncoding), dlByteLength, rlByteLength); + dataPageHeaderV2.setIs_compressed(compressed); + PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize); pageHeader.setData_page_header_v2(dataPageHeaderV2); - pageHeader.setCrc(crc); return pageHeader; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 795063e5c8..566ab76cc5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -295,8 +295,13 @@ public void writePageV2( int rlByteLength = toIntWithCheck(repetitionLevels.size()); int dlByteLength = toIntWithCheck(definitionLevels.size()); int uncompressedSize = toIntWithCheck(data.size() + repetitionLevels.size() + definitionLevels.size()); - // TODO: decide if we compress - BytesInput compressedData = compressor.compress(data); + boolean compressed = false; + BytesInput compressedData = BytesInput.empty(); + if (data.size() > 0) { + // TODO: decide if we compress + compressedData = compressor.compress(data); + compressed = true; + } if (null != pageBlockEncryptor) { AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal); compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD)); @@ -327,6 +332,7 @@ public void writePageV2( dataEncoding, rlByteLength, dlByteLength, + compressed, (int) crc.getValue(), tempOutputStream, headerBlockEncryptor, @@ -341,6 +347,7 @@ public void writePageV2( dataEncoding, rlByteLength, dlByteLength, + compressed, tempOutputStream, headerBlockEncryptor, dataPageHeaderAAD); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index f0a912f599..9b2f7f4450 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -1044,7 +1044,9 @@ public void addBloomFilter(String column, BloomFilter bloomFilter) { * @param uncompressedDataSize the size of uncompressed data * @param statistics the statistics of the page * @throws IOException if any I/O error occurs during writing the file + * @deprecated will be removed in 2.0.0. Use {@link ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput, Encoding, BytesInput, boolean, int, Statistics)} instead */ + @Deprecated public void writeDataPageV2( int rowCount, int nullCount, @@ -1064,6 +1066,50 @@ public void writeDataPageV2( definitionLevels, dataEncoding, compressedData, + true, /* compressed by default */ + uncompressedDataSize, + statistics, + null, + null, + null); + } + + /** + * Writes a single v2 data page + * + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param bytes data bytes + * @param compressed whether the data bytes is compressed + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput bytes, + boolean compressed, + int uncompressedDataSize, + Statistics statistics) + throws IOException { + writeDataPageV2( + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + dataEncoding, + bytes, + compressed, uncompressedDataSize, statistics, null, @@ -1086,7 +1132,9 @@ public void writeDataPageV2( * @param metadataBlockEncryptor encryptor for block data * @param pageHeaderAAD pageHeader AAD * @throws IOException if any I/O error occurs during writing the file + * @deprecated will be removed in 2.0.0. Use {@link ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput, Encoding, BytesInput, boolean, int, Statistics, BlockCipher.Encryptor, byte[])} instead */ + @Deprecated public void writeDataPageV2( int rowCount, int nullCount, @@ -1108,6 +1156,54 @@ public void writeDataPageV2( definitionLevels, dataEncoding, compressedData, + true, /* compressed by default */ + uncompressedDataSize, + statistics, + metadataBlockEncryptor, + pageHeaderAAD, + null); + } + + /** + * Writes a single v2 data page + * + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param bytes data bytes + * @param compressed whether the data bytes is compressed + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput bytes, + boolean compressed, + int uncompressedDataSize, + Statistics statistics, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD) + throws IOException { + writeDataPageV2( + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + dataEncoding, + bytes, + compressed, uncompressedDataSize, statistics, metadataBlockEncryptor, @@ -1131,7 +1227,9 @@ public void writeDataPageV2( * @param pageHeaderAAD pageHeader AAD * @param sizeStatistics size statistics for the page * @throws IOException if any I/O error occurs during writing the file + * @deprecated will be removed in 2.0.0. Use {@link ParquetFileWriter#writeDataPageV2(int, int, int, BytesInput, BytesInput, Encoding, BytesInput, boolean, int, Statistics, BlockCipher.Encryptor, byte[], SizeStatistics)} instead */ + @Deprecated public void writeDataPageV2( int rowCount, int nullCount, @@ -1146,12 +1244,60 @@ public void writeDataPageV2( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { + writeDataPageV2( + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + dataEncoding, + compressedData, + true, /* compressed by default */ + uncompressedDataSize, + statistics, + metadataBlockEncryptor, + pageHeaderAAD, + sizeStatistics); + } + + /** + * Writes a single v2 data page + * + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param bytes data bytes + * @param compressed whether the data bytes is compressed + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput bytes, + boolean compressed, + int uncompressedDataSize, + Statistics statistics, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) + throws IOException { state = state.write(); int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); - int compressedSize = - toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page"); + int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page"); int uncompressedSize = toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); @@ -1169,8 +1315,8 @@ public void writeDataPageV2( if (definitionLevels.size() > 0) { crcUpdate(definitionLevels); } - if (compressedData.size() > 0) { - crcUpdate(compressedData); + if (bytes.size() > 0) { + crcUpdate(bytes); } metadataConverter.writeDataPageV2Header( uncompressedSize, @@ -1181,6 +1327,7 @@ public void writeDataPageV2( dataEncoding, rlByteLength, dlByteLength, + compressed, (int) crc.getValue(), out, metadataBlockEncryptor, @@ -1195,6 +1342,7 @@ public void writeDataPageV2( dataEncoding, rlByteLength, dlByteLength, + compressed, out, metadataBlockEncryptor, pageHeaderAAD); @@ -1209,7 +1357,7 @@ public void writeDataPageV2( currentEncodings.add(dataEncoding); encodingStatsBuilder.addDataEncoding(dataEncoding); - BytesInput.concat(repetitionLevels, definitionLevels, compressedData).writeAllTo(out); + BytesInput.concat(repetitionLevels, definitionLevels, bytes).writeAllTo(out); offsetIndexBuilder.add( toIntWithCheck(out.getPos() - beforeHeader, "page"), diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 9535b4335d..c8c25f3c66 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -737,6 +737,7 @@ private void processChunk( encryptColumn, dataEncryptor, dataPageAAD); + boolean compressed = compressor != null; statistics = convertStatistics( originalCreatedBy, normalizeNameInType(chunk.getPrimitiveType()), @@ -762,6 +763,7 @@ private void processChunk( dlLevels, converter.getEncoding(headerV2.getEncoding()), BytesInput.from(pageLoad), + compressed, rawDataLength, statistics, metaEncryptor, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index c6be72ff70..3126e1746f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -513,15 +513,15 @@ public void testWriteReadDataPageV2() throws Exception { w.startColumn(C1, 6, CODEC); long c1Starts = w.getPos(); - w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data, 4, statsC1P1); - w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data, 4, statsC1P2); + w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data, false, 4, statsC1P1); + w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data, false, 4, statsC1P2); w.endColumn(); long c1Ends = w.getPos(); w.startColumn(C2, 5, CODEC); long c2Starts = w.getPos(); - w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2, 4, EMPTY_STATS); - w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2, 4, EMPTY_STATS); + w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2, false, 4, EMPTY_STATS); + w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2, false, 4, EMPTY_STATS); w.endColumn(); long c2Ends = w.getPos(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 64001bcaf2..c8e8f71a91 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -33,9 +33,11 @@ import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -51,20 +53,32 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ColumnEncryptionProperties; +import org.apache.parquet.crypto.DecryptionKeyRetrieverMock; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.crypto.InternalColumnDecryptionSetup; +import org.apache.parquet.crypto.InternalFileDecryptor; +import org.apache.parquet.crypto.ModuleCipherFactory; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopOutputFile; @@ -232,7 +246,7 @@ public void testBadWriteSchema() throws IOException { return null; }); - Assert.assertFalse("Should not create a file when schema is rejected", file.exists()); + assertFalse("Should not create a file when schema is rejected", file.exists()); } // Testing the issue of PARQUET-1531 where writing null nested rows leads to empty pages if the page row count limit @@ -592,4 +606,103 @@ public void testSizeStatisticsControl() throws Exception { } } } + + @Test + public void testV2WriteAllNullValues() throws Exception { + testV2WriteAllNullValues(null, null); + } + + @Test + public void testV2WriteAllNullValuesWithEncrypted() throws Exception { + byte[] footerEncryptionKey = "0123456789012345".getBytes(); + byte[] columnEncryptionKey = "1234567890123450".getBytes(); + + String footerEncryptionKeyID = "kf"; + String columnEncryptionKeyID = "kc"; + + ColumnEncryptionProperties columnProperties = ColumnEncryptionProperties.builder("float") + .withKey(columnEncryptionKey) + .withKeyID(columnEncryptionKeyID) + .build(); + + Map columnPropertiesMap = new HashMap<>(); + columnPropertiesMap.put(columnProperties.getPath(), columnProperties); + + FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(footerEncryptionKey) + .withFooterKeyID(footerEncryptionKeyID) + .withEncryptedColumns(columnPropertiesMap) + .build(); + + DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock() + .putKey(footerEncryptionKeyID, footerEncryptionKey) + .putKey(columnEncryptionKeyID, columnEncryptionKey); + FileDecryptionProperties decryptionProperties = FileDecryptionProperties.builder() + .withKeyRetriever(decryptionKeyRetrieverMock) + .build(); + + testV2WriteAllNullValues(encryptionProperties, decryptionProperties); + } + + private void testV2WriteAllNullValues( + FileEncryptionProperties encryptionProperties, FileDecryptionProperties decryptionProperties) + throws Exception { + MessageType schema = Types.buildMessage().optional(FLOAT).named("float").named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + File file = temp.newFile(); + temp.delete(); + Path path = new Path(file.getAbsolutePath()); + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + Group nullValue = factory.newGroup(); + int recordCount = 10; + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withConf(conf) + .withWriterVersion(WriterVersion.PARQUET_2_0) + .withDictionaryEncoding(false) + .withEncryption(encryptionProperties) + .build()) { + for (int i = 0; i < recordCount; i++) { + writer.write(nullValue); + } + } + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path) + .withDecryption(decryptionProperties) + .build()) { + int readRecordCount = 0; + for (Group group = reader.read(); group != null; group = reader.read()) { + assertEquals(nullValue.toString(), group.toString()); + ++readRecordCount; + } + assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount); + } + + ParquetReadOptions options = ParquetReadOptions.builder() + .withDecryption(decryptionProperties) + .build(); + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf), options)) { + BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); + reader.f.seek(blockMetaData.getStartingPos()); + + if (decryptionProperties != null) { + InternalFileDecryptor fileDecryptor = + reader.getFooter().getFileMetaData().getFileDecryptor(); + InternalColumnDecryptionSetup columnDecryptionSetup = + fileDecryptor.getColumnSetup(ColumnPath.fromDotString("float")); + byte[] dataPageHeaderAAD = AesCipher.createModuleAAD( + fileDecryptor.getFileAAD(), ModuleCipherFactory.ModuleType.DataPageHeader, 0, 0, 0); + PageHeader pageHeader = + Util.readPageHeader(reader.f, columnDecryptionSetup.getMetaDataDecryptor(), dataPageHeaderAAD); + assertFalse(pageHeader.getData_page_header_v2().isIs_compressed()); + } else { + PageHeader pageHeader = Util.readPageHeader(reader.f); + assertFalse(pageHeader.getData_page_header_v2().isIs_compressed()); + } + } + } } From 1c16f92afa92bfe6d05239bcdc4b52e32ffac8e9 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Mon, 10 Feb 2025 14:52:12 +0800 Subject: [PATCH 2/4] address comments & fixes --- .../java/org/apache/parquet/crypto/AesGcmDecryptor.java | 7 ------- .../org/apache/parquet/hadoop/rewrite/ParquetRewriter.java | 2 +- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java index 0cbb57fea1..b0da1572a5 100755 --- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java @@ -51,9 +51,6 @@ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) { public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) { int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH; - if (plainTextLength == 0) { - return new byte[0]; - } if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); @@ -86,10 +83,6 @@ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) { int cipherTextLength = ciphertext.limit() - ciphertext.position() - SIZE_LENGTH; int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH; - if (plainTextLength == 0) { - return ByteBuffer.allocate(0); - } - if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index c8c25f3c66..3dceeee827 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -737,7 +737,7 @@ private void processChunk( encryptColumn, dataEncryptor, dataPageAAD); - boolean compressed = compressor != null; + boolean compressed = compressor != null || headerV2.is_compressed; statistics = convertStatistics( originalCreatedBy, normalizeNameInType(chunk.getPrimitiveType()), From d6d00fa611a7f922c3668a969b3c436fd4189bbb Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Mon, 10 Feb 2025 14:55:04 +0800 Subject: [PATCH 3/4] address comments --- .../org/apache/parquet/hadoop/rewrite/ParquetRewriter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 3dceeee827..10c84731f0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -737,7 +737,6 @@ private void processChunk( encryptColumn, dataEncryptor, dataPageAAD); - boolean compressed = compressor != null || headerV2.is_compressed; statistics = convertStatistics( originalCreatedBy, normalizeNameInType(chunk.getPrimitiveType()), @@ -763,7 +762,7 @@ private void processChunk( dlLevels, converter.getEncoding(headerV2.getEncoding()), BytesInput.from(pageLoad), - compressed, + headerV2.is_compressed, rawDataLength, statistics, metaEncryptor, From 8b342f1fa7c7ed8ece544f6ab8f2a9eaa0308f89 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Tue, 11 Feb 2025 19:58:09 +0800 Subject: [PATCH 4/4] address comments --- .../java/org/apache/parquet/crypto/AesCtrDecryptor.java | 8 -------- .../java/org/apache/parquet/crypto/AesGcmDecryptor.java | 2 -- 2 files changed, 10 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java index d16be490c2..afc5054b1c 100755 --- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java @@ -55,10 +55,6 @@ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) { public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) { int plainTextLength = cipherTextLength - NONCE_LENGTH; - if (plainTextLength == 0) { - return new byte[0]; - } - if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); } @@ -95,10 +91,6 @@ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) { int cipherTextLength = ciphertext.limit() - ciphertext.position() - SIZE_LENGTH; int plainTextLength = cipherTextLength - NONCE_LENGTH; - if (plainTextLength == 0) { - return ByteBuffer.allocate(0); - } - if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java index b0da1572a5..dc75d7e2ac 100755 --- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java @@ -51,7 +51,6 @@ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) { public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) { int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH; - if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); } @@ -82,7 +81,6 @@ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) { int cipherTextOffset = SIZE_LENGTH; int cipherTextLength = ciphertext.limit() - ciphertext.position() - SIZE_LENGTH; int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH; - if (plainTextLength < 0) { throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength); }