Skip to content

Commit

Permalink
Java: Add XINFO GROUPS and XINFO CONSUMERS commands (valkey-io#1793)
Browse files Browse the repository at this point in the history
* Added xinfo groups and consumers

* Add support for GlideString

* Spotless

* Addressed PR comments and fixed CI

* Spotless

* Addressed PR comment

* Fixed binary return
  • Loading branch information
GumpacG authored Jul 4, 2024
1 parent b63d1ec commit a178468
Show file tree
Hide file tree
Showing 7 changed files with 548 additions and 4 deletions.
36 changes: 36 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XInfoConsumers;
import static redis_request.RedisRequestOuterClass.RequestType.XInfoGroups;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -2709,6 +2711,40 @@ public CompletableFuture<GlideString[]> xclaimJustId(
XClaim, args, response -> castArray(handleArrayResponse(response), GlideString.class));
}

@Override
public CompletableFuture<Map<String, Object>[]> xinfoGroups(@NonNull String key) {
return commandManager.submitNewCommand(
XInfoGroups,
new String[] {key},
response -> castArray(handleArrayResponse(response), Map.class));
}

@Override
public CompletableFuture<Map<GlideString, Object>[]> xinfoGroups(@NonNull GlideString key) {
return commandManager.submitNewCommand(
XInfoGroups,
new GlideString[] {key},
response -> castArray(handleArrayResponseBinary(response), Map.class));
}

@Override
public CompletableFuture<Map<String, Object>[]> xinfoConsumers(
@NonNull String key, @NonNull String groupName) {
return commandManager.submitNewCommand(
XInfoConsumers,
new String[] {key, groupName},
response -> castArray(handleArrayResponse(response), Map.class));
}

