Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client optimizations. #37

Merged
merged 10 commits into from
Nov 16, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,10 @@
import javababushka.benchmarks.utils.ConnectionSettings;
import io.netty.channel.unix.DomainSocketAddress;
import javababushka.client.RedisClient;
import org.apache.commons.lang3.tuple.Pair;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be worth setting up the checkstyle plugin to group together packages (eg everything in org in one group), alphabetize, etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have spotless in the project, it should do all work. Code clean up is planned, thank you.


import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally prefer to avoid * imports to avoid over-importing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spotless should fix this as well.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -81,12 +78,15 @@ public class JniNettyClient implements SyncClient, AsyncClient<Response>, AutoCl
public static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000;

// Futures to handle responses. Index is callback id, starting from 1 (0 index is for connection request always).
// Is it not a concurrent nor sync collection, but it is synced on adding. No removes.
// TODO clean up completed futures
private final List<CompletableFuture<Response>> responses = Collections.synchronizedList(new ArrayList<>());
private final List<CompletableFuture<Response>> responses = new ArrayList<>();
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
// Unique offset for every client to avoid having multiple commands with the same id at a time.
// For debugging replace with: new Random().nextInt(1000) * 1000
private final int callbackOffset = new Random().nextInt();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is guaranteed to be a unique offset?

  1. Every instance is getting its own Random() object instead of sharing a generator.
  2. I don't know if it's guaranteed that a Random can't generate the same number multiple times?

Could use a static AtomicInteger counter. It'd be unique but predictable.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design question -- are we creating several JniNettyClients and funneling a single or small number of task through each one, or creating a limited number of JniNettyClients and funneling a large number of tasks through them?

Netty (and async I/O in general) is more efficient for the latter case. It better handles resources than creating a bunch of threads and having one socket per thread.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private final String unixSocket = getSocket();

