Skip to content

Commit

Permalink
Add java cluster client, request routes configuration and support for…
Browse files Browse the repository at this point in the history
… bulk response (valkey-io#864)

* Add cluster client, request routes configuration and support for bulk response (#59)


Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Feb 1, 2024
1 parent 07e6d43 commit 39335cb
Show file tree
Hide file tree
Showing 16 changed files with 784 additions and 101 deletions.
58 changes: 52 additions & 6 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api;

import static glide.ffi.resolvers.SocketListenerResolver.getSocket;

import glide.api.models.configuration.BaseClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.ffi.resolvers.RedisValueResolver;
import glide.managers.BaseCommandResponseResolver;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import lombok.AllArgsConstructor;
import response.ResponseOuterClass.Response;

/** Base Client class for Redis */
@AllArgsConstructor
public abstract class BaseClient implements AutoCloseable {

protected ConnectionManager connectionManager;
protected CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
Expand All @@ -23,10 +30,36 @@ public abstract class BaseClient implements AutoCloseable {
* @param response Redis protobuf message
* @return Response Object
*/
protected static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return (new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer)).apply(response);
protected Object handleObjectResponse(Response response) {
// convert protobuf response into Object and then Object into T
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response);
}

/**
* Async request for an async (non-blocking) Redis client.
*
* @param config Redis client Configuration
* @param constructor Redis client constructor reference
* @param <T> Client type
* @return a Future to connect and return a RedisClient
*/
protected static <T> CompletableFuture<T> CreateClient(
BaseClientConfiguration config,
BiFunction<ConnectionManager, CommandManager, T> constructor) {
try {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> constructor.apply(connectionManager, commandManager));
} catch (InterruptedException e) {
// Something bad happened while we were establishing netty connection to UDS
var future = new CompletableFuture<T>();
future.completeExceptionally(e);
return future;
}
}

/**
Expand All @@ -45,4 +78,17 @@ public void close() throws ExecutionException {
throw new RuntimeException(e);
}
}

protected static ChannelHandler buildChannelHandler() throws InterruptedException {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket());
}

protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) {
return new ConnectionManager(channelHandler);
}

protected static CommandManager buildCommandManager(ChannelHandler channelHandler) {
return new CommandManager(channelHandler);
}
}
67 changes: 11 additions & 56 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
@@ -1,82 +1,37 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api;

import static glide.ffi.resolvers.SocketListenerResolver.getSocket;

import glide.api.commands.BaseCommands;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
* Async (non-blocking) client for Redis in Standalone mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class RedisClient extends BaseClient implements BaseCommands {

/**
* Request an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
try {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
} catch (InterruptedException e) {
// Something bad happened while we were establishing netty connection to UDS
var future = new CompletableFuture<RedisClient>();
future.completeExceptionally(e);
return future;
}
}

protected static ChannelHandler buildChannelHandler() throws InterruptedException {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket());
}

protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) {
return new ConnectionManager(channelHandler);
}

protected static CommandManager buildCommandManager(ChannelHandler channelHandler) {
return new CommandManager(channelHandler);
}

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as SUBSCRIBE), or that return potentially more than a single response
* (such as XREAD), or that change the client's behavior (such as entering pub/sub mode on
* RESP2 connections) shouldn't be called using this function.
* @example Returns a list of all pub/sub clients:
* <pre>
* Object result = client.customCommand(new String[]{"CLIENT","LIST","TYPE", "PUBSUB"}).get();
* </pre>
* Async request for an async (non-blocking) Redis client in Standalone mode.
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
* @param config Redis client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
return CreateClient(config, RedisClient::new);
}

@Override
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseClient::handleObjectResponse);
return commandManager.submitNewCommand(command, this::handleObjectResponse);
}
}
61 changes: 61 additions & 0 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api;

import glide.api.commands.ClusterBaseCommands;
import glide.api.models.ClusterValue;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Cluster mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class RedisClusterClient extends BaseClient implements ClusterBaseCommands {

protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Async request for an async (non-blocking) Redis client in Cluster mode.
*
* @param config Redis cluster client Configuration
* @return a Future to connect and return a ClusterClient
*/
public static CompletableFuture<RedisClusterClient> CreateClient(
RedisClusterClientConfiguration config) {
return CreateClient(config, RedisClusterClient::new);
}

@Override
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
// TODO if a command returns a map as a single value, ClusterValue misleads user
return commandManager.submitNewCommand(
command, response -> ClusterValue.of(handleObjectResponse(response)));
}

@Override
@SuppressWarnings("unchecked")
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route) {
Command command =
Command.builder()
.requestType(Command.RequestType.CUSTOM_COMMAND)
.arguments(args)
.route(route)
.build();

return commandManager.submitNewCommand(
command,
response ->
route.isSingleNodeRoute()
? ClusterValue.ofSingleValue(handleObjectResponse(response))
: ClusterValue.ofMultiValue((Map<String, Object>) handleObjectResponse(response)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.ClusterValue;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import java.util.concurrent.CompletableFuture;

/**
* Base Commands interface to handle generic command and transaction requests with routing options.
*/
public interface ClusterBaseCommands {

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in {@code args}.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as <em>SUBSCRIBE</em>), or that return potentially more than a single
* response (such as <em>XREAD</em>), or that change the client's behavior (such as entering
* <em>pub</em>/<em>sub</em> mode on <em>RESP2</em> connections) shouldn't be called using
* this function.
* @example Returns a list of all <em>pub</em>/<em>sub</em> clients:
* <p><code>
* Object result = client.customCommand(new String[]{ "CLIENT", "LIST", "TYPE", "PUBSUB" }).get();
* </code>
* @param args Arguments for the custom command including the command name
* @return A <em>CompletableFuture</em> with response result from Redis
*/
CompletableFuture<ClusterValue<Object>> customCommand(String[] args);

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in {@code args}.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as <em>SUBSCRIBE</em>), or that return potentially more than a single
* response (such as <em>XREAD</em>), or that change the client's behavior (such as entering
* <em>pub</em>/<em>sub</em> mode on <em>RESP2</em> connections) shouldn't be called using
* this function.
* @example Returns a list of all <em>pub</em>/<em>sub</em> clients:
* <p><code>
* Object result = client.customCommand(new String[]{ "CLIENT", "LIST", "TYPE", "PUBSUB" }).get();
* </code>
* @param args Arguments for the custom command including the command name
* @param route Routing configuration for the command
* @return A <em>CompletableFuture</em> with response result from Redis
*/
CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route);
}
73 changes: 73 additions & 0 deletions java/client/src/main/java/glide/api/models/ClusterValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models;

import java.util.Map;

/**
* union-like type which can store single-value or multi-value retrieved from Redis. The
* multi-value, if defined, contains the routed value as a Map<String, Object> containing a cluster
* node address to cluster node value.
*
* @param <T> The wrapped data type
*/
public class ClusterValue<T> {
private Map<String, T> multiValue = null;

private T singleValue = null;

private ClusterValue() {}

/**
* Get per-node value.<br>
* Check with {@link #hasMultiData()} prior to accessing the data.
*/
public Map<String, T> getMultiValue() {
assert hasMultiData() : "No multi value stored";
return multiValue;
}

/**
* Get the single value.<br>
* Check with {@link #hasSingleData()} ()} prior to accessing the data.
*/
public T getSingleValue() {
assert hasSingleData() : "No single value stored";
return singleValue;
}

/** A constructor for the value with type auto-detection. */
@SuppressWarnings("unchecked")
public static <T> ClusterValue<T> of(Object data) {
var res = new ClusterValue<T>();
if (data instanceof Map) {
res.multiValue = (Map<String, T>) data;
} else {
res.singleValue = (T) data;
}
return res;
}

/** A constructor for the value. */
public static <T> ClusterValue<T> ofSingleValue(T data) {
var res = new ClusterValue<T>();
res.singleValue = data;
return res;
}

/** A constructor for the value. */
public static <T> ClusterValue<T> ofMultiValue(Map<String, T> data) {
var res = new ClusterValue<T>();
res.multiValue = data;
return res;
}

/** Check that multi-value is stored in this object. Use it prior to accessing the data. */
public boolean hasMultiData() {
return multiValue != null;
}

/** Check that single-value is stored in this object. Use it prior to accessing the data. */
public boolean hasSingleData() {
return !hasMultiData();
}
}
Loading

0 comments on commit 39335cb

Please sign in to comment.