Skip to content

Commit

Permalink
Managers.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Dec 15, 2023
1 parent ff63182 commit 3b5d20b
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 8 deletions.
3 changes: 3 additions & 0 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ tasks.register('cleanProtobuf') {
tasks.register('buildRustRelease', Exec) {
commandLine 'cargo', 'build', '--release'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildRustReleaseStrip', Exec) {
commandLine 'cargo', 'build', '--release', '--strip'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildRust', Exec) {
commandLine 'cargo', 'build'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildWithRust') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package babushka.ffi.resolvers;

import response.ResponseOuterClass.Response;

public class RedisValueResolver {
/**
* Resolve a value received from Redis using given C-style pointer.
*
* @param pointer A memory pointer from {@link Response}
* @return A RESP2 value
*/
public static native Object valueFromPointer(long pointer);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package babushka.ffi.resolvers;

public class BabushkaCoreNativeDefinitions {
public static native String startSocketListenerExternal() throws Exception;
public class SocketListenerResolver {

public static native Object valueFromPointer(long pointer);
/** Make an FFI call to Babushka to open a UDS socket to connect to. */
private static native String startSocketListener() throws Exception;

static {
System.loadLibrary("javababushka");
Expand All @@ -16,7 +16,7 @@ public class BabushkaCoreNativeDefinitions {
*/
public static String getSocket() {
try {
return startSocketListenerExternal();
return startSocketListener();
} catch (Exception | UnsatisfiedLinkError e) {
System.err.printf("Failed to create a UDS connection: %s%n%n", e);
throw new RuntimeException(e);
Expand Down
62 changes: 62 additions & 0 deletions java/client/src/main/java/babushka/managers/ClientState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package babushka.managers;

public class ClientState {

private ClientState() {}

/**
* A read only client state. It is supposed that main Client class will have instance of the state
* of this type and won't be able to change the state directly.
*/
public static interface ReadOnlyClientState {
/** Check that connection established. This doesn't validate whether it is alive. */
boolean isConnected();
}

/** A client state which accepts switching to <em>Connected</em> or <em>Closed</em> states. */
public static interface OpenableAndClosableClientState extends ClosableClientState {
/** Report connection status. */
void connect(boolean successful);
}

/** A client state which accepts only one way switching - to <em>Closed</em> state only. */
public static interface ClosableClientState extends ReadOnlyClientState {
/** Report disconnection. */
void disconnect();
}

private enum InnerStates {
INITIALIZING,
CONNECTED,
CLOSED
}

/**
* Create an instance of {@link ReadOnlyClientState} which can be safely shipped to {@link
* CommandManager} and {@link ConnectionManager}. Only those classes are responsible to switch the
* state.
*/
public static ReadOnlyClientState create() {
return new OpenableAndClosableClientState() {
private InnerStates state = InnerStates.INITIALIZING;

@Override
public boolean isConnected() {
return state == InnerStates.CONNECTED;
}

@Override
public void connect(boolean successful) {
if (state != InnerStates.INITIALIZING) {
throw new IllegalStateException();
}
state = successful ? InnerStates.CONNECTED : InnerStates.CLOSED;
}

@Override
public void disconnect() {
state = InnerStates.CLOSED;
}
};
}
}
88 changes: 88 additions & 0 deletions java/client/src/main/java/babushka/managers/CommandManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package babushka.managers;

import babushka.connectors.handlers.ChannelHandler;
import babushka.ffi.resolvers.RedisValueResolver;
import babushka.models.RequestBuilder;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass.RequestType;
import response.ResponseOuterClass.Response;

@RequiredArgsConstructor
public class CommandManager {

/** UDS connection representation. */
private final ChannelHandler channel;

/**
* Client state, which {@link CommandManager} can flick to closed if corresponding error received.
*/
private final ClientState.ClosableClientState clientState;

/**
* Async (non-blocking) get.<br>
* See <a href="https://redis.io/commands/get/">REDIS docs for GET</a>.
*
* @param key The key name
*/
public CompletableFuture<String> get(String key) {
return submitNewCommand(RequestType.GetString, List.of(key));
}

/**
* Async (non-blocking) set.<br>
* See <a href="https://redis.io/commands/set/">REDIS docs for SET</a>.
*
* @param key The key name
* @param value The value to set
*/
public CompletableFuture<String> set(String key, String value) {
return submitNewCommand(RequestType.SetString, List.of(key, value));
}

/**
* Build a command and submit it Netty to send.
*
* @param command Command type
* @param args Command arguments
* @return A result promise
*/
private CompletableFuture<String> submitNewCommand(RequestType command, List<String> args) {
if (!clientState.isConnected()) {
throw new IllegalStateException("Connection is not open");
}

// TODO this explicitly uses ForkJoin thread pool. May be we should use another one.
return CompletableFuture.supplyAsync(
() -> channel.write(RequestBuilder.prepareRequest(command, args), true))
// TODO: is there a better way to execute this?
.thenComposeAsync(f -> f)
.thenApplyAsync(this::extractValueFromResponse);
}

/**
* Check response and extract data from it.
*
* @param response A response received from Babushka
* @return A String from the Redis RESP2 response, or Ok. Otherwise, returns null
*/
private String extractValueFromResponse(Response response) {
if (response.hasRequestError()) {
// TODO do we need to support different types of exceptions and distinguish them by type?
throw new RuntimeException(
String.format(
"%s: %s",
response.getRequestError().getType(), response.getRequestError().getMessage()));
} else if (response.hasClosingError()) {
CompletableFuture.runAsync(channel::close);
clientState.disconnect();
throw new RuntimeException("Connection closed: " + response.getClosingError());
} else if (response.hasConstantResponse()) {
return response.getConstantResponse().toString();
} else if (response.hasRespPointer()) {
return RedisValueResolver.valueFromPointer(response.getRespPointer()).toString();
}
return null;
}
}
71 changes: 71 additions & 0 deletions java/client/src/main/java/babushka/managers/ConnectionManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package babushka.managers;

import babushka.connectors.handlers.ChannelHandler;
import babushka.ffi.resolvers.RedisValueResolver;
import babushka.models.RequestBuilder;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import response.ResponseOuterClass.ConstantResponse;
import response.ResponseOuterClass.Response;

@RequiredArgsConstructor
public class ConnectionManager {

/** UDS connection representation. */
private final ChannelHandler channel;

/**
* Client state, which {@link ConnectionManager} can flick to closed or to open, according to user
* request or error received.
*/
private final ClientState.OpenableAndClosableClientState clientState;

/**
* Connect to Redis using a ProtoBuf connection request.
*
* @param host Server address
* @param port Server port
* @param useSsl true if communication with the server or cluster should use Transport Level
* Security
* @param clusterMode true if REDIS instance runs in the cluster mode
*/
// TODO support more parameters and/or configuration object
public CompletableFuture<Boolean> connectToRedis(
String host, int port, boolean useSsl, boolean clusterMode) {
ConnectionRequest request =
RequestBuilder.createConnectionRequest(host, port, useSsl, clusterMode);
return channel.connect(request).thenApplyAsync(this::checkBabushkaResponse);
}

/** Check a response received from Babushka. */
private boolean checkBabushkaResponse(Response response) {
// TODO do we need to check callback value? It could be -1 or 0
if (response.hasRequestError()) {
// TODO do we need to support different types of exceptions and distinguish them by type?
throw new RuntimeException(
String.format(
"%s: %s",
response.getRequestError().getType(), response.getRequestError().getMessage()));
} else if (response.hasClosingError()) {
throw new RuntimeException("Connection closed: " + response.getClosingError());
} else if (response.hasConstantResponse()) {
clientState.connect(response.getConstantResponse() == ConstantResponse.OK);
return clientState.isConnected();
} else if (response.hasRespPointer()) {
throw new RuntimeException(
"Unexpected response data: "
+ RedisValueResolver.valueFromPointer(response.getRespPointer()));
}
return false;
}

/**
* Close the connection and the corresponding channel.<br>
* TODO: provide feedback that the connection was properly closed
*/
public CompletableFuture<Void> closeConnection() {
clientState.disconnect();
return CompletableFuture.runAsync(channel::close);
}
}
60 changes: 60 additions & 0 deletions java/client/src/main/java/babushka/models/RequestBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package babushka.models;

import babushka.connectors.handlers.CallbackDispatcher;
import babushka.managers.CommandManager;
import babushka.managers.ConnectionManager;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import connection_request.ConnectionRequestOuterClass.NodeAddress;
import connection_request.ConnectionRequestOuterClass.ReadFrom;
import connection_request.ConnectionRequestOuterClass.TlsMode;
import java.util.List;
import redis_request.RedisRequestOuterClass.Command;
import redis_request.RedisRequestOuterClass.Command.ArgsArray;
import redis_request.RedisRequestOuterClass.RedisRequest;
import redis_request.RedisRequestOuterClass.RequestType;
import redis_request.RedisRequestOuterClass.Routes;
import redis_request.RedisRequestOuterClass.SimpleRoutes;

public class RequestBuilder {

/**
* Build a protobuf connection request.<br>
* Used by {@link ConnectionManager#connectToRedis}.
*/
// TODO support more parameters and/or configuration object
public static ConnectionRequest createConnectionRequest(
String host, int port, boolean useSsl, boolean clusterMode) {
return ConnectionRequest.newBuilder()
.addAddresses(NodeAddress.newBuilder().setHost(host).setPort(port).build())
.setTlsMode(useSsl ? TlsMode.SecureTls : TlsMode.NoTls)
.setClusterModeEnabled(clusterMode)
.setReadFrom(ReadFrom.Primary)
.setDatabaseId(0)
.build();
}

/**
* Build a protobuf command/transaction request draft.<br>
* Used by {@link CommandManager}.
*
* @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by
* adding a callback id.
*/
public static RedisRequest.Builder prepareRequest(RequestType command, List<String> args) {
var commandArgs = ArgsArray.newBuilder();
for (var arg : args) {
commandArgs.addArgs(arg);
}

return RedisRequest.newBuilder()
.setSingleCommand( // set command
Command.newBuilder()
.setRequestType(command) // set command name
.setArgsArray(commandArgs.build()) // set arguments
.build())
.setRoute( // set route
Routes.newBuilder()
.setSimpleRoutes(SimpleRoutes.AllNodes) // set route type
.build());
}
}
6 changes: 2 additions & 4 deletions java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject {
}

#[no_mangle]
pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions_valueFromPointer<
'local,
>(
pub extern "system" fn Java_babushka_ffi_resolvers_RedisValueResolver_valueFromPointer<'local>(
env: JNIEnv<'local>,
_class: JClass<'local>,
pointer: jlong,
Expand All @@ -54,7 +52,7 @@ pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions
}

#[no_mangle]
pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions_startSocketListenerExternal<
pub extern "system" fn Java_babushka_ffi_resolvers_SocketListenerResolver_startSocketListener<
'local,
>(
env: JNIEnv<'local>,
Expand Down

0 comments on commit 3b5d20b

Please sign in to comment.