Skip to content

Commit

Permalink
Add cluster client and routes support for cluster client.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Jan 25, 2024
1 parent 9345634 commit 2639c41
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 40 deletions.
11 changes: 5 additions & 6 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
@AllArgsConstructor
public abstract class BaseClient implements AutoCloseable {

protected ConnectionManager connectionManager;
protected CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
Expand All @@ -22,10 +22,9 @@ public abstract class BaseClient implements AutoCloseable {
* @param response Redis protobuf message
* @return Response Object
*/
protected static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return (new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer)).apply(response);
protected Object handleObjectResponse(Response response) {
// convert protobuf response into Object and then Object into T
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response);
}

/**
Expand Down
59 changes: 59 additions & 0 deletions java/client/src/main/java/glide/api/ClusterClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package glide.api;

import static glide.api.RedisClient.buildChannelHandler;
import static glide.api.RedisClient.buildCommandManager;
import static glide.api.RedisClient.buildConnectionManager;

import glide.api.commands.ClusterBaseCommands;
import glide.api.commands.Command;
import glide.api.models.ClusterValue;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.Route;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Cluster mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class ClusterClient extends BaseClient implements ClusterBaseCommands<ClusterValue<Object>> {

protected ClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Async request for an async (non-blocking) Redis client in Cluster mode.
*
* @param config Redis cluster client Configuration
* @return a Future to connect and return a ClusterClient
*/
public static CompletableFuture<ClusterClient> CreateClient(
RedisClusterClientConfiguration config) {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignored -> new ClusterClient(connectionManager, commandManager));
}

@Override
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(
command, response -> ClusterValue.of(handleObjectResponse(response)));
}

@Override
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(
command, route, response -> ClusterValue.of(handleObjectResponse(response)));
}
}
25 changes: 13 additions & 12 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
* Async (non-blocking) client for Redis in Standalone mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class RedisClient extends BaseClient implements BaseCommands {
public class RedisClient extends BaseClient implements BaseCommands<Object> {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Request an async (non-blocking) Redis client in Standalone mode.
* Async request for an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @param config Redis client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
Expand Down Expand Up @@ -53,10 +57,6 @@ protected static CommandManager buildCommandManager(ChannelHandler channelHandle
return new CommandManager(channelHandler);
}

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
Expand All @@ -73,9 +73,10 @@ protected RedisClient(ConnectionManager connectionManager, CommandManager comman
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
@Override
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseClient::handleObjectResponse);
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseClient::handleObjectResponse);
}
}
10 changes: 7 additions & 3 deletions java/client/src/main/java/glide/api/commands/BaseCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import java.util.concurrent.CompletableFuture;

