From 12dcaba84cd5a848529593884d49a7aafc86d4c9 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 14 Dec 2023 20:37:57 -0800 Subject: [PATCH] Rename `CallbackManager` to `CallbackDispatcher`. Signed-off-by: Yury-Fridlyand --- .../handlers/CallbackDispatcher.java} | 17 +++++++++-------- .../connectors/handlers/ChannelHandler.java | 17 ++++++++--------- .../ProtobufSocketChannelInitializer.java | 5 ++--- .../connectors/handlers/ReadHandler.java | 7 +++---- 4 files changed, 22 insertions(+), 24 deletions(-) rename java/client/src/main/java/babushka/{managers/CallbackManager.java => connectors/handlers/CallbackDispatcher.java} (82%) diff --git a/java/client/src/main/java/babushka/managers/CallbackManager.java b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java similarity index 82% rename from java/client/src/main/java/babushka/managers/CallbackManager.java rename to java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java index 34d95d607c..5ebaa03969 100644 --- a/java/client/src/main/java/babushka/managers/CallbackManager.java +++ b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java @@ -1,16 +1,14 @@ -package babushka.managers; +package babushka.connectors.handlers; -import babushka.connectors.handlers.ReadHandler; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import lombok.Getter; import org.apache.commons.lang3.tuple.Pair; import response.ResponseOuterClass.Response; /** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */ -public class CallbackManager { +public class CallbackDispatcher { /** Unique request ID (callback ID). Thread-safe. */ private final AtomicInteger requestId = new AtomicInteger(0); @@ -25,7 +23,7 @@ public class CallbackManager { * requests can't be stored in the same storage, because callback ID = 0 is hardcoded for * connection requests. */ - @Getter private final CompletableFuture connectionPromise = new CompletableFuture<>(); + private final CompletableFuture connectionPromise = new CompletableFuture<>(); /** * Register a new request to be sent. Once response received, the given future completes with it. @@ -40,6 +38,10 @@ public Pair> registerRequest() { return Pair.of(callbackId, future); } + public CompletableFuture registerConnection() { + return connectionPromise; + } + /** * Complete the corresponding client promise and free resources. * @@ -56,9 +58,8 @@ public void completeRequest(Response response) { } public void shutdownGracefully() { - connectionPromise.completeExceptionally(new InterruptedException()); - responses.forEach( - (callbackId, future) -> future.completeExceptionally(new InterruptedException())); + connectionPromise.cancel(false); + responses.values().forEach(future -> future.cancel(false)); responses.clear(); } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java index 80f04839c0..adfdcbbbcc 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -1,7 +1,6 @@ package babushka.connectors.handlers; import babushka.connectors.resources.Platform; -import babushka.managers.CallbackManager; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -14,29 +13,29 @@ /** * Class responsible for manipulations with Netty's {@link Channel}.
- * Uses a {@link CallbackManager} to record callbacks of every request sent. + * Uses a {@link CallbackDispatcher} to record callbacks of every request sent. */ public class ChannelHandler { private final Channel channel; - private final CallbackManager callbackManager; + private final CallbackDispatcher callbackDispatcher; /** Open a new channel for a new client. */ - public ChannelHandler(CallbackManager callbackManager, String socketPath) { + public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) { channel = new Bootstrap() // TODO let user specify the thread pool or pool size as an option .group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty())) .channel(Platform.getClientUdsNettyChannelType()) - .handler(new ProtobufSocketChannelInitializer(callbackManager)) + .handler(new ProtobufSocketChannelInitializer(callbackDispatcher)) .connect(new DomainSocketAddress(socketPath)) // TODO call here .sync() if needed or remove this comment .channel(); - this.callbackManager = callbackManager; + this.callbackDispatcher = callbackDispatcher; } /** Write a protobuf message to the socket. */ public CompletableFuture write(RedisRequest.Builder request, boolean flush) { - var commandId = callbackManager.registerRequest(); + var commandId = callbackDispatcher.registerRequest(); request.setCallbackIdx(commandId.getKey()); if (flush) { @@ -50,7 +49,7 @@ public CompletableFuture write(RedisRequest.Builder request, boolean f /** Write a protobuf message to the socket. */ public CompletableFuture connect(ConnectionRequest request) { channel.writeAndFlush(request); - return callbackManager.getConnectionPromise(); + return callbackDispatcher.registerConnection(); } private final AtomicBoolean closed = new AtomicBoolean(false); @@ -59,7 +58,7 @@ public CompletableFuture connect(ConnectionRequest request) { public void close() { if (closed.compareAndSet(false, true)) { channel.close(); - callbackManager.shutdownGracefully(); + callbackDispatcher.shutdownGracefully(); } } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java b/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java index eb37b221cc..06c4c03f02 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java @@ -1,6 +1,5 @@ package babushka.connectors.handlers; -import babushka.managers.CallbackManager; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.unix.UnixChannel; @@ -16,7 +15,7 @@ @RequiredArgsConstructor public class ProtobufSocketChannelInitializer extends ChannelInitializer { - private final CallbackManager callbackManager; + private final CallbackDispatcher callbackDispatcher; @Override public void initChannel(@NonNull UnixChannel ch) { @@ -26,7 +25,7 @@ public void initChannel(@NonNull UnixChannel ch) { .addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()) .addLast("protobufDecoder", new ProtobufDecoder(Response.getDefaultInstance())) .addLast("protobufEncoder", new ProtobufEncoder()) - .addLast(new ReadHandler(callbackManager)) + .addLast(new ReadHandler(callbackDispatcher)) .addLast(new ChannelOutboundHandlerAdapter()); } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java index 711119aca8..63aedf001e 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java @@ -1,6 +1,5 @@ package babushka.connectors.handlers; -import babushka.managers.CallbackManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.NonNull; @@ -11,12 +10,12 @@ @RequiredArgsConstructor public class ReadHandler extends ChannelInboundHandlerAdapter { - private final CallbackManager callbackManager; + private final CallbackDispatcher callbackDispatcher; - /** Submit responses from babushka to an instance {@link CallbackManager} to handle them. */ + /** Submit responses from babushka to an instance {@link CallbackDispatcher} to handle them. */ @Override public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) { - callbackManager.completeRequest((Response) msg); + callbackDispatcher.completeRequest((Response) msg); } /** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */