Skip to content

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Jan 20, 2024
1 parent eec4bc3 commit b7f9bbd
Show file tree
Hide file tree
Showing 16 changed files with 277 additions and 395 deletions.
23 changes: 20 additions & 3 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package glide.api;

import glide.ffi.resolvers.RedisValueResolver;
import glide.managers.BaseCommandResponseResolver;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import lombok.AllArgsConstructor;
import lombok.Getter;
import response.ResponseOuterClass.Response;

/**
* Base Client class for Redis
Expand All @@ -14,8 +17,22 @@
@AllArgsConstructor
public abstract class BaseClient<T> implements AutoCloseable {

protected ConnectionManager connectionManager;
protected CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;
protected final Function<Object, T> dataConverter;

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
* appropriate response has an Object
*
* @param response Redis protobuf message
* @return Response Object
*/
protected T handleObjectResponse(Response response) {
// convert protobuf response into Object and then Object into T
return dataConverter.apply(
new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response));
}

/**
* Closes this resource, relinquishing any underlying resources. This method is invoked
Expand Down
68 changes: 38 additions & 30 deletions java/client/src/main/java/glide/api/ClusterClient.java
Original file line number Diff line number Diff line change
@@ -1,50 +1,58 @@
package glide.api;

import static glide.api.RedisClient.buildChannelHandler;
import static glide.api.RedisClient.buildCommandManager;
import static glide.api.RedisClient.buildConnectionManager;

import glide.api.commands.ClusterBaseCommands;
import glide.api.commands.Command;
import glide.api.models.RedisValue;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.Route;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.ClusterCommandManager;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Cluster mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class ClusterClient extends BaseClient<RedisValue> {
public class ClusterClient extends BaseClient<RedisValue>
implements ClusterBaseCommands<RedisValue> {

protected ClusterClient(
ConnectionManager connectionManager, ClusterCommandManager<RedisValue> commandManager) {
super(connectionManager, commandManager);
}
protected ClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager, RedisValue::of);
}

/**
* Request an async (non-blocking) Redis client in Cluster mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<ClusterClient> CreateClient(
RedisClusterClientConfiguration config) {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
ClusterCommandManager<RedisValue> commandManager = buildClusterCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApplyAsync(ignored -> new ClusterClient(connectionManager, commandManager));
}
/**
* Async request for an async (non-blocking) Redis client in Cluster mode.
*
* @param config Redis cluster client Configuration
* @return a Future to connect and return a ClusterClient
*/
public static CompletableFuture<ClusterClient> CreateClient(
RedisClusterClientConfiguration config) {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignored -> new ClusterClient(connectionManager, commandManager));
}

protected static ClusterCommandManager<RedisValue> buildClusterCommandManager(
ChannelHandler channelHandler) {
return new ClusterCommandManager<>(channelHandler, RedisValue::of);
}
@Override
public CompletableFuture<RedisValue> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, this::handleObjectResponse);
}

@Override
public ClusterCommandManager<RedisValue> getCommandManager() {
return (ClusterCommandManager<RedisValue>) commandManager;
}
@Override
public CompletableFuture<RedisValue> customCommand(String[] args, Route route) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, this::handleObjectResponse, route);
}
}
39 changes: 19 additions & 20 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
* Async (non-blocking) client for Redis in Standalone mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class RedisClient extends BaseClient implements BaseCommands {
public class RedisClient extends BaseClient<Object> implements BaseCommands<Object> {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager, obj -> obj);
}

/**
* Request an async (non-blocking) Redis client in Standalone mode.
* Async request for an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @param config Redis client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
Expand All @@ -30,31 +34,26 @@ public static CompletableFuture<RedisClient> CreateClient(RedisClientConfigurati
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
.thenApply(ignored -> new RedisClient(connectionManager, commandManager));
}

protected static CommandManager buildCommandManager(ChannelHandler channelHandler) {
return new CommandManager(channelHandler);
protected static ChannelHandler buildChannelHandler() {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket());
}

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) {
return new ConnectionManager(channelHandler);
}

/**
* Executes a single custom command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @param args command and arguments for the custom command call
* @return CompletableFuture with the response
*/
protected static CommandManager buildCommandManager(ChannelHandler channelHandler) {
return new CommandManager(channelHandler);
}

@Override
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseCommands::handleObjectResponse);
return commandManager.submitNewCommand(command, this::handleObjectResponse);
}

protected static CommandManager<String> buildCommandManager2(ChannelHandler channelHandler) {
return new CommandManager<>(channelHandler, Object::toString);
}
}
33 changes: 11 additions & 22 deletions java/client/src/main/java/glide/api/commands/BaseCommands.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,20 @@
package glide.api.commands;

import glide.ffi.resolvers.RedisValueResolver;
import glide.managers.BaseCommandResponseResolver;
import java.util.concurrent.CompletableFuture;
import response.ResponseOuterClass.Response;

/** Base Commands interface to handle generic command and transaction requests. */
public interface BaseCommands {
/**
* Base Commands interface to handle generic command and transaction requests.
*
* @param <T> The data return type.
*/
public interface BaseCommands<T> {

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
* appropriate response has an Object
* Executes a single custom command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @param response Redis protobuf message
* @return Response Object
* @param args command and arguments for the custom command call
* @return CompletableFuture with the response
*/
static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return (new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer)).apply(response);
}

/**
* Execute a @see{Command} by sending command via socket manager
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
CompletableFuture<Object> customCommand(String[] args);
CompletableFuture<T> customCommand(String[] args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package glide.api.commands;

import glide.api.models.configuration.Route;
import java.util.concurrent.CompletableFuture;

/**
* Base Commands interface to handle generic command and transaction requests with routing options.
*
* @param <T> The data return type.
*/
public interface ClusterBaseCommands<T> extends BaseCommands<T> {

/**
* Executes a single custom command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @param args command and arguments for the custom command call
* @param route node routing configuration for the command
* @return CompletableFuture with the response
*/
CompletableFuture<T> customCommand(String[] args, Route route);
}
35 changes: 19 additions & 16 deletions java/client/src/main/java/glide/api/models/RedisValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@
/** A union-like type which can store single or bulk value retrieved from Redis. */
@Getter
public class RedisValue {
private Map<String, String> bulkValue = null;
private String singleValue = null;
/** Get per-node value. */
private Map<String, Object> multiValue = null;

private RedisValue() {}
/** Get the single value. */
private Object singleValue = null;

@SuppressWarnings("unchecked")
public static RedisValue of(Object data) {
var res = new RedisValue();
if (data instanceof Map) {
res.bulkValue = (Map<String, String>) data;
} else {
res.singleValue = data.toString();
private RedisValue() {}

@SuppressWarnings("unchecked")
public static RedisValue of(Object data) {
var res = new RedisValue();
if (data instanceof Map) {
res.multiValue = (Map<String, Object>) data;
} else {
res.singleValue = data;
}
return res;
}
return res;
}

/** Get the value type. Use it prior to accessing the data. */
public boolean hasBulkData() {
return bulkValue != null;
}
/** Get the value type. Use it prior to accessing the data. */
public boolean hasMultiData() {
return multiValue != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package glide.api.models.configuration;

import lombok.Builder;
import lombok.Getter;

/** Request routing configuration. */
@Builder
@Getter
public class Route {

public enum RouteType {
/** Route request to all nodes. */
AllNodes,
/** Route request to all primary nodes. */
AllPrimaries,
/** Route request to a random node. */
Random,
/** Route request to the primary node that contains the slot with the given id. */
PrimarySlotId,
/** Route request to the replica node that contains the slot with the given id. */
ReplicaSlotId,
/** Route request to the primary node that contains the slot that the given key matches. */
PrimarySlotKey,
/** Route request to the replica node that contains the slot that the given key matches. */
ReplicaSlotKey,
}

/**
* Request routing configuration overrides the {@link ReadFrom} connection configuration.<br>
* If {@link RouteType#ReplicaSlotId} or {@link RouteType#ReplicaSlotKey} is used, the request
* will be routed to a replica, even if the strategy is {@link ReadFrom#PRIMARY}.
*/
private final RouteType routeType;

/**
* Slot number. There are 16384 slots in a redis cluster, and each shard manages a slot range.
* Unless the slot is known, it's better to route using {@link RouteType#PrimarySlotKey} or {@link
* RouteType#ReplicaSlotKey}.<br>
* Could be used with {@link RouteType#PrimarySlotId} or {@link RouteType#ReplicaSlotId} only.
*/
private final int slotId;

/**
* The request will be sent to nodes managing this key.<br>
* Could be used with {@link RouteType#PrimarySlotKey} or {@link RouteType#ReplicaSlotKey} only.
*/
private final String slotKey;
}

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit b7f9bbd

Please sign in to comment.