Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Pub & Sub = <3 #1662

Merged
merged 13 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 126 additions & 21 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.PfAdd;
import static redis_request.RedisRequestOuterClass.RequestType.PfCount;
import static redis_request.RedisRequestOuterClass.RequestType.PfMerge;
import static redis_request.RedisRequestOuterClass.RequestType.Publish;
import static redis_request.RedisRequestOuterClass.RequestType.RPop;
import static redis_request.RedisRequestOuterClass.RequestType.RPush;
import static redis_request.RedisRequestOuterClass.RequestType.RPushX;
Expand Down Expand Up @@ -180,13 +181,15 @@
import glide.api.commands.HashBaseCommands;
import glide.api.commands.HyperLogLogBaseCommands;
import glide.api.commands.ListBaseCommands;
import glide.api.commands.PubSubBaseCommands;
import glide.api.commands.ScriptingAndFunctionsBaseCommands;
import glide.api.commands.SetBaseCommands;
import glide.api.commands.SortedSetBaseCommands;
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringBaseCommands;
import glide.api.commands.TransactionsBaseCommands;
import glide.api.models.GlideString;
import glide.api.models.PubSubMessage;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.GetExOptions;
Expand Down Expand Up @@ -227,33 +230,38 @@
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.configuration.BaseSubscriptionConfiguration;
import glide.api.models.exceptions.ConfigurationError;
import glide.api.models.exceptions.RedisException;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.connectors.handlers.MessageHandler;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolResource;
import glide.connectors.resources.ThreadPoolResourceAllocator;
import glide.ffi.resolvers.RedisValueResolver;
import glide.managers.BaseCommandResponseResolver;
import glide.managers.BaseResponseResolver;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.NotImplementedException;
import response.ResponseOuterClass.ConstantResponse;
import response.ResponseOuterClass.Response;

/** Base Client class for Redis */
@AllArgsConstructor
public abstract class BaseClient
implements AutoCloseable,
BitmapBaseCommands,
Expand All @@ -267,13 +275,41 @@ public abstract class BaseClient
HyperLogLogBaseCommands,
GeospatialIndicesBaseCommands,
ScriptingAndFunctionsBaseCommands,
TransactionsBaseCommands {
TransactionsBaseCommands,
PubSubBaseCommands {

/** Redis simple string response with "OK" */
public static final String OK = ConstantResponse.OK.toString();

protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final ConcurrentLinkedDeque<PubSubMessage> messageQueue;
protected final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;

/** Helper which extracts data from received {@link Response}s from GLIDE. */
private static final BaseResponseResolver responseResolver =
new BaseResponseResolver(RedisValueResolver::valueFromPointer);

/** Helper which extracts data with binary strings from received {@link Response}s from GLIDE. */
private static final BaseResponseResolver binaryResponseResolver =
new BaseResponseResolver(RedisValueResolver::valueFromPointerBinary);

/** A constructor. */
protected BaseClient(ClientBuilder builder) {
this.connectionManager = builder.connectionManager;
this.commandManager = builder.commandManager;
this.messageQueue = builder.messageQueue;
this.subscriptionConfiguration = builder.subscriptionConfiguration;
}

/** Auxiliary builder which wraps all fields to be initialized in the constructor. */
@RequiredArgsConstructor
protected static class ClientBuilder {
private final ConnectionManager connectionManager;
private final CommandManager commandManager;
private final ConcurrentLinkedDeque<PubSubMessage> messageQueue;
private final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;
}

/**
* Async request for an async (non-blocking) Redis client.
Expand All @@ -283,31 +319,81 @@ public abstract class BaseClient
* @param <T> Client type.
* @return a Future to connect and return a RedisClient.
*/
protected static <T> CompletableFuture<T> CreateClient(
BaseClientConfiguration config,
BiFunction<ConnectionManager, CommandManager, T> constructor) {
protected static <T extends BaseClient> CompletableFuture<T> CreateClient(
@NonNull BaseClientConfiguration config, Function<ClientBuilder, T> constructor) {
try {
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource =
ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier());
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
MessageHandler messageHandler = buildMessageHandler(config);
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource, messageHandler);
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> constructor.apply(connectionManager, commandManager));
.thenApply(
ignored ->
constructor.apply(
new ClientBuilder(
connectionManager,
commandManager,
messageHandler.getQueue(),
Optional.ofNullable(config.getSubscriptionConfiguration()))));
} catch (InterruptedException e) {
// Something bad happened while we were establishing netty connection to
// UDS
// Something bad happened while we were establishing netty connection to UDS
var future = new CompletableFuture<T>();
future.completeExceptionally(e);
return future;
}
}

