Skip to content

Commit

Permalink
Add a couple more test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Jun 30, 2024
1 parent 1ae76dc commit 8c3ff62
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,9 @@ CompletableFuture<Object[][]> xpending(
* @param group The consumer group name.
* @param consumer The group consumer.
* @param minIdleTime The minimum idle time for the message to be claimed.
* @param ids An array of entry ids.
* @return An <code>array</code> of message entries with the format <code>
* [[id, ["entry", "data"]], ...]</code> that are claimed by the consumer.
* @param ids A array of entry ids.
* @return A <code>Map</code> of message entries with the format <code>
* {"entryId": ["entry", "data"], ...}</code> that are claimed by the consumer.
* @example
* <pre>
* </pre>
Expand All @@ -702,8 +702,8 @@ CompletableFuture<Map<String, String[]>> xclaim(
* @param minIdleTime The minimum idle time for the message to be claimed.
* @param ids An array of entry ids.
* @param options Stream claim options {@link StreamClaimOptions}.
* @return An <code>array</code> of message entries with the format <code>
* [[id, ["entry", "data"]], ...]</code> that are claimed by the consumer.
* @return A <code>Map</code> of message entries with the format <code>
* {"entryId": ["entry", "data"], ...}</code> that are claimed by the consumer.
*/
CompletableFuture<Map<String, String[]>> xclaim(
String key,
Expand All @@ -714,8 +714,8 @@ CompletableFuture<Map<String, String[]>> xclaim(
StreamClaimOptions options);

/**
* Changes the ownership of a pending message. This command uses the JUSTID optional argument to
* return a list of stream message ids.
* Changes the ownership of a pending message. This function returns an <code>array</code> with
* only the message/entry IDs, and is equivalent to using <code>JUSTID</code> in the Redis API.
*
* @see <a href="https://valkey.io/commands/xclaim/">valkey.io</a> for details.
* @param key The key of the stream.
Expand All @@ -729,8 +729,8 @@ CompletableFuture<String[]> xclaimJustId(
String key, String group, String consumer, long minIdleTime, String[] ids);

/**
* Changes the ownership of a pending message.This command uses the JUSTID optional argument to
* return a list of stream message ids.
* Changes the ownership of a pending message. This function returns an <code>array</code> with
* only the message/entry IDs, and is equivalent to using <code>JUSTID</code> in the Redis API.
*
* @see <a href="https://valkey.io/commands/xclaim/">valkey.io</a> for details.
* @param key The key of the stream.
Expand Down
16 changes: 8 additions & 8 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -3288,8 +3288,8 @@ public T xpending(
* @param consumer The group consumer.
* @param minIdleTime The minimum idle time for the message to be claimed.
* @param ids An array of entry ids.
* @return Command Response - An <code>array</code> of message entries with the format <code>
* [[id, ["entry", "data"]], ...]</code> that are claimed by the consumer.
* @return Command Response - A <code>Map</code> of message entries with the format <code>
* {"entryId": ["entry", "data"], ...}</code> that are claimed by the consumer.
*/
public T xclaim(
@NonNull String key,
Expand All @@ -3313,8 +3313,8 @@ public T xclaim(
* @param minIdleTime The minimum idle time for the message to be claimed.
* @param ids An array of entry ids.
* @param options Stream claim options {@link StreamClaimOptions}.
* @return Command Response - An <code>array</code> of message entries with the format <code>
* [[id, ["entry", "data"]], ...]</code> that are claimed by the consumer.
* @return Command Response - A <code>Map</code> of message entries with the format <code>
* {"entryId": ["entry", "data"], ...}</code> that are claimed by the consumer.
*/
public T xclaim(
@NonNull String key,
Expand All @@ -3331,8 +3331,8 @@ public T xclaim(
}

/**
* Changes the ownership of a pending message. This command uses the JUSTID optional argument to
* return a list of stream message ids.
* Changes the ownership of a pending message. This function returns an <code>array</code> with
* only the message/entry IDs, and is equivalent to using <code>JUSTID</code> in the Redis API.
*
* @see <a href="https://valkey.io/commands/xclaim/">valkey.io</a> for details.
* @param key The key of the stream.
Expand All @@ -3358,8 +3358,8 @@ public T xclaimJustId(
}

/**
* Changes the ownership of a pending message.This command uses the JUSTID optional argument to
* return a list of stream message ids.
* Changes the ownership of a pending message. This function returns an <code>array</code> with
* only the message/entry IDs, and is equivalent to using <code>JUSTID</code> in the Redis API.
*
* @see <a href="https://valkey.io/commands/xclaim/">valkey.io</a> for details.
* @param key The key of the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,46 @@
* Optional arguments to {@link StreamBaseCommands#xclaim(String, String, String, long, String[],
* StreamClaimOptions)}
*
* @see <a href="https://redis.io/commands/xclaim/">redis.io</a>
* @see <a href="https://valkey.io/commands/xclaim/">valkey.io</a>
*/
@Builder
public class StreamClaimOptions {

/** Redis api string to designate IDLE time in milliseconds */
/** ValKey API string to designate IDLE time in milliseconds */
public static final String IDLE_REDIS_API = "IDLE";

/** Redis api string to designate TIME time in unix-milliseconds */
/** ValKey API string to designate TIME time in unix-milliseconds */
public static final String TIME_REDIS_API = "TIME";

/** Redis api string to designate RETRYCOUNT */
/** ValKey API string to designate RETRYCOUNT */
public static final String RETRY_COUNT_REDIS_API = "RETRYCOUNT";

/** Redis api string to designate FORCE */
/** ValKey API string to designate FORCE */
public static final String FORCE_REDIS_API = "FORCE";

/** Redis api string to designate JUSTID */
/** ValKey API string to designate JUSTID */
public static final String JUST_ID_REDIS_API = "JUSTID";

/**
* Set the idle time (last time it was delivered) of the message. If <code>idle</code> is not
* specified, an <code>idle</code> of <code>0</code> is assumed, that is, the time count is reset
* because the message has now a new owner trying to process it.
* Set the idle time (last time it was delivered) of the message in milliseconds. If <code>idle
* </code> is not specified, an <code>idle</code> of <code>0</code> is assumed, that is, the time
* count is reset because the message now has a new owner trying to process it.
*/
private final Long idle; // in milliseconds

/**
* This is the same as idle but instead of a relative amount of milliseconds, it sets the idle
* time to a specific Unix time (in milliseconds). This is useful in order to rewrite the AOF file
* generating <code>XCLAIM</code> commands.
* This is the same as {@link #idle} but instead of a relative amount of milliseconds, it sets the
* idle time to a specific Unix time (in milliseconds). This is useful in order to rewrite the AOF
* file generating <code>XCLAIM</code> commands.
*/
private final Long idleUnixTime; // in unix-time milliseconds

/**
* Set the retry counter to the specified value. This counter is incremented every time a message
* is delivered again. Normally <code>XCLAIM</code> does not alter this counter, which is just
* served to clients when the <code>XPENDING</code> command is called: this way clients can detect
* anomalies, like messages that are never processed for some reason after a big number of
* delivery attempts.
* is delivered again. Normally {@link StreamBaseCommands#xclaim} does not alter this counter,
* which is just served to clients when the {@link StreamBaseCommands#xpending} command is called:
* this way clients can detect anomalies, like messages that are never processed for some reason
* after a big number of delivery attempts.
*/
private final Long retryCount;

Expand Down
71 changes: 5 additions & 66 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4393,7 +4393,7 @@ public void xpending_xclaim(BaseClient client) {
ArrayUtils.remove(pending_results_extended[4], 2));
assertTrue((Long) pending_results_extended[4][2] >= 0L);

// use claim to claim stream 3 and 4 for consumer 1
// use claim to claim stream 3 and 5 for consumer 1
var claimResults =
client.xclaim(key, groupName, consumer1, 0L, new String[] {streamid_3, streamid_5}).get();
assertDeepEquals(
Expand Down Expand Up @@ -4470,71 +4470,6 @@ public void xpending_xclaim(BaseClient client) {
assertEquals(2, pending_results_extended.length);
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void xclaim_options(BaseClient client) {

String key = UUID.randomUUID().toString();
String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumer1 = "consumer-1-" + UUID.randomUUID();
String consumer2 = "consumer-2-" + UUID.randomUUID();

// create group and consumer for the group
assertEquals(
OK,
client
.xgroupCreate(
key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
.get());
assertTrue(client.xgroupCreateConsumer(key, groupName, consumer1).get());
assertTrue(client.xgroupCreateConsumer(key, groupName, consumer2).get());

// Add two stream entries for consumer 1
String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get();
assertNotNull(streamid_1);
String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get();
assertNotNull(streamid_2);
String streamid_4 = client.xadd(key, Map.of("field4", "value4")).get();
assertNotNull(streamid_4);
String streamid_5 = client.xadd(key, Map.of("field5", "value5")).get();
assertNotNull(streamid_5);

// read the entire stream for the consumer and mark messages as pending
var result_1 = client.xreadgroup(Map.of(key, ">"), groupName, consumer1).get();
assertDeepEquals(
Map.of(
key,
Map.of(
streamid_1, new String[][] {{"field1", "value1"}},
streamid_2, new String[][] {{"field2", "value2"}},
// streamid_3, new String[][] {{"field3", "value3"}},
streamid_4, new String[][] {{"field4", "value4"}},
streamid_5, new String[][] {{"field5", "value5"}})),
result_1);

String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get();
assertNotNull(streamid_3);

StreamClaimOptions options = StreamClaimOptions.builder().retryCount(100L).force().build();
var claim_results =
client
.xclaim(
key,
groupName,
consumer2,
0,
new String[] {streamid_1, streamid_3, streamid_5},
options)
.get();
System.out.println(claim_results);

var pending_results =
client.xpending(key, groupName, IdBound.of(streamid_1), IdBound.of(streamid_3), 10L).get();
System.out.println(pending_results);
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
Expand Down Expand Up @@ -4728,6 +4663,10 @@ public void xclaim_return_failures(BaseClient client) {
client.xclaimJustId(key, groupName, consumer1, 1L, new String[] {"invalid"}).get());
assertInstanceOf(RequestException.class, executionException.getCause());

// claim with empty stream entry IDs returns no results
var emptyClaim = client.xclaimJustId(key, groupName, consumer1, 1L, new String[0]).get();
assertEquals(0L, emptyClaim.length);

// non-existent key throws a RequestError (NOGROUP)
executionException =
assertThrows(
Expand Down
13 changes: 3 additions & 10 deletions java/integTest/src/test/java/glide/TransactionTestUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,16 +868,9 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
streamKey1,
Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options);
Map.of(), // xclaim(streamKey1, groupName1, consumer1, 0L, new String[] {"0-1"})
Map.of(
"0-3",
new String[] {
"field3", "value3"
}), // xclaim(streamKey1, groupName1, consumer1, 0L, {"0-3"}, options)
new String[] {
"0-3"
}, // xclaimJustId(streamKey1, groupName1, consumer1, 0L, new String[] {"0-3"})
new String
[0], // xclaimJustId(streamKey1, groupName1, consumer1, 0L, new String[] {"0-4"}, options)
Map.of("0-3", new String[] {"field3", "value3"}), // xclaim(streamKey1, ..., {"0-3"}, options)
new String[] {"0-3"}, // xclaimJustId(streamKey1, ..., new String[] {"0-3"})
new String[0], // xclaimJustId(streamKey1, ..., new String[] {"0-4"}, options)
new Object[] {
1L, "0-3", "0-3", new Object[][] {{consumer1, "1"}}
}, // xpending(streamKey1, groupName1)
Expand Down

0 comments on commit 8c3ff62

Please sign in to comment.