// TODO static or move to constructor?
private static String getSocket() {
try {
return RedisClient.startSocketListenerExternal();
Expand Down Expand Up @@ -136,21 +136,11 @@ public void connectToRedis() {

@Override
public void connectToRedis(ConnectionSettings connectionSettings) {

Response connected = null;
try {
connected = waitForResult(asyncConnectToRedis(connectionSettings));
//System.out.printf("Connection %s%n", connected != null ? connected.getConstantResponse() : null);
} catch (Exception e) {
System.err.println("Connection time out");
}

int a = 5;
waitForResult(asyncConnectToRedis(connectionSettings));
}

private void createChannel() {
// TODO maybe move to constructor or to static?
// ======
try {
channel = new Bootstrap()
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -175,10 +165,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
var buf = (ByteBuf) msg;
var bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
// TODO surround parsing with try-catch
// TODO surround parsing with try-catch, set error to future if parsing failed.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inability to parse a protobuf object directly from a ByteBuf is annoying. There are some solutions here you could use to avoid having to allocate the byte array, copy into, then throw it away:
https://stackoverflow.com/questions/29690118/how-to-parse-google-protocol-buffers-that-in-the-direct-memory-without-allocatin

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, maybe you can use Netty's built-in Protobuf encoder/decoder instead of dealing with message serialization yourself:
https://netty.io/4.0/api/io/netty/handler/codec/protobuf/ProtobufDecoder.html
Then you don't need to mess around with ByteBufs or byte arrays.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is already used in lines 158-159

var response = Response.parseFrom(bytes);
int callbackId = response.getCallbackIdx();
if (callbackId != 0) {
// connection request has hardcoded callback id = 0
// https://github.com/aws/babushka/issues/600
callbackId -= callbackOffset;
}
//System.out.printf("== Received response with callback %d%n", response.getCallbackIdx());
responses.get(response.getCallbackIdx()).complete(response);
responses.get(callbackId).complete(response);
responses.set(callbackId, null);
super.channelRead(ctx, bytes);
}

Expand All @@ -190,18 +187,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
})
.addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
//System.out.printf("=== bind %s %s %s %n", ctx, localAddress, promise);
super.bind(ctx, localAddress, promise);
}

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
//System.out.printf("=== connect %s %s %s %s %n", ctx, remoteAddress, localAddress, promise);
super.connect(ctx, remoteAddress, localAddress, promise);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
//System.out.printf("=== write %s %s %s %n", ctx, msg, promise);
Expand All @@ -226,21 +211,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
//System.out.printf("=== flush %s %n", ctx);
super.flush(ctx);
}
});
/*
.addLast(new SimpleUserEventChannelHandler<String>() {
@Override
protected void eventReceived(ChannelHandlerContext ctx, String evt) throws Exception {

}
});
*/
//.addLast(new CombinedChannelDuplexHandler(new ChannelInboundHandler(), new ChannelOutboundHandler()));
}
})
.connect(new DomainSocketAddress(unixSocket)).sync().channel();
Expand Down Expand Up @@ -271,7 +242,7 @@ public void closeConnection() {
long waitStarted = System.nanoTime();
long waitUntil = waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos
for (var future : responses) {
if (future.isDone()) {
if (future == null || future.isDone()) {
continue;
}
try {
Expand All @@ -283,9 +254,6 @@ public void closeConnection() {
break;
}
}

// channel.closeFuture().sync();
// } catch (InterruptedException ignored) {
} finally {
group.shutdownGracefully();
}
Expand All @@ -300,26 +268,13 @@ public void set(String key, String value) {
@Override
public String get(String key) {
return waitForResult(asyncGet(key));
/*
try {
var response = responses.get(callbackId).get(DEFAULT_FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS);
return response.hasRespPointer()
? RedisClient.valueFromPointer(response.getRespPointer()).toString()
: null;
} catch (Exception e) {
System.err.printf("Failed to process `get` response, callback = %d: %s %s%n",
callbackId, e.getClass().getSimpleName(), e.getMessage());
e.printStackTrace(System.err);
return null;
}
*/
// TODO support non-strings
}

// TODO use reentrant lock
// https://www.geeksforgeeks.org/reentrant-lock-java/
private synchronized int getNextCallbackId() {
responses.add(new CompletableFuture<>());
return responses.size() - 1;
private synchronized Pair<Integer, CompletableFuture<Response>> getNextCallbackId() {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
var future = new CompletableFuture<Response>();
responses.add(future);
return Pair.of(responses.size() - 1, future);
}

public static void main(String[] args) {
Expand Down Expand Up @@ -435,27 +390,30 @@ public Future<Response> asyncConnectToRedis(ConnectionSettings connectionSetting
}

private CompletableFuture<Response> submitNewCommand(RequestType command, List<String> args) {
int callbackId = getNextCallbackId();
//System.out.printf("== %s(%s), callback %d%n", command, String.join(", ", args), callbackId);
RedisRequest request =
RedisRequest.newBuilder()
.setCallbackIdx(callbackId)
.setSingleCommand(
Command.newBuilder()
.setRequestType(command)
.setArgsArray(ArgsArray.newBuilder().addAllArgs(args).build())
.build())
.setRoute(
Routes.newBuilder()
.setSimpleRoutes(SimpleRoutes.AllNodes)
.build())
.build();
if (ALWAYS_FLUSH_ON_WRITE) {
channel.writeAndFlush(request.toByteArray());
return responses.get(callbackId);
}
channel.write(request.toByteArray());
return autoFlushFutureWrapper(responses.get(callbackId));
var commandId = getNextCallbackId();
//System.out.printf("== %s(%s), callback %d%n", command, String.join(", ", args), commandId);

return CompletableFuture.supplyAsync(() -> {
RedisRequest request =
RedisRequest.newBuilder()
.setCallbackIdx(commandId.getKey() + callbackOffset)
.setSingleCommand(
Command.newBuilder()
.setRequestType(command)
.setArgsArray(ArgsArray.newBuilder().addAllArgs(args).build())
.build())
.setRoute(
Routes.newBuilder()
.setSimpleRoutes(SimpleRoutes.AllNodes)
.build())
.build();
if (ALWAYS_FLUSH_ON_WRITE) {
channel.writeAndFlush(request.toByteArray());
return commandId.getRight();
}
channel.write(request.toByteArray());
return autoFlushFutureWrapper(commandId.getRight());
}).thenCompose(f -> f);
}

private <T> CompletableFuture<T> autoFlushFutureWrapper(Future<T> future) {
Expand Down
Loading