diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 39299a05c6..46b13dea32 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -148,6 +148,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer; import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy; import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId; +import static redis_request.RedisRequestOuterClass.RequestType.XInfoConsumers; +import static redis_request.RedisRequestOuterClass.RequestType.XInfoGroups; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XPending; import static redis_request.RedisRequestOuterClass.RequestType.XRange; @@ -2709,6 +2711,40 @@ public CompletableFuture xclaimJustId( XClaim, args, response -> castArray(handleArrayResponse(response), GlideString.class)); } + @Override + public CompletableFuture[]> xinfoGroups(@NonNull String key) { + return commandManager.submitNewCommand( + XInfoGroups, + new String[] {key}, + response -> castArray(handleArrayResponse(response), Map.class)); + } + + @Override + public CompletableFuture[]> xinfoGroups(@NonNull GlideString key) { + return commandManager.submitNewCommand( + XInfoGroups, + new GlideString[] {key}, + response -> castArray(handleArrayResponseBinary(response), Map.class)); + } + + @Override + public CompletableFuture[]> xinfoConsumers( + @NonNull String key, @NonNull String groupName) { + return commandManager.submitNewCommand( + XInfoConsumers, + new String[] {key, groupName}, + response -> castArray(handleArrayResponse(response), Map.class)); + } + + @Override + public CompletableFuture[]> xinfoConsumers( + @NonNull GlideString key, @NonNull GlideString groupName) { + return commandManager.submitNewCommand( + XInfoConsumers, + new GlideString[] {key, groupName}, + response -> castArray(handleArrayResponseBinary(response), Map.class)); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index a1880d8ef9..bb2f2558dd 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -1277,4 +1277,99 @@ CompletableFuture xclaimJustId( long minIdleTime, GlideString[] ids, StreamClaimOptions options); + + /** + * Returns the list of all consumer groups and their attributes for the stream stored at key + * . + * + * @see valkey.io for details. + * @param key The key of the stream. + * @return An Array of Maps, where each mapping represents the + * attributes of a consumer group for the stream at key. + * @example + *
{@code
+     * Map[] groups = client.xinfoGroups("key").get();
+     * for (int i = 0; i < groups.length; i ++) {
+     *     System.out.println("Info of group: " + groups[0].get("name"));
+     *     System.out.println("\tname: " + groups[0].get("name"));
+     *     System.out.println("\tconsumers: " + groups[0].get("consumers"));
+     *     System.out.println("\tpending: " + groups[0].get("pending"));
+     *     System.out.println("\tlast-delivered-id: " + groups[0].get("last-delivered-id"));
+     *     System.out.println("\tentries-read: " + groups[0].get("entries-read"));
+     *     System.out.println("\tlag: " + groups[0].get("lag"));
+     * }
+     * }
+ */ + CompletableFuture[]> xinfoGroups(String key); + + /** + * Returns the list of all consumer groups and their attributes for the stream stored at key + * . + * + * @see valkey.io for details. + * @param key The key of the stream. + * @return An Array of Maps, where each mapping represents the + * attributes of a consumer group for the stream at key. + * @example + *
{@code
+     * Map[] groups = client.xinfoGroups(gs("key")).get();
+     * for (int i = 0; i < groups.length; i ++) {
+     *     System.out.println("Info of group: " + groups[0].get(gs("name")));
+     *     System.out.println("\tname: " + groups[0].get(gs("name")));
+     *     System.out.println("\tconsumers: " + groups[0].get(gs("consumers")));
+     *     System.out.println("\tpending: " + groups[0].get(gs("pending")));
+     *     System.out.println("\tlast-delivered-id: " + groups[0].get(gs("last-delivered-id")));
+     *     System.out.println("\tentries-read: " + groups[0].get(gs("entries-read")));
+     *     System.out.println("\tlag: " + groups[0].get(gs("lag")));
+     * }
+     * }
+ */ + CompletableFuture[]> xinfoGroups(GlideString key); + + /** + * Returns the list of all consumers and their attributes for the given consumer group of the + * stream stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupName The consumer group name. + * @return An Array of Maps, where each mapping contains the attributes + * of a consumer for the given consumer group of the stream at key. + * @example + *
{@code
+     * Map[] consumers = client.xinfoConsumers("key", "groupName").get();
+     * for (int i = 0; i < consumers.length; i ++) {
+     *     System.out.println("Info of consumer: " + consumers[0].get("name"));
+     *     System.out.println("\tname: " + consumers[0].get("name"));
+     *     System.out.println("\tpending: " + consumers[0].get("pending"));
+     *     System.out.println("\tidle: " + consumers[0].get("idle"));
+     *     System.out.println("\tinactive: " + consumers[0].get("inactive"));
+     * }
+     * }
+ */ + CompletableFuture[]> xinfoConsumers(String key, String groupName); + + /** + * Returns the list of all consumers and their attributes for the given consumer group of the + * stream stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupName The consumer group name. + * @return An Array of Maps, where each mapping contains the attributes + * of a consumer for the given consumer group of the stream at key. + * @example + *
{@code
+     * Map[] consumers = client.xinfoConsumers(gs("key"), gs("groupName")).get();
+     * for (int i = 0; i < consumers.length; i ++) {
+     *     System.out.println("Info of consumer: " + consumers[0].get(gs("name")));
+     *     System.out.println("\tname: " + consumers[0].get(gs("name")));
+     *     System.out.println("\tpending: " + consumers[0].get(gs("pending")));
+     *     System.out.println("\tidle: " + consumers[0].get(gs("idle")));
+     *     System.out.println("\tinactive: " + consumers[0].get(gs("inactive")));
+     * }
+     * }
+ */ + CompletableFuture[]> xinfoConsumers( + GlideString key, GlideString groupName); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 3052ebcdf1..ca43eeed13 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -170,6 +170,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer; import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy; import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId; +import static redis_request.RedisRequestOuterClass.RequestType.XInfoConsumers; +import static redis_request.RedisRequestOuterClass.RequestType.XInfoGroups; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XPending; import static redis_request.RedisRequestOuterClass.RequestType.XRange; @@ -4044,6 +4046,37 @@ public T xclaimJustId( return getThis(); } + /** + * Returns the list of all consumer groups and their attributes for the stream stored at key + * . + * + * @see valkey.io for details. + * @param key The key of the stream. + * @return Command Response - An Array of Maps, where each mapping + * represents the attributes of a consumer group for the stream at key. + */ + public T xinfoGroups(@NonNull ArgType key) { + protobufTransaction.addCommands(buildCommand(XInfoGroups, newArgsBuilder().add(key))); + return getThis(); + } + + /** + * Returns the list of all consumers and their attributes for the given consumer group of the + * stream stored at key. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param groupName The consumer group name. + * @return Command Response - An Array of Maps, where each mapping + * contains the attributes of a consumer for the given consumer group of the stream at + * key. + */ + public T xinfoConsumers(@NonNull ArgType key, @NonNull ArgType groupName) { + protobufTransaction.addCommands( + buildCommand(XInfoConsumers, newArgsBuilder().add(key).add(groupName))); + return getThis(); + } + /** * Returns the remaining time to live of key that has a timeout, in milliseconds. * diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index e16e8675cf..6a2b9ccc89 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -241,6 +241,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer; import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy; import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId; +import static redis_request.RedisRequestOuterClass.RequestType.XInfoConsumers; +import static redis_request.RedisRequestOuterClass.RequestType.XInfoGroups; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XPending; import static redis_request.RedisRequestOuterClass.RequestType.XRange; @@ -13244,4 +13246,150 @@ public void geosearchstore_with_options_binary_returns_success( assertEquals(testResponse, response); assertEquals(expected, payload); } + + @SneakyThrows + @Test + public void xinfoGroups_returns_success() { + // setup + String key = "testKey"; + String[] arguments = {key}; + Map[] mockResult = + new Map[] { + Map.of( + "name", + "groupName", + "consumers", + 2, + "pending", + 2, + "last-delivered-id", + "1638126030001-0", + "entries-read", + 2, + "lag", + 2) + }; + + CompletableFuture[]> testResponse = new CompletableFuture<>(); + testResponse.complete(mockResult); + + // match on protobuf request + when(commandManager.[]>submitNewCommand( + eq(XInfoGroups), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture[]> response = service.xinfoGroups(key); + Map[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(mockResult, payload); + } + + @SneakyThrows + @Test + public void xinfoGroups_binary_returns_success() { + // setup + GlideString key = gs("testKey"); + GlideString[] arguments = {key}; + Map[] mockResult = + new Map[] { + Map.of( + gs("name"), + gs("groupName"), + gs("consumers"), + 2, + gs("pending"), + 2, + gs("last-delivered-id"), + gs("1638126030001-0"), + gs("entries-read"), + 2, + gs("lag"), + 2) + }; + + CompletableFuture[]> testResponse = new CompletableFuture<>(); + testResponse.complete(mockResult); + + // match on protobuf request + when(commandManager.[]>submitNewCommand( + eq(XInfoGroups), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture[]> response = service.xinfoGroups(key); + Map[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(mockResult, payload); + } + + @SneakyThrows + @Test + public void xinfoConsumers_returns_success() { + // setup + String key = "testKey"; + String groupName = "groupName"; + String[] arguments = {key, groupName}; + Map[] mockResult = + new Map[] { + Map.of("name", "groupName", "pending", 2, "idle", 9104628, "inactive", 18104698) + }; + + CompletableFuture[]> testResponse = new CompletableFuture<>(); + testResponse.complete(mockResult); + + // match on protobuf request + when(commandManager.[]>submitNewCommand( + eq(XInfoConsumers), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture[]> response = service.xinfoConsumers(key, groupName); + Map[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(mockResult, payload); + } + + @SneakyThrows + @Test + public void xinfoConsumers_binary_returns_success() { + // setup + GlideString key = gs("testKey"); + GlideString groupName = gs("groupName"); + GlideString[] arguments = {key, groupName}; + Map[] mockResult = + new Map[] { + Map.of( + gs("name"), + gs("groupName"), + gs("pending"), + 2, + gs("idle"), + 9104628, + gs("inactive"), + 18104698) + }; + + CompletableFuture[]> testResponse = new CompletableFuture<>(); + testResponse.complete(mockResult); + + // match on protobuf request + when(commandManager.[]>submitNewCommand( + eq(XInfoConsumers), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture[]> response = service.xinfoConsumers(key, groupName); + Map[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(mockResult, payload); + } } diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index f8a7b9651b..92f6ec2358 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -196,6 +196,8 @@ import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer; import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy; import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId; +import static redis_request.RedisRequestOuterClass.RequestType.XInfoConsumers; +import static redis_request.RedisRequestOuterClass.RequestType.XInfoGroups; import static redis_request.RedisRequestOuterClass.RequestType.XLen; import static redis_request.RedisRequestOuterClass.RequestType.XPending; import static redis_request.RedisRequestOuterClass.RequestType.XRange; @@ -952,6 +954,12 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), FORCE_REDIS_API, JUST_ID_REDIS_API))); + transaction.xinfoGroups("key"); + results.add(Pair.of(XInfoGroups, buildArgs("key"))); + + transaction.xinfoConsumers("key", "groupName"); + results.add(Pair.of(XInfoConsumers, buildArgs("key", "groupName"))); + transaction.time(); results.add(Pair.of(Time, buildArgs())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index d5372ffca3..0fd23ce940 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -5347,6 +5347,221 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { assertInstanceOf(RequestException.class, executionException.getCause()); } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xinfoGroups_with_xinfoConsumers(BaseClient client) { + String key = UUID.randomUUID().toString(); + String groupName1 = "group1" + UUID.randomUUID(); + String groupName2 = "group2" + UUID.randomUUID(); + String consumer1 = "consumer1" + UUID.randomUUID(); + String consumer2 = "consumer2" + UUID.randomUUID(); + String streamId0_0 = "0-0"; + String streamId1_0 = "1-0"; + String streamId1_1 = "1-1"; + String streamId1_2 = "1-2"; + String streamId1_3 = "1-3"; + LinkedHashMap streamMap = new LinkedHashMap(); + streamMap.put("f1", "v1"); + streamMap.put("f2", "v2"); + + assertEquals( + streamId1_0, + client.xadd(key, streamMap, StreamAddOptions.builder().id(streamId1_0).build()).get()); + assertEquals( + streamId1_1, + client + .xadd(key, Map.of("f3", "v3"), StreamAddOptions.builder().id(streamId1_1).build()) + .get()); + assertEquals( + streamId1_2, + client + .xadd(key, Map.of("f4", "v4"), StreamAddOptions.builder().id(streamId1_2).build()) + .get()); + assertEquals(OK, client.xgroupCreate(key, groupName1, streamId0_0).get()); + + Map> result = + client + .xreadgroup( + Map.of(key, ">"), + groupName1, + consumer1, + StreamReadGroupOptions.builder().count(1L).build()) + .get(); + assertDeepEquals( + Map.of(key, Map.of(streamId1_0, new String[][] {{"f1", "v1"}, {"f2", "v2"}})), result); + + // Sleep to ensure the idle time value and inactive time value returned by xinfo_consumers is >0 + Thread.sleep(2000); + Map[] consumers = client.xinfoConsumers(key, groupName1).get(); + assertEquals(1, consumers.length); + Map consumerInfo = consumers[0]; + assertEquals(consumer1, consumerInfo.get("name")); + assertEquals(1L, consumerInfo.get("pending")); + assertTrue((Long) consumerInfo.get("idle") > 0L); + + if (REDIS_VERSION.isGreaterThanOrEqualTo("7.2.0")) { + assertTrue((Long) consumerInfo.get("inactive") > 0L); + } + + // Test with GlideString + Map[] binaryConsumers = + client.xinfoConsumers(gs(key), gs(groupName1)).get(); + assertEquals(1, binaryConsumers.length); + Map binaryConsumerInfo = binaryConsumers[0]; + assertEquals(gs(consumer1), binaryConsumerInfo.get(gs("name"))); + assertEquals(1L, binaryConsumerInfo.get(gs("pending"))); + assertTrue((Long) binaryConsumerInfo.get(gs("idle")) > 0L); + + if (REDIS_VERSION.isGreaterThanOrEqualTo("7.2.0")) { + assertTrue((Long) binaryConsumerInfo.get(gs("inactive")) > 0L); + } + + // Create consumer2 and read the rest of the entries with it + assertTrue(client.xgroupCreateConsumer(key, groupName1, consumer2).get()); + result = client.xreadgroup(Map.of(key, ">"), groupName1, consumer2).get(); + assertDeepEquals( + Map.of( + key, + Map.of( + streamId1_1, new String[][] {{"f3", "v3"}}, + streamId1_2, new String[][] {{"f4", "v4"}})), + result); + + // Verify that xinfo_consumers contains info for 2 consumers now + // Test with byte string args + consumers = client.xinfoConsumers(key, groupName1).get(); + assertEquals(2, consumers.length); + + // Add one more entry + assertEquals( + streamId1_3, + client + .xadd(key, Map.of("f5", "v5"), StreamAddOptions.builder().id(streamId1_3).build()) + .get()); + + Map[] groups = client.xinfoGroups(key).get(); + assertEquals(1, groups.length); + Map group1Info = groups[0]; + assertEquals(groupName1, group1Info.get("name")); + assertEquals(2L, group1Info.get("consumers")); + assertEquals(3L, group1Info.get("pending")); + assertEquals(streamId1_2, group1Info.get("last-delivered-id")); + if (REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0")) { + assertEquals( + 3L, group1Info.get("entries-read")); // We have read stream entries 1-0, 1-1, and 1-2 + assertEquals( + 1L, group1Info.get("lag")); // We still have not read one entry in the stream, entry 1-3 + } + + // Test with GlideString + Map[] binaryGroups = client.xinfoGroups(gs(key)).get(); + assertEquals(1, binaryGroups.length); + Map binaryGroup1Info = binaryGroups[0]; + assertEquals(gs(groupName1), binaryGroup1Info.get(gs("name"))); + assertEquals(2L, binaryGroup1Info.get(gs("consumers"))); + assertEquals(3L, binaryGroup1Info.get(gs("pending"))); + assertEquals(gs(streamId1_2), binaryGroup1Info.get(gs("last-delivered-id"))); + if (REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0")) { + assertEquals( + 3L, + binaryGroup1Info.get( + gs("entries-read"))); // We have read stream entries 1-0, 1-1, and 1-2 + assertEquals( + 1L, + binaryGroup1Info.get( + gs("lag"))); // We still have not read one entry in the stream, entry 1-3 + } + + // Verify xgroup_set_id effects the returned value from xinfo_groups + assertEquals(OK, client.xgroupSetId(key, groupName1, streamId1_1).get()); + groups = client.xinfoGroups(key).get(); + assertEquals(1, groups.length); + group1Info = groups[0]; + assertEquals(groupName1, group1Info.get("name")); + assertEquals(2L, group1Info.get("consumers")); + assertEquals(3L, group1Info.get("pending")); + assertEquals(streamId1_1, group1Info.get("last-delivered-id")); + if (REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0")) { + assertNull( + group1Info.get("entries-read")); // Gets set to None when we change the last delivered ID + assertNull(group1Info.get("lag")); // Gets set to None when we change the last delivered ID + + // Verify xgroup_set_id with entries_read_id effects the returned value from xinfo_groups + assertEquals(OK, client.xgroupSetId(key, groupName1, streamId1_1, 1L).get()); + groups = client.xinfoGroups(key).get(); + assertEquals(1, groups.length); + group1Info = groups[0]; + assertEquals(groupName1, group1Info.get("name")); + assertEquals(2L, group1Info.get("consumers")); + assertEquals(3L, group1Info.get("pending")); + assertEquals(streamId1_1, group1Info.get("last-delivered-id")); + assertEquals(1L, group1Info.get("entries-read")); + assertEquals(3L, group1Info.get("lag")); + } + + // Add one more consumer group + assertEquals(OK, client.xgroupCreate(key, groupName2, streamId0_0).get()); + + // Verify that xinfo_groups contains info for 2 consumer groups now + groups = client.xinfoGroups(key).get(); + assertEquals(2, groups.length); + } + + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xinfoGroups_with_xinfoConsumers_and_edge_cases(BaseClient client) { + String key = UUID.randomUUID().toString(); + String stringKey = UUID.randomUUID().toString(); + String nonExistentKey = UUID.randomUUID().toString(); + String groupName = "group1" + UUID.randomUUID(); + String streamId1_0 = "1-0"; + + // Passing a non-existing key raises an error + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.xinfoGroups(nonExistentKey).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + executionException = + assertThrows( + ExecutionException.class, () -> client.xinfoConsumers(nonExistentKey, groupName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + assertEquals( + streamId1_0, + client + .xadd( + key, + Map.of("f1", "v1", "f2", "v2"), + StreamAddOptions.builder().id(streamId1_0).build()) + .get()); + + // Passing a non-existing group raises an error + executionException = + assertThrows( + ExecutionException.class, () -> client.xinfoConsumers(key, "nonExistentGroup").get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // No groups exist yet + assertEquals(0, client.xinfoGroups(key).get().length); + + assertEquals(OK, client.xgroupCreate(key, groupName, streamId1_0).get()); + + // No consumers exist yet + assertEquals(0, client.xinfoConsumers(key, groupName).get().length); + + // Key exists, but it is not a stream + assertEquals(OK, client.set(stringKey, "foo").get()); + executionException = + assertThrows(ExecutionException.class, () -> client.xinfoGroups(stringKey).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + executionException = + assertThrows( + ExecutionException.class, () -> client.xinfoConsumers(stringKey, groupName).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index caf461f30a..07c354f7ac 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -813,11 +813,12 @@ private static Object[] hyperLogLogCommands(BaseTransaction transaction) { private static Object[] streamCommands(BaseTransaction transaction) { final String streamKey1 = "{streamKey}-1-" + UUID.randomUUID(); + final String streamKey2 = "{streamKey}-2-" + UUID.randomUUID(); + final String streamKey3 = "{streamKey}-3-" + UUID.randomUUID(); final String groupName1 = "{groupName}-1-" + UUID.randomUUID(); final String groupName2 = "{groupName}-2-" + UUID.randomUUID(); - final String consumer1 = "{consumer}-1-" + UUID.randomUUID(); - final String streamKey2 = "{streamKey}-2-" + UUID.randomUUID(); final String groupName3 = "{groupName}-2-" + UUID.randomUUID(); + final String consumer1 = "{consumer}-1-" + UUID.randomUUID(); transaction .xadd(streamKey1, Map.of("field1", "value1"), StreamAddOptions.builder().id("0-1").build()) @@ -832,6 +833,7 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xrevrange(streamKey1, IdBound.of("0-1"), IdBound.of("0-1"), 1L) .xtrim(streamKey1, new MinId(true, "0-2")) .xgroupCreate(streamKey1, groupName1, "0-2") + .xinfoConsumers(streamKey1, groupName1) .xgroupCreate( streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build()) .xgroupCreateConsumer(streamKey1, groupName1, consumer1) @@ -869,7 +871,10 @@ private static Object[] streamCommands(BaseTransaction transaction) { .xgroupDelConsumer(streamKey1, groupName1, consumer1) .xgroupDestroy(streamKey1, groupName1) .xgroupDestroy(streamKey1, groupName2) - .xdel(streamKey1, new String[] {"0-3", "0-5"}); + .xdel(streamKey1, new String[] {"0-3", "0-5"}) + .xadd(streamKey3, Map.of("f0", "v0"), StreamAddOptions.builder().id("1-0").build()) + .xgroupCreate(streamKey3, groupName3, "0") + .xinfoGroups(streamKey1); if (REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0")) { transaction @@ -898,6 +903,7 @@ private static Object[] streamCommands(BaseTransaction transaction) { "0-1", new String[][] {{"field1", "value1"}}), // .xrevrange(streamKey1, "0-1", "0-1", 1l) 1L, // xtrim(streamKey1, new MinId(true, "0-2")) OK, // xgroupCreate(streamKey1, groupName1, "0-0") + new Map[] {}, // .xinfoConsumers(streamKey1, groupName1) OK, // xgroupCreate(streamKey1, groupName1, "0-0", options) true, // xgroupCreateConsumer(streamKey1, groupName1, consumer1) OK, // xgroupSetId(streamKey1, groupName1, "0-2") @@ -925,7 +931,10 @@ private static Object[] streamCommands(BaseTransaction transaction) { 0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) true, // xgroupDestroy(streamKey1, groupName1) true, // xgroupDestroy(streamKey1, groupName2) - 1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"}); + 1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"}) + "1-0", // xadd(streamKey3, Map.of("f0", "v0"), id("1-0")) + OK, // xgroupCreate(streamKey3, groupName3, "0") + new Map[] {}, // xinfoGroups(streamKey3) }; if (REDIS_VERSION.isGreaterThanOrEqualTo("7.0.0")) {