Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor channel closure handling #118

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void distributeClosingException(String message) {
}

public void shutdownGracefully() {
responses.values().forEach(future -> future.cancel(false));
String msg = "Operation terminated: The closing process has been initiated for the resource.";
responses.values().forEach(future -> future.completeExceptionally(new ClosingException(msg)));
responses.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.unix.DomainSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass.RedisRequest;
Expand All @@ -22,6 +23,11 @@ public class ChannelHandler {

protected final Channel channel;
protected final CallbackDispatcher callbackDispatcher;
private AtomicBoolean isClosed = new AtomicBoolean(false);

public boolean isClosed() {
return this.isClosed.get() || !this.channel.isOpen();
}

/**
* Open a new channel for a new client and running it on the provided EventLoopGroup.
Expand Down Expand Up @@ -84,6 +90,7 @@ public CompletableFuture<Response> connect(ConnectionRequest request) {

/** Closes the UDS connection and frees corresponding resources. */
public ChannelFuture close() {
this.isClosed.set(true);
callbackDispatcher.shutdownGracefully();
return channel.close();
}
Expand Down
7 changes: 7 additions & 0 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ public <T> CompletableFuture<T> submitNewCommand(
*/
protected <T> CompletableFuture<T> submitCommandToChannel(
RedisRequest.Builder command, RedisExceptionCheckedFunction<Response, T> responseHandler) {
if (channel.isClosed()) {
var errorFuture = new CompletableFuture<T>();
errorFuture.completeExceptionally(
new ClosingException("Channel closed: Unable to submit command."));
return errorFuture;
}

// write command request to channel
// when complete, convert the response to our expected type T using the given responseHandler
return channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,8 @@ public void rethrow_error_if_UDS_channel_closed() {
stopRustCoreLibMock();
try {
var exception =
assertThrows(
ExecutionException.class, () -> client.customCommand(new String[0]).get(1, SECONDS));
assertTrue(exception.getCause() instanceof RuntimeException);

// Not a public class, can't import
assertEquals(
"io.netty.channel.StacklessClosedChannelException",
exception.getCause().getCause().getClass().getName());
assertThrows(ExecutionException.class, () -> client.customCommand(new String[0]).get());
assertTrue(exception.getCause() instanceof ClosingException);
} finally {
// restart mock to let other tests pass if this one failed
startRustCoreLibMock(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void submitNewCommand_return_Object_result() {
CompletableFuture<Response> future = new CompletableFuture<>();
future.complete(respPointerResponse);
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

// exercise
CompletableFuture<Object> result =
Expand All @@ -81,6 +82,7 @@ public void submitNewCommand_return_Null_result() {
CompletableFuture<Response> future = new CompletableFuture<>();
future.complete(respPointerResponse);
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

// exercise
CompletableFuture<Object> result =
Expand All @@ -107,6 +109,7 @@ public void submitNewCommand_return_String_result() {
CompletableFuture<Response> future = new CompletableFuture<>();
future.complete(respPointerResponse);
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

// exercise
CompletableFuture<Object> result =
Expand All @@ -126,6 +129,7 @@ public void submitNewCommand_return_String_result() {
public void prepare_request_with_simple_routes(SimpleRoute routeType) {
CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down Expand Up @@ -156,6 +160,7 @@ public void prepare_request_with_simple_routes(SimpleRoute routeType) {
public void prepare_request_with_slot_id_routes(SlotType slotType) {
CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down Expand Up @@ -188,6 +193,7 @@ public void prepare_request_with_slot_id_routes(SlotType slotType) {
public void prepare_request_with_slot_key_routes(SlotType slotType) {
CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down Expand Up @@ -239,6 +245,7 @@ public void submitNewCommand_with_Transaction_sends_protobuf_request() {

CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down Expand Up @@ -279,6 +286,7 @@ public void submitNewCommand_with_ClusterTransaction_with_route_sends_protobuf_r

CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down
33 changes: 33 additions & 0 deletions java/integTest/src/test/java/glide/cluster/ClientTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.cluster;

import static glide.TestConfiguration.CLUSTER_PORTS;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import glide.api.RedisClusterClient;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.exceptions.ClosingException;
import java.util.concurrent.ExecutionException;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;

public class ClientTests {
@Test
@SneakyThrows
public void close_client_throws_ExecutionException_with_ClosingException_cause() {
RedisClusterClient client =
RedisClusterClient.CreateClient(
RedisClusterClientConfiguration.builder()
.address(NodeAddress.builder().port(CLUSTER_PORTS[0]).build())
.requestTimeout(5000)
.build())
.get();

client.close();
ExecutionException executionException =
assertThrows(ExecutionException.class, () -> client.set("foo", "bar"));
assertTrue(executionException.getCause() instanceof ClosingException);
}
}
32 changes: 32 additions & 0 deletions java/integTest/src/test/java/glide/standalone/ClientTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.standalone;

import static glide.TestConfiguration.STANDALONE_PORTS;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import glide.api.RedisClient;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.api.models.exceptions.ClosingException;
import java.util.concurrent.ExecutionException;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;

public class ClientTests {
@Test
@SneakyThrows
public void close_client_throws_ExecutionException_with_ClosingException_cause() {
RedisClient client =
RedisClient.CreateClient(
RedisClientConfiguration.builder()
.address(NodeAddress.builder().port(STANDALONE_PORTS[0]).build())
.build())
.get();

client.close();
ExecutionException executionException =
assertThrows(ExecutionException.class, () -> client.set("key", "value").get());
assertTrue(executionException.getCause() instanceof ClosingException);
}
}
Loading