Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add the XREADGROUP command #376

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ fn convert_to_array_of_pairs(
value_expected_return_type: Option<ExpectedReturnType>,
) -> RedisResult<Value> {
match response {
Value::Nil => Ok(response),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XREADGROUP can/will return null values in the map

Value::Array(ref array) if array.is_empty() || matches!(array[0], Value::Array(_)) => {
// The server response is an empty array or a RESP3 array of pairs. In RESP3, the values in the pairs are
// already of the correct type, so we do not need to convert them and `response` is in the correct format.
Expand Down Expand Up @@ -852,7 +853,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::ArrayOfPairs),
}),
b"XREAD" => Some(ExpectedReturnType::Map {
b"XREAD" | b"XREADGROUP" => Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
Expand Down Expand Up @@ -1205,6 +1206,28 @@ mod tests {
));
}

#[test]
fn convert_xreadgroup() {
assert!(matches!(
expected_type_for_cmd(
redis::cmd("XREADGROUP")
.arg("GROUP")
.arg("group")
.arg("consumer")
.arg("streams")
.arg("key")
.arg("id")
),
Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::ArrayOfPairs),
}),
})
));
}

#[test]
fn test_convert_empty_array_to_map_is_nil() {
let mut cmd = redis::cmd("XREAD");
Expand Down
18 changes: 18 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup;
import static redis_request.RedisRequestOuterClass.RequestType.XRevRange;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
Expand Down Expand Up @@ -194,6 +195,7 @@
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadGroupOptions;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.BaseClientConfiguration;
Expand Down Expand Up @@ -1428,6 +1430,22 @@ public CompletableFuture<Long> xgroupDelConsumer(
XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse);
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
@NonNull Map<String, String> keysAndIds, @NonNull String group, @NonNull String consumer) {
return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build());
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
@NonNull Map<String, String> keysAndIds,
@NonNull String group,
@NonNull String consumer,
StreamReadGroupOptions options) {
String[] arguments = options.toArgs(group, consumer, keysAndIds);
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamRange.IdBound;
import glide.api.models.commands.stream.StreamRange.InfRangeBound;
import glide.api.models.commands.stream.StreamReadGroupOptions;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import java.util.Map;
Expand Down Expand Up @@ -407,4 +408,87 @@ CompletableFuture<String> xgroupCreate(
* }</pre>
*/
CompletableFuture<Long> xgroupDelConsumer(String key, String group, String consumer);

/**
* Reads entries from the given streams owned by a consumer group.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://valkey.io/commands/xreadgroup/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read. Use the special id of <code>{@literal ">"}</code> to receive only new messages.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return A <code>{@literal Map<String, Map<String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
* @example
* <pre>{@code
* // create a new stream at "mystream", with stream id "1-0"
* Map<String, String> xreadKeys = Map.of("myfield", "mydata");
* String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
* assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
* Map<String, Map<String, String[][]>> streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", ">")).get();
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* // Returns "mystream": "1-0": {{"myfield", "mydata"}}
* for (var keyEntry : streamReadResponse.entrySet()) {
* System.out.printf("Key: %s", keyEntry.getKey());
* for (var streamEntry : keyEntry.getValue().entrySet()) {
* Arrays.stream(streamEntry.getValue()).forEach(entity ->
* System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
* );
* }
* }
* assert client.xdel("mystream", "1-0").get() == 1L;
* client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", "0")).get();
* // Returns "mystream": "1-0": null
* assert streamReadResponse.get("mystream").get("1-0") == null;
* </pre>
*/
CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
Map<String, String> keysAndIds, String group, String consumer);

/**
* Reads entries from the given streams owned by a consumer group.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://valkey.io/commands/xreadgroup/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read. Use the special id of <code>{@literal ">"}</code> to receive only new messages.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
* @return A <code>{@literal Map<String, Map<String[][]>>}</code> with stream
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
* @example
* <pre>{@code
* // create a new stream at "mystream", with stream id "1-0"
* Map<String, String> xreadKeys = Map.of("myfield", "mydata");
* String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
* assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
* StreamReadGroupOptions op = StreamReadGroupOptions.builder().count(1).build(); // retrieves only a single message at a time
* Map<String, Map<String, String[][]>> streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", ">"), op).get();
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* // Returns "mystream": "1-0": {{"myfield", "mydata"}}
* for (var keyEntry : streamReadResponse.entrySet()) {
* System.out.printf("Key: %s", keyEntry.getKey());
* for (var streamEntry : keyEntry.getValue().entrySet()) {
* Arrays.stream(streamEntry.getValue()).forEach(entity ->
* System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
* );
* }
* }
* assert client.xdel("mystream", "1-0").get() == 1L;
* streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", "0"), op).get();
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* // Returns "mystream": "1-0": null
* assert streamReadResponse.get("mystream").get("1-0") == null;
* </pre>
*/
CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
Map<String, String> keysAndIds,
String group,
String consumer,
StreamReadGroupOptions options);
}
47 changes: 47 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup;
import static redis_request.RedisRequestOuterClass.RequestType.XRevRange;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
Expand Down Expand Up @@ -232,6 +233,7 @@
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadGroupOptions;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.ReadFrom;
Expand Down Expand Up @@ -3047,6 +3049,51 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull
return getThis();
}

