From 7d5fa4bf618f14412f79e66e39f7f0585c5abe7a Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 13 Dec 2023 17:02:38 -0800 Subject: [PATCH] Rework how we open `Channel`. Signed-off-by: Yury-Fridlyand --- .../babushka/connectors/SocketConnection.java | 134 ----------------- .../connectors/handlers/ChannelHandler.java | 28 +++- ... => ProtobufSocketChannelInitializer.java} | 4 +- .../connectors/resources/Platform.java | 139 ++++++++++++++++++ 4 files changed, 165 insertions(+), 140 deletions(-) delete mode 100644 java/client/src/main/java/babushka/connectors/SocketConnection.java rename java/client/src/main/java/babushka/connectors/handlers/{ChannelBuilder.java => ProtobufSocketChannelInitializer.java} (88%) create mode 100644 java/client/src/main/java/babushka/connectors/resources/Platform.java diff --git a/java/client/src/main/java/babushka/connectors/SocketConnection.java b/java/client/src/main/java/babushka/connectors/SocketConnection.java deleted file mode 100644 index cd41e00ced..0000000000 --- a/java/client/src/main/java/babushka/connectors/SocketConnection.java +++ /dev/null @@ -1,134 +0,0 @@ -package babushka.connectors; - -import babushka.connectors.handlers.ChannelBuilder; -import babushka.connectors.handlers.ChannelHandler; -import babushka.managers.CallbackManager; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollDomainSocketChannel; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueDomainSocketChannel; -import io.netty.channel.kqueue.KQueueEventLoopGroup; -import io.netty.channel.unix.DomainSocketAddress; -import io.netty.util.concurrent.DefaultThreadFactory; - -public class SocketConnection { - - /** Thread pool supplied to Netty to perform all async IO. */ - private EventLoopGroup group; - - /** The singleton instance. */ - private static SocketConnection INSTANCE = null; - - private static String socketPath; - - public static void setSocketPath(String socketPath) { - if (SocketConnection.socketPath == null) { - SocketConnection.socketPath = socketPath; - return; - } - throw new RuntimeException("socket path can only be declared once"); - } - - /** - * Creates (if not yet created) and returns the singleton instance of the {@link - * SocketConnection}. - * - * @return a {@link SocketConnection} instance. - */ - public static synchronized SocketConnection getInstance() { - if (INSTANCE == null) { - assert socketPath != null : "socket path must be defined"; - INSTANCE = new SocketConnection(); - } - return INSTANCE; - } - - // At the moment, Windows is not supported - // Probably we should use NIO (NioEventLoopGroup) for Windows. - private static final boolean isMacOs = isKQueueAvailable(); - - // TODO support IO-Uring and NIO - /** - * Detect platform to identify which native implementation to use for UDS interaction. Currently - * supported platforms are: Linux and macOS.
- * Subject to change in future to support more platforms and implementations. - */ - private static boolean isKQueueAvailable() { - try { - Class.forName("io.netty.channel.kqueue.KQueue"); - return KQueue.isAvailable(); - } catch (ClassNotFoundException e) { - return false; - } - } - - /** Constructor for the single instance. */ - private SocketConnection() { - try { - int cpuCount = Runtime.getRuntime().availableProcessors(); - group = - isMacOs - ? new KQueueEventLoopGroup( - cpuCount, new DefaultThreadFactory("SocketConnection-kqueue-elg", true)) - : new EpollEventLoopGroup( - cpuCount, new DefaultThreadFactory("SocketConnection-epoll-elg", true)); - } catch (Exception e) { - System.err.printf( - "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); - e.printStackTrace(System.err); - } - } - - /** Open a new channel for a new client. */ - public ChannelHandler openNewChannel(CallbackManager callbackManager) { - try { - Channel channel = - new Bootstrap() - .group(group) - .channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class) - .handler(new ChannelBuilder(callbackManager)) - .connect(new DomainSocketAddress(socketPath)) - .sync() - .channel(); - return new ChannelHandler(channel, callbackManager); - } catch (InterruptedException e) { - System.err.printf( - "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); - e.printStackTrace(System.err); - throw new RuntimeException(e); - } - } - - /** - * Closes the UDS connection and frees corresponding resources. A consecutive call to {@link - * #getInstance()} will create a new connection with new resource pool. - */ - public void close() { - group.shutdownGracefully(); - INSTANCE = null; - } - - /** - * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing - * resources by calling {@link #close()}. It is recommended to use a class instead of lambda to - * ensure that it is called.
- * See {@link Runtime#addShutdownHook}. - */ - private static class ShutdownHook implements Runnable { - @Override - public void run() { - if (INSTANCE != null) { - INSTANCE.close(); - INSTANCE = null; - } - } - } - - static { - Runtime.getRuntime() - .addShutdownHook(new Thread(new ShutdownHook(), "SocketConnection-shutdown-hook")); - } -} diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java index 1942b3033b..1cd11f12e8 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -1,10 +1,14 @@ package babushka.connectors.handlers; +import babushka.connectors.resources.Platform; import babushka.managers.CallbackManager; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.unix.DomainSocketAddress; +import java.util.OptionalInt; import java.util.concurrent.CompletableFuture; -import lombok.RequiredArgsConstructor; +import java.util.concurrent.atomic.AtomicBoolean; import redis_request.RedisRequestOuterClass.RedisRequest; import response.ResponseOuterClass.Response; @@ -12,11 +16,23 @@ * Class responsible for manipulations with Netty's {@link Channel}.
* Uses a {@link CallbackManager} to record callbacks of every request sent. */ -@RequiredArgsConstructor public class ChannelHandler { private final Channel channel; private final CallbackManager callbackManager; + /** Open a new channel for a new client. */ + public ChannelHandler(CallbackManager callbackManager, String socketPath) { + channel = + new Bootstrap() + .group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty())) + .channel(Platform.getClientUdsNettyChannelType()) + .handler(new ProtobufSocketChannelInitializer(callbackManager)) + .connect(new DomainSocketAddress(socketPath)) + // TODO call here .sync() if needed or remove this comment + .channel(); + this.callbackManager = callbackManager; + } + /** Write a protobuf message to the socket. */ public CompletableFuture write(RedisRequest.Builder request, boolean flush) { var commandId = callbackManager.registerRequest(); @@ -36,9 +52,13 @@ public CompletableFuture connect(ConnectionRequest request) { return callbackManager.getConnectionPromise(); } + private final AtomicBoolean closed = new AtomicBoolean(false); + /** Closes the UDS connection and frees corresponding resources. */ public void close() { - channel.close(); - callbackManager.shutdownGracefully(); + if (closed.compareAndSet(false, true)) { + channel.close(); + callbackManager.shutdownGracefully(); + } } } diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java b/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java similarity index 88% rename from java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java rename to java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java index 157aa4f632..eb37b221cc 100644 --- a/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java +++ b/java/client/src/main/java/babushka/connectors/handlers/ProtobufSocketChannelInitializer.java @@ -12,9 +12,9 @@ import lombok.RequiredArgsConstructor; import response.ResponseOuterClass.Response; -/** Builder for the channel used by {@link babushka.connectors.SocketConnection}. */ +/** Builder for the channel used by {@link ChannelHandler}. */ @RequiredArgsConstructor -public class ChannelBuilder extends ChannelInitializer { +public class ProtobufSocketChannelInitializer extends ChannelInitializer { private final CallbackManager callbackManager; diff --git a/java/client/src/main/java/babushka/connectors/resources/Platform.java b/java/client/src/main/java/babushka/connectors/resources/Platform.java new file mode 100644 index 0000000000..b411f04f50 --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/resources/Platform.java @@ -0,0 +1,139 @@ +package babushka.connectors.resources; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.unix.DomainSocketChannel; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.Map; +import java.util.OptionalInt; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.UtilityClass; + +/** + * An auxiliary class purposed to detect platform (OS + JVM) {@link Capabilities} and allocate + * corresponding resources. + */ +@UtilityClass +public class Platform { + + @Getter + @AllArgsConstructor(access = AccessLevel.PRIVATE) + @ToString + public static class Capabilities { + private final boolean isKQueueAvailable; + private final boolean isEPollAvailable; + // TODO support IO-Uring and NIO + private final boolean isIOUringAvailable; + // At the moment, Windows is not supported + // Probably we should use NIO (NioEventLoopGroup) for Windows. + private final boolean isNIOAvailable; + } + + /** Detected platform (OS + JVM) capabilities. Not supposed to be changed in runtime. */ + @Getter + private static final Capabilities capabilities = + new Capabilities(isKQueueAvailable(), isEPollAvailable(), false, false); + + /** + * Thread pools supplied to Netty to perform all async IO.
+ * Map key is supposed to be pool name + thread count as a string concat product. + */ + private static final Map groups = new ConcurrentHashMap<>(); + + /** Detect kqueue availability. */ + private static boolean isKQueueAvailable() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + return KQueue.isAvailable(); + } catch (ClassNotFoundException e) { + return false; + } + } + + /** Detect epoll availability. */ + private static boolean isEPollAvailable() { + try { + Class.forName("io.netty.channel.epoll.Epoll"); + return Epoll.isAvailable(); + } catch (ClassNotFoundException e) { + return false; + } + } + + /** + * Allocate Netty thread pool required to manage connection. A thread pool could be shared across + * multiple connections. + * + * @return A new thread pool. + */ + public static EventLoopGroup createNettyThreadPool(String prefix, OptionalInt threadLimit) { + int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); + if (capabilities.isKQueueAvailable()) { + var name = prefix + "-kqueue-elg"; + return getOrCreate( + name + threadCount, + () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); + } else if (capabilities.isEPollAvailable()) { + var name = prefix + "-epoll-elg"; + return getOrCreate( + name + threadCount, + () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); + } + // TODO support IO-Uring and NIO + + throw new RuntimeException("Current platform supports no known thread pool types"); + } + + /** + * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache. + */ + private static EventLoopGroup getOrCreate(String name, Supplier supplier) { + if (groups.containsKey(name)) { + return groups.get(name); + } + var group = supplier.get(); + groups.put(name, group); + return group; + } + + /** + * Get a channel class required by Netty to open a client UDS channel. + * + * @return Return a class supported by the current platform. + */ + public static Class getClientUdsNettyChannelType() { + if (capabilities.isKQueueAvailable()) { + return KQueueDomainSocketChannel.class; + } + if (capabilities.isEPollAvailable()) { + return EpollDomainSocketChannel.class; + } + throw new RuntimeException("Current platform supports no known socket types"); + } + + /** + * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing + * resources. It is recommended to use a class instead of lambda to ensure that it is called.
+ * See {@link Runtime#addShutdownHook}. + */ + private static class ShutdownHook implements Runnable { + @Override + public void run() { + groups.values().forEach(EventLoopGroup::shutdownGracefully); + } + } + + static { + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook")); + } +}