Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Pub & Sub = <3 #1662

Merged
merged 13 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 124 additions & 22 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.PfAdd;
import static redis_request.RedisRequestOuterClass.RequestType.PfCount;
import static redis_request.RedisRequestOuterClass.RequestType.PfMerge;
import static redis_request.RedisRequestOuterClass.RequestType.Publish;
import static redis_request.RedisRequestOuterClass.RequestType.RPop;
import static redis_request.RedisRequestOuterClass.RequestType.RPush;
import static redis_request.RedisRequestOuterClass.RequestType.RPushX;
Expand Down Expand Up @@ -174,13 +175,15 @@
import glide.api.commands.HashBaseCommands;
import glide.api.commands.HyperLogLogBaseCommands;
import glide.api.commands.ListBaseCommands;
import glide.api.commands.PubSubBaseCommands;
import glide.api.commands.ScriptingAndFunctionsBaseCommands;
import glide.api.commands.SetBaseCommands;
import glide.api.commands.SortedSetBaseCommands;
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringBaseCommands;
import glide.api.commands.TransactionsBaseCommands;
import glide.api.models.GlideString;
import glide.api.models.Message;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.GetExOptions;
Expand Down Expand Up @@ -212,33 +215,37 @@
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.configuration.BaseSubscriptionConfiguration;
import glide.api.models.exceptions.RedisException;
import glide.api.models.exceptions.WrongConfigurationException;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.connectors.handlers.MessageHandler;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolResource;
import glide.connectors.resources.ThreadPoolResourceAllocator;
import glide.ffi.resolvers.RedisValueResolver;
import glide.managers.BaseCommandResponseResolver;
import glide.managers.BaseResponseResolver;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.NotImplementedException;
import response.ResponseOuterClass.ConstantResponse;
import response.ResponseOuterClass.Response;

/** Base Client class for Redis */
@AllArgsConstructor
public abstract class BaseClient
implements AutoCloseable,
BitmapBaseCommands,
Expand All @@ -252,13 +259,44 @@ public abstract class BaseClient
HyperLogLogBaseCommands,
GeospatialIndicesBaseCommands,
ScriptingAndFunctionsBaseCommands,
TransactionsBaseCommands {
TransactionsBaseCommands,
PubSubBaseCommands {

/** Redis simple string response with "OK" */
public static final String OK = ConstantResponse.OK.toString();

protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;
// Members below are effectively-final, but since we use a static async method as a constructor,
// we can't set them in a standard constructor. They all set in `buildClient` method below.
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
// All made protected to simplify testing.
protected CommandManager commandManager;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
protected ConnectionManager connectionManager;
protected ConcurrentLinkedDeque<Message> messageQueue;
protected Optional<BaseSubscriptionConfiguration> subscriptionConfiguration = Optional.empty();

/** Helper which extracts data from received {@link Response}s from GLIDE. */
private static final BaseResponseResolver responseResolver =
new BaseResponseResolver(RedisValueResolver::valueFromPointer);

/** Helper which extracts data with binary strings from received {@link Response}s from GLIDE. */
private static final BaseResponseResolver binaryResponseResolver =
new BaseResponseResolver(RedisValueResolver::valueFromPointerBinary);

/** Create a client instance and init private fields. */
protected static <T extends BaseClient> T buildClient(
Supplier<T> constructor,
ConnectionManager connectionManager,
CommandManager commandManager,
MessageHandler messageHandler,
BaseSubscriptionConfiguration subscriptionConfiguration) {
T client = constructor.get();
client.connectionManager = connectionManager;
client.commandManager = commandManager;
client.messageQueue = messageHandler.getQueue();
if (subscriptionConfiguration != null) {
client.subscriptionConfiguration = Optional.of(subscriptionConfiguration);
}
return client;
}

/**
* Async request for an async (non-blocking) Redis client.
Expand All @@ -268,31 +306,82 @@ public abstract class BaseClient
* @param <T> Client type.
* @return a Future to connect and return a RedisClient.
*/
protected static <T> CompletableFuture<T> CreateClient(
BaseClientConfiguration config,
BiFunction<ConnectionManager, CommandManager, T> constructor) {
protected static <T extends BaseClient> CompletableFuture<T> CreateClient(
@NonNull BaseClientConfiguration config, Supplier<T> constructor) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how it is related to the pubsub

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to rework it since BaseClient got more private fields to store.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this answer your question @ikolomi ?

try {
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource =
ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier());
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
MessageHandler messageHandler = buildMessageHandler(config);
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource, messageHandler);
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> constructor.apply(connectionManager, commandManager));
.thenApply(
ignored ->
buildClient(
constructor,
connectionManager,
commandManager,
messageHandler,
config.getSubscriptionConfiguration()));
} catch (InterruptedException e) {
// Something bad happened while we were establishing netty connection to
// UDS
// Something bad happened while we were establishing netty connection to UDS
var future = new CompletableFuture<T>();
future.completeExceptionally(e);
return future;
}
}

