Skip to content

Commit

Permalink
Rename CallbackManager to CallbackDispatcher.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Dec 16, 2023
1 parent 708293f commit 12dcaba
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package babushka.managers;
package babushka.connectors.handlers;

import babushka.connectors.handlers.ReadHandler;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import response.ResponseOuterClass.Response;

/** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */
public class CallbackManager {
public class CallbackDispatcher {
/** Unique request ID (callback ID). Thread-safe. */
private final AtomicInteger requestId = new AtomicInteger(0);

Expand All @@ -25,7 +23,7 @@ public class CallbackManager {
* requests can't be stored in the same storage, because callback ID = 0 is hardcoded for
* connection requests.
*/
@Getter private final CompletableFuture<Response> connectionPromise = new CompletableFuture<>();
private final CompletableFuture<Response> connectionPromise = new CompletableFuture<>();

/**
* Register a new request to be sent. Once response received, the given future completes with it.
Expand All @@ -40,6 +38,10 @@ public Pair<Integer, CompletableFuture<Response>> registerRequest() {
return Pair.of(callbackId, future);
}

public CompletableFuture<Response> registerConnection() {
return connectionPromise;
}

/**
* Complete the corresponding client promise and free resources.
*
Expand All @@ -56,9 +58,8 @@ public void completeRequest(Response response) {
}

public void shutdownGracefully() {
connectionPromise.completeExceptionally(new InterruptedException());
responses.forEach(
(callbackId, future) -> future.completeExceptionally(new InterruptedException()));
connectionPromise.cancel(false);
responses.values().forEach(future -> future.cancel(false));
responses.clear();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package babushka.connectors.handlers;

import babushka.connectors.resources.Platform;
import babushka.managers.CallbackManager;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand All @@ -14,29 +13,29 @@

/**
* Class responsible for manipulations with Netty's {@link Channel}.<br>
* Uses a {@link CallbackManager} to record callbacks of every request sent.
* Uses a {@link CallbackDispatcher} to record callbacks of every request sent.
*/
public class ChannelHandler {
private final Channel channel;
private final CallbackManager callbackManager;
private final CallbackDispatcher callbackDispatcher;

/** Open a new channel for a new client. */
public ChannelHandler(CallbackManager callbackManager, String socketPath) {
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) {
channel =
new Bootstrap()
// TODO let user specify the thread pool or pool size as an option
.group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty()))
.channel(Platform.getClientUdsNettyChannelType())
.handler(new ProtobufSocketChannelInitializer(callbackManager))
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
// TODO call here .sync() if needed or remove this comment
.channel();
this.callbackManager = callbackManager;
this.callbackDispatcher = callbackDispatcher;
}

/** Write a protobuf message to the socket. */
public CompletableFuture<Response> write(RedisRequest.Builder request, boolean flush) {
var commandId = callbackManager.registerRequest();
var commandId = callbackDispatcher.registerRequest();
request.setCallbackIdx(commandId.getKey());

if (flush) {
Expand All @@ -50,7 +49,7 @@ public CompletableFuture<Response> write(RedisRequest.Builder request, boolean f
/** Write a protobuf message to the socket. */
public CompletableFuture<Response> connect(ConnectionRequest request) {
channel.writeAndFlush(request);
return callbackManager.getConnectionPromise();
return callbackDispatcher.registerConnection();
}

private final AtomicBoolean closed = new AtomicBoolean(false);
Expand All @@ -59,7 +58,7 @@ public CompletableFuture<Response> connect(ConnectionRequest request) {
public void close() {
if (closed.compareAndSet(false, true)) {
channel.close();
callbackManager.shutdownGracefully();
callbackDispatcher.shutdownGracefully();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package babushka.connectors.handlers;

import babushka.managers.CallbackManager;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.unix.UnixChannel;
Expand All @@ -16,7 +15,7 @@
@RequiredArgsConstructor
public class ProtobufSocketChannelInitializer extends ChannelInitializer<UnixChannel> {

private final CallbackManager callbackManager;
private final CallbackDispatcher callbackDispatcher;

@Override
public void initChannel(@NonNull UnixChannel ch) {
Expand All @@ -26,7 +25,7 @@ public void initChannel(@NonNull UnixChannel ch) {
.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender())
.addLast("protobufDecoder", new ProtobufDecoder(Response.getDefaultInstance()))
.addLast("protobufEncoder", new ProtobufEncoder())
.addLast(new ReadHandler(callbackManager))
.addLast(new ReadHandler(callbackDispatcher))
.addLast(new ChannelOutboundHandlerAdapter());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package babushka.connectors.handlers;

import babushka.managers.CallbackManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.NonNull;
Expand All @@ -11,12 +10,12 @@
@RequiredArgsConstructor
public class ReadHandler extends ChannelInboundHandlerAdapter {

private final CallbackManager callbackManager;
private final CallbackDispatcher callbackDispatcher;

/** Submit responses from babushka to an instance {@link CallbackManager} to handle them. */
/** Submit responses from babushka to an instance {@link CallbackDispatcher} to handle them. */
@Override
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) {
callbackManager.completeRequest((Response) msg);
callbackDispatcher.completeRequest((Response) msg);
}

/** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */
Expand Down

0 comments on commit 12dcaba

Please sign in to comment.