From 2549c5f244ae43d9991ea36b78c3fcc0bf4c2db5 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Thu, 6 Feb 2025 23:28:38 +0530 Subject: [PATCH 1/3] feat: Enable GZIP compression --- s2-sdk/src/main/java/s2/client/StreamClient.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/s2-sdk/src/main/java/s2/client/StreamClient.java b/s2-sdk/src/main/java/s2/client/StreamClient.java index 11929f1..d38f8c8 100644 --- a/s2-sdk/src/main/java/s2/client/StreamClient.java +++ b/s2-sdk/src/main/java/s2/client/StreamClient.java @@ -35,6 +35,8 @@ public class StreamClient extends BasinClient { /** Name of stream associated with this client. */ final String streamName; + private static final String compressionCodec = "gzip"; + private final StreamServiceFutureStub futureStub; final StreamServiceStub asyncStub; @@ -64,11 +66,13 @@ public StreamClient( this.futureStub = StreamServiceGrpc.newFutureStub(channel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) - .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)) + .withCompression(compressionCodec); this.asyncStub = StreamServiceGrpc.newStub(channel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) - .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)) + .withCompression(compressionCodec); } /** From 45799b8919dca8196db713b33efd1476fac6f106 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Sat, 8 Feb 2025 12:19:21 +0530 Subject: [PATCH 2/3] configurable option --- .../src/main/java/s2/client/StreamClient.java | 19 +++++++++++++------ s2-sdk/src/main/java/s2/config/Config.java | 14 ++++++++++++-- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/s2-sdk/src/main/java/s2/client/StreamClient.java b/s2-sdk/src/main/java/s2/client/StreamClient.java index d38f8c8..3245839 100644 --- a/s2-sdk/src/main/java/s2/client/StreamClient.java +++ b/s2-sdk/src/main/java/s2/client/StreamClient.java @@ -63,16 +63,23 @@ public StreamClient( var meta = new Metadata(); meta.put(Key.of("s2-basin", Metadata.ASCII_STRING_MARSHALLER), basin); this.streamName = streamName; - this.futureStub = + + StreamServiceFutureStub futureStub = StreamServiceGrpc.newFutureStub(channel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) - .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)) - .withCompression(compressionCodec); - this.asyncStub = + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); + StreamServiceStub asyncStub = StreamServiceGrpc.newStub(channel) .withCallCredentials(new BearerTokenCallCredentials(config.token)) - .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)) - .withCompression(compressionCodec); + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(meta)); + + if (config.compression) { + futureStub = futureStub.withCompression(compressionCodec); + asyncStub = asyncStub.withCompression(compressionCodec); + } + + this.futureStub = futureStub; + this.asyncStub = asyncStub; } /** diff --git a/s2-sdk/src/main/java/s2/config/Config.java b/s2-sdk/src/main/java/s2/config/Config.java index 78f1286..cd9e482 100644 --- a/s2-sdk/src/main/java/s2/config/Config.java +++ b/s2-sdk/src/main/java/s2/config/Config.java @@ -13,6 +13,7 @@ public class Config { public final Duration requestTimeout; public final Duration retryDelay; public final String userAgent; + public final Boolean compression; private Config( String token, @@ -22,7 +23,8 @@ private Config( Integer maxRetries, Duration requestTimeout, Duration retryDelay, - String userAgent) { + String userAgent, + Boolean compression) { this.token = token; this.appendRetryPolicy = appendRetryPolicy; this.endpoints = endpoints; @@ -31,6 +33,7 @@ private Config( this.requestTimeout = requestTimeout; this.retryDelay = retryDelay; this.userAgent = userAgent; + this.compression = compression; } public static ConfigBuilder newBuilder(String token) { @@ -46,6 +49,7 @@ public static final class ConfigBuilder { private Optional requestTimeout = Optional.empty(); private Optional retryDelay = Optional.empty(); private Optional userAgent = Optional.empty(); + private Optional compression = Optional.empty(); ConfigBuilder(String token) { this.token = token; @@ -86,6 +90,11 @@ public ConfigBuilder withUserAgent(String userAgent) { return this; } + public ConfigBuilder withCompression(Boolean compression) { + this.compression = Optional.of(compression); + return this; + } + public Config build() { validate(); return new Config( @@ -96,7 +105,8 @@ public Config build() { this.maxRetries.orElse(3), this.requestTimeout.orElse(Duration.ofSeconds(10)), this.retryDelay.orElse(Duration.ofMillis(50)), - this.userAgent.orElse("s2-sdk-java")); + this.userAgent.orElse("s2-sdk-java"), + this.compression.orElse(false)); } private void validate() { From 4ed258b0b9918d1cead4f99800bd498a7408374d Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Wed, 12 Feb 2025 14:35:53 +0530 Subject: [PATCH 3/3] alphabetic order --- s2-sdk/src/main/java/s2/config/Config.java | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/s2-sdk/src/main/java/s2/config/Config.java b/s2-sdk/src/main/java/s2/config/Config.java index cd9e482..4386b98 100644 --- a/s2-sdk/src/main/java/s2/config/Config.java +++ b/s2-sdk/src/main/java/s2/config/Config.java @@ -7,33 +7,33 @@ public class Config { public final String token; public final AppendRetryPolicy appendRetryPolicy; + public final Boolean compression; public final Endpoints endpoints; public final Integer maxAppendInflightBytes; public final Integer maxRetries; public final Duration requestTimeout; public final Duration retryDelay; public final String userAgent; - public final Boolean compression; private Config( String token, AppendRetryPolicy appendRetryPolicy, + Boolean compression, Endpoints endpoints, Integer maxAppendInflightBytes, Integer maxRetries, Duration requestTimeout, Duration retryDelay, - String userAgent, - Boolean compression) { + String userAgent) { this.token = token; this.appendRetryPolicy = appendRetryPolicy; + this.compression = compression; this.endpoints = endpoints; this.maxAppendInflightBytes = maxAppendInflightBytes; this.maxRetries = maxRetries; this.requestTimeout = requestTimeout; this.retryDelay = retryDelay; this.userAgent = userAgent; - this.compression = compression; } public static ConfigBuilder newBuilder(String token) { @@ -60,6 +60,11 @@ public ConfigBuilder withAppendRetryPolicy(AppendRetryPolicy appendRetryPolicy) return this; } + public ConfigBuilder withCompression(Boolean compression) { + this.compression = Optional.of(compression); + return this; + } + public ConfigBuilder withEndpoints(Endpoints endpoints) { this.endpoints = Optional.of(endpoints); return this; @@ -90,23 +95,18 @@ public ConfigBuilder withUserAgent(String userAgent) { return this; } - public ConfigBuilder withCompression(Boolean compression) { - this.compression = Optional.of(compression); - return this; - } - public Config build() { validate(); return new Config( this.token, this.appendRetryPolicy.orElse(AppendRetryPolicy.ALL), + this.compression.orElse(false), this.endpoints.orElse(Endpoints.forCloud(Cloud.AWS)), this.maxAppendInflightBytes.orElse(Integer.MAX_VALUE), this.maxRetries.orElse(3), this.requestTimeout.orElse(Duration.ofSeconds(10)), this.retryDelay.orElse(Duration.ofMillis(50)), - this.userAgent.orElse("s2-sdk-java"), - this.compression.orElse(false)); + this.userAgent.orElse("s2-sdk-java")); } private void validate() {