From ad1e72b7a09d6034c5ec0009131f39aaef236a50 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Wed, 10 May 2023 06:26:19 -0400 Subject: [PATCH] feat: Add the ability to set gRPC compression in the CPS sink connector. (#259) * feat: Add the ability to set gRPC compression in the CPS sink connector. * Add units to docs --- README.md | 2 ++ .../kafka/sink/CloudPubSubSinkConnector.java | 18 ++++++++++++++++++ .../pubsub/kafka/sink/CloudPubSubSinkTask.java | 9 +++++++++ 3 files changed, 29 insertions(+) diff --git a/README.md b/README.md index 9dbdf07..b86b935 100644 --- a/README.md +++ b/README.md @@ -210,6 +210,8 @@ configurations: | headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. | | orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. | | messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. | +| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. | +| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress. ### Pub/Sub Lite connector configs diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java index eb7d681..e7c5611 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java @@ -47,6 +47,8 @@ public class CloudPubSubSinkConnector extends SinkConnector { public static final String MAX_REQUEST_TIMEOUT_MS = "maxRequestTimeoutMs"; public static final String MAX_TOTAL_TIMEOUT_MS = "maxTotalTimeoutMs"; public static final String MAX_SHUTDOWN_TIMEOUT_MS = "maxShutdownTimeoutMs"; + public static final String ENABLE_COMPRESSION = "enableCompression"; + public static final String COMPRESSION_BYTES_THRESHOLD = "compressionBytesThreshold"; public static final int DEFAULT_MAX_BUFFER_SIZE = 100; public static final long DEFAULT_MAX_BUFFER_BYTES = 9500000L; public static final int DEFAULT_DELAY_THRESHOLD_MS = 100; @@ -61,6 +63,8 @@ public class CloudPubSubSinkConnector extends SinkConnector { public static final String PUBLISH_KAFKA_HEADERS = "headers.publish"; public static final String ORDERING_KEY_SOURCE = "orderingKeySource"; public static final String DEFAULT_ORDERING_KEY_SOURCE = "none"; + public static final boolean DEFAULT_ENABLE_COMPRESSION = false; + public static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L; /** Defines the accepted values for the {@link #ORDERING_KEY_SOURCE}. */ public enum OrderingKeySource { @@ -250,6 +254,20 @@ public ConfigDef config() { Importance.MEDIUM, "What to use to populate the Pub/Sub message ordering key. Possible values are " + "\"none\", \"key\", or \"partition\".") + .define( + ENABLE_COMPRESSION, + Type.BOOLEAN, + DEFAULT_ENABLE_COMPRESSION, + Importance.MEDIUM, + "When \"true\", use gRPC Gzip compression on publish requests before sending them " + + "to Cloud Pub/Sub.") + .define( + COMPRESSION_BYTES_THRESHOLD, + Type.LONG, + DEFAULT_COMPRESSION_BYTES_THRESHOLD, + Importance.MEDIUM, + "The number of bytes at which to compress a request when publishing to " + + "Cloud Pub/Sub. Only takes effect if \"enableCompression\" is \"true\".") .define( ConnectorUtils.CPS_ENDPOINT, Type.STRING, diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index 9faa791..28c74ce 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -81,6 +81,8 @@ public class CloudPubSubSinkTask extends SinkTask { private boolean includeMetadata; private boolean includeHeaders; private OrderingKeySource orderingKeySource; + private boolean enableCompression; + private long compressionBytesThreshold; private ConnectorCredentialsProvider gcpCredentialsProvider; private com.google.cloud.pubsub.v1.Publisher publisher; @@ -135,6 +137,9 @@ public void start(Map props) { orderingKeySource = OrderingKeySource.getEnum( (String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE)); + enableCompression = (Boolean) validatedProps.get(CloudPubSubSinkConnector.ENABLE_COMPRESSION); + compressionBytesThreshold = + (Long) validatedProps.get(CloudPubSubSinkConnector.COMPRESSION_BYTES_THRESHOLD); gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(validatedProps); if (publisher == null) { // Only do this if we did not use the constructor. @@ -413,6 +418,10 @@ private void createPublisher() { if (orderingKeySource != OrderingKeySource.NONE) { builder.setEnableMessageOrdering(true); } + if (enableCompression) { + builder.setEnableCompression(true); + builder.setCompressionBytesThreshold(compressionBytesThreshold); + } try { publisher = builder.build(); } catch (Exception e) {