From b9a6a671ef9b2d478bc62729cefc53a17b85aa64 Mon Sep 17 00:00:00 2001 From: SanHalacogluImproving Date: Wed, 28 Feb 2024 16:07:22 -0800 Subject: [PATCH 1/4] Ensure resources complete with Closing Error when client closes. New command submissions on closed clients now also return a future with a ClosingException Set. (Consistent with Py and Node clients) --- .../glide/connectors/handlers/CallbackDispatcher.java | 9 ++++++++- .../java/glide/connectors/handlers/ChannelHandler.java | 4 ++++ .../src/main/java/glide/managers/CommandManager.java | 7 +++++++ .../glide/connection/ConnectionWithGlideMockTests.java | 7 +------ 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index dfaf01bbe7..b67b0fff32 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -129,7 +129,14 @@ public void distributeClosingException(String message) { } public void shutdownGracefully() { - responses.values().forEach(future -> future.cancel(false)); + responses + .values() + .forEach( + future -> + future.completeExceptionally( + new ClosingException( + "Operation terminated: The closing process has been initiated for the" + + " resource."))); responses.clear(); } } diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index 32637219d1..68319af51c 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -23,6 +23,10 @@ public class ChannelHandler { protected final Channel channel; protected final CallbackDispatcher callbackDispatcher; + public boolean isClosed() { + return !channel.isOpen(); + } + /** * Open a new channel for a new client and running it on the provided EventLoopGroup. * diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index 770830ce2f..253a83f317 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -109,6 +109,13 @@ public CompletableFuture submitNewCommand( */ protected CompletableFuture submitCommandToChannel( RedisRequest.Builder command, RedisExceptionCheckedFunction responseHandler) { + if (channel.isClosed()) { + var errorFuture = new CompletableFuture(); + errorFuture.completeExceptionally( + new ClosingException("Channel closed: Unable to submit command.")); + return errorFuture; + } + // write command request to channel // when complete, convert the response to our expected type T using the given responseHandler return channel diff --git a/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java b/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java index 52be39bacc..b430510834 100644 --- a/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java +++ b/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java @@ -175,12 +175,7 @@ public void rethrow_error_if_UDS_channel_closed() { var exception = assertThrows( ExecutionException.class, () -> client.customCommand(new String[0]).get(1, SECONDS)); - assertTrue(exception.getCause() instanceof RuntimeException); - - // Not a public class, can't import - assertEquals( - "io.netty.channel.StacklessClosedChannelException", - exception.getCause().getCause().getClass().getName()); + assertTrue(exception.getCause() instanceof ClosingException); } finally { // restart mock to let other tests pass if this one failed startRustCoreLibMock(null); From 611238e52b2d96537aea1348a918c97ae2bea272 Mon Sep 17 00:00:00 2001 From: SanHalacogluImproving Date: Thu, 29 Feb 2024 15:24:29 -0800 Subject: [PATCH 2/4] Added a private boolean that saves the state of channel handler. --- .../glide/connectors/handlers/CallbackDispatcher.java | 10 ++-------- .../java/glide/connectors/handlers/ChannelHandler.java | 4 +++- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index b67b0fff32..6c5e86e2d2 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -129,14 +129,8 @@ public void distributeClosingException(String message) { } public void shutdownGracefully() { - responses - .values() - .forEach( - future -> - future.completeExceptionally( - new ClosingException( - "Operation terminated: The closing process has been initiated for the" - + " resource."))); + String msg = "Operation terminated: The closing process has been initiated for the resource."; + responses.values().forEach(future -> future.completeExceptionally(new ClosingException(msg))); responses.clear(); } } diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index 68319af51c..740a17a3bd 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -22,9 +22,10 @@ public class ChannelHandler { protected final Channel channel; protected final CallbackDispatcher callbackDispatcher; + private boolean isClosed = false; public boolean isClosed() { - return !channel.isOpen(); + return this.isClosed; } /** @@ -88,6 +89,7 @@ public CompletableFuture connect(ConnectionRequest request) { /** Closes the UDS connection and frees corresponding resources. */ public ChannelFuture close() { + this.isClosed = true; callbackDispatcher.shutdownGracefully(); return channel.close(); } From 0230151c7345c3c28fc6fb9005a4f8717fde8702 Mon Sep 17 00:00:00 2001 From: SanHalacogluImproving Date: Fri, 1 Mar 2024 12:35:48 -0800 Subject: [PATCH 3/4] Changed Boolean to Atomic Boolean added IT tests and modified UT tests. --- .../connectors/handlers/ChannelHandler.java | 7 ++-- .../ConnectionWithGlideMockTests.java | 3 +- .../glide/managers/CommandManagerTest.java | 8 +++++ .../test/java/glide/cluster/ClientTests.java | 33 +++++++++++++++++++ .../java/glide/standalone/ClientTests.java | 32 ++++++++++++++++++ 5 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 java/integTest/src/test/java/glide/cluster/ClientTests.java create mode 100644 java/integTest/src/test/java/glide/standalone/ClientTests.java diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index 740a17a3bd..4800316803 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -9,6 +9,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.unix.DomainSocketAddress; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.NonNull; import lombok.RequiredArgsConstructor; import redis_request.RedisRequestOuterClass.RedisRequest; @@ -22,10 +23,10 @@ public class ChannelHandler { protected final Channel channel; protected final CallbackDispatcher callbackDispatcher; - private boolean isClosed = false; + private AtomicBoolean isClosed = new AtomicBoolean(false); public boolean isClosed() { - return this.isClosed; + return this.isClosed.get() || !this.channel.isOpen(); } /** @@ -89,7 +90,7 @@ public CompletableFuture connect(ConnectionRequest request) { /** Closes the UDS connection and frees corresponding resources. */ public ChannelFuture close() { - this.isClosed = true; + this.isClosed.set(true); callbackDispatcher.shutdownGracefully(); return channel.close(); } diff --git a/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java b/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java index b430510834..331af6fa39 100644 --- a/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java +++ b/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java @@ -173,8 +173,7 @@ public void rethrow_error_if_UDS_channel_closed() { stopRustCoreLibMock(); try { var exception = - assertThrows( - ExecutionException.class, () -> client.customCommand(new String[0]).get(1, SECONDS)); + assertThrows(ExecutionException.class, () -> client.customCommand(new String[0]).get()); assertTrue(exception.getCause() instanceof ClosingException); } finally { // restart mock to let other tests pass if this one failed diff --git a/java/client/src/test/java/glide/managers/CommandManagerTest.java b/java/client/src/test/java/glide/managers/CommandManagerTest.java index 0f7f539ebb..a64c6499ad 100644 --- a/java/client/src/test/java/glide/managers/CommandManagerTest.java +++ b/java/client/src/test/java/glide/managers/CommandManagerTest.java @@ -60,6 +60,7 @@ public void submitNewCommand_return_Object_result() { CompletableFuture future = new CompletableFuture<>(); future.complete(respPointerResponse); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); // exercise CompletableFuture result = @@ -81,6 +82,7 @@ public void submitNewCommand_return_Null_result() { CompletableFuture future = new CompletableFuture<>(); future.complete(respPointerResponse); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); // exercise CompletableFuture result = @@ -107,6 +109,7 @@ public void submitNewCommand_return_String_result() { CompletableFuture future = new CompletableFuture<>(); future.complete(respPointerResponse); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); // exercise CompletableFuture result = @@ -126,6 +129,7 @@ public void submitNewCommand_return_String_result() { public void prepare_request_with_simple_routes(SimpleRoute routeType) { CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); @@ -156,6 +160,7 @@ public void prepare_request_with_simple_routes(SimpleRoute routeType) { public void prepare_request_with_slot_id_routes(SlotType slotType) { CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); @@ -188,6 +193,7 @@ public void prepare_request_with_slot_id_routes(SlotType slotType) { public void prepare_request_with_slot_key_routes(SlotType slotType) { CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); @@ -239,6 +245,7 @@ public void submitNewCommand_with_Transaction_sends_protobuf_request() { CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); @@ -279,6 +286,7 @@ public void submitNewCommand_with_ClusterTransaction_with_route_sends_protobuf_r CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); diff --git a/java/integTest/src/test/java/glide/cluster/ClientTests.java b/java/integTest/src/test/java/glide/cluster/ClientTests.java new file mode 100644 index 0000000000..ffebe09c45 --- /dev/null +++ b/java/integTest/src/test/java/glide/cluster/ClientTests.java @@ -0,0 +1,33 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.cluster; + +import static glide.TestConfiguration.CLUSTER_PORTS; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import glide.api.RedisClusterClient; +import glide.api.models.configuration.NodeAddress; +import glide.api.models.configuration.RedisClusterClientConfiguration; +import glide.api.models.exceptions.ClosingException; +import java.util.concurrent.ExecutionException; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +public class ClientTests { + @Test + @SneakyThrows + public void close_client_throws_ExecutionException_with_ClosingException_cause() { + RedisClusterClient client = + RedisClusterClient.CreateClient( + RedisClusterClientConfiguration.builder() + .address(NodeAddress.builder().port(CLUSTER_PORTS[0]).build()) + .requestTimeout(5000) + .build()) + .get(); + + client.close(); + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.set("foo", "bar")); + assertTrue(executionException.getCause() instanceof ClosingException); + } +} diff --git a/java/integTest/src/test/java/glide/standalone/ClientTests.java b/java/integTest/src/test/java/glide/standalone/ClientTests.java new file mode 100644 index 0000000000..db248b2bce --- /dev/null +++ b/java/integTest/src/test/java/glide/standalone/ClientTests.java @@ -0,0 +1,32 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.standalone; + +import static glide.TestConfiguration.STANDALONE_PORTS; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import glide.api.RedisClient; +import glide.api.models.configuration.NodeAddress; +import glide.api.models.configuration.RedisClientConfiguration; +import glide.api.models.exceptions.ClosingException; +import java.util.concurrent.ExecutionException; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +public class ClientTests { + @Test + @SneakyThrows + public void close_client_throws_ExecutionException_with_ClosingException_cause() { + RedisClient client = + RedisClient.CreateClient( + RedisClientConfiguration.builder() + .address(NodeAddress.builder().port(STANDALONE_PORTS[0]).build()) + .build()) + .get(); + + client.close(); + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.set("key", "value").get()); + assertTrue(executionException.getCause() instanceof ClosingException); + } +} From e60574a8be3e5b72d7d65f10d9d716c1d5edf7e9 Mon Sep 17 00:00:00 2001 From: SanHalacogluImproving Date: Fri, 1 Mar 2024 15:15:16 -0800 Subject: [PATCH 4/4] Fix IT --- java/integTest/src/test/java/glide/cluster/ClientTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/java/integTest/src/test/java/glide/cluster/ClientTests.java b/java/integTest/src/test/java/glide/cluster/ClientTests.java index ffebe09c45..3ad92f59ce 100644 --- a/java/integTest/src/test/java/glide/cluster/ClientTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClientTests.java @@ -21,13 +21,12 @@ public void close_client_throws_ExecutionException_with_ClosingException_cause() RedisClusterClient.CreateClient( RedisClusterClientConfiguration.builder() .address(NodeAddress.builder().port(CLUSTER_PORTS[0]).build()) - .requestTimeout(5000) .build()) .get(); client.close(); ExecutionException executionException = - assertThrows(ExecutionException.class, () -> client.set("foo", "bar")); + assertThrows(ExecutionException.class, () -> client.set("foo", "bar").get()); assertTrue(executionException.getCause() instanceof ClosingException); } }