Skip to content

Commit

Permalink
Minor fixes.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Dec 5, 2023
1 parent 701da44 commit 5bf6672
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 18 deletions.
4 changes: 1 addition & 3 deletions java/client/src/main/java/babushka/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ public class Client {

public Client() {
var callBackManager = new CallbackManager();
channelHolder =
new ChannelHolder(
SocketManager.getInstance().openNewChannel(callBackManager), callBackManager);
channelHolder = SocketManager.getInstance().openNewChannel(callBackManager);
connection = new Connection(channelHolder);
commands = new Commands(channelHolder);
}
Expand Down
4 changes: 4 additions & 0 deletions java/client/src/main/java/babushka/client/ChannelHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;

/**
* Class responsible for manipulations with Netty's {@link Channel}.<br>
* Uses a {@link CallbackManager} to record callbacks of every request sent.
*/
@RequiredArgsConstructor
public class ChannelHolder {
private final Channel channel;
Expand Down
7 changes: 7 additions & 0 deletions java/client/src/main/java/babushka/client/RequestBuilder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package babushka.client;

import babushka.connection.CallbackManager;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import connection_request.ConnectionRequestOuterClass.NodeAddress;
import connection_request.ConnectionRequestOuterClass.ReadFrom;
Expand Down Expand Up @@ -29,6 +30,12 @@ public static ConnectionRequest createConnectionRequest(
.build();
}

/**
* Build a protobuf command/transaction request draft.
*
* @return An uncompleted request. {@link CallbackManager} is responsible to complete it by adding
* a callback id.
*/
public static RedisRequest.Builder prepareRequest(RequestType command, List<String> args) {
var commandArgs = ArgsArray.newBuilder();
for (var arg : args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

/** Builder for the channel used by {@link SocketManager}. */
/** Builder for the channel. */
@RequiredArgsConstructor
public class ChannelBuilder extends ChannelInitializer<UnixChannel> {

Expand Down
29 changes: 15 additions & 14 deletions java/client/src/main/java/babushka/connection/SocketManager.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package babushka.connection;

import babushka.FFI.BabushkaCoreNativeDefinitions;
import babushka.client.ChannelHolder;
import babushka.client.Commands;
import babushka.client.Connection;
import io.netty.bootstrap.Bootstrap;
Expand All @@ -21,10 +22,7 @@
* <li>opening a connection (channel) though the UDS;
* <li>allocating the corresponding resources, e.g. thread pools (see also {@link
* CallbackManager});
* <li>handling connection requests;
* <li>providing unique request ID (callback ID);
* <li>handling REDIS requests;
* <li>closing connection;
* <li>freeing shared resources;
* </ul>
*
* Note: should not be used outside of {@link Commands} or {@link Connection}!
Expand Down Expand Up @@ -101,15 +99,18 @@ cpuCount, new DefaultThreadFactory("SocketManager-kqueue-elg", true))
}
}

public Channel openNewChannel(CallbackManager callbackManager) {
/** Open a new channel for a new client. */
public ChannelHolder openNewChannel(CallbackManager callbackManager) {
try {
return new Bootstrap()
.group(group)
.channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class)
.handler(new ChannelBuilder(callbackManager))
.connect(new DomainSocketAddress(socketPath))
.sync()
.channel();
Channel channel =
new Bootstrap()
.group(group)
.channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class)
.handler(new ChannelBuilder(callbackManager))
.connect(new DomainSocketAddress(socketPath))
.sync()
.channel();
return new ChannelHolder(channel, callbackManager);
} catch (InterruptedException e) {
System.err.printf(
"Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage());
Expand All @@ -119,8 +120,8 @@ public Channel openNewChannel(CallbackManager callbackManager) {
}

/**
* Closes the UDS connection and frees corresponding resources. A consecutive call to {@link
* #getInstance()} will create a new connection with new resource pool.
* Close the UDS connection and frees corresponding resources. A consecutive call to {@link
* #getInstance()} will create a new resource pool.
*/
public void close() {
group.shutdownGracefully();
Expand Down

0 comments on commit 5bf6672

Please sign in to comment.