From e5281ca2db16d14c27397f1d0e8820843b597f00 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Mon, 18 Dec 2023 12:00:56 -0800 Subject: [PATCH] Split `ClientState` and `Platform`. Signed-off-by: Yury-Fridlyand --- .../java/babushka/connectors/ClientState.java | 30 ++++++++ .../handlers/CallbackDispatcher.java | 2 +- .../connectors/handlers/ChannelHandler.java | 5 +- .../connectors/resources/Platform.java | 66 ----------------- .../resources/ThreadPoolAllocator.java | 72 +++++++++++++++++++ .../java/babushka/managers/ClientState.java | 70 ------------------ .../babushka/managers/ClientStateImpl.java | 46 ++++++++++++ .../babushka/managers/CommandManager.java | 1 + .../babushka/managers/ConnectionManager.java | 1 + 9 files changed, 154 insertions(+), 139 deletions(-) create mode 100644 java/client/src/main/java/babushka/connectors/ClientState.java create mode 100644 java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java delete mode 100644 java/client/src/main/java/babushka/managers/ClientState.java create mode 100644 java/client/src/main/java/babushka/managers/ClientStateImpl.java diff --git a/java/client/src/main/java/babushka/connectors/ClientState.java b/java/client/src/main/java/babushka/connectors/ClientState.java new file mode 100644 index 0000000000..e31ad23c3e --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/ClientState.java @@ -0,0 +1,30 @@ +package babushka.connectors; + +public class ClientState { + + private ClientState() {} + + /** + * A read only client state. It is supposed that main Client class will have instance of the state + * of this type and won't be able to change the state directly. + */ + public static interface ReadOnlyClientState { + /** Check that connection established. This doesn't validate whether it is alive. */ + boolean isConnected(); + + /** Check that connection is not yet established. */ + boolean isInitializing(); + } + + /** A client state which accepts switching to Connected or Closed states. */ + public static interface OpenableAndClosableClientState extends ClosableClientState { + /** Report connection status. */ + void connect(boolean successful); + } + + /** A client state which accepts only one way switching - to Closed state only. */ + public static interface ClosableClientState extends ReadOnlyClientState { + /** Report disconnection. */ + void disconnect(); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java index eec671bf27..ce78209bf1 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java @@ -1,6 +1,6 @@ package babushka.connectors.handlers; -import babushka.managers.ClientState; +import babushka.connectors.ClientState; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java index 7387df9205..3a41a3e20c 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -1,11 +1,12 @@ package babushka.connectors.handlers; import babushka.connectors.resources.Platform; +import babushka.connectors.resources.ThreadPoolAllocator; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.unix.DomainSocketAddress; -import java.util.OptionalInt; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import redis_request.RedisRequestOuterClass.RedisRequest; import response.ResponseOuterClass.Response; @@ -23,7 +24,7 @@ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) channel = new Bootstrap() // TODO let user specify the thread pool or pool size as an option - .group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty())) + .group(ThreadPoolAllocator.createNettyThreadPool("babushka-channel", Optional.empty())) .channel(Platform.getClientUdsNettyChannelType()) .handler(new ProtobufSocketChannelInitializer(callbackDispatcher)) .connect(new DomainSocketAddress(socketPath)) diff --git a/java/client/src/main/java/babushka/connectors/resources/Platform.java b/java/client/src/main/java/babushka/connectors/resources/Platform.java index b411f04f50..4967a9b9f0 100644 --- a/java/client/src/main/java/babushka/connectors/resources/Platform.java +++ b/java/client/src/main/java/babushka/connectors/resources/Platform.java @@ -1,18 +1,10 @@ package babushka.connectors.resources; -import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollDomainSocketChannel; -import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.kqueue.KQueue; import io.netty.channel.kqueue.KQueueDomainSocketChannel; -import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.unix.DomainSocketChannel; -import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.Map; -import java.util.OptionalInt; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Supplier; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -44,12 +36,6 @@ public static class Capabilities { private static final Capabilities capabilities = new Capabilities(isKQueueAvailable(), isEPollAvailable(), false, false); - /** - * Thread pools supplied to Netty to perform all async IO.
- * Map key is supposed to be pool name + thread count as a string concat product. - */ - private static final Map groups = new ConcurrentHashMap<>(); - /** Detect kqueue availability. */ private static boolean isKQueueAvailable() { try { @@ -70,42 +56,6 @@ private static boolean isEPollAvailable() { } } - /** - * Allocate Netty thread pool required to manage connection. A thread pool could be shared across - * multiple connections. - * - * @return A new thread pool. - */ - public static EventLoopGroup createNettyThreadPool(String prefix, OptionalInt threadLimit) { - int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); - if (capabilities.isKQueueAvailable()) { - var name = prefix + "-kqueue-elg"; - return getOrCreate( - name + threadCount, - () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); - } else if (capabilities.isEPollAvailable()) { - var name = prefix + "-epoll-elg"; - return getOrCreate( - name + threadCount, - () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); - } - // TODO support IO-Uring and NIO - - throw new RuntimeException("Current platform supports no known thread pool types"); - } - - /** - * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache. - */ - private static EventLoopGroup getOrCreate(String name, Supplier supplier) { - if (groups.containsKey(name)) { - return groups.get(name); - } - var group = supplier.get(); - groups.put(name, group); - return group; - } - /** * Get a channel class required by Netty to open a client UDS channel. * @@ -120,20 +70,4 @@ public static Class getClientUdsNettyChannelType( } throw new RuntimeException("Current platform supports no known socket types"); } - - /** - * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing - * resources. It is recommended to use a class instead of lambda to ensure that it is called.
- * See {@link Runtime#addShutdownHook}. - */ - private static class ShutdownHook implements Runnable { - @Override - public void run() { - groups.values().forEach(EventLoopGroup::shutdownGracefully); - } - } - - static { - Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook")); - } } diff --git a/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java b/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java new file mode 100644 index 0000000000..c3c1e2ef55 --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java @@ -0,0 +1,72 @@ +package babushka.connectors.resources; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** A class responsible to allocating and deallocating shared thread pools. */ +public class ThreadPoolAllocator { + + /** + * Thread pools supplied to Netty to perform all async IO.
+ * Map key is supposed to be pool name + thread count as a string concat product. + */ + private static final Map groups = new ConcurrentHashMap<>(); + + /** + * Allocate (create new or share existing) Netty thread pool required to manage connection. A + * thread pool could be shared across multiple connections. + * + * @return A new thread pool. + */ + public static EventLoopGroup createNettyThreadPool(String prefix, Optional threadLimit) { + int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); + if (Platform.getCapabilities().isKQueueAvailable()) { + var name = prefix + "-kqueue-elg"; + return getOrCreate( + name + threadCount, + () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); + } else if (Platform.getCapabilities().isEPollAvailable()) { + var name = prefix + "-epoll-elg"; + return getOrCreate( + name + threadCount, + () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); + } + // TODO support IO-Uring and NIO + + throw new RuntimeException("Current platform supports no known thread pool types"); + } + + /** + * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache. + */ + private static EventLoopGroup getOrCreate(String name, Supplier supplier) { + if (groups.containsKey(name)) { + return groups.get(name); + } + EventLoopGroup group = supplier.get(); + groups.put(name, group); + return group; + } + + /** + * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing + * resources. It is recommended to use a class instead of lambda to ensure that it is called.
+ * See {@link Runtime#addShutdownHook}. + */ + private static class ShutdownHook implements Runnable { + @Override + public void run() { + groups.values().forEach(EventLoopGroup::shutdownGracefully); + } + } + + static { + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook")); + } +} diff --git a/java/client/src/main/java/babushka/managers/ClientState.java b/java/client/src/main/java/babushka/managers/ClientState.java deleted file mode 100644 index 08dd65dd6b..0000000000 --- a/java/client/src/main/java/babushka/managers/ClientState.java +++ /dev/null @@ -1,70 +0,0 @@ -package babushka.managers; - -public class ClientState { - - private ClientState() {} - - /** - * A read only client state. It is supposed that main Client class will have instance of the state - * of this type and won't be able to change the state directly. - */ - public static interface ReadOnlyClientState { - /** Check that connection established. This doesn't validate whether it is alive. */ - boolean isConnected(); - - /** Check that connection is not yet established. */ - boolean isInitializing(); - } - - /** A client state which accepts switching to Connected or Closed states. */ - public static interface OpenableAndClosableClientState extends ClosableClientState { - /** Report connection status. */ - void connect(boolean successful); - } - - /** A client state which accepts only one way switching - to Closed state only. */ - public static interface ClosableClientState extends ReadOnlyClientState { - /** Report disconnection. */ - void disconnect(); - } - - private enum InnerStates { - INITIALIZING, - CONNECTED, - CLOSED - } - - /** - * Create an instance of {@link ReadOnlyClientState} which can be safely shipped to {@link - * CommandManager} and {@link ConnectionManager}. Only those classes are responsible to switch the - * state. - */ - public static ReadOnlyClientState create() { - return new OpenableAndClosableClientState() { - private InnerStates state = InnerStates.INITIALIZING; - - @Override - public boolean isConnected() { - return state == InnerStates.CONNECTED; - } - - @Override - public boolean isInitializing() { - return state == InnerStates.INITIALIZING; - } - - @Override - public void connect(boolean successful) { - if (state != InnerStates.INITIALIZING) { - throw new IllegalStateException(); - } - state = successful ? InnerStates.CONNECTED : InnerStates.CLOSED; - } - - @Override - public void disconnect() { - state = InnerStates.CLOSED; - } - }; - } -} diff --git a/java/client/src/main/java/babushka/managers/ClientStateImpl.java b/java/client/src/main/java/babushka/managers/ClientStateImpl.java new file mode 100644 index 0000000000..7b100c1d23 --- /dev/null +++ b/java/client/src/main/java/babushka/managers/ClientStateImpl.java @@ -0,0 +1,46 @@ +package babushka.managers; + +import babushka.connectors.ClientState; + +public class ClientStateImpl { + private enum InnerStates { + INITIALIZING, + CONNECTED, + CLOSED + } + + /** + * Create an instance of {@link ClientState.ReadOnlyClientState} which can be safely shipped to + * {@link CommandManager} and {@link ConnectionManager}. Only those classes are responsible to + * switch the state. + */ + public static ClientState.ReadOnlyClientState create() { + return new ClientState.OpenableAndClosableClientState() { + private ClientStateImpl.InnerStates state = ClientStateImpl.InnerStates.INITIALIZING; + + @Override + public boolean isConnected() { + return state == ClientStateImpl.InnerStates.CONNECTED; + } + + @Override + public boolean isInitializing() { + return state == ClientStateImpl.InnerStates.INITIALIZING; + } + + @Override + public void connect(boolean successful) { + if (state != ClientStateImpl.InnerStates.INITIALIZING) { + throw new IllegalStateException(); + } + state = + successful ? ClientStateImpl.InnerStates.CONNECTED : ClientStateImpl.InnerStates.CLOSED; + } + + @Override + public void disconnect() { + state = ClientStateImpl.InnerStates.CLOSED; + } + }; + } +} diff --git a/java/client/src/main/java/babushka/managers/CommandManager.java b/java/client/src/main/java/babushka/managers/CommandManager.java index 78390288f9..dc6e517306 100644 --- a/java/client/src/main/java/babushka/managers/CommandManager.java +++ b/java/client/src/main/java/babushka/managers/CommandManager.java @@ -1,5 +1,6 @@ package babushka.managers; +import babushka.connectors.ClientState; import babushka.connectors.handlers.ChannelHandler; import babushka.ffi.resolvers.RedisValueResolver; import babushka.models.RequestBuilder; diff --git a/java/client/src/main/java/babushka/managers/ConnectionManager.java b/java/client/src/main/java/babushka/managers/ConnectionManager.java index 78b8e9ce2b..560ac65cf9 100644 --- a/java/client/src/main/java/babushka/managers/ConnectionManager.java +++ b/java/client/src/main/java/babushka/managers/ConnectionManager.java @@ -1,5 +1,6 @@ package babushka.managers; +import babushka.connectors.ClientState; import babushka.connectors.handlers.ChannelHandler; import babushka.ffi.resolvers.RedisValueResolver; import babushka.models.RequestBuilder;