Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster client, request routes configuration and support for bulk response #59

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
@AllArgsConstructor
public abstract class BaseClient implements AutoCloseable {

protected ConnectionManager connectionManager;
protected CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
Expand All @@ -22,10 +22,9 @@ public abstract class BaseClient implements AutoCloseable {
* @param response Redis protobuf message
* @return Response Object
*/
protected static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return (new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer)).apply(response);
protected Object handleObjectResponse(Response response) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not opposed, but why not static?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea

// convert protobuf response into Object and then Object into T
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response);
}

/**
Expand Down
35 changes: 10 additions & 25 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
* Async (non-blocking) client for Redis in Standalone mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class RedisClient extends BaseClient implements BaseCommands {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Request an async (non-blocking) Redis client in Standalone mode.
* Async request for an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @param config Redis client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
Expand Down Expand Up @@ -53,29 +57,10 @@ protected static CommandManager buildCommandManager(ChannelHandler channelHandle
return new CommandManager(channelHandler);
}

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as SUBSCRIBE), or that return potentially more than a single response
* (such as XREAD), or that change the client's behavior (such as entering pub/sub mode on
* RESP2 connections) shouldn't be called using this function.
* @example Returns a list of all pub/sub clients:
* <pre>
* Object result = client.customCommand(new String[]{"CLIENT","LIST","TYPE", "PUBSUB"}).get();
* </pre>
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
@Override
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseClient::handleObjectResponse);
return commandManager.submitNewCommand(command, this::handleObjectResponse);
}
}
66 changes: 66 additions & 0 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package glide.api;

import static glide.api.RedisClient.buildChannelHandler;
import static glide.api.RedisClient.buildCommandManager;
import static glide.api.RedisClient.buildConnectionManager;

import glide.api.commands.ClusterBaseCommands;
import glide.api.models.ClusterValue;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.Route;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import glide.managers.models.Command;
import java.util.concurrent.CompletableFuture;

/**
* Async (non-blocking) client for Redis in Cluster mode. Use {@link #CreateClient} to request a
* client to Redis.
*/
public class RedisClusterClient extends BaseClient implements ClusterBaseCommands {

protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Async request for an async (non-blocking) Redis client in Cluster mode.
*
* @param config Redis cluster client Configuration
* @return a Future to connect and return a ClusterClient
*/
public static CompletableFuture<RedisClusterClient> CreateClient(
RedisClusterClientConfiguration config) {
try {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignored -> new RedisClusterClient(connectionManager, commandManager));
} catch (InterruptedException e) {
// Something bad happened while we were establishing netty connection to UDS
var future = new CompletableFuture<RedisClusterClient>();
future.completeExceptionally(e);
return future;
}
}

@Override
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(
command, response -> ClusterValue.of(handleObjectResponse(response)));
}

