Skip to content

Commit

Permalink
feat: Add the ability to set gRPC compression in the CPS sink connect…
Browse files Browse the repository at this point in the history
…or. (#259)

* feat: Add the ability to set gRPC compression in the CPS sink connector.

* Add units to docs
  • Loading branch information
kamalaboulhosn authored May 10, 2023
1 parent b7eea57 commit ad1e72b
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ configurations:
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
| orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
| messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress.

### Pub/Sub Lite connector configs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class CloudPubSubSinkConnector extends SinkConnector {
public static final String MAX_REQUEST_TIMEOUT_MS = "maxRequestTimeoutMs";
public static final String MAX_TOTAL_TIMEOUT_MS = "maxTotalTimeoutMs";
public static final String MAX_SHUTDOWN_TIMEOUT_MS = "maxShutdownTimeoutMs";
public static final String ENABLE_COMPRESSION = "enableCompression";
public static final String COMPRESSION_BYTES_THRESHOLD = "compressionBytesThreshold";
public static final int DEFAULT_MAX_BUFFER_SIZE = 100;
public static final long DEFAULT_MAX_BUFFER_BYTES = 9500000L;
public static final int DEFAULT_DELAY_THRESHOLD_MS = 100;
Expand All @@ -61,6 +63,8 @@ public class CloudPubSubSinkConnector extends SinkConnector {
public static final String PUBLISH_KAFKA_HEADERS = "headers.publish";
public static final String ORDERING_KEY_SOURCE = "orderingKeySource";
public static final String DEFAULT_ORDERING_KEY_SOURCE = "none";
public static final boolean DEFAULT_ENABLE_COMPRESSION = false;
public static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L;

/** Defines the accepted values for the {@link #ORDERING_KEY_SOURCE}. */
public enum OrderingKeySource {
Expand Down Expand Up @@ -250,6 +254,20 @@ public ConfigDef config() {
Importance.MEDIUM,
"What to use to populate the Pub/Sub message ordering key. Possible values are "
+ "\"none\", \"key\", or \"partition\".")
.define(
ENABLE_COMPRESSION,
Type.BOOLEAN,
DEFAULT_ENABLE_COMPRESSION,
Importance.MEDIUM,
"When \"true\", use gRPC Gzip compression on publish requests before sending them "
+ "to Cloud Pub/Sub.")
.define(
COMPRESSION_BYTES_THRESHOLD,
Type.LONG,
DEFAULT_COMPRESSION_BYTES_THRESHOLD,
Importance.MEDIUM,
"The number of bytes at which to compress a request when publishing to "
+ "Cloud Pub/Sub. Only takes effect if \"enableCompression\" is \"true\".")
.define(
ConnectorUtils.CPS_ENDPOINT,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class CloudPubSubSinkTask extends SinkTask {
private boolean includeMetadata;
private boolean includeHeaders;
private OrderingKeySource orderingKeySource;
private boolean enableCompression;
private long compressionBytesThreshold;
private ConnectorCredentialsProvider gcpCredentialsProvider;
private com.google.cloud.pubsub.v1.Publisher publisher;

Expand Down Expand Up @@ -135,6 +137,9 @@ public void start(Map<String, String> props) {
orderingKeySource =
OrderingKeySource.getEnum(
(String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE));
enableCompression = (Boolean) validatedProps.get(CloudPubSubSinkConnector.ENABLE_COMPRESSION);
compressionBytesThreshold =
(Long) validatedProps.get(CloudPubSubSinkConnector.COMPRESSION_BYTES_THRESHOLD);
gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(validatedProps);
if (publisher == null) {
// Only do this if we did not use the constructor.
Expand Down Expand Up @@ -413,6 +418,10 @@ private void createPublisher() {
if (orderingKeySource != OrderingKeySource.NONE) {
builder.setEnableMessageOrdering(true);
}
if (enableCompression) {
builder.setEnableCompression(true);
builder.setCompressionBytesThreshold(compressionBytesThreshold);
}
try {
publisher = builder.build();
} catch (Exception e) {
Expand Down

0 comments on commit ad1e72b

Please sign in to comment.