/** Base Commands interface to handle generic command and transaction requests. */
public interface BaseCommands {
/**
* Base Commands interface to handle generic command and transaction requests.
*
* @param <T> The data return type.
*/
public interface BaseCommands<T> {

/**
* Executes a single command, without checking inputs. Every part of the command, including
Expand All @@ -21,5 +25,5 @@ public interface BaseCommands {
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
CompletableFuture<Object> customCommand(String[] args);
CompletableFuture<T> customCommand(String[] args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package glide.api.commands;

import glide.api.models.configuration.Route;
import java.util.concurrent.CompletableFuture;

/**
* Base Commands interface to handle generic command and transaction requests with routing options.
*
* @param <T> The data return type.
*/
public interface ClusterBaseCommands<T> extends BaseCommands<T> {

/**
* Executes a single custom command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @param args command and arguments for the custom command call
* @param route node routing configuration for the command
* @return CompletableFuture with the response
*/
CompletableFuture<T> customCommand(String[] args, Route route);
}
44 changes: 44 additions & 0 deletions java/client/src/main/java/glide/api/models/ClusterValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package glide.api.models;

import java.util.Map;

/**
* A union-like type which can store single or bulk value retrieved from Redis.
*
* @param <T> The wrapped data type
*/
public class ClusterValue<T> {
/** Get per-node value. */
private Map<String, T> multiValue = null;

/** Get the single value. */
private T singleValue = null;

private ClusterValue() {}

public Map<String, T> getMultiValue() {
assert hasMultiData();
return multiValue;
}

public T getSingleValue() {
assert !hasMultiData();
return singleValue;
}

@SuppressWarnings("unchecked")
public static <T> ClusterValue<T> of(Object data) {
var res = new ClusterValue<T>();
if (data instanceof Map) {
res.multiValue = (Map<String, T>) data;
} else {
res.singleValue = (T) data;
}
return res;
}

/** Get the value type. Use it prior to accessing the data. */
public boolean hasMultiData() {
return multiValue != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package glide.api.models.configuration;

import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/** Request routing configuration. */
@Getter
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class Route {

public enum RouteType {
/** Route request to all nodes. */
ALL_NODES,
/** Route request to all primary nodes. */
ALL_PRIMARIES,
/** Route request to a random node. */
RANDOM,
/** Route request to the primary node that contains the slot with the given id. */
PRIMARY_SLOT_ID,
/** Route request to the replica node that contains the slot with the given id. */
REPLICA_SLOT_ID,
/** Route request to the primary node that contains the slot that the given key matches. */
PRIMARY_SLOT_KEY,
/** Route request to the replica node that contains the slot that the given key matches. */
REPLICA_SLOT_KEY,
}

/**
* Request routing configuration overrides the {@link ReadFrom} connection configuration.<br>
* If {@link RouteType#REPLICA_SLOT_ID} or {@link RouteType#REPLICA_SLOT_KEY} is used, the request
* will be routed to a replica, even if the strategy is {@link ReadFrom#PRIMARY}.
*/
private final RouteType routeType;

/**
* Slot number. There are 16384 slots in a redis cluster, and each shard manages a slot range.
* Unless the slot is known, it's better to route using {@link RouteType#PRIMARY_SLOT_KEY} or
* {@link RouteType#REPLICA_SLOT_KEY}.<br>
* Could be used with {@link RouteType#PRIMARY_SLOT_ID} or {@link RouteType#REPLICA_SLOT_ID} only.
*/
private final int slotId;

/**
* The request will be sent to nodes managing this key.<br>
* Could be used with {@link RouteType#PRIMARY_SLOT_KEY} or {@link RouteType#REPLICA_SLOT_KEY}
* only.
*/
private final String slotKey;

@RequiredArgsConstructor
public static class Builder {
private final RouteType routeType;
private int slotId;
private boolean slotIdSet = false;
private String slotKey;
private boolean slotKeySet = false;

public Builder setSlotId(int slotId) {
if (!(routeType == RouteType.PRIMARY_SLOT_ID || routeType == RouteType.REPLICA_SLOT_ID)) {
throw new IllegalArgumentException(
"Slot ID could be set for corresponding types of route only");
}
this.slotId = slotId;
slotIdSet = true;
return this;
}

public Builder setSlotKey(String slotKey) {
if (!(routeType == RouteType.PRIMARY_SLOT_KEY || routeType == RouteType.REPLICA_SLOT_KEY)) {
throw new IllegalArgumentException(
"Slot key could be set for corresponding types of route only");
}
this.slotKey = slotKey;
slotKeySet = true;
return this;
}

public Route build() {
if ((routeType == RouteType.PRIMARY_SLOT_ID || routeType == RouteType.REPLICA_SLOT_ID)
&& !slotIdSet) {
throw new IllegalArgumentException("Slot ID is missing");
}
if ((routeType == RouteType.PRIMARY_SLOT_KEY || routeType == RouteType.REPLICA_SLOT_KEY)
&& !slotKeySet) {
throw new IllegalArgumentException("Slot key is missing");
}

return new Route(routeType, slotId, slotKey);
}
}
}
Loading

0 comments on commit 2639c41

Please sign in to comment.