/**
* Tries to return a next pubsub message.
*
* @throws ConfigurationError If client is not subscribed to any channel or if client configured
* with a callback.
* @return A message if any or <code>null</code> if there are no unread messages.
*/
public PubSubMessage tryGetPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
throw new ConfigurationError(
"The operation will never complete since there was no pubsub subscriptions applied to the"
+ " client.");
}
if (subscriptionConfiguration.get().getCallback().isPresent()) {
throw new ConfigurationError(
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
return messageQueue.poll();
}

/**
* Returns a promise for a next pubsub message.
*
* @apiNote <b>Not implemented!</b>
* @throws ConfigurationError If client is not subscribed to any channel or if client configured
* with a callback.
* @return A <code>Future</code> which resolved with the next incoming message.
*/
public CompletableFuture<PubSubMessage> getPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
throw new ConfigurationError(
"The operation will never complete since there was no pubsub subscriptions applied to the"
+ " client.");
}
if (subscriptionConfiguration.get().getCallback().isPresent()) {
throw new ConfigurationError(
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
throw new NotImplementedException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be implemented to be on a par with python implementation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see #1770

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this cover the ask @ikolomi ?

"This feature will be supported in a future release of the GLIDE java client");
}

/**
* Closes this resource, relinquishing any underlying resources. This method is invoked
* automatically on objects managed by the try-with-resources statement.
Expand All @@ -320,15 +406,25 @@ public void close() throws ExecutionException {
try {
connectionManager.closeConnection().get();
} catch (InterruptedException e) {
// suppressing the interrupted exception - it is already suppressed in the
// future
// suppressing the interrupted exception - it is already suppressed in the future
throw new RuntimeException(e);
}
}

protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource)
protected static MessageHandler buildMessageHandler(BaseClientConfiguration config) {
if (config.getSubscriptionConfiguration() == null) {
return new MessageHandler(Optional.empty(), Optional.empty(), responseResolver);
}
return new MessageHandler(
config.getSubscriptionConfiguration().getCallback(),
config.getSubscriptionConfiguration().getContext(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a validation that context cannot be set w/o the callback and the subscriptions cannot be used with resp2, do we have it here?
https://github.com/aws/glide-for-redis/blob/e2a804c2f1d6355a797a60f28e95c2d16826a856/python/python/glide/config.py#L451

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java client does not support RESP2 yet. I put a todo.

This comment was marked as resolved.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ikolomi , does this cover your concern?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this takes in both callback and context in one method, the caller could supply null to either of these. I've added an explict check for this in #1773.

responseResolver);
}

protected static ChannelHandler buildChannelHandler(
ThreadPoolResource threadPoolResource, MessageHandler messageHandler)
throws InterruptedException {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
CallbackDispatcher callbackDispatcher = new CallbackDispatcher(messageHandler);
return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource);
}

Expand Down Expand Up @@ -364,10 +460,7 @@ protected <T> T handleRedisResponse(
boolean encodingUtf8 = flags.contains(ResponseFlags.ENCODING_UTF8);
boolean isNullable = flags.contains(ResponseFlags.IS_NULLABLE);
Object value =
encodingUtf8
? new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response)
: new BaseCommandResponseResolver(RedisValueResolver::valueFromPointerBinary)
.apply(response);
encodingUtf8 ? responseResolver.apply(response) : binaryResponseResolver.apply(response);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how its applies to pubsub?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageHandler uses responseResolver to get the message from received push.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this answer your question @ikolomi ?

if (isNullable && (value == null)) {
return null;
}
Expand Down Expand Up @@ -2707,6 +2800,18 @@ public CompletableFuture<Map<String, Object>> lcsIdxWithMatchLen(
return commandManager.submitNewCommand(LCS, arguments, this::handleMapResponse);
}

@Override
public CompletableFuture<String> publish(@NonNull String channel, @NonNull String message) {
ikolomi marked this conversation as resolved.
Show resolved Hide resolved
return commandManager.submitNewCommand(
Publish,
new String[] {channel, message},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we still using Strings?

response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
Expand Down
13 changes: 7 additions & 6 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import glide.api.models.commands.SortOptions;
import glide.api.models.commands.function.FunctionRestorePolicy;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -73,15 +71,18 @@ public class RedisClient extends BaseClient
ScriptingAndFunctionsCommands,
TransactionsCommands {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
/**
* A constructor. Use {@link #CreateClient} to get a client. Made protected to simplify testing.
*/
protected RedisClient(ClientBuilder builder) {
super(builder);
}