@Override
public CompletableFuture<Map<GlideString, Object>[]> xinfoConsumers(
@NonNull GlideString key, @NonNull GlideString groupName) {
return commandManager.submitNewCommand(
XInfoConsumers,
new GlideString[] {key, groupName},
response -> castArray(handleArrayResponseBinary(response), Map.class));
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,4 +1277,99 @@ CompletableFuture<GlideString[]> xclaimJustId(
long minIdleTime,
GlideString[] ids,
StreamClaimOptions options);

/**
* Returns the list of all consumer groups and their attributes for the stream stored at <code>key
* </code>.
*
* @see <a href="https://valkey.io/commands/xinfo-groups/">valkey.io</a> for details.
* @param key The key of the stream.
* @return An <code>Array</code> of <code>Maps</code>, where each mapping represents the
* attributes of a consumer group for the stream at <code>key</code>.
* @example
* <pre>{@code
* Map<String, Object>[] groups = client.xinfoGroups("key").get();
* for (int i = 0; i < groups.length; i ++) {
* System.out.println("Info of group: " + groups[0].get("name"));
* System.out.println("\tname: " + groups[0].get("name"));
* System.out.println("\tconsumers: " + groups[0].get("consumers"));
* System.out.println("\tpending: " + groups[0].get("pending"));
* System.out.println("\tlast-delivered-id: " + groups[0].get("last-delivered-id"));
* System.out.println("\tentries-read: " + groups[0].get("entries-read"));
* System.out.println("\tlag: " + groups[0].get("lag"));
* }
* }</pre>
*/
CompletableFuture<Map<String, Object>[]> xinfoGroups(String key);

/**
* Returns the list of all consumer groups and their attributes for the stream stored at <code>key
* </code>.
*
* @see <a href="https://valkey.io/commands/xinfo-groups/">valkey.io</a> for details.
* @param key The key of the stream.
* @return An <code>Array</code> of <code>Maps</code>, where each mapping represents the
* attributes of a consumer group for the stream at <code>key</code>.
* @example
* <pre>{@code
* Map<GlideString, Object>[] groups = client.xinfoGroups(gs("key")).get();
* for (int i = 0; i < groups.length; i ++) {
* System.out.println("Info of group: " + groups[0].get(gs("name")));
* System.out.println("\tname: " + groups[0].get(gs("name")));
* System.out.println("\tconsumers: " + groups[0].get(gs("consumers")));
* System.out.println("\tpending: " + groups[0].get(gs("pending")));
* System.out.println("\tlast-delivered-id: " + groups[0].get(gs("last-delivered-id")));
* System.out.println("\tentries-read: " + groups[0].get(gs("entries-read")));
* System.out.println("\tlag: " + groups[0].get(gs("lag")));
* }
* }</pre>
*/
CompletableFuture<Map<GlideString, Object>[]> xinfoGroups(GlideString key);

/**
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xinfo-consumers/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @return An <code>Array</code> of <code>Maps</code>, where each mapping contains the attributes
* of a consumer for the given consumer group of the stream at <code>key</code>.
* @example
* <pre>{@code
* Map<String, Object>[] consumers = client.xinfoConsumers("key", "groupName").get();
* for (int i = 0; i < consumers.length; i ++) {
* System.out.println("Info of consumer: " + consumers[0].get("name"));
* System.out.println("\tname: " + consumers[0].get("name"));
* System.out.println("\tpending: " + consumers[0].get("pending"));
* System.out.println("\tidle: " + consumers[0].get("idle"));
* System.out.println("\tinactive: " + consumers[0].get("inactive"));
* }
* }</pre>
*/
CompletableFuture<Map<String, Object>[]> xinfoConsumers(String key, String groupName);

/**
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xinfo-consumers/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @return An <code>Array</code> of <code>Maps</code>, where each mapping contains the attributes
* of a consumer for the given consumer group of the stream at <code>key</code>.
* @example
* <pre>{@code
* Map<GlideString, Object>[] consumers = client.xinfoConsumers(gs("key"), gs("groupName")).get();
* for (int i = 0; i < consumers.length; i ++) {
* System.out.println("Info of consumer: " + consumers[0].get(gs("name")));
* System.out.println("\tname: " + consumers[0].get(gs("name")));
* System.out.println("\tpending: " + consumers[0].get(gs("pending")));
* System.out.println("\tidle: " + consumers[0].get(gs("idle")));
* System.out.println("\tinactive: " + consumers[0].get(gs("inactive")));
* }
* }</pre>
*/
CompletableFuture<Map<GlideString, Object>[]> xinfoConsumers(
GlideString key, GlideString groupName);
}
33 changes: 33 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XInfoConsumers;
import static redis_request.RedisRequestOuterClass.RequestType.XInfoGroups;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -4044,6 +4046,37 @@ public <ArgType> T xclaimJustId(
return getThis();
}

/**
* Returns the list of all consumer groups and their attributes for the stream stored at <code>key
* </code>.
*
* @see <a href="https://valkey.io/commands/xinfo-groups/">valkey.io</a> for details.
* @param key The key of the stream.
* @return Command Response - An <code>Array</code> of <code>Maps</code>, where each mapping
* represents the attributes of a consumer group for the stream at <code>key</code>.
*/
public <ArgType> T xinfoGroups(@NonNull ArgType key) {
protobufTransaction.addCommands(buildCommand(XInfoGroups, newArgsBuilder().add(key)));
return getThis();
}

