Skip to content

Commit

Permalink
Split ClientState and Platform.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand committed Dec 18, 2023
1 parent e8fe498 commit e5281ca
Showing 9 changed files with 154 additions and 139 deletions.
30 changes: 30 additions & 0 deletions java/client/src/main/java/babushka/connectors/ClientState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package babushka.connectors;

public class ClientState {

private ClientState() {}

/**
* A read only client state. It is supposed that main Client class will have instance of the state
* of this type and won't be able to change the state directly.
*/
public static interface ReadOnlyClientState {
/** Check that connection established. This doesn't validate whether it is alive. */
boolean isConnected();

/** Check that connection is not yet established. */
boolean isInitializing();
}

/** A client state which accepts switching to <em>Connected</em> or <em>Closed</em> states. */
public static interface OpenableAndClosableClientState extends ClosableClientState {
/** Report connection status. */
void connect(boolean successful);
}

/** A client state which accepts only one way switching - to <em>Closed</em> state only. */
public static interface ClosableClientState extends ReadOnlyClientState {
/** Report disconnection. */
void disconnect();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package babushka.connectors.handlers;

import babushka.managers.ClientState;
import babushka.connectors.ClientState;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package babushka.connectors.handlers;

import babushka.connectors.resources.Platform;
import babushka.connectors.resources.ThreadPoolAllocator;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.unix.DomainSocketAddress;
import java.util.OptionalInt;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;
@@ -23,7 +24,7 @@ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath)
channel =
new Bootstrap()
// TODO let user specify the thread pool or pool size as an option
.group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty()))
.group(ThreadPoolAllocator.createNettyThreadPool("babushka-channel", Optional.empty()))
.channel(Platform.getClientUdsNettyChannelType())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
package babushka.connectors.resources;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -44,12 +36,6 @@ public static class Capabilities {
private static final Capabilities capabilities =
new Capabilities(isKQueueAvailable(), isEPollAvailable(), false, false);

/**
* Thread pools supplied to <em>Netty</em> to perform all async IO.<br>
* Map key is supposed to be pool name + thread count as a string concat product.
*/
private static final Map<String, EventLoopGroup> groups = new ConcurrentHashMap<>();

/** Detect <em>kqueue</em> availability. */
private static boolean isKQueueAvailable() {
try {
@@ -70,42 +56,6 @@ private static boolean isEPollAvailable() {
}
}

/**
* Allocate Netty thread pool required to manage connection. A thread pool could be shared across
* multiple connections.
*
* @return A new thread pool.
*/
public static EventLoopGroup createNettyThreadPool(String prefix, OptionalInt threadLimit) {
int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors());
if (capabilities.isKQueueAvailable()) {
var name = prefix + "-kqueue-elg";
return getOrCreate(
name + threadCount,
() -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
} else if (capabilities.isEPollAvailable()) {
var name = prefix + "-epoll-elg";
return getOrCreate(
name + threadCount,
() -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
}
// TODO support IO-Uring and NIO

throw new RuntimeException("Current platform supports no known thread pool types");
}

/**
* Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache.
*/
private static EventLoopGroup getOrCreate(String name, Supplier<EventLoopGroup> supplier) {
if (groups.containsKey(name)) {
return groups.get(name);
}
var group = supplier.get();
groups.put(name, group);
return group;
}

/**
* Get a channel class required by Netty to open a client UDS channel.
*
@@ -120,20 +70,4 @@ public static Class<? extends DomainSocketChannel> getClientUdsNettyChannelType(
}
throw new RuntimeException("Current platform supports no known socket types");
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
private static class ShutdownHook implements Runnable {
@Override
public void run() {
groups.values().forEach(EventLoopGroup::shutdownGracefully);
}
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package babushka.connectors.resources;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/** A class responsible to allocating and deallocating shared thread pools. */
public class ThreadPoolAllocator {

/**
* Thread pools supplied to <em>Netty</em> to perform all async IO.<br>
* Map key is supposed to be pool name + thread count as a string concat product.
*/
private static final Map<String, EventLoopGroup> groups = new ConcurrentHashMap<>();

/**
* Allocate (create new or share existing) Netty thread pool required to manage connection. A
* thread pool could be shared across multiple connections.
*
* @return A new thread pool.
*/
public static EventLoopGroup createNettyThreadPool(String prefix, Optional<Integer> threadLimit) {
int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors());
if (Platform.getCapabilities().isKQueueAvailable()) {
var name = prefix + "-kqueue-elg";
return getOrCreate(
name + threadCount,
() -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
} else if (Platform.getCapabilities().isEPollAvailable()) {
var name = prefix + "-epoll-elg";
return getOrCreate(
name + threadCount,
() -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
}
// TODO support IO-Uring and NIO

throw new RuntimeException("Current platform supports no known thread pool types");
}

/**
* Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache.
*/
private static EventLoopGroup getOrCreate(String name, Supplier<EventLoopGroup> supplier) {
if (groups.containsKey(name)) {
return groups.get(name);
}
EventLoopGroup group = supplier.get();
groups.put(name, group);
return group;
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
private static class ShutdownHook implements Runnable {
@Override
public void run() {
groups.values().forEach(EventLoopGroup::shutdownGracefully);
}
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook"));
}
}
70 changes: 0 additions & 70 deletions java/client/src/main/java/babushka/managers/ClientState.java

This file was deleted.

46 changes: 46 additions & 0 deletions java/client/src/main/java/babushka/managers/ClientStateImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package babushka.managers;

import babushka.connectors.ClientState;

public class ClientStateImpl {
private enum InnerStates {
INITIALIZING,
CONNECTED,
CLOSED
}

/**
* Create an instance of {@link ClientState.ReadOnlyClientState} which can be safely shipped to
* {@link CommandManager} and {@link ConnectionManager}. Only those classes are responsible to
* switch the state.
*/
public static ClientState.ReadOnlyClientState create() {
return new ClientState.OpenableAndClosableClientState() {
private ClientStateImpl.InnerStates state = ClientStateImpl.InnerStates.INITIALIZING;

@Override
public boolean isConnected() {
return state == ClientStateImpl.InnerStates.CONNECTED;
}

@Override
public boolean isInitializing() {
return state == ClientStateImpl.InnerStates.INITIALIZING;
}

@Override
public void connect(boolean successful) {
if (state != ClientStateImpl.InnerStates.INITIALIZING) {
throw new IllegalStateException();
}
state =
successful ? ClientStateImpl.InnerStates.CONNECTED : ClientStateImpl.InnerStates.CLOSED;
}

@Override
public void disconnect() {
state = ClientStateImpl.InnerStates.CLOSED;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package babushka.managers;

import babushka.connectors.ClientState;
import babushka.connectors.handlers.ChannelHandler;
import babushka.ffi.resolvers.RedisValueResolver;
import babushka.models.RequestBuilder;
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package babushka.managers;

import babushka.connectors.ClientState;
import babushka.connectors.handlers.ChannelHandler;
import babushka.ffi.resolvers.RedisValueResolver;
import babushka.models.RequestBuilder;

0 comments on commit e5281ca

Please sign in to comment.