@Override
public CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(
command, route, response -> ClusterValue.of(handleObjectResponse(response)));
}
}
22 changes: 11 additions & 11 deletions java/client/src/main/java/glide/api/commands/BaseCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ public interface BaseCommands {

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
* subcommands, should be added as a separate value in {@code args}.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as SUBSCRIBE), or that return potentially more than a single response
* (such as XREAD), or that change the client's behavior (such as entering pub/sub mode on
* RESP2 connections) shouldn't be called using this function.
* @example Returns a list of all pub/sub clients:
* <pre>
* Object result = client.customCommand(new String[]{"CLIENT","LIST","TYPE", "PUBSUB"}).get();
* </pre>
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
* return response (such as <em>SUBSCRIBE</em>), or that return potentially more than a single
* response (such as <em>XREAD</em>), or that change the client's behavior (such as entering
* <em>pub</em>/<em>sub</em> mode on <em>RESP2</em> connections) shouldn't be called using
* this function.
* @example Returns a list of all <em>pub</em>/<em>sub</em> clients:
* <p><code>
* Object result = client.customCommand(new String[]{ "CLIENT", "LIST", "TYPE", "PUBSUB" }).get();
* </code>
* @param args Arguments for the custom command including the command name
* @return A <em>CompletableFuture</em> with response result from Redis
*/
CompletableFuture<Object> customCommand(String[] args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package glide.api.commands;

import glide.api.models.ClusterValue;
import glide.api.models.configuration.Route;
import java.util.concurrent.CompletableFuture;

/**
* Base Commands interface to handle generic command and transaction requests with routing options.
*/
public interface ClusterBaseCommands {

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in {@code args}.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as <em>SUBSCRIBE</em>), or that return potentially more than a single
* response (such as <em>XREAD</em>), or that change the client's behavior (such as entering
* <em>pub</em>/<em>sub</em> mode on <em>RESP2</em> connections) shouldn't be called using
* this function.
* @example Returns a list of all <em>pub</em>/<em>sub</em> clients:
* <p><code>
* Object result = client.customCommand(new String[]{ "CLIENT", "LIST", "TYPE", "PUBSUB" }).get();
* </code>
* @param args Arguments for the custom command including the command name
* @return A <em>CompletableFuture</em> with response result from Redis
*/
CompletableFuture<ClusterValue<Object>> customCommand(String[] args);

/**
* Executes a single command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in {@code args}.
*
* @remarks This function should only be used for single-response commands. Commands that don't
* return response (such as <em>SUBSCRIBE</em>), or that return potentially more than a single
* response (such as <em>XREAD</em>), or that change the client's behavior (such as entering
* <em>pub</em>/<em>sub</em> mode on <em>RESP2</em> connections) shouldn't be called using
* this function.
* @example Returns a list of all <em>pub</em>/<em>sub</em> clients:
* <p><code>
* Object result = client.customCommand(new String[]{ "CLIENT", "LIST", "TYPE", "PUBSUB" }).get();
* </code>
* @param args Arguments for the custom command including the command name
* @param route Routing configuration for the command
* @return A <em>CompletableFuture</em> with response result from Redis
*/
CompletableFuture<ClusterValue<Object>> customCommand(String[] args, Route route);
}
44 changes: 44 additions & 0 deletions java/client/src/main/java/glide/api/models/ClusterValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package glide.api.models;

import java.util.Map;

/**
* A union-like type which can store single or bulk value retrieved from Redis.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* A union-like type which can store single or bulk value retrieved from Redis.
* A union-like type which can store single-value or multi-value retrieved from Redis. The multi-value, if defined, contains the routed value as a Map<String, Object> containing a cluster node address to cluster node value.

*
* @param <T> The wrapped data type
*/
public class ClusterValue<T> {
private Map<String, T> multiValue = null;

private T singleValue = null;

private ClusterValue() {}

/** Get per-node value. */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need a more explicit javadoc if we're going to expose this to the user.

Please mention that hasMultiValue() should be called first to avoid exceptions

public Map<String, T> getMultiValue() {
assert hasMultiData();
return multiValue;
}

/** Get the single value. */
public T getSingleValue() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc - please mention that hasSingleValue() should be called first to avoid exceptions

assert !hasMultiData();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it worth creating a hasSingleData() method?

return singleValue;
}

@SuppressWarnings("unchecked")
public static <T> ClusterValue<T> of(Object data) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

var res = new ClusterValue<T>();
if (data instanceof Map) {
res.multiValue = (Map<String, T>) data;
} else {
res.singleValue = (T) data;
}
return res;
}

/** Get the value type. Use it prior to accessing the data. */
public boolean hasMultiData() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc - please be more specific since this is exposed to the user.

return multiValue != null;
}
}
126 changes: 126 additions & 0 deletions java/client/src/main/java/glide/api/models/configuration/Route.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package glide.api.models.configuration;

import java.util.Optional;
import lombok.Builder;
import lombok.Getter;

