Skip to content

Commit

Permalink
Optimize protobuf and netty integration.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Dec 13, 2023
1 parent 75c1977 commit 5223401
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

import babushka.managers.CallbackManager;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.unix.UnixChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import response.ResponseOuterClass.Response;

/** Builder for the channel used by {@link babushka.connectors.SocketConnection}. */
@RequiredArgsConstructor
Expand All @@ -18,9 +22,11 @@ public class ChannelBuilder extends ChannelInitializer<UnixChannel> {
public void initChannel(@NonNull UnixChannel ch) {
ch.pipeline()
// https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html
.addLast("protobufDecoder", new ProtobufVarint32FrameDecoder())
.addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender())
.addLast("frameDecoder", new ProtobufVarint32FrameDecoder())
.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender())
.addLast("protobufDecoder", new ProtobufDecoder(Response.getDefaultInstance()))
.addLast("protobufEncoder", new ProtobufEncoder())
.addLast(new ReadHandler(callbackManager))
.addLast(new WriteHandler());
.addLast(new ChannelOutboundHandlerAdapter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ public CompletableFuture<Response> write(RedisRequest.Builder request, boolean f
request.setCallbackIdx(commandId.getKey());

if (flush) {
channel.writeAndFlush(request.build().toByteArray());
channel.writeAndFlush(request.build());
} else {
channel.write(request.build().toByteArray());
channel.write(request.build());
}
return commandId.getValue();
}

/** Write a protobuf message to the socket. */
public CompletableFuture<Response> connect(ConnectionRequest request) {
channel.writeAndFlush(request.toByteArray());
channel.writeAndFlush(request);
return callbackManager.getConnectionPromise();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,22 @@
package babushka.connectors.handlers;

import babushka.managers.CallbackManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import response.ResponseOuterClass;
import response.ResponseOuterClass.Response;

/** Handler for inbound traffic though UDS. Used by Netty. */
@RequiredArgsConstructor
public class ReadHandler extends ChannelInboundHandlerAdapter {

private final CallbackManager callbackManager;

/**
* Handles responses from babushka core:
*
* <ol>
* <li>Copy to a buffer;
* <li>Parse protobuf packet;
* <li>Find and resolve a corresponding future;
* </ol>
*/
/** Submit responses from babushka to an instance {@link CallbackManager} to handle them. */
@Override
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
throws Exception {
var buf = (ByteBuf) msg;
var bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
// TODO surround parsing with try-catch, set error to future if parsing failed.
var response = ResponseOuterClass.Response.parseFrom(bytes);
callbackManager.completeRequest(response);
buf.release();
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) {
callbackManager.completeRequest((Response) msg);
}

/** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */
Expand Down

This file was deleted.

0 comments on commit 5223401

Please sign in to comment.