Skip to content

Commit

Permalink
fix: prevent long stall during reconnection/disconnection by providin…
Browse files Browse the repository at this point in the history
…g LinkedHashSetQueue instead of ArrayDeque for CommandHandler#stack
  • Loading branch information
okg-cxf committed Aug 1, 2024
1 parent dee8020 commit d300391
Show file tree
Hide file tree
Showing 4 changed files with 564 additions and 9 deletions.
39 changes: 34 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class ClientOptions implements Serializable {

public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create();

public static final boolean DEFAULT_USE_HASH_INDEX_QUEUE = false;

private final boolean autoReconnect;

private final boolean cancelCommandsOnReconnectFailure;
Expand Down Expand Up @@ -97,6 +99,8 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;

private final boolean useHashIndexedQueue;

protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
Expand All @@ -112,6 +116,7 @@ protected ClientOptions(Builder builder) {
this.sslOptions = builder.sslOptions;
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
this.timeoutOptions = builder.timeoutOptions;
this.useHashIndexedQueue = builder.useHashIndexedQueue;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -129,6 +134,7 @@ protected ClientOptions(ClientOptions original) {
this.sslOptions = original.getSslOptions();
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
this.timeoutOptions = original.getTimeoutOptions();
this.useHashIndexedQueue = original.isUseHashIndexedQueue();
}

/**
Expand Down Expand Up @@ -192,6 +198,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;

protected Builder() {
}

Expand Down Expand Up @@ -247,8 +255,8 @@ public Builder bufferUsageRatio(int bufferUsageRatio) {
*
* @param policy the policy to use in {@link io.lettuce.core.protocol.CommandHandler}
* @return {@code this}
* @since 6.0
* @see DecodeBufferPolicies
* @since 6.0
*/
public Builder decodeBufferPolicy(DecodeBufferPolicy policy) {

Expand Down Expand Up @@ -295,8 +303,8 @@ public Builder pingBeforeActivateConnection(boolean pingBeforeActivateConnection
*
* @param protocolVersion version to use.
* @return {@code this}
* @since 6.0
* @see ProtocolVersion#newestSupported()
* @since 6.0
*/
public Builder protocolVersion(ProtocolVersion protocolVersion) {

Expand All @@ -315,9 +323,9 @@ public Builder protocolVersion(ProtocolVersion protocolVersion) {
*
* @param publishOnScheduler true/false
* @return {@code this}
* @since 5.2
* @see org.reactivestreams.Subscriber#onNext(Object)
* @see ClientResources#eventExecutorGroup()
* @since 5.2
*/
public Builder publishOnScheduler(boolean publishOnScheduler) {
this.publishOnScheduler = publishOnScheduler;
Expand Down Expand Up @@ -422,6 +430,20 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Use hash indexed queue which provides O(1) remove(Object) thus won't cause blocking issues.
*
* @param useHashIndexedQueue true/false
* @return {@code this}
* @see io.lettuce.core.protocol.CommandHandler.AddToStack
* @since 5.1
*/
@SuppressWarnings("JavadocReference")
public Builder useHashIndexQueue(boolean useHashIndexedQueue) {
this.useHashIndexedQueue = useHashIndexedQueue;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -439,7 +461,6 @@ public ClientOptions build() {
*
* @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are replicated from the
* current {@link ClientOptions}.
*
* @since 5.1
*/
public ClientOptions.Builder mutate() {
Expand Down Expand Up @@ -498,7 +519,6 @@ public DecodeBufferPolicy getDecodeBufferPolicy() {
*
* @return zero.
* @since 5.2
*
* @deprecated since 6.0 in favor of {@link DecodeBufferPolicy}.
*/
@Deprecated
Expand Down Expand Up @@ -637,6 +657,15 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
*
* @return if hash indexed queue should be used
*/
public boolean isUseHashIndexedQueue() {
return useHashIndexedQueue;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
Loading

0 comments on commit d300391

Please sign in to comment.