Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3122: Correct V2 page header compression fields for zero-size data pages #3148

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) {
public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) {

int plainTextLength = cipherTextLength - NONCE_LENGTH;
if (plainTextLength < 1) {
if (plainTextLength < 0) {
throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
}

Expand Down Expand Up @@ -91,7 +91,7 @@ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
int cipherTextLength = ciphertext.limit() - ciphertext.position() - SIZE_LENGTH;

int plainTextLength = cipherTextLength - NONCE_LENGTH;
if (plainTextLength < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

if (plainTextLength < 0) {
throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) {
public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) {

int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
if (plainTextLength < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AES GCM decryption is able to handle an empty plaintext. It will also use the key to check the IV/tag integrity.
So the fix can be just replacement of if (plainTextLength < 1) { with if (plainTextLength < 0) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions. Updated.

if (plainTextLength < 0) {
throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
}

Expand Down Expand Up @@ -81,7 +81,7 @@ public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
int cipherTextOffset = SIZE_LENGTH;
int cipherTextLength = ciphertext.limit() - ciphertext.position() - SIZE_LENGTH;
int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
if (plainTextLength < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

if (plainTextLength < 0) {
throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,8 @@ public void writeDataPageV2Header(
rowCount,
dataEncoding,
rlByteLength,
dlByteLength),
dlByteLength,
true /* compressed by default */),
to);
}

Expand Down Expand Up @@ -2059,6 +2060,10 @@ public void writeDataPageV1Header(
pageHeaderAAD);
}

/**
* @deprecated will be removed in 2.0.0. Use {@link ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, org.apache.parquet.column.Encoding, int, int, boolean, OutputStream)} instead
*/
@Deprecated
public void writeDataPageV2Header(
int uncompressedSize,
int compressedSize,
Expand All @@ -2079,11 +2084,16 @@ public void writeDataPageV2Header(
dataEncoding,
rlByteLength,
dlByteLength,
true, /* compressed by default */
to,
null,
null);
}

/**
* @deprecated will be removed in 2.0.0. Use {@link ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, org.apache.parquet.column.Encoding, int, int, boolean, OutputStream, BlockCipher.Encryptor, byte[])} instead
*/
@Deprecated
public void writeDataPageV2Header(
int uncompressedSize,
int compressedSize,
Expand All @@ -2097,35 +2107,53 @@ public void writeDataPageV2Header(
BlockCipher.Encryptor blockEncryptor,
byte[] pageHeaderAAD)
throws IOException {
writePageHeader(
newDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength),
writeDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
true, /* compressed by default */
to,
blockEncryptor,
pageHeaderAAD);
}

private PageHeader newDataPageV2Header(
/**
* @deprecated will be removed in 2.0.0. Use {@link ParquetMetadataConverter#writeDataPageV2Header(int, int, int, int, int, org.apache.parquet.column.Encoding, int, int, boolean, int, OutputStream, BlockCipher.Encryptor, byte[])} instead
*/
@Deprecated
public void writeDataPageV2Header(
int uncompressedSize,
int compressedSize,
int valueCount,
int nullCount,
int rowCount,
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
int dlByteLength) {
DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
valueCount, nullCount, rowCount, getEncoding(dataEncoding), dlByteLength, rlByteLength);
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
pageHeader.setData_page_header_v2(dataPageHeaderV2);
return pageHeader;
int dlByteLength,
int crc,
OutputStream to,
BlockCipher.Encryptor blockEncryptor,
byte[] pageHeaderAAD)
throws IOException {
writeDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
true, /* compressed by default */
crc,
to,
blockEncryptor,
pageHeaderAAD);
}

public void writeDataPageV2Header(
Expand All @@ -2137,7 +2165,34 @@ public void writeDataPageV2Header(
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
int dlByteLength,
int crc,
boolean compressed,
OutputStream to)
throws IOException {
writeDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
compressed,
to,
null,
null);
}

public void writeDataPageV2Header(
int uncompressedSize,
int compressedSize,
int valueCount,
int nullCount,
int rowCount,
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
int dlByteLength,
boolean compressed,
OutputStream to,
BlockCipher.Encryptor blockEncryptor,
byte[] pageHeaderAAD)
Expand All @@ -2152,12 +2207,43 @@ public void writeDataPageV2Header(
dataEncoding,
rlByteLength,
dlByteLength,
crc),
compressed),
to,
blockEncryptor,
pageHeaderAAD);
}

public void writeDataPageV2Header(
int uncompressedSize,
int compressedSize,
int valueCount,
int nullCount,
int rowCount,
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
int dlByteLength,
boolean compressed,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou @wgtmac changed to an explicit parameter.

int crc,
OutputStream to,
BlockCipher.Encryptor blockEncryptor,
byte[] pageHeaderAAD)
throws IOException {
PageHeader pageHeader = newDataPageV2Header(
uncompressedSize,
compressedSize,
valueCount,
nullCount,
rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
compressed);

pageHeader.setCrc(crc);

writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD);
}

private PageHeader newDataPageV2Header(
int uncompressedSize,
int compressedSize,
Expand All @@ -2167,12 +2253,13 @@ private PageHeader newDataPageV2Header(
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength,
int dlByteLength,
int crc) {
boolean compressed) {
DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
valueCount, nullCount, rowCount, getEncoding(dataEncoding), dlByteLength, rlByteLength);
dataPageHeaderV2.setIs_compressed(compressed);

PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
pageHeader.setData_page_header_v2(dataPageHeaderV2);
pageHeader.setCrc(crc);
return pageHeader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,13 @@ public void writePageV2(
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
int uncompressedSize = toIntWithCheck(data.size() + repetitionLevels.size() + definitionLevels.size());
// TODO: decide if we compress
BytesInput compressedData = compressor.compress(data);
boolean compressed = false;
BytesInput compressedData = BytesInput.empty();
if (data.size() > 0) {
// TODO: decide if we compress
compressedData = compressor.compress(data);
compressed = true;
}
if (null != pageBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
Expand Down Expand Up @@ -327,6 +332,7 @@ public void writePageV2(
dataEncoding,
rlByteLength,
dlByteLength,
compressed,
(int) crc.getValue(),
tempOutputStream,
headerBlockEncryptor,
Expand All @@ -341,6 +347,7 @@ public void writePageV2(
dataEncoding,
rlByteLength,
dlByteLength,
compressed,
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
Expand Down
Loading