/**
* Async request for an async (non-blocking) Redis client in Standalone mode.
*
* @param config Redis client Configuration
* @return A Future to connect and return a RedisClient
* @param config Redis client Configuration.
* @return A Future to connect and return a RedisClient.
*/
public static CompletableFuture<RedisClient> CreateClient(
@NonNull RedisClientConfiguration config) {
Expand Down
28 changes: 21 additions & 7 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import static redis_request.RedisRequestOuterClass.RequestType.Lolwut;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.RandomKey;
import static redis_request.RedisRequestOuterClass.RequestType.SPublish;
import static redis_request.RedisRequestOuterClass.RequestType.Sort;
import static redis_request.RedisRequestOuterClass.RequestType.SortReadOnly;
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;

import glide.api.commands.ConnectionManagementClusterCommands;
import glide.api.commands.GenericClusterCommands;
import glide.api.commands.PubSubClusterCommands;
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
import glide.api.commands.ServerManagementClusterCommands;
import glide.api.commands.TransactionsClusterCommands;
Expand All @@ -57,8 +59,6 @@
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -77,17 +77,19 @@ public class RedisClusterClient extends BaseClient
GenericClusterCommands,
ServerManagementClusterCommands,
ScriptingAndFunctionsClusterCommands,
TransactionsClusterCommands {
TransactionsClusterCommands,
PubSubClusterCommands {

protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
/** A private constructor. Use {@link #CreateClient} to get a client. */
RedisClusterClient(ClientBuilder builder) {
super(builder);
}

/**
* Async request for an async (non-blocking) Redis client in Cluster mode.
*
* @param config Redis cluster client Configuration
* @return A Future to connect and return a RedisClusterClient
* @param config Redis cluster client Configuration.
* @return A Future to connect and return a RedisClusterClient.
*/
public static CompletableFuture<RedisClusterClient> CreateClient(
@NonNull RedisClusterClientConfiguration config) {
Expand Down Expand Up @@ -780,6 +782,18 @@ public CompletableFuture<String> randomKey() {
RandomKey, new String[0], this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<String> spublish(@NonNull String channel, @NonNull String message) {
ikolomi marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a GlideString variant for this function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eifrah-aws , I've removed spublish and made two overloads of publish(). They both use ArgType now. Please see: #1773.

return commandManager.submitNewCommand(
SPublish,
new String[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String[]> sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import java.util.concurrent.CompletableFuture;

/**
* Supports commands for the "Pub/Sub" group for standalone and cluster clients.
*
* @see <a href="https://redis.io/docs/latest/commands/?group=pubsub">Pub/Sub Commands</a>
*/
public interface PubSubBaseCommands {

/**
* Publishes message on pubsub channel.
*
* @see <a href="https://valkey.io/commands/publish/">redis.io</a> for details.
* @param channel The channel to publish the message on.
* @param message The message to publish.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.publish("announcements", "The cat said 'meow'!").get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String channel, String message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlideString variant

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #1773

}
Loading
Loading