Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub)!: set default max ack extension period to 60 minutes #3501

Merged
merged 12 commits into from
Feb 4, 2025
2 changes: 1 addition & 1 deletion docs/src/main/asciidoc/_configprops.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
|spring.cloud.gcp.pubsub.subscriber.flow-control.limit-exceeded-behavior | | The behavior when the specified limits are exceeded.
|spring.cloud.gcp.pubsub.subscriber.flow-control.max-outstanding-element-count | | Maximum number of outstanding elements to keep in memory before enforcing flow control.
|spring.cloud.gcp.pubsub.subscriber.flow-control.max-outstanding-request-bytes | | Maximum number of outstanding bytes to keep in memory before enforcing flow control.
|spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period | 0 | The optional max ack extension period in seconds for the subscriber factory.
|spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period | Library default (60 minutes) | The optional max ack extension period in seconds for the subscriber factory.
|spring.cloud.gcp.pubsub.subscriber.max-acknowledgement-threads | 4 | Number of threads used for batch acknowledgement.
|spring.cloud.gcp.pubsub.subscriber.parallel-pull-count | | The optional parallel pull count setting for the subscriber factory.
|spring.cloud.gcp.pubsub.subscriber.pull-endpoint | | The optional pull endpoint setting for the subscriber factory.
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ However, if a per-subscription configuration is not set then the global or defau
|===
| Name | Description | Required | Default value
| `spring.cloud.gcp.pubsub.subscriber.parallel-pull-count` | The number of pull workers | No | 1
| `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds | No | Library default (60 minutes)
| `spring.cloud.gcp.pubsub.subscriber.min-duration-per-ack-extension` | The lower bound for a single mod ack extension period, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscriber.max-duration-per-ack-extension` | The upper bound for a single mod ack extension period, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscriber.pull-endpoint` | The endpoint for pulling messages | No | pubsub.googleapis.com:443
Expand Down Expand Up @@ -111,7 +111,7 @@ When true, replicates the default behavior before Spring 6.1.x. | No | false
| Name | Description | Required | Default value
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].fully-qualified-name` | The fully-qualified subscription name in the `projects/[PROJECT]/subscriptions/[SUBSCRIPTION]` format. When this property is present, the `[subscription-name]` key does not have to match any actual resources; it's used only for logical grouping. | No | 1
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].parallel-pull-count` | The number of pull workers. | No | 1
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds. | No | 0
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds. | No | Library default (60 minutes)
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].min-duration-per-ack-extension` | The lower bound for a single mod ack extension period, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-duration-per-ack-extension` | The upper bound for a single mod ack extension period, in seconds | No | 0
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].pull-endpoint` | The endpoint for pulling messages. | No | pubsub.googleapis.com:443
Expand Down
5 changes: 3 additions & 2 deletions docs/src/main/asciidoc/spring-integration-pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ When working with Cloud Pub/Sub, it is important to understand the concept of `a
Each subscription has a default `ackDeadline` applied to all messages sent to it.
Additionally, the Cloud Pub/Sub client library can extend each streamed message's `ackDeadline` until the message processing completes, fails or until the maximum extension period elapses.

NOTE: In the Pub/Sub client library, default maximum extension period is an hour. However, Spring Framework on Google Cloud disables this auto-extension behavior.
Use the `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` property to re-enable it.
NOTE: In the Pub/Sub client library, default maximum extension period is an hour.
The Spring integration delegates the default value resolution to the underlying client library.
If you wish to use a different value, use the `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` property.

Acknowledging (acking) a message removes it from Pub/Sub's known outstanding messages. Nacking a message resets its acknowledgement deadline to 0, forcing immediate redelivery.
This could be useful in a load balanced architecture, where one of the subscribers is having issues but others are available to process messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ void pullConfig_defaultConfigurationSet() {
assertThat(
gcpPubSubProperties.computeMaxAckExtensionPeriod(
"subscription-name", projectIdProvider.getProjectId()))
.isZero();
.isNull();
assertThat(
gcpPubSubProperties.computeMinDurationPerAckExtension(
"subscription-name", projectIdProvider.getProjectId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public class PubSubConfiguration {
/** Default number of executor threads. */
public static final int DEFAULT_EXECUTOR_THREADS = 4;

private static final Long DEFAULT_MAX_ACK_EXTENSION_PERIOD = 0L;

/**
* Automatically extracted user-provided properties. Contains only short subscription keys
* user-provided properties, therefore do not use except in initialize().
Expand Down Expand Up @@ -225,10 +223,7 @@ public Long computeMaxAckExtensionPeriod(String subscriptionName, String project
if (maxAckExtensionPeriod != null) {
return maxAckExtensionPeriod;
}
Long globalMaxAckExtensionPeriod = this.globalSubscriber.getMaxAckExtensionPeriod();
return globalMaxAckExtensionPeriod != null
? globalMaxAckExtensionPeriod
: DEFAULT_MAX_ACK_EXTENSION_PERIOD;
return this.globalSubscriber.getMaxAckExtensionPeriod();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,11 @@ Duration getMaxAckExtensionPeriod(String subscriptionName) {
if (this.maxAckExtensionPeriod != null) {
return this.maxAckExtensionPeriod;
}
return Duration.ofSeconds(
this.pubSubConfiguration.computeMaxAckExtensionPeriod(subscriptionName, projectId));
Long maxAckExtensionPeriod = this.pubSubConfiguration.computeMaxAckExtensionPeriod(subscriptionName, projectId);
if (maxAckExtensionPeriod != null) {
return Duration.ofSeconds(maxAckExtensionPeriod);
}
return null;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void testComputeMaxAckExtensionPeriod_returnDefault() {
Long result =
pubSubConfiguration.computeMaxAckExtensionPeriod("subscription-name", "projectId");

assertThat(result).isZero();
assertThat(result).isNull();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ void testNewSubscriber() {
.isEqualTo("projects/angeldust/subscriptions/midnight cowboy");
}

@Test
void testNewSubscriber_noMaxAckExtensionPeriodSet_usesClientDefault()
throws NoSuchFieldException, IllegalAccessException {
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(() -> "sealion", pubSubConfig);
String subscriptionName = "deadbeef";

// By default, we build a subscriber with null max ack extension period
Duration maxAckExtensionPeriodArgument = factory.getMaxAckExtensionPeriod(subscriptionName);
assertThat(maxAckExtensionPeriodArgument).isNull();

// Now we check that the default in google-cloud-pubsub was used
Subscriber subscriber = factory.createSubscriber(subscriptionName, (message, consumer) -> {});
java.time.Duration effectiveMaxAckExtensionPeriod = (java.time.Duration) FieldUtils.readField(subscriber, "maxAckExtensionPeriod", true);
java.time.Duration clientDefaultMaxAckExtensionPeriod = (java.time.Duration) FieldUtils.readField(subscriber, "DEFAULT_MAX_ACK_EXTENSION_PERIOD", true);
assertThat(effectiveMaxAckExtensionPeriod).isEqualTo(clientDefaultMaxAckExtensionPeriod);
}

@Test
void testNewSubscriber_constructorWithPubSubConfiguration() {
GcpProjectIdProvider projectIdProvider = () -> "angeldust";
Expand Down Expand Up @@ -534,7 +551,7 @@ void testGetMaxAckExtensionPeriod_newConfiguration() {
new DefaultSubscriberFactory(projectIdProvider, this.pubSubConfig);

assertThat(factory.getMaxAckExtensionPeriod("subscription-name"))
.isEqualTo(Duration.ofSeconds(0L));
.isNull();
}

@Test
Expand Down
Loading