/**
* Tries to return a next pubsub message.
*
* @throws WrongConfigurationException If client is not subscribed to any channel or if client
* configured with a callback.
* @return A message if any or <code>null</code> if there are no unread messages.
*/
public Message tryGetPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
throw new WrongConfigurationException(
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
"The operation will never complete since there was no pubsub subscriptions applied to the"
+ " client.");
}
if (subscriptionConfiguration.get().getCallback().isPresent()) {
throw new WrongConfigurationException(
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
return messageQueue.poll();
}

/**
* Returns a promise for a next pubsub message.
*
* @throws WrongConfigurationException If client is not subscribed to any channel or if client
* configured with a callback.
* @return A <code>Future</code> which resolved with the next incoming message.
*/
public CompletableFuture<Message> getPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
throw new WrongConfigurationException(
"The operation will never complete since there was no pubsub subscriptions applied to the"
+ " client.");
}
if (subscriptionConfiguration.get().getCallback().isPresent()) {
throw new WrongConfigurationException(
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
// TODO
throw new NotImplementedException("oh no");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not implemented and won't be in the foreseeable future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a more meaningful error message here, along with the TODO link to documentation (perhaps explaining the limitation?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, WIP

Copy link
Collaborator Author

@Yury-Fridlyand Yury-Fridlyand Jun 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 5a11ff4

}

// TODO return a message queue

/**
* Closes this resource, relinquishing any underlying resources. This method is invoked
* automatically on objects managed by the try-with-resources statement.
Expand All @@ -305,15 +394,25 @@ public void close() throws ExecutionException {
try {
connectionManager.closeConnection().get();
} catch (InterruptedException e) {
// suppressing the interrupted exception - it is already suppressed in the
// future
// suppressing the interrupted exception - it is already suppressed in the future
throw new RuntimeException(e);
}
}

protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource)
protected static MessageHandler buildMessageHandler(BaseClientConfiguration config) {
if (config.getSubscriptionConfiguration() == null) {
return new MessageHandler(Optional.empty(), Optional.empty(), responseResolver);
}
return new MessageHandler(
config.getSubscriptionConfiguration().getCallback(),
config.getSubscriptionConfiguration().getContext(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a validation that context cannot be set w/o the callback and the subscriptions cannot be used with resp2, do we have it here?
https://github.com/aws/glide-for-redis/blob/e2a804c2f1d6355a797a60f28e95c2d16826a856/python/python/glide/config.py#L451

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java client does not support RESP2 yet. I put a todo.

This comment was marked as resolved.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ikolomi , does this cover your concern?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this takes in both callback and context in one method, the caller could supply null to either of these. I've added an explict check for this in #1773.

responseResolver);
}

protected static ChannelHandler buildChannelHandler(
ThreadPoolResource threadPoolResource, MessageHandler messageHandler)
throws InterruptedException {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
CallbackDispatcher callbackDispatcher = new CallbackDispatcher(messageHandler);
return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource);
}

Expand Down Expand Up @@ -349,10 +448,7 @@ protected <T> T handleRedisResponse(
boolean encodingUtf8 = flags.contains(ResponseFlags.ENCODING_UTF8);
boolean isNullable = flags.contains(ResponseFlags.IS_NULLABLE);
Object value =
encodingUtf8
? new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response)
: new BaseCommandResponseResolver(RedisValueResolver::valueFromPointerBinary)
.apply(response);
encodingUtf8 ? responseResolver.apply(response) : binaryResponseResolver.apply(response);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how its applies to pubsub?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageHandler uses responseResolver to get the message from received push.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this answer your question @ikolomi ?

if (isNullable && (value == null)) {
return null;
}
Expand Down Expand Up @@ -2525,6 +2621,12 @@ public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
}

@Override
public CompletableFuture<Long> publish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
Publish, new String[] {channel, message}, this::handleLongResponse);
}

@Override
public CompletableFuture<Set<String>> sunion(@NonNull String[] keys) {
return commandManager.submitNewCommand(SUnion, keys, this::handleSetResponse);
Expand Down
6 changes: 0 additions & 6 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import glide.api.models.commands.SortOptions;
import glide.api.models.commands.function.FunctionRestorePolicy;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -73,10 +71,6 @@ public class RedisClient extends BaseClient
ScriptingAndFunctionsCommands,
TransactionsCommands {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Async request for an async (non-blocking) Redis client in Standalone mode.
*
Expand Down
17 changes: 10 additions & 7 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import static redis_request.RedisRequestOuterClass.RequestType.Lolwut;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.RandomKey;
import static redis_request.RedisRequestOuterClass.RequestType.SPublish;
import static redis_request.RedisRequestOuterClass.RequestType.Sort;
import static redis_request.RedisRequestOuterClass.RequestType.SortReadOnly;
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;

import glide.api.commands.ConnectionManagementClusterCommands;
import glide.api.commands.GenericClusterCommands;
import glide.api.commands.PubSubClusterCommands;
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
import glide.api.commands.ServerManagementClusterCommands;
import glide.api.commands.TransactionsClusterCommands;
Expand All @@ -57,8 +59,6 @@
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -77,11 +77,8 @@ public class RedisClusterClient extends BaseClient
GenericClusterCommands,
ServerManagementClusterCommands,
ScriptingAndFunctionsClusterCommands,
TransactionsClusterCommands {

protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}
TransactionsClusterCommands,
PubSubClusterCommands {

/**
* Async request for an async (non-blocking) Redis client in Cluster mode.
Expand Down Expand Up @@ -761,6 +758,12 @@ public CompletableFuture<String> randomKey() {
RandomKey, new String[0], this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Long> spublish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
SPublish, new String[] {channel, message}, this::handleLongResponse);
}

@Override
public CompletableFuture<String[]> sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import java.util.concurrent.CompletableFuture;

/**
* Supports commands for the "Pub/Sub" group for standalone and cluster clients.
*
* @see <a href="https://redis.io/docs/latest/commands/?group=pubsub">Pub/Sub Commands</a>
*/
public interface PubSubBaseCommands {

/**
* Publishes message on pubsub channel.
*
* @see <a href="https://redis.io/docs/latest/commands/publish/">redis.io</a> for details.
* @param channel The Channel to publish the message on.
* @param message The message to publish.
* @return The number of clients that received the message.
* @example
* <pre>{@code
* Long receivers = client.publish("announcements", "The cat said 'meow'!").get();
* assert receivers > 0L;
* }</pre>
*/
CompletableFuture<Long> publish(String channel, String message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import java.util.concurrent.CompletableFuture;

/**
* Supports commands for the "Pub/Sub" group for a cluster client.
*
* @see <a href="https://redis.io/docs/latest/commands/publish/">Pub/Sub Commands</a>
*/
public interface PubSubClusterCommands {

/**
* Publishes message on pubsub channel in sharded mode.
*
* @since Redis 7.0 and above.
* @see <a href="https://redis.io/docs/latest/commands/spublish/">redis.io</a> for details.
* @param channel The Channel to publish the message on.
* @param message The message to publish.
* @return The number of clients that received the message.
* @example
* <pre>{@code
* Long receivers = client.spublish("announcements", "The cat said 'meow'!").get();
* assert receivers > 0L;
* }</pre>
*/
CompletableFuture<Long> spublish(String channel, String message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the sharded flavor

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spublish is sharded pubsub only

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this answer your question @ikolomi ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you want to have publish have two overloads instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in #1773

}
Loading
Loading