/** Request routing configuration. */
public class Route {

public enum RouteType {
/** Route request to all nodes. */
ALL_NODES,
/** Route request to all primary nodes. */
ALL_PRIMARIES,
/** Route request to a random node. */
RANDOM,
/** Route request to the primary node that contains the slot with the given id. */
PRIMARY_SLOT_ID,
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
/** Route request to the replica node that contains the slot with the given id. */
REPLICA_SLOT_ID,
/** Route request to the primary node that contains the slot that the given key matches. */
PRIMARY_SLOT_KEY,
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
/** Route request to the replica node that contains the slot that the given key matches. */
REPLICA_SLOT_KEY,
}

@Getter private final RouteType routeType;

private final Optional<Integer> slotId;

private final Optional<String> slotKey;

public Integer getSlotId() {
assert slotId.isPresent();
return slotId.get();
}

public String getSlotKey() {
assert slotKey.isPresent();
return slotKey.get();
}

private Route(RouteType routeType, Integer slotId) {
this.routeType = routeType;
this.slotId = Optional.of(slotId);
this.slotKey = Optional.empty();
}

private Route(RouteType routeType, String slotKey) {
this.routeType = routeType;
this.slotId = Optional.empty();
this.slotKey = Optional.of(slotKey);
}

private Route(RouteType routeType) {
this.routeType = routeType;
this.slotId = Optional.empty();
this.slotKey = Optional.empty();
}

public static class Builder {
private final RouteType routeType;
private int slotId;
private boolean slotIdSet = false;
private String slotKey;
private boolean slotKeySet = false;

/**
* Request routing configuration overrides the {@link ReadFrom} connection configuration.<br>
* If {@link RouteType#REPLICA_SLOT_ID} or {@link RouteType#REPLICA_SLOT_KEY} is used, the
* request will be routed to a replica, even if the strategy is {@link ReadFrom#PRIMARY}.
*/
public Builder(RouteType routeType) {
this.routeType = routeType;
}

/**
* Slot number. There are 16384 slots in a redis cluster, and each shard manages a slot range.
* Unless the slot is known, it's better to route using {@link RouteType#PRIMARY_SLOT_KEY} or
* {@link RouteType#REPLICA_SLOT_KEY}.<br>
* Could be used with {@link RouteType#PRIMARY_SLOT_ID} or {@link RouteType#REPLICA_SLOT_ID}
* only.
*/
public Builder setSlotId(int slotId) {
if (!(routeType == RouteType.PRIMARY_SLOT_ID || routeType == RouteType.REPLICA_SLOT_ID)) {
throw new IllegalArgumentException(
"Slot ID could be set for corresponding types of route only");
}
this.slotId = slotId;
slotIdSet = true;
return this;
}

/**
* The request will be sent to nodes managing this key.<br>
* Could be used with {@link RouteType#PRIMARY_SLOT_KEY} or {@link RouteType#REPLICA_SLOT_KEY}
* only.
*/
public Builder setSlotKey(String slotKey) {
if (!(routeType == RouteType.PRIMARY_SLOT_KEY || routeType == RouteType.REPLICA_SLOT_KEY)) {
throw new IllegalArgumentException(
"Slot key could be set for corresponding types of route only");
}
this.slotKey = slotKey;
slotKeySet = true;
return this;
}

public Route build() {
if (routeType == RouteType.PRIMARY_SLOT_ID || routeType == RouteType.REPLICA_SLOT_ID) {
if (!slotIdSet) {
throw new IllegalArgumentException("Slot ID is missing");
}
return new Route(routeType, slotId);
}
if (routeType == RouteType.PRIMARY_SLOT_KEY || routeType == RouteType.REPLICA_SLOT_KEY) {
if (!slotKeySet) {
throw new IllegalArgumentException("Slot key is missing");
}
return new Route(routeType, slotKey);
}

return new Route(routeType);
}
}
}
Loading
Loading