diff --git a/java/client/src/main/java/babushka/connectors/ClientState.java b/java/client/src/main/java/babushka/connectors/ClientState.java
new file mode 100644
index 0000000000..e31ad23c3e
--- /dev/null
+++ b/java/client/src/main/java/babushka/connectors/ClientState.java
@@ -0,0 +1,30 @@
+package babushka.connectors;
+
+public class ClientState {
+
+ private ClientState() {}
+
+ /**
+ * A read only client state. It is supposed that main Client class will have instance of the state
+ * of this type and won't be able to change the state directly.
+ */
+ public static interface ReadOnlyClientState {
+ /** Check that connection established. This doesn't validate whether it is alive. */
+ boolean isConnected();
+
+ /** Check that connection is not yet established. */
+ boolean isInitializing();
+ }
+
+ /** A client state which accepts switching to Connected or Closed states. */
+ public static interface OpenableAndClosableClientState extends ClosableClientState {
+ /** Report connection status. */
+ void connect(boolean successful);
+ }
+
+ /** A client state which accepts only one way switching - to Closed state only. */
+ public static interface ClosableClientState extends ReadOnlyClientState {
+ /** Report disconnection. */
+ void disconnect();
+ }
+}
diff --git a/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java
index eec671bf27..ce78209bf1 100644
--- a/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java
+++ b/java/client/src/main/java/babushka/connectors/handlers/CallbackDispatcher.java
@@ -1,6 +1,6 @@
package babushka.connectors.handlers;
-import babushka.managers.ClientState;
+import babushka.connectors.ClientState;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java
index 7387df9205..3a41a3e20c 100644
--- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java
+++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java
@@ -1,11 +1,12 @@
package babushka.connectors.handlers;
import babushka.connectors.resources.Platform;
+import babushka.connectors.resources.ThreadPoolAllocator;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.unix.DomainSocketAddress;
-import java.util.OptionalInt;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;
@@ -23,7 +24,7 @@ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath)
channel =
new Bootstrap()
// TODO let user specify the thread pool or pool size as an option
- .group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty()))
+ .group(ThreadPoolAllocator.createNettyThreadPool("babushka-channel", Optional.empty()))
.channel(Platform.getClientUdsNettyChannelType())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
diff --git a/java/client/src/main/java/babushka/connectors/resources/Platform.java b/java/client/src/main/java/babushka/connectors/resources/Platform.java
index b411f04f50..4967a9b9f0 100644
--- a/java/client/src/main/java/babushka/connectors/resources/Platform.java
+++ b/java/client/src/main/java/babushka/connectors/resources/Platform.java
@@ -1,18 +1,10 @@
package babushka.connectors.resources;
-import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.unix.DomainSocketChannel;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.Map;
-import java.util.OptionalInt;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -44,12 +36,6 @@ public static class Capabilities {
private static final Capabilities capabilities =
new Capabilities(isKQueueAvailable(), isEPollAvailable(), false, false);
- /**
- * Thread pools supplied to Netty to perform all async IO.
- * Map key is supposed to be pool name + thread count as a string concat product.
- */
- private static final Map groups = new ConcurrentHashMap<>();
-
/** Detect kqueue availability. */
private static boolean isKQueueAvailable() {
try {
@@ -70,42 +56,6 @@ private static boolean isEPollAvailable() {
}
}
- /**
- * Allocate Netty thread pool required to manage connection. A thread pool could be shared across
- * multiple connections.
- *
- * @return A new thread pool.
- */
- public static EventLoopGroup createNettyThreadPool(String prefix, OptionalInt threadLimit) {
- int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors());
- if (capabilities.isKQueueAvailable()) {
- var name = prefix + "-kqueue-elg";
- return getOrCreate(
- name + threadCount,
- () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
- } else if (capabilities.isEPollAvailable()) {
- var name = prefix + "-epoll-elg";
- return getOrCreate(
- name + threadCount,
- () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
- }
- // TODO support IO-Uring and NIO
-
- throw new RuntimeException("Current platform supports no known thread pool types");
- }
-
- /**
- * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache.
- */
- private static EventLoopGroup getOrCreate(String name, Supplier supplier) {
- if (groups.containsKey(name)) {
- return groups.get(name);
- }
- var group = supplier.get();
- groups.put(name, group);
- return group;
- }
-
/**
* Get a channel class required by Netty to open a client UDS channel.
*
@@ -120,20 +70,4 @@ public static Class extends DomainSocketChannel> getClientUdsNettyChannelType(
}
throw new RuntimeException("Current platform supports no known socket types");
}
-
- /**
- * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
- * resources. It is recommended to use a class instead of lambda to ensure that it is called.
- * See {@link Runtime#addShutdownHook}.
- */
- private static class ShutdownHook implements Runnable {
- @Override
- public void run() {
- groups.values().forEach(EventLoopGroup::shutdownGracefully);
- }
- }
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook"));
- }
}
diff --git a/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java b/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java
new file mode 100644
index 0000000000..c3c1e2ef55
--- /dev/null
+++ b/java/client/src/main/java/babushka/connectors/resources/ThreadPoolAllocator.java
@@ -0,0 +1,72 @@
+package babushka.connectors.resources;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+/** A class responsible to allocating and deallocating shared thread pools. */
+public class ThreadPoolAllocator {
+
+ /**
+ * Thread pools supplied to Netty to perform all async IO.
+ * Map key is supposed to be pool name + thread count as a string concat product.
+ */
+ private static final Map groups = new ConcurrentHashMap<>();
+
+ /**
+ * Allocate (create new or share existing) Netty thread pool required to manage connection. A
+ * thread pool could be shared across multiple connections.
+ *
+ * @return A new thread pool.
+ */
+ public static EventLoopGroup createNettyThreadPool(String prefix, Optional threadLimit) {
+ int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors());
+ if (Platform.getCapabilities().isKQueueAvailable()) {
+ var name = prefix + "-kqueue-elg";
+ return getOrCreate(
+ name + threadCount,
+ () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
+ } else if (Platform.getCapabilities().isEPollAvailable()) {
+ var name = prefix + "-epoll-elg";
+ return getOrCreate(
+ name + threadCount,
+ () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
+ }
+ // TODO support IO-Uring and NIO
+
+ throw new RuntimeException("Current platform supports no known thread pool types");
+ }
+
+ /**
+ * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache.
+ */
+ private static EventLoopGroup getOrCreate(String name, Supplier supplier) {
+ if (groups.containsKey(name)) {
+ return groups.get(name);
+ }
+ EventLoopGroup group = supplier.get();
+ groups.put(name, group);
+ return group;
+ }
+
+ /**
+ * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
+ * resources. It is recommended to use a class instead of lambda to ensure that it is called.
+ * See {@link Runtime#addShutdownHook}.
+ */
+ private static class ShutdownHook implements Runnable {
+ @Override
+ public void run() {
+ groups.values().forEach(EventLoopGroup::shutdownGracefully);
+ }
+ }
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook"));
+ }
+}
diff --git a/java/client/src/main/java/babushka/managers/ClientState.java b/java/client/src/main/java/babushka/managers/ClientState.java
deleted file mode 100644
index 08dd65dd6b..0000000000
--- a/java/client/src/main/java/babushka/managers/ClientState.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package babushka.managers;
-
-public class ClientState {
-
- private ClientState() {}
-
- /**
- * A read only client state. It is supposed that main Client class will have instance of the state
- * of this type and won't be able to change the state directly.
- */
- public static interface ReadOnlyClientState {
- /** Check that connection established. This doesn't validate whether it is alive. */
- boolean isConnected();
-
- /** Check that connection is not yet established. */
- boolean isInitializing();
- }
-
- /** A client state which accepts switching to Connected or Closed states. */
- public static interface OpenableAndClosableClientState extends ClosableClientState {
- /** Report connection status. */
- void connect(boolean successful);
- }
-
- /** A client state which accepts only one way switching - to Closed state only. */
- public static interface ClosableClientState extends ReadOnlyClientState {
- /** Report disconnection. */
- void disconnect();
- }
-
- private enum InnerStates {
- INITIALIZING,
- CONNECTED,
- CLOSED
- }
-
- /**
- * Create an instance of {@link ReadOnlyClientState} which can be safely shipped to {@link
- * CommandManager} and {@link ConnectionManager}. Only those classes are responsible to switch the
- * state.
- */
- public static ReadOnlyClientState create() {
- return new OpenableAndClosableClientState() {
- private InnerStates state = InnerStates.INITIALIZING;
-
- @Override
- public boolean isConnected() {
- return state == InnerStates.CONNECTED;
- }
-
- @Override
- public boolean isInitializing() {
- return state == InnerStates.INITIALIZING;
- }
-
- @Override
- public void connect(boolean successful) {
- if (state != InnerStates.INITIALIZING) {
- throw new IllegalStateException();
- }
- state = successful ? InnerStates.CONNECTED : InnerStates.CLOSED;
- }
-
- @Override
- public void disconnect() {
- state = InnerStates.CLOSED;
- }
- };
- }
-}
diff --git a/java/client/src/main/java/babushka/managers/ClientStateImpl.java b/java/client/src/main/java/babushka/managers/ClientStateImpl.java
new file mode 100644
index 0000000000..7b100c1d23
--- /dev/null
+++ b/java/client/src/main/java/babushka/managers/ClientStateImpl.java
@@ -0,0 +1,46 @@
+package babushka.managers;
+
+import babushka.connectors.ClientState;
+
+public class ClientStateImpl {
+ private enum InnerStates {
+ INITIALIZING,
+ CONNECTED,
+ CLOSED
+ }
+
+ /**
+ * Create an instance of {@link ClientState.ReadOnlyClientState} which can be safely shipped to
+ * {@link CommandManager} and {@link ConnectionManager}. Only those classes are responsible to
+ * switch the state.
+ */
+ public static ClientState.ReadOnlyClientState create() {
+ return new ClientState.OpenableAndClosableClientState() {
+ private ClientStateImpl.InnerStates state = ClientStateImpl.InnerStates.INITIALIZING;
+
+ @Override
+ public boolean isConnected() {
+ return state == ClientStateImpl.InnerStates.CONNECTED;
+ }
+
+ @Override
+ public boolean isInitializing() {
+ return state == ClientStateImpl.InnerStates.INITIALIZING;
+ }
+
+ @Override
+ public void connect(boolean successful) {
+ if (state != ClientStateImpl.InnerStates.INITIALIZING) {
+ throw new IllegalStateException();
+ }
+ state =
+ successful ? ClientStateImpl.InnerStates.CONNECTED : ClientStateImpl.InnerStates.CLOSED;
+ }
+
+ @Override
+ public void disconnect() {
+ state = ClientStateImpl.InnerStates.CLOSED;
+ }
+ };
+ }
+}
diff --git a/java/client/src/main/java/babushka/managers/CommandManager.java b/java/client/src/main/java/babushka/managers/CommandManager.java
index 78390288f9..dc6e517306 100644
--- a/java/client/src/main/java/babushka/managers/CommandManager.java
+++ b/java/client/src/main/java/babushka/managers/CommandManager.java
@@ -1,5 +1,6 @@
package babushka.managers;
+import babushka.connectors.ClientState;
import babushka.connectors.handlers.ChannelHandler;
import babushka.ffi.resolvers.RedisValueResolver;
import babushka.models.RequestBuilder;
diff --git a/java/client/src/main/java/babushka/managers/ConnectionManager.java b/java/client/src/main/java/babushka/managers/ConnectionManager.java
index 78b8e9ce2b..560ac65cf9 100644
--- a/java/client/src/main/java/babushka/managers/ConnectionManager.java
+++ b/java/client/src/main/java/babushka/managers/ConnectionManager.java
@@ -1,5 +1,6 @@
package babushka.managers;
+import babushka.connectors.ClientState;
import babushka.connectors.handlers.ChannelHandler;
import babushka.ffi.resolvers.RedisValueResolver;
import babushka.models.RequestBuilder;