Skip to content

Commit

Permalink
Allow set none for avroCodec and parquetCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Nov 28, 2023
1 parent 6437d1e commit f4e67a1
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 8 deletions.
4 changes: 2 additions & 2 deletions docs/aws-s3-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
| `useHumanReadableSchemaVersion` | Boolean | False | false | Use a human-readable format string for the schema version in the message metadata. If it is set to `true`, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
| `skipFailedMessages` | Boolean | False | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `pathPrefix` | String | False | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
| `avroCodec` | String | False | snappy | Compression codec used when formatType=`avro`. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy. |
| `parquetCodec` | String | False | gzip | Compression codec used when formatType=`parquet`. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
| `avroCodec` | String | False | snappy | Compression codec used when formatType=`avro`. Available compression types are: none (no compression), deflate, bzip2, xz, zstandard, snappy. |
| `parquetCodec` | String | False | gzip | Compression codec used when formatType=`parquet`. Available compression types are: none (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
| `jsonAllowNaN` | Boolean | False | false | Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=`json`. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |

## Advanced features
Expand Down
4 changes: 2 additions & 2 deletions docs/azure-blob-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ Before using the Azure Blob Storage sink connector, you need to configure it. Th
| `useHumanReadableSchemaVersion` | Boolean | False | false | Use a human-readable format string for the schema version in the message metadata. If it is set to `true`, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
| `skipFailedMessages` | Boolean | False | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `pathPrefix` | String | False | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
| `avroCodec` | String | False | snappy | Compression codec used when formatType=`avro`. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy. |
| `parquetCodec` | String | False | gzip | Compression codec used when formatType=`parquet`. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
| `avroCodec` | String | False | snappy | Compression codec used when formatType=`avro`. Available compression types are: none (no compression), deflate, bzip2, xz, zstandard, snappy. |
| `parquetCodec` | String | False | gzip | Compression codec used when formatType=`parquet`. Available compression types are: none (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
| `jsonAllowNaN` | Boolean | False | false | Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=`json`. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |

There are three methods to authenticate with Azure Blob Storage:
Expand Down
4 changes: 2 additions & 2 deletions docs/google-cloud-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ Before using the Google Cloud Storage sink connector, you need to configure it.
| `useHumanReadableSchemaVersion` | Boolean | False | false | Use a human-readable format string for the schema version in the message metadata. If it is set to `true`, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
| `skipFailedMessages` | Boolean | False | false | Configure whether to skip a message which it fails to be processed. If it is set to `true`, the connector will skip the failed messages by `ack` it. Otherwise, the connector will `fail` the message. |
| `pathPrefix` | String | False | false | If it is set, the output files are stored in a folder under the given bucket path. The `pathPrefix` must be in the format of `xx/xxx/`. |
| `avroCodec` | String | False | snappy | Compression codec used when formatType=`avro`. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy. |
| `parquetCodec` | String | False | gzip | Compression codec used when formatType=`parquet`. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
| `avroCodec` | String | False | snappy | Compression codec used when formatType=`avro`. Available compression types are: none (no compression), deflate, bzip2, xz, zstandard, snappy. |
| `parquetCodec` | String | False | gzip | Compression codec used when formatType=`parquet`. Available compression types are: none (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
| `jsonAllowNaN` | Boolean | False | false | Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=`json`. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |

## Advanced features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ public class BlobStoreAbstractConfig implements Serializable {
private boolean partitionerUseIndexAsOffset;

// The AVRO codec.
// Options: null, deflate, bzip2, xz, zstandard, snappy
// Options: none, deflate, bzip2, xz, zstandard, snappy
private String avroCodec = "snappy";

// The Parquet codec.
// Options: null, snappy, gzip, lzo, brotli, lz4, zstd
// Options: none, snappy, gzip, lzo, brotli, lz4, zstd
private String parquetCodec = "gzip";

private String timePartitionPattern;
Expand Down Expand Up @@ -183,6 +183,13 @@ public void validate() {
checkArgument(pendingQueueSize > 0, "pendingQueueSize must be a positive integer.");
checkArgument(pendingQueueSize >= batchSize, "pendingQueueSize must be larger than or "
+ "equal to batchSize");

if (avroCodec != null && (avroCodec.isEmpty() || avroCodec.equals("none"))) {
avroCodec = null;
}
if (parquetCodec != null && (parquetCodec.isEmpty() || parquetCodec.equals("none"))) {
parquetCodec = null;
}
}

private static boolean hasURIScheme(String endpoint) {
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/org/apache/pulsar/io/jcloud/ConnectorConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,34 @@ public void testNotAllowEndpointEmptyWithAzure() throws IOException {
}
}

@Test
public void testCodec() throws IOException {
Map<String, Object> config = new HashMap<>();
config.put("provider", PROVIDER_AZURE);
config.put("azureStorageAccountConnectionString", "test-connection-string");
config.put("bucket", "test-container-name");
config.put("formatType", "bytes");
config.put("partitionerType", "PARTITION");
config.put("avroCodec", "snappy");
config.put("parquetCodec", "snappy");
CloudStorageSinkConfig cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
cloudStorageSinkConfig.validate();
Assert.assertEquals("snappy", cloudStorageSinkConfig.getAvroCodec());
Assert.assertEquals("snappy", cloudStorageSinkConfig.getParquetCodec());

config.put("avroCodec", "");
config.put("parquetCodec", "");
cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
cloudStorageSinkConfig.validate();
Assert.assertNull(cloudStorageSinkConfig.getAvroCodec());
Assert.assertNull(cloudStorageSinkConfig.getParquetCodec());

config.put("avroCodec", "none");
config.put("parquetCodec", "none");
cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
cloudStorageSinkConfig.validate();
Assert.assertNull(cloudStorageSinkConfig.getAvroCodec());
Assert.assertNull(cloudStorageSinkConfig.getParquetCodec());
}

}

0 comments on commit f4e67a1

Please sign in to comment.