/**
* Reads entries from the given streams owned by a consumer group.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://valkey.io/commands/xreadgroup/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read. Use the special id of <code>{@literal ">"}</code> to receive only new messages.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return Command Response - A <code>{@literal Map<String, Map<String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
*/
public T xreadgroup(Map<String, String> keysAndIds, String group, String consumer) {
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build());
}

/**
* Reads entries from the given streams owned by a consumer group.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://valkey.io/commands/xreadgroup/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read. Use the special id of <code>{@literal ">"}</code> to receive only new messages.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
* @return Command Response - A <code>{@literal Map<String, Map<String[][]>>}</code> with stream
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
*/
public T xreadgroup(
Map<String, String> keysAndIds,
String group,
String consumer,
StreamReadGroupOptions options) {
protobufTransaction.addCommands(
buildCommand(XReadGroup, buildArgs(options.toArgs(group, consumer, keysAndIds))));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands.stream;

import glide.api.commands.StreamBaseCommands;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.experimental.SuperBuilder;

/**
* Optional arguments for {@link StreamBaseCommands#xreadgroup(Map, String, String,
* StreamReadGroupOptions)}
*
* @see <a href="https://valkey.io/commands/xreadgroup/">redis.io</a>
*/
@SuperBuilder
public final class StreamReadGroupOptions extends StreamReadOptions {

public static final String READ_GROUP_REDIS_API = "GROUP";
public static final String READ_NOACK_REDIS_API = "NOACK";

/**
* If set, messages are not added to the Pending Entries List (PEL). This is equivalent to
* acknowledging the message when it is read.
*/
private Boolean noack;

/**
* Converts options and the key-to-id input for {@link StreamBaseCommands#xreadgroup(Map, String,
* String, StreamReadGroupOptions)} into a String[].
*
* @return String[]
*/
public String[] toArgs(String group, String consumer, Map<String, String> streams) {
List<String> optionArgs = new ArrayList<>();
optionArgs.add(READ_GROUP_REDIS_API);
optionArgs.add(group);
optionArgs.add(consumer);

if (this.count != null) {
optionArgs.add(READ_COUNT_REDIS_API);
optionArgs.add(count.toString());
}

if (this.block != null) {
optionArgs.add(READ_BLOCK_REDIS_API);
optionArgs.add(block.toString());
}

if (this.noack != null && this.noack) {
optionArgs.add(READ_NOACK_REDIS_API);
}

optionArgs.add(READ_STREAMS_REDIS_API);
Set<Map.Entry<String, String>> entrySet = streams.entrySet();
optionArgs.addAll(entrySet.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
optionArgs.addAll(entrySet.stream().map(Map.Entry::getValue).collect(Collectors.toList()));

return optionArgs.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.experimental.SuperBuilder;

/**
* Optional arguments for {@link StreamBaseCommands#xread(Map, StreamReadOptions)}
*
* @see <a href="https://redis.io/commands/xread/">redis.io</a>
*/
@Builder
public final class StreamReadOptions {
@SuperBuilder
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
public class StreamReadOptions {

public static final String READ_COUNT_REDIS_API = "COUNT";
public static final String READ_BLOCK_REDIS_API = "BLOCK";
Expand All @@ -25,12 +25,12 @@ public final class StreamReadOptions {
* If set, the request will be blocked for the set amount of milliseconds or until the server has
* the required number of entries. Equivalent to <code>BLOCK</code> in the Redis API.
*/
Long block;
protected Long block;

/**
* The maximal number of elements requested. Equivalent to <code>COUNT</code> in the Redis API.
*/
Long count;
protected Long count;

/**
* Converts options and the key-to-id input for {@link StreamBaseCommands#xread(Map,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import glide.api.models.commands.geospatial.GeospatialData;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -128,7 +129,10 @@ public static <T> Map<String, T[][]> castMapOf2DArray(
return null;
}
return mapOfArrays.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> castArrayofArrays(e.getValue(), clazz)));
.collect(
HashMap::new,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? to accept nulls?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. XREADGROUP returns maps will null values (gross!)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test for that case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do

(m, e) -> m.put(e.getKey(), castArrayofArrays(e.getValue(), clazz)),
HashMap::putAll);
}

/**
Expand Down
Loading
Loading