From 5a23a1ee9a32e39209a1d73b99d6a51aa7745206 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Mon, 17 Jun 2024 14:58:05 -0700 Subject: [PATCH 1/6] Add XGROUP CreateConsumer, DelConsumer Signed-off-by: Andrew Carbonetto --- .../test/java/glide/SharedCommandTests.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index dd720dbc36..9b311b2d38 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3528,6 +3528,62 @@ public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) { assertInstanceOf(RequestException.class, executionException.getCause()); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) { + String key = UUID.randomUUID().toString(); + String stringKey = UUID.randomUUID().toString(); + String groupName = "group" + UUID.randomUUID(); + String zeroStreamId = "0"; + String consumerName = "consumer" + UUID.randomUUID(); + + // create group and consumer for the group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // create consumer for group that does not exist results in a NOGROUP request error + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupCreateConsumer(key, "not_a_group", consumerName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + assertTrue(executionException.getMessage().contains("NOGROUP")); + + // create consumer for group again + assertFalse(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // Deletes a consumer that is not created yet returns 0 + assertEquals(0L, client.xgroupDelConsumer(key, groupName, "not_a_consumer").get()); + + // String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get(); + // assertNotNull(streamid_1); + // String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get(); + // assertNotNull(streamid_2); + + // TODO use XREADGROUP to mark pending messages for the consumer so that we get non-zero return + assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + + // key is a string and cannot be created as a stream + assertEquals(OK, client.set(stringKey, "not_a_stream").get()); + executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupCreateConsumer(stringKey, groupName, consumerName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + executionException = + assertThrows( + ExecutionException.class, + () -> client.xgroupDelConsumer(stringKey, groupName, consumerName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") From 7680d68a7e6eb8e014edb04471333622fd6012dc Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Tue, 18 Jun 2024 10:53:47 -0700 Subject: [PATCH 2/6] Add XREADGROUP command Signed-off-by: Andrew Carbonetto --- glide-core/src/client/value_conversion.rs | 25 +++- .../src/main/java/glide/api/BaseClient.java | 18 +++ .../api/commands/StreamBaseCommands.java | 82 +++++++++++ .../glide/api/models/BaseTransaction.java | 45 ++++++ .../stream/StreamReadGroupOptions.java | 63 +++++++++ .../commands/stream/StreamReadOptions.java | 10 +- .../java/glide/utils/ArrayTransformUtils.java | 6 +- .../test/java/glide/api/RedisClientTest.java | 101 ++++++++++++++ .../glide/api/models/TransactionTests.java | 32 +++++ .../test/java/glide/SharedCommandTests.java | 132 +++++++++++++++++- .../java/glide/TransactionTestUtilities.java | 21 ++- 11 files changed, 522 insertions(+), 13 deletions(-) create mode 100644 java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index 43c2e74bd0..6b8e93a69a 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -791,6 +791,7 @@ fn convert_to_array_of_pairs( value_expected_return_type: Option, ) -> RedisResult { match response { + Value::Nil => Ok(response), Value::Array(ref array) if array.is_empty() || matches!(array[0], Value::Array(_)) => { // The server response is an empty array or a RESP3 array of pairs. In RESP3, the values in the pairs are // already of the correct type, so we do not need to convert them and `response` is in the correct format. @@ -852,7 +853,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { key_type: &Some(ExpectedReturnType::BulkString), value_type: &Some(ExpectedReturnType::ArrayOfPairs), }), - b"XREAD" => Some(ExpectedReturnType::Map { + b"XREAD" | b"XREADGROUP" => Some(ExpectedReturnType::Map { key_type: &Some(ExpectedReturnType::BulkString), value_type: &Some(ExpectedReturnType::Map { key_type: &Some(ExpectedReturnType::BulkString), @@ -1205,6 +1206,28 @@ mod tests { )); } + #[test] + fn convert_xreadgroup() { + assert!(matches!( + expected_type_for_cmd( + redis::cmd("XREADGROUP") + .arg("GROUP") + .arg("group") + .arg("consumer") + .arg("streams") + .arg("key") + .arg("id") + ), + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &Some(ExpectedReturnType::ArrayOfPairs), + }), + }) + )); + } + #[test] fn test_convert_empty_array_to_map_is_nil() { let mut cmd = redis::cmd("XREAD"); diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 9912ecea8e..5c7506c0da 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -128,6 +128,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; +import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup; import static redis_request.RedisRequestOuterClass.RequestType.XRevRange; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; @@ -194,6 +195,7 @@ import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.configuration.BaseClientConfiguration; @@ -1428,6 +1430,22 @@ public CompletableFuture xgroupDelConsumer( XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse); } + @Override + public CompletableFuture>> xreadgroup( + @NonNull Map keysAndIds, @NonNull String group, @NonNull String consumer) { + return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build()); + } + + @Override + public CompletableFuture>> xreadgroup( + @NonNull Map keysAndIds, + @NonNull String group, + @NonNull String consumer, + StreamReadGroupOptions options) { + String[] arguments = options.toArgs(group, consumer, keysAndIds); + return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index d51aeadd18..53899d7529 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -7,6 +7,7 @@ import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import java.util.Map; @@ -407,4 +408,85 @@ CompletableFuture xgroupCreate( * } */ CompletableFuture xgroupDelConsumer(String key, String group, String consumer); + + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal ">"} to receive only new messages. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * @example + *
{@code
+     * // create a new stream at "mystream", with stream id "1-0"
+     * Map xreadKeys = Map.of("myfield", "mydata");
+     * String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
+     * assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
+     * Map> streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", ">")).get();
+     * // Returns "mystream": "1-0": {{"myfield", "mydata"}}
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+     * assert client.xdel("mystream", "1-0").get() == 1L;
+     * client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", "0")).get();
+     * // Returns "mystream": "1-0": null
+     * assert streamReadResponse.get("mystream").get("1-0") == null;
+     * 
+ */ + CompletableFuture>> xreadgroup( + Map keysAndIds, String group, String consumer); + + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal ">"} to receive only new messages. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}. + * @return A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * @example + *
{@code
+     * // create a new stream at "mystream", with stream id "1-0"
+     * Map xreadKeys = Map.of("myfield", "mydata");
+     * String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
+     * assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
+     * StreamReadGroupOptions op = StreamReadGroupOptions.builder().count(1).build(); // retrieves only a single message at a time
+     * Map> streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", ">"), op).get();
+     * // Returns "mystream": "1-0": {{"myfield", "mydata"}}
+     * for (var keyEntry : streamReadResponse.entrySet()) {
+     *     System.out.printf("Key: %s", keyEntry.getKey());
+     *     for (var streamEntry : keyEntry.getValue().entrySet()) {
+     *         Arrays.stream(streamEntry.getValue()).forEach(entity ->
+     *             System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
+     *         );
+     *     }
+     * }
+     * assert client.xdel("mystream", "1-0").get() == 1L;
+     * streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", "0"), op).get();
+     * // Returns "mystream": "1-0": null
+     * assert streamReadResponse.get("mystream").get("1-0") == null;
+     * 
+ */ + CompletableFuture>> xreadgroup( + Map keysAndIds, + String group, + String consumer, + StreamReadGroupOptions options); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 63d49ab80e..8d22213d2a 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -156,6 +156,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; +import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup; import static redis_request.RedisRequestOuterClass.RequestType.XRevRange; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; @@ -232,6 +233,7 @@ import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.configuration.ReadFrom; @@ -3047,6 +3049,49 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull return getThis(); } + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal ">"} to receive only new messages. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @return Command Response - A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + */ + public T xreadgroup(Map keysAndIds, String group, String consumer) { + return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build()); + } + + /** + * Reads entries from the given streams owned by a consumer group. + * + * @apiNote When in cluster mode, all keys in keysAndIds must map to the same hash + * slot. + * @see valkey.io for details. + * @param keysAndIds A Map of keys and entry ids to read from. The + * Map is composed of a stream's key and the id of the entry after which the stream + * will be read. Use the special id of {@literal ">"} to receive only new messages. + * @param group The consumer group name. + * @param consumer The newly created consumer. + * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}. + * @return Command Response - A {@literal Map>} with stream + * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + */ + public T xreadgroup( + Map keysAndIds, + String group, + String consumer, + StreamReadGroupOptions options) { + protobufTransaction.addCommands( + buildCommand(XReadGroup, buildArgs(options.toArgs(group, consumer, keysAndIds)))); + return getThis(); + } + /** * Returns the remaining time to live of key that has a timeout, in milliseconds. * diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java new file mode 100644 index 0000000000..0942992c31 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java @@ -0,0 +1,63 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.commands.stream; + +import glide.api.commands.StreamBaseCommands; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.experimental.SuperBuilder; + +/** + * Optional arguments for {@link StreamBaseCommands#xreadgroup(Map, String, String, + * StreamReadGroupOptions)} + * + * @see redis.io + */ +@SuperBuilder +public final class StreamReadGroupOptions extends StreamReadOptions { + + public static final String READ_GROUP_REDIS_API = "GROUP"; + public static final String READ_NOACK_REDIS_API = "NOACK"; + + /** + * If set, messages are not added to the Pending Entries List (PEL). This is equivalent to + * acknowledging the message when it is read. + */ + private Boolean noack; + + /** + * Converts options and the key-to-id input for {@link StreamBaseCommands#xreadgroup(Map, String, + * String, StreamReadGroupOptions)} into a String[]. + * + * @return String[] + */ + public String[] toArgs(String group, String consumer, Map streams) { + List optionArgs = new ArrayList<>(); + optionArgs.add(READ_GROUP_REDIS_API); + optionArgs.add(group); + optionArgs.add(consumer); + + if (this.count != null) { + optionArgs.add(READ_COUNT_REDIS_API); + optionArgs.add(count.toString()); + } + + if (this.block != null) { + optionArgs.add(READ_BLOCK_REDIS_API); + optionArgs.add(block.toString()); + } + + if (this.noack != null && this.noack) { + optionArgs.add(READ_NOACK_REDIS_API); + } + + optionArgs.add(READ_STREAMS_REDIS_API); + Set> entrySet = streams.entrySet(); + optionArgs.addAll(entrySet.stream().map(Map.Entry::getKey).collect(Collectors.toList())); + optionArgs.addAll(entrySet.stream().map(Map.Entry::getValue).collect(Collectors.toList())); + + return optionArgs.toArray(new String[0]); + } +} diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java index 7baad14121..ad3d0fe421 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java @@ -7,15 +7,15 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import lombok.Builder; +import lombok.experimental.SuperBuilder; /** * Optional arguments for {@link StreamBaseCommands#xread(Map, StreamReadOptions)} * * @see redis.io */ -@Builder -public final class StreamReadOptions { +@SuperBuilder +public class StreamReadOptions { public static final String READ_COUNT_REDIS_API = "COUNT"; public static final String READ_BLOCK_REDIS_API = "BLOCK"; @@ -25,12 +25,12 @@ public final class StreamReadOptions { * If set, the request will be blocked for the set amount of milliseconds or until the server has * the required number of entries. Equivalent to BLOCK in the Redis API. */ - Long block; + protected Long block; /** * The maximal number of elements requested. Equivalent to COUNT in the Redis API. */ - Long count; + protected Long count; /** * Converts options and the key-to-id input for {@link StreamBaseCommands#xread(Map, diff --git a/java/client/src/main/java/glide/utils/ArrayTransformUtils.java b/java/client/src/main/java/glide/utils/ArrayTransformUtils.java index a251693293..8fea005cae 100644 --- a/java/client/src/main/java/glide/utils/ArrayTransformUtils.java +++ b/java/client/src/main/java/glide/utils/ArrayTransformUtils.java @@ -5,6 +5,7 @@ import glide.api.models.commands.geospatial.GeospatialData; import java.lang.reflect.Array; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -128,7 +129,10 @@ public static Map castMapOf2DArray( return null; } return mapOfArrays.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> castArrayofArrays(e.getValue(), clazz))); + .collect( + HashMap::new, + (m, e) -> m.put(e.getKey(), castArrayofArrays(e.getValue(), clazz)), + HashMap::putAll); } /** diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index fa4bc7d36d..574b479b78 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -33,6 +33,8 @@ import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.MINIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.RANGE_COUNT_REDIS_API; +import static glide.api.models.commands.stream.StreamReadGroupOptions.READ_GROUP_REDIS_API; +import static glide.api.models.commands.stream.StreamReadGroupOptions.READ_NOACK_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_BLOCK_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_COUNT_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_STREAMS_REDIS_API; @@ -194,6 +196,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; +import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup; import static redis_request.RedisRequestOuterClass.RequestType.XRevRange; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; @@ -267,6 +270,7 @@ import glide.api.models.commands.stream.StreamRange; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.commands.stream.StreamTrimOptions.MaxLen; @@ -4510,6 +4514,103 @@ public void xgroupDelConsumer() { assertEquals(result, payload); } + @SneakyThrows + @Test + public void xreadgroup_multiple_keys() { + // setup + String keyOne = "one"; + String streamIdOne = "id-one"; + String keyTwo = "two"; + String streamIdTwo = "id-two"; + String groupName = "testGroup"; + String consumerName = "consumerGroup"; + String[][] fieldValues = {{"field", "value"}}; + Map> completedResult = new LinkedHashMap<>(); + completedResult.put(keyOne, Map.of(streamIdOne, fieldValues)); + completedResult.put(keyTwo, Map.of(streamIdTwo, fieldValues)); + String[] arguments = { + READ_GROUP_REDIS_API, + groupName, + consumerName, + READ_STREAMS_REDIS_API, + keyOne, + keyTwo, + streamIdOne, + streamIdTwo + }; + + CompletableFuture>> testResponse = + new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.>>submitNewCommand( + eq(XReadGroup), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + Map keysAndIds = new LinkedHashMap<>(); + keysAndIds.put(keyOne, streamIdOne); + keysAndIds.put(keyTwo, streamIdTwo); + CompletableFuture>> response = + service.xreadgroup(keysAndIds, groupName, consumerName); + Map> payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + + @SneakyThrows + @Test + public void xreadgroup_with_options() { + // setup + String keyOne = "one"; + String streamIdOne = "id-one"; + Long block = 2L; + Long count = 10L; + String groupName = "testGroup"; + String consumerName = "consumerGroup"; + String[][] fieldValues = {{"field", "value"}}; + Map> completedResult = + Map.of(keyOne, Map.of(streamIdOne, fieldValues)); + String[] arguments = { + READ_GROUP_REDIS_API, + groupName, + consumerName, + READ_COUNT_REDIS_API, + count.toString(), + READ_BLOCK_REDIS_API, + block.toString(), + READ_NOACK_REDIS_API, + READ_STREAMS_REDIS_API, + keyOne, + streamIdOne + }; + + CompletableFuture>> testResponse = + new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.>>submitNewCommand( + eq(XReadGroup), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture>> response = + service.xreadgroup( + Map.of(keyOne, streamIdOne), + groupName, + consumerName, + StreamReadGroupOptions.builder().block(block).count(count).noack(true).build()); + Map> payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + @SneakyThrows @Test public void type_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 91cb876d0d..3cc7e6aaed 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -29,6 +29,8 @@ import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.MINIMUM_RANGE_REDIS_API; import static glide.api.models.commands.stream.StreamRange.RANGE_COUNT_REDIS_API; +import static glide.api.models.commands.stream.StreamReadGroupOptions.READ_GROUP_REDIS_API; +import static glide.api.models.commands.stream.StreamReadGroupOptions.READ_NOACK_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_BLOCK_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_COUNT_REDIS_API; import static glide.api.models.commands.stream.StreamReadOptions.READ_STREAMS_REDIS_API; @@ -170,6 +172,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XRange; import static redis_request.RedisRequestOuterClass.RequestType.XRead; +import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup; import static redis_request.RedisRequestOuterClass.RequestType.XRevRange; import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZAdd; @@ -234,6 +237,7 @@ import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions.MinId; import java.util.ArrayList; @@ -785,6 +789,34 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.xgroupDelConsumer("key", "group", "consumer"); results.add(Pair.of(XGroupDelConsumer, buildArgs("key", "group", "consumer"))); + transaction.xreadgroup(Map.of("key", "id"), "group", "consumer"); + results.add( + Pair.of( + XReadGroup, + buildArgs( + READ_GROUP_REDIS_API, "group", "consumer", READ_STREAMS_REDIS_API, "key", "id"))); + + transaction.xreadgroup( + Map.of("key", "id"), + "group", + "consumer", + StreamReadGroupOptions.builder().block(1L).count(2L).noack(true).build()); + results.add( + Pair.of( + XReadGroup, + buildArgs( + READ_GROUP_REDIS_API, + "group", + "consumer", + READ_COUNT_REDIS_API, + "2", + READ_BLOCK_REDIS_API, + "1", + READ_NOACK_REDIS_API, + READ_STREAMS_REDIS_API, + "key", + "id"))); + transaction.time(); results.add(Pair.of(Time, buildArgs())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 9b311b2d38..e7357fc7f6 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -69,6 +69,7 @@ import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.IdBound; import glide.api.models.commands.stream.StreamRange.InfRangeBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions.MaxLen; import glide.api.models.commands.stream.StreamTrimOptions.MinId; @@ -3480,7 +3481,7 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") - public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) { + public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) { String key = UUID.randomUUID().toString(); String stringKey = UUID.randomUUID().toString(); String groupName = "group" + UUID.randomUUID(); @@ -3510,8 +3511,37 @@ public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) { // Deletes a consumer that is not created yet returns 0 assertEquals(0L, client.xgroupDelConsumer(key, groupName, "not_a_consumer").get()); - // TODO use XREADGROUP to mark pending messages for the consumer so that we get non-zero return - assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + // Add two stream entries + String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get(); + assertNotNull(streamid_1); + String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get(); + assertNotNull(streamid_2); + + // read the entire stream for the consumer and mark messages as pending + var result_1 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); + assertEquals(2, result_1.get(key).size()); + + // delete one of the streams + assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get()); + + // now xreadgroup yeilds one empty stream and one non-empty stream + var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get(); + assertEquals(2, result_2.get(key).size()); + assertNull(result_2.get(key).get(streamid_1)); + assertArrayEquals(new String[][] {{"field2", "value2"}}, result_2.get(key).get(streamid_2)); + + String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get(); + assertNotNull(streamid_3); + + // Delete the consumer group and expect 2 pending messages + assertEquals(2L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + + // Consume the last message with the previously deleted consumer (creates the consumer anew) + var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); + assertEquals(1, result_3.get(key).size()); + + // Delete the consumer group and expect the pending message + assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get()); // key is a string and cannot be created as a stream assertEquals(OK, client.set(stringKey, "not_a_stream").get()); @@ -3584,6 +3614,102 @@ public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) { assertInstanceOf(RequestException.class, executionException.getCause()); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xreadgroup_return_failures(BaseClient client) { + String key = "{key}:1" + UUID.randomUUID(); + String nonStreamKey = "{key}:3" + UUID.randomUUID(); + String field1 = "f1_"; + + // setup first entries in streams key1 and key2 + Map timestamp_1_1_map = new LinkedHashMap<>(); + timestamp_1_1_map.put(field1, field1 + "1"); + String timestamp_1_1 = + client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get(); + assertNotNull(timestamp_1_1); + + String groupName = "group" + UUID.randomUUID(); + String zeroStreamId = "0"; + String consumerName = "consumer" + UUID.randomUUID(); + + // create group and consumer for the group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // Key exists, but it is not a stream + assertEquals(OK, client.set(nonStreamKey, "bar").get()); + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xreadgroup( + Map.of(nonStreamKey, timestamp_1_1, key, timestamp_1_1), + groupName, + consumerName) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + executionException = + assertThrows( + ExecutionException.class, + () -> + client + .xreadgroup( + Map.of(key, timestamp_1_1, nonStreamKey, timestamp_1_1), + groupName, + consumerName) + .get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + try (var testClient = + client instanceof RedisClient + ? RedisClient.CreateClient(commonClientConfig().build()).get() + : RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) { + String timeoutKey = "{key}:2" + UUID.randomUUID(); + String timeoutGroupName = "group" + UUID.randomUUID(); + String timeoutConsumerName = "consumer" + UUID.randomUUID(); + + assertEquals( + OK, + client + .xgroupCreate( + timeoutKey, timeoutGroupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(timeoutKey, timeoutGroupName, timeoutConsumerName).get()); + + // ensure that command doesn't time out even if timeout > request timeout + long oneSecondInMS = 1000L; + assertNull( + testClient + .xreadgroup( + Map.of(timeoutKey, zeroStreamId), + timeoutGroupName, + timeoutConsumerName, + StreamReadGroupOptions.builder().block(oneSecondInMS).build()) + .get()); + + // with 0 timeout (no timeout) should never time out, + // but we wrap the test with timeout to avoid test failing or stuck forever + assertThrows( + TimeoutException.class, // <- future timeout, not command timeout + () -> + testClient + .xreadgroup( + Map.of(timeoutKey, zeroStreamId), + timeoutGroupName, + timeoutConsumerName, + StreamReadGroupOptions.builder().block(0L).build()) + .get(3, TimeUnit.SECONDS)); + } + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index 556d167700..11db12acb3 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -38,6 +38,8 @@ import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.IdBound; +import glide.api.models.commands.stream.StreamReadGroupOptions; +import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions.MinId; import java.util.HashMap; import java.util.Map; @@ -735,6 +737,7 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xadd(streamKey1, Map.of("field3", "value3"), StreamAddOptions.builder().id("0-3").build()) .xlen(streamKey1) .xread(Map.of(streamKey1, "0-2")) + .xread(Map.of(streamKey1, "0-2"), StreamReadOptions.builder().count(1L).build()) .xrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1")) .xrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L) .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1")) @@ -744,6 +747,9 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xgroupCreate( streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build()) .xgroupCreateConsumer(streamKey1, groupName1, consumer1) +// .xreadgroup(Map.of(streamKey1, "0-2"), groupName1, consumer1) +// .xreadgroup(Map.of(streamKey1, "0-2"), groupName1, consumer1, +// StreamReadGroupOptions.builder().count(2L).build()) .xgroupDelConsumer(streamKey1, groupName1, consumer1) .xgroupDestroy(streamKey1, groupName1) .xgroupDestroy(streamKey1, groupName2) @@ -754,9 +760,12 @@ private static Object[] streamCommands(BaseTransaction transaction) { "0-2", // xadd(streamKey1, Map.of("field2", "value2"), ... .id("0-2").build()); "0-3", // xadd(streamKey1, Map.of("field3", "value3"), ... .id("0-3").build()); 3L, // xlen(streamKey1) - Map.of( - streamKey1, - Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2")); +// Map.of( +// streamKey1, +// Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2")); +// Map.of( +// streamKey1, +// Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2"), options); Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrange(streamKey1, "0-1", "0-1") Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrange(streamKey1, "0-1", "0-1", 1l) Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrevrange(streamKey1, "0-1", "0-1") @@ -766,6 +775,12 @@ private static Object[] streamCommands(BaseTransaction transaction) { OK, // xgroupCreate(streamKey1, groupName1, "0-0") OK, // xgroupCreate(streamKey1, groupName1, "0-0", options) true, // xgroupCreateConsumer(streamKey1, groupName1, consumer1) + Map.of( + streamKey1, + Map.of("0-3", new String[][] {{"field3", "value3"}})), // xreadgroup(Map.of(key9, "0-2"), groupName1, consumer1); + Map.of( + streamKey1, + Map.of("0-3", new String[][] {{"field3", "value3"}})), // xreadgroup(Map.of(key9, "0-2"), groupName1, consumer1, options); 0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) true, // xgroupDestroy(streamKey1, groupName1) true, // xgroupDestroy(streamKey1, groupName2) From f861626baac2d30a04bd2acefaeb85f01abcf914 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Tue, 18 Jun 2024 11:54:16 -0700 Subject: [PATCH 3/6] Udpate IT tests Signed-off-by: Andrew Carbonetto --- .../test/java/glide/SharedCommandTests.java | 33 ++++++++++++--- .../java/glide/TransactionTestUtilities.java | 40 +++++++++++-------- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index e7357fc7f6..e415fbafd4 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3676,14 +3676,35 @@ public void xreadgroup_return_failures(BaseClient client) { String timeoutGroupName = "group" + UUID.randomUUID(); String timeoutConsumerName = "consumer" + UUID.randomUUID(); + // Create a group read with the test client + // add a single stream entry and consumer + // the first call to ">" will return an update consumer group + // the second call to ">" will block waiting for new entries + // using anything other than ">" won't block, but will return the empty consumer result + // see: https://github.com/redis/redis/issues/6587 assertEquals( - OK, - client - .xgroupCreate( - timeoutKey, timeoutGroupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) - .get()); - assertTrue(client.xgroupCreateConsumer(timeoutKey, timeoutGroupName, timeoutConsumerName).get()); + OK, + testClient + .xgroupCreate( + timeoutKey, + timeoutGroupName, + zeroStreamId, + StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue( + testClient.xgroupCreateConsumer(timeoutKey, timeoutGroupName, timeoutConsumerName).get()); + String streamid_1 = testClient.xadd(timeoutKey, Map.of("field1", "value1")).get(); + assertNotNull(streamid_1); + + // read the entire stream for the consumer and mark messages as pending + var result_1 = + testClient + .xreadgroup(Map.of(timeoutKey, ">"), timeoutGroupName, timeoutConsumerName) + .get(); + // returns an null result on the key + assertNull(result_1.get(key)); + // subsequent calls to read ">" will block: // ensure that command doesn't time out even if timeout > request timeout long oneSecondInMS = 1000L; assertNull( diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index 11db12acb3..2dd27a0254 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -38,7 +38,6 @@ import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamRange.IdBound; -import glide.api.models.commands.stream.StreamReadGroupOptions; import glide.api.models.commands.stream.StreamReadOptions; import glide.api.models.commands.stream.StreamTrimOptions.MinId; import java.util.HashMap; @@ -747,9 +746,9 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xgroupCreate( streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build()) .xgroupCreateConsumer(streamKey1, groupName1, consumer1) -// .xreadgroup(Map.of(streamKey1, "0-2"), groupName1, consumer1) -// .xreadgroup(Map.of(streamKey1, "0-2"), groupName1, consumer1, -// StreamReadGroupOptions.builder().count(2L).build()) + // .xreadgroup(Map.of(streamKey1, "0-2"), groupName1, consumer1) + // .xreadgroup(Map.of(streamKey1, "0-2"), groupName1, consumer1, + // StreamReadGroupOptions.builder().count(2L).build()) .xgroupDelConsumer(streamKey1, groupName1, consumer1) .xgroupDestroy(streamKey1, groupName1) .xgroupDestroy(streamKey1, groupName2) @@ -760,12 +759,12 @@ private static Object[] streamCommands(BaseTransaction transaction) { "0-2", // xadd(streamKey1, Map.of("field2", "value2"), ... .id("0-2").build()); "0-3", // xadd(streamKey1, Map.of("field3", "value3"), ... .id("0-3").build()); 3L, // xlen(streamKey1) -// Map.of( -// streamKey1, -// Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2")); -// Map.of( -// streamKey1, -// Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2"), options); + Map.of( + streamKey1, + Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2")); + Map.of( + streamKey1, Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, + // "0-2"), options); Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrange(streamKey1, "0-1", "0-1") Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrange(streamKey1, "0-1", "0-1", 1l) Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrevrange(streamKey1, "0-1", "0-1") @@ -775,12 +774,21 @@ private static Object[] streamCommands(BaseTransaction transaction) { OK, // xgroupCreate(streamKey1, groupName1, "0-0") OK, // xgroupCreate(streamKey1, groupName1, "0-0", options) true, // xgroupCreateConsumer(streamKey1, groupName1, consumer1) - Map.of( - streamKey1, - Map.of("0-3", new String[][] {{"field3", "value3"}})), // xreadgroup(Map.of(key9, "0-2"), groupName1, consumer1); - Map.of( - streamKey1, - Map.of("0-3", new String[][] {{"field3", "value3"}})), // xreadgroup(Map.of(key9, "0-2"), groupName1, consumer1, options); + // Map.of( + // streamKey1, + // Map.of( + // "0-3", + // new String[][] { + // {"field3", "value3"} + // })), // xreadgroup(Map.of(key9, "0-2"), groupName1, consumer1); + // Map.of( + // streamKey1, + // Map.of( + // "0-3", + // new String[][] { + // {"field3", "value3"} + // })), // xreadgroup(Map.of(key9, "0-2"), groupName1, consumer1, + // options); 0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) true, // xgroupDestroy(streamKey1, groupName1) true, // xgroupDestroy(streamKey1, groupName2) From 2b5abc08e476adcc5ed345477031d90e212a0385 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Tue, 18 Jun 2024 13:54:55 -0700 Subject: [PATCH 4/6] Fix IT tests Signed-off-by: Andrew Carbonetto --- .../api/commands/StreamBaseCommands.java | 2 + .../glide/api/models/BaseTransaction.java | 2 + .../java/glide/TransactionTestUtilities.java | 45 ++++++++++--------- .../cluster/ClusterTransactionTests.java | 4 ++ 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 53899d7529..7f35b1df81 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -422,6 +422,7 @@ CompletableFuture xgroupCreate( * @param consumer The newly created consumer. * @return A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. + * Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty. * @example *
{@code
      * // create a new stream at "mystream", with stream id "1-0"
@@ -461,6 +462,7 @@ CompletableFuture>> xreadgroup(
      * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
      * @return A {@literal Map>} with stream
      *      keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
+     *      Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
      * @example
      *     
{@code
      * // create a new stream at "mystream", with stream id "1-0"
diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java
index 8d22213d2a..e3d1b039a0 100644
--- a/java/client/src/main/java/glide/api/models/BaseTransaction.java
+++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java
@@ -3062,6 +3062,7 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull
      * @param consumer The newly created consumer.
      * @return Command Response - A {@literal Map>} with stream
      *      keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
+     *      Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
      */
     public T xreadgroup(Map keysAndIds, String group, String consumer) {
         return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build());
@@ -3081,6 +3082,7 @@ public T xreadgroup(Map keysAndIds, String group, String consume
      * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
      * @return Command Response - A {@literal Map>} with stream
      *      keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
+     *      Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
      */
     public T xreadgroup(
             Map keysAndIds,
diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
index 2dd27a0254..085c4f76a0 100644
--- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java
+++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
@@ -38,6 +38,7 @@
 import glide.api.models.commands.stream.StreamAddOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamRange.IdBound;
+import glide.api.models.commands.stream.StreamReadGroupOptions;
 import glide.api.models.commands.stream.StreamReadOptions;
 import glide.api.models.commands.stream.StreamTrimOptions.MinId;
 import java.util.HashMap;
@@ -742,13 +743,16 @@ private static Object[] streamCommands(BaseTransaction transaction) {
                 .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"))
                 .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L)
                 .xtrim(streamKey1, new MinId(true, "0-2"))
-                .xgroupCreate(streamKey1, groupName1, "0-0")
+                .xgroupCreate(streamKey1, groupName1, "0-2")
                 .xgroupCreate(
                         streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build())
                 .xgroupCreateConsumer(streamKey1, groupName1, consumer1)
-                //                .xreadgroup(Map.of(streamKey1, "0-2"), groupName1, consumer1)
-                //                .xreadgroup(Map.of(streamKey1, "0-2"), groupName1, consumer1,
-                //                    StreamReadGroupOptions.builder().count(2L).build())
+                .xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1)
+                .xreadgroup(
+                        Map.of(streamKey1, "0-3"),
+                        groupName1,
+                        consumer1,
+                        StreamReadGroupOptions.builder().count(2L).build())
                 .xgroupDelConsumer(streamKey1, groupName1, consumer1)
                 .xgroupDestroy(streamKey1, groupName1)
                 .xgroupDestroy(streamKey1, groupName2)
@@ -763,8 +767,10 @@ private static Object[] streamCommands(BaseTransaction transaction) {
                     streamKey1,
                     Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2"));
             Map.of(
-                    streamKey1, Map.of("0-3", new String[][] {{"field3", "value3"}})), // xread(Map.of(key9,
-            // "0-2"), options);
+                    streamKey1,
+                    Map.of(
+                            "0-3",
+                            new String[][] {{"field3", "value3"}})), // xread(Map.of(key9, "0-2"), options);
             Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrange(streamKey1, "0-1", "0-1")
             Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrange(streamKey1, "0-1", "0-1", 1l)
             Map.of("0-1", new String[][] {{"field1", "value1"}}), // .xrevrange(streamKey1, "0-1", "0-1")
@@ -774,22 +780,17 @@ private static Object[] streamCommands(BaseTransaction transaction) {
             OK, // xgroupCreate(streamKey1, groupName1, "0-0")
             OK, // xgroupCreate(streamKey1, groupName1, "0-0", options)
             true, // xgroupCreateConsumer(streamKey1, groupName1, consumer1)
-            //            Map.of(
-            //                    streamKey1,
-            //                    Map.of(
-            //                            "0-3",
-            //                            new String[][] {
-            //                                {"field3", "value3"}
-            //                            })), // xreadgroup(Map.of(key9, "0-2"), groupName1, consumer1);
-            //            Map.of(
-            //                    streamKey1,
-            //                    Map.of(
-            //                            "0-3",
-            //                            new String[][] {
-            //                                {"field3", "value3"}
-            //                            })), // xreadgroup(Map.of(key9, "0-2"), groupName1, consumer1,
-            // options);
-            0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1)
+            Map.of(
+                    streamKey1,
+                    Map.of(
+                            "0-3",
+                            new String[][] {
+                                {"field3", "value3"}
+                            })), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1);
+            Map.of(
+                    streamKey1,
+                    Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options);
+            1L, // xgroupDelConsumer(streamKey1, groupName1, consumer1)
             true, // xgroupDestroy(streamKey1, groupName1)
             true, // xgroupDestroy(streamKey1, groupName2)
             1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"});
diff --git a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java
index d1f1eb4452..4b732746ae 100644
--- a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java
+++ b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java
@@ -81,6 +81,10 @@ public void transactions_with_group_of_commands(String testName, TransactionBuil
         Object[] expectedResult = builder.apply(transaction);
 
         Object[] results = clusterClient.exec(transaction).get();
+        if (testName.equals("Stream Commands")) {
+            System.out.println(expectedResult[14] + " <=> " + results[14]);
+            System.out.println(expectedResult[15] + " <=> " + results[15]);
+        }
         assertDeepEquals(expectedResult, results);
     }
 

From 957e012d1387b7ededec48f2113a86b48e1d725f Mon Sep 17 00:00:00 2001
From: Andrew Carbonetto 
Date: Wed, 19 Jun 2024 10:56:59 -0700
Subject: [PATCH 5/6] SPOTLESS & merge conflict fix

Signed-off-by: Andrew Carbonetto 
---
 .../test/java/glide/SharedCommandTests.java   | 86 +++++++++----------
 1 file changed, 43 insertions(+), 43 deletions(-)

diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java
index e415fbafd4..cec1d804f8 100644
--- a/java/integTest/src/test/java/glide/SharedCommandTests.java
+++ b/java/integTest/src/test/java/glide/SharedCommandTests.java
@@ -3626,7 +3626,7 @@ public void xreadgroup_return_failures(BaseClient client) {
         Map timestamp_1_1_map = new LinkedHashMap<>();
         timestamp_1_1_map.put(field1, field1 + "1");
         String timestamp_1_1 =
-            client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get();
+                client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get();
         assertNotNull(timestamp_1_1);
 
         String groupName = "group" + UUID.randomUUID();
@@ -3635,43 +3635,43 @@ public void xreadgroup_return_failures(BaseClient client) {
 
         // create group and consumer for the group
         assertEquals(
-            OK,
-            client
-                .xgroupCreate(
-                    key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
-                .get());
+                OK,
+                client
+                        .xgroupCreate(
+                                key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
+                        .get());
         assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get());
 
         // Key exists, but it is not a stream
         assertEquals(OK, client.set(nonStreamKey, "bar").get());
         ExecutionException executionException =
-            assertThrows(
-                ExecutionException.class,
-                () ->
-                    client
-                        .xreadgroup(
-                            Map.of(nonStreamKey, timestamp_1_1, key, timestamp_1_1),
-                            groupName,
-                            consumerName)
-                        .get());
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xreadgroup(
+                                                Map.of(nonStreamKey, timestamp_1_1, key, timestamp_1_1),
+                                                groupName,
+                                                consumerName)
+                                        .get());
         assertInstanceOf(RequestException.class, executionException.getCause());
 
         executionException =
-            assertThrows(
-                ExecutionException.class,
-                () ->
-                    client
-                        .xreadgroup(
-                            Map.of(key, timestamp_1_1, nonStreamKey, timestamp_1_1),
-                            groupName,
-                            consumerName)
-                        .get());
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xreadgroup(
+                                                Map.of(key, timestamp_1_1, nonStreamKey, timestamp_1_1),
+                                                groupName,
+                                                consumerName)
+                                        .get());
         assertInstanceOf(RequestException.class, executionException.getCause());
 
         try (var testClient =
-                 client instanceof RedisClient
-                     ? RedisClient.CreateClient(commonClientConfig().build()).get()
-                     : RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) {
+                client instanceof RedisClient
+                        ? RedisClient.CreateClient(commonClientConfig().build()).get()
+                        : RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) {
             String timeoutKey = "{key}:2" + UUID.randomUUID();
             String timeoutGroupName = "group" + UUID.randomUUID();
             String timeoutConsumerName = "consumer" + UUID.randomUUID();
@@ -3708,26 +3708,26 @@ public void xreadgroup_return_failures(BaseClient client) {
             // ensure that command doesn't time out even if timeout > request timeout
             long oneSecondInMS = 1000L;
             assertNull(
-                testClient
-                    .xreadgroup(
-                        Map.of(timeoutKey, zeroStreamId),
-                        timeoutGroupName,
-                        timeoutConsumerName,
-                        StreamReadGroupOptions.builder().block(oneSecondInMS).build())
-                    .get());
+                    testClient
+                            .xreadgroup(
+                                    Map.of(timeoutKey, ">"),
+                                    timeoutGroupName,
+                                    timeoutConsumerName,
+                                    StreamReadGroupOptions.builder().block(oneSecondInMS).build())
+                            .get());
 
             // with 0 timeout (no timeout) should never time out,
             // but we wrap the test with timeout to avoid test failing or stuck forever
             assertThrows(
-                TimeoutException.class, // <- future timeout, not command timeout
-                () ->
-                    testClient
-                        .xreadgroup(
-                            Map.of(timeoutKey, zeroStreamId),
-                            timeoutGroupName,
-                            timeoutConsumerName,
-                            StreamReadGroupOptions.builder().block(0L).build())
-                        .get(3, TimeUnit.SECONDS));
+                    TimeoutException.class, // <- future timeout, not command timeout
+                    () ->
+                            testClient
+                                    .xreadgroup(
+                                            Map.of(timeoutKey, ">"),
+                                            timeoutGroupName,
+                                            timeoutConsumerName,
+                                            StreamReadGroupOptions.builder().block(0L).build())
+                                    .get(3, TimeUnit.SECONDS));
         }
     }
 

From ffca9755601ee3a977668b5935b42152e52ad5ce Mon Sep 17 00:00:00 2001
From: Andrew Carbonetto 
Date: Wed, 19 Jun 2024 14:20:09 -0700
Subject: [PATCH 6/6] Update for review comments

Signed-off-by: Andrew Carbonetto 
---
 .../src/main/java/glide/api/BaseClient.java   |  2 +-
 .../api/commands/StreamBaseCommands.java      | 20 ++++++++--------
 .../glide/api/models/BaseTransaction.java     | 23 ++++++++++---------
 .../stream/StreamReadGroupOptions.java        | 13 +++++++++--
 .../test/java/glide/api/RedisClientTest.java  |  2 +-
 .../glide/api/models/TransactionTests.java    |  2 +-
 .../test/java/glide/SharedCommandTests.java   | 11 +++++++--
 .../cluster/ClusterTransactionTests.java      |  4 ----
 8 files changed, 46 insertions(+), 31 deletions(-)

diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java
index 5c7506c0da..d338691f14 100644
--- a/java/client/src/main/java/glide/api/BaseClient.java
+++ b/java/client/src/main/java/glide/api/BaseClient.java
@@ -1441,7 +1441,7 @@ public CompletableFuture>> xreadgroup(
             @NonNull Map keysAndIds,
             @NonNull String group,
             @NonNull String consumer,
-            StreamReadGroupOptions options) {
+            @NonNull StreamReadGroupOptions options) {
         String[] arguments = options.toArgs(group, consumer, keysAndIds);
         return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
     }
diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
index 7f35b1df81..1f19eddf46 100644
--- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
+++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java
@@ -69,7 +69,7 @@ public interface StreamBaseCommands {
      * @param keysAndIds A Map of keys and entry ids to read from. The 
      *     Map is composed of a stream's key and the id of the entry after which the stream
      *     will be read.
-     * @return A {@literal Map>} with stream
+     * @return A {@literal Map>} with stream
      *      keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
      * @example
      *     
{@code
@@ -96,7 +96,7 @@ public interface StreamBaseCommands {
      *     Map is composed of a stream's key and the id of the entry after which the stream
      *     will be read.
      * @param options Options detailing how to read the stream {@link StreamReadOptions}.
-     * @return A {@literal Map>} with stream
+     * @return A {@literal Map>} with stream
      *     keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
      * @example
      *     
{@code
@@ -420,7 +420,7 @@ CompletableFuture xgroupCreate(
      *     will be read. Use the special id of {@literal ">"} to receive only new messages.
      * @param group The consumer group name.
      * @param consumer The newly created consumer.
-     * @return A {@literal Map>} with stream
+     * @return A {@literal Map>} with stream
      *      keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
      *      Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
      * @example
@@ -429,7 +429,7 @@ CompletableFuture xgroupCreate(
      * Map xreadKeys = Map.of("myfield", "mydata");
      * String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
      * assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
-     * Map> streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", ">")).get();
+     * Map> streamReadResponse = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "myconsumer").get();
      * // Returns "mystream": "1-0": {{"myfield", "mydata"}}
      * for (var keyEntry : streamReadResponse.entrySet()) {
      *     System.out.printf("Key: %s", keyEntry.getKey());
@@ -440,7 +440,7 @@ CompletableFuture xgroupCreate(
      *     }
      * }
      * assert client.xdel("mystream", "1-0").get() == 1L;
-     * client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", "0")).get();
+     * client.xreadgroup(Map.of("mystream", "0"), "mygroup", "myconsumer").get();
      * // Returns "mystream": "1-0": null
      * assert streamReadResponse.get("mystream").get("1-0") == null;
      * 
@@ -460,7 +460,7 @@ CompletableFuture>> xreadgroup( * @param group The consumer group name. * @param consumer The newly created consumer. * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}. - * @return A {@literal Map>} with stream + * @return A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. * Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty. * @example @@ -469,8 +469,8 @@ CompletableFuture>> xreadgroup( * Map xreadKeys = Map.of("myfield", "mydata"); * String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get(); * assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup" - * StreamReadGroupOptions op = StreamReadGroupOptions.builder().count(1).build(); // retrieves only a single message at a time - * Map> streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", ">"), op).get(); + * StreamReadGroupOptions options = StreamReadGroupOptions.builder().count(1).build(); // retrieves only a single message at a time + * Map> streamReadResponse = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "myconsumer", options).get(); * // Returns "mystream": "1-0": {{"myfield", "mydata"}} * for (var keyEntry : streamReadResponse.entrySet()) { * System.out.printf("Key: %s", keyEntry.getKey()); @@ -481,7 +481,9 @@ CompletableFuture>> xreadgroup( * } * } * assert client.xdel("mystream", "1-0").get() == 1L; - * streamReadResponse = client.xreadgroup("mygroup", "myconsumer", Map.of("mystream", "0"), op).get(); + * // read the first 10 items and acknowledge (ACK) them: + * StreamReadGroupOptions options = StreamReadGroupOptions.builder().count(10L).noack().build(); + * streamReadResponse = client.xreadgroup(Map.of("mystream", "0"), "mygroup", "myconsumer", options).get(); * // Returns "mystream": "1-0": null * assert streamReadResponse.get("mystream").get("1-0") == null; *
diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index e3d1b039a0..41180c933a 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -2768,7 +2768,7 @@ public T xadd( * @param keysAndIds An array of Pairs of keys and entry ids to read from. A * pair is composed of a stream's key and the id of the entry after which the stream * will be read. - * @return Command Response - A {@literal Map>} with stream + * @return Command Response - A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. */ public T xread(@NonNull Map keysAndIds) { @@ -2783,7 +2783,7 @@ public T xread(@NonNull Map keysAndIds) { * pair is composed of a stream's key and the id of the entry after which the stream * will be read. * @param options options detailing how to read the stream {@link StreamReadOptions}. - * @return Command Response - A {@literal Map>} with stream + * @return Command Response - A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. */ public T xread(@NonNull Map keysAndIds, @NonNull StreamReadOptions options) { @@ -3057,14 +3057,15 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull * @see valkey.io for details. * @param keysAndIds A Map of keys and entry ids to read from. The * Map is composed of a stream's key and the id of the entry after which the stream - * will be read. Use the special id of {@literal ">"} to receive only new messages. + * will be read. Use the special id of {@literal Map>} to receive only new messages. * @param group The consumer group name. * @param consumer The newly created consumer. - * @return Command Response - A {@literal Map>} with stream + * @return Command Response - A {@literal Map>>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. * Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty. */ - public T xreadgroup(Map keysAndIds, String group, String consumer) { + public T xreadgroup( + @NonNull Map keysAndIds, @NonNull String group, @NonNull String consumer) { return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build()); } @@ -3076,19 +3077,19 @@ public T xreadgroup(Map keysAndIds, String group, String consume * @see valkey.io for details. * @param keysAndIds A Map of keys and entry ids to read from. The * Map is composed of a stream's key and the id of the entry after which the stream - * will be read. Use the special id of {@literal ">"} to receive only new messages. + * will be read. Use the special id of {@literal Map>} to receive only new messages. * @param group The consumer group name. * @param consumer The newly created consumer. * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}. - * @return Command Response - A {@literal Map>} with stream + * @return Command Response - A {@literal Map>>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. * Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty. */ public T xreadgroup( - Map keysAndIds, - String group, - String consumer, - StreamReadGroupOptions options) { + @NonNull Map keysAndIds, + @NonNull String group, + @NonNull String consumer, + @NonNull StreamReadGroupOptions options) { protobufTransaction.addCommands( buildCommand(XReadGroup, buildArgs(options.toArgs(group, consumer, keysAndIds)))); return getThis(); diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java index 0942992c31..246b4e1128 100644 --- a/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamReadGroupOptions.java @@ -25,7 +25,16 @@ public final class StreamReadGroupOptions extends StreamReadOptions { * If set, messages are not added to the Pending Entries List (PEL). This is equivalent to * acknowledging the message when it is read. */ - private Boolean noack; + private boolean noack; + + public abstract static class StreamReadGroupOptionsBuilder< + C extends StreamReadGroupOptions, B extends StreamReadGroupOptionsBuilder> + extends StreamReadOptions.StreamReadOptionsBuilder { + public B noack() { + this.noack = true; + return self(); + } + } /** * Converts options and the key-to-id input for {@link StreamBaseCommands#xreadgroup(Map, String, @@ -49,7 +58,7 @@ public String[] toArgs(String group, String consumer, Map stream optionArgs.add(block.toString()); } - if (this.noack != null && this.noack) { + if (this.noack) { optionArgs.add(READ_NOACK_REDIS_API); } diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 574b479b78..e1a1d81812 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -4603,7 +4603,7 @@ public void xreadgroup_with_options() { Map.of(keyOne, streamIdOne), groupName, consumerName, - StreamReadGroupOptions.builder().block(block).count(count).noack(true).build()); + StreamReadGroupOptions.builder().block(block).count(count).noack().build()); Map> payload = response.get(); // verify diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 3cc7e6aaed..a94a466453 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -800,7 +800,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), Map.of("key", "id"), "group", "consumer", - StreamReadGroupOptions.builder().block(1L).count(2L).noack(true).build()); + StreamReadGroupOptions.builder().block(1L).count(2L).noack().build()); results.add( Pair.of( XReadGroup, diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index cec1d804f8..21804e4a6e 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3519,7 +3519,13 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) // read the entire stream for the consumer and mark messages as pending var result_1 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); - assertEquals(2, result_1.get(key).size()); + assertDeepEquals( + Map.of( + key, + Map.of( + streamid_1, new String[][] {{"field1", "value1"}}, + streamid_2, new String[][] {{"field2", "value2"}})), + result_1); // delete one of the streams assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get()); @@ -3642,7 +3648,7 @@ public void xreadgroup_return_failures(BaseClient client) { .get()); assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); - // Key exists, but it is not a stream + // First key exists, but it is not a stream assertEquals(OK, client.set(nonStreamKey, "bar").get()); ExecutionException executionException = assertThrows( @@ -3656,6 +3662,7 @@ public void xreadgroup_return_failures(BaseClient client) { .get()); assertInstanceOf(RequestException.class, executionException.getCause()); + // Second key exists, but it is not a stream executionException = assertThrows( ExecutionException.class, diff --git a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java index 4b732746ae..d1f1eb4452 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java @@ -81,10 +81,6 @@ public void transactions_with_group_of_commands(String testName, TransactionBuil Object[] expectedResult = builder.apply(transaction); Object[] results = clusterClient.exec(transaction).get(); - if (testName.equals("Stream Commands")) { - System.out.println(expectedResult[14] + " <=> " + results[14]); - System.out.println(expectedResult[15] + " <=> " + results[15]); - } assertDeepEquals(expectedResult, results); }