/**
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xinfo-consumers/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @return Command Response - An <code>Array</code> of <code>Maps</code>, where each mapping
* contains the attributes of a consumer for the given consumer group of the stream at <code>
* key</code>.
*/
public <ArgType> T xinfoConsumers(@NonNull ArgType key, @NonNull ArgType groupName) {
protobufTransaction.addCommands(
buildCommand(XInfoConsumers, newArgsBuilder().add(key).add(groupName)));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
148 changes: 148 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XInfoConsumers;
import static redis_request.RedisRequestOuterClass.RequestType.XInfoGroups;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -13244,4 +13246,150 @@ public void geosearchstore_with_options_binary_returns_success(
assertEquals(testResponse, response);
assertEquals(expected, payload);
}

@SneakyThrows
@Test
public void xinfoGroups_returns_success() {
// setup
String key = "testKey";
String[] arguments = {key};
Map<String, Object>[] mockResult =
new Map[] {
Map.of(
"name",
"groupName",
"consumers",
2,
"pending",
2,
"last-delivered-id",
"1638126030001-0",
"entries-read",
2,
"lag",
2)
};

CompletableFuture<Map<String, Object>[]> testResponse = new CompletableFuture<>();
testResponse.complete(mockResult);

// match on protobuf request
when(commandManager.<Map<String, Object>[]>submitNewCommand(
eq(XInfoGroups), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<String, Object>[]> response = service.xinfoGroups(key);
Map<String, Object>[] payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(mockResult, payload);
}

@SneakyThrows
@Test
public void xinfoGroups_binary_returns_success() {
// setup
GlideString key = gs("testKey");
GlideString[] arguments = {key};
Map<GlideString, Object>[] mockResult =
new Map[] {
Map.of(
gs("name"),
gs("groupName"),
gs("consumers"),
2,
gs("pending"),
2,
gs("last-delivered-id"),
gs("1638126030001-0"),
gs("entries-read"),
2,
gs("lag"),
2)
};

CompletableFuture<Map<GlideString, Object>[]> testResponse = new CompletableFuture<>();
testResponse.complete(mockResult);

// match on protobuf request
when(commandManager.<Map<GlideString, Object>[]>submitNewCommand(
eq(XInfoGroups), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<GlideString, Object>[]> response = service.xinfoGroups(key);
Map<GlideString, Object>[] payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(mockResult, payload);
}

@SneakyThrows
@Test
public void xinfoConsumers_returns_success() {
// setup
String key = "testKey";
String groupName = "groupName";
String[] arguments = {key, groupName};
Map<String, Object>[] mockResult =
new Map[] {
Map.of("name", "groupName", "pending", 2, "idle", 9104628, "inactive", 18104698)
};

CompletableFuture<Map<String, Object>[]> testResponse = new CompletableFuture<>();
testResponse.complete(mockResult);

// match on protobuf request
when(commandManager.<Map<String, Object>[]>submitNewCommand(
eq(XInfoConsumers), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<String, Object>[]> response = service.xinfoConsumers(key, groupName);
Map<String, Object>[] payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(mockResult, payload);
}

@SneakyThrows
@Test
public void xinfoConsumers_binary_returns_success() {
// setup
GlideString key = gs("testKey");
GlideString groupName = gs("groupName");
GlideString[] arguments = {key, groupName};
Map<GlideString, Object>[] mockResult =
new Map[] {
Map.of(
gs("name"),
gs("groupName"),
gs("pending"),
2,
gs("idle"),
9104628,
gs("inactive"),
18104698)
};

CompletableFuture<Map<GlideString, Object>[]> testResponse = new CompletableFuture<>();
testResponse.complete(mockResult);

// match on protobuf request
when(commandManager.<Map<GlideString, Object>[]>submitNewCommand(
eq(XInfoConsumers), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Map<GlideString, Object>[]> response = service.xinfoConsumers(key, groupName);
Map<GlideString, Object>[] payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(mockResult, payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XInfoConsumers;
import static redis_request.RedisRequestOuterClass.RequestType.XInfoGroups;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -952,6 +954,12 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
FORCE_REDIS_API,
JUST_ID_REDIS_API)));

transaction.xinfoGroups("key");
results.add(Pair.of(XInfoGroups, buildArgs("key")));

transaction.xinfoConsumers("key", "groupName");
results.add(Pair.of(XInfoConsumers, buildArgs("key", "groupName")));

transaction.time();
results.add(Pair.of(Time, buildArgs()));

Expand Down
Loading

0 comments on commit a178468

Please sign in to comment.