From 7ee82afeb6716cfa81c6e38a51cdfc29e6594ae7 Mon Sep 17 00:00:00 2001 From: tjzhang-BQ <111323543+tjzhang-BQ@users.noreply.github.com> Date: Fri, 28 Jun 2024 19:12:59 -0700 Subject: [PATCH] Java: Adding command WAIT Java: Adding command WAIT --- glide-core/src/protobuf/redis_request.proto | 1 + glide-core/src/request_type.rs | 3 +++ .../src/main/java/glide/api/BaseClient.java | 9 +++++++ .../api/commands/GenericBaseCommands.java | 17 +++++++++++++ .../glide/api/models/BaseTransaction.java | 17 +++++++++++++ .../test/java/glide/api/RedisClientTest.java | 25 +++++++++++++++++++ .../glide/api/models/TransactionTests.java | 4 +++ .../cluster/ClusterTransactionTests.java | 20 +++++++++++++++ .../test/java/glide/cluster/CommandTests.java | 17 +++++++++++++ .../java/glide/standalone/CommandTests.java | 17 +++++++++++++ .../glide/standalone/TransactionTests.java | 22 ++++++++++++++++ 11 files changed, 152 insertions(+) diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index b3330959ec..3719c379dc 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -238,6 +238,7 @@ enum RequestType { FunctionRestore = 197; XPending = 198; XGroupSetId = 199; + Wait = 200; } message Command { diff --git a/glide-core/src/request_type.rs b/glide-core/src/request_type.rs index 943721bfd3..c4977f47a1 100644 --- a/glide-core/src/request_type.rs +++ b/glide-core/src/request_type.rs @@ -208,6 +208,7 @@ pub enum RequestType { FunctionRestore = 197, XPending = 198, XGroupSetId = 199, + Wait = 200, } fn get_two_word_command(first: &str, second: &str) -> Cmd { @@ -419,6 +420,7 @@ impl From<::protobuf::EnumOrUnknown> for RequestType { ProtobufRequestType::FunctionRestore => RequestType::FunctionRestore, ProtobufRequestType::XPending => RequestType::XPending, ProtobufRequestType::XGroupSetId => RequestType::XGroupSetId, + ProtobufRequestType::Wait => RequestType::Wait, } } } @@ -628,6 +630,7 @@ impl RequestType { RequestType::FunctionRestore => Some(get_two_word_command("FUNCTION", "RESTORE")), RequestType::XPending => Some(cmd("XPENDING")), RequestType::XGroupSetId => Some(get_two_word_command("XGROUP", "SETID")), + RequestType::Wait => Some(cmd("WAIT")), } } } diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index c777e40c30..4fb4ff4b96 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -128,6 +128,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Touch; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.Wait; import static redis_request.RedisRequestOuterClass.RequestType.Watch; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; @@ -2784,4 +2785,12 @@ public CompletableFuture sortStore(@NonNull String key, @NonNull String de return commandManager.submitNewCommand( Sort, new String[] {key, STORE_COMMAND_STRING, destination}, this::handleLongResponse); } + + @Override + public CompletableFuture wait(long numreplicas, long timeout) { + return commandManager.submitNewCommand( + Wait, + new String[] {Long.toString(numreplicas), Long.toString(timeout)}, + this::handleLongResponse); + } } diff --git a/java/client/src/main/java/glide/api/commands/GenericBaseCommands.java b/java/client/src/main/java/glide/api/commands/GenericBaseCommands.java index e440b969d7..399530fe71 100644 --- a/java/client/src/main/java/glide/api/commands/GenericBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/GenericBaseCommands.java @@ -1207,4 +1207,21 @@ CompletableFuture restore( * } */ CompletableFuture sortStore(String key, String destination); + + /** + * Blocks the current client until all the previous write commands are successfully transferred + * and acknowledged by at least numreplicas of replicas. If timeout is + * reached, the command returns even if the specified number of replicas were not yet reached. + * + * @param numreplicas The number of replicas to reach. + * @param timeout The timeout value specified in milliseconds. + * @return The number of replicas reached by all the writes performed in the context of the + * current connection. + * @example + *
{@code
+     * client.set("key", "value).get();
+     * assert client.wait(1L, 1000L).get() == 1L;
+     * }
+ */ + CompletableFuture wait(long numreplicas, long timeout); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 3164ecf7a9..a8e3ac74df 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -155,6 +155,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Touch; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.Wait; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; @@ -5126,6 +5127,22 @@ public T lcsIdxWithMatchLen(@NonNull String key1, @NonNull String key2, long min return getThis(); } + /** + * Blocks the current client until all the previous write commands are successfully transferred + * and acknowledged by at least numreplicas of replicas. If timeout is + * reached, the command returns even if the specified number of replicas were not yet reached. + * + * @param numreplicas The number of replicas to reach. + * @param timeout The timeout value specified in milliseconds. + * @return Command Response - The number of replicas reached by all the writes performed in the + * context of the current connection. + */ + public T wait(long numreplicas, long timeout) { + ArgsArray args = buildArgs(Long.toString(numreplicas), Long.toString(timeout)); + protobufTransaction.addCommands(buildCommand(Wait, args)); + return getThis(); + } + /** Build protobuf {@link Command} object for given command and arguments. */ protected Command buildCommand(RequestType requestType) { return buildCommand(requestType, buildArgs()); diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index e254d364ba..4fb889fc6d 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -207,6 +207,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.UnWatch; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.Wait; import static redis_request.RedisRequestOuterClass.RequestType.Watch; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; @@ -8957,4 +8958,28 @@ public void sortStore_with_options_returns_success() { assertEquals(testResponse, response); assertEquals(result, payload); } + + @SneakyThrows + @Test + public void wait_returns_success() { + // setup + long numreplicas = 1L; + long timeout = 1000L; + Long result = 5L; + String[] args = new String[] {"1", "1000"}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(result); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(Wait), eq(args), any())).thenReturn(testResponse); + + // exercise + CompletableFuture response = service.wait(numreplicas, timeout); + Long payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(result, payload); + } } diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index a0512764ec..306a3d4937 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -173,6 +173,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Touch; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.Wait; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; @@ -1168,6 +1169,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.sortStore("key1", "key2"); results.add(Pair.of(Sort, buildArgs("key1", STORE_COMMAND_STRING, "key2"))); + transaction.wait(1L, 1000L); + results.add(Pair.of(Wait, buildArgs("1", "1000"))); + var protobufTransaction = transaction.getProtobufTransaction().build(); for (int idx = 0; idx < protobufTransaction.getCommandsCount(); idx++) { diff --git a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java index 5aacc3c65a..c54800d3be 100644 --- a/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java +++ b/java/integTest/src/test/java/glide/cluster/ClusterTransactionTests.java @@ -287,4 +287,24 @@ public void sort() { assertDeepEquals(expectedResult, results); } + + @SneakyThrows + @Test + public void waitTest() { + // setup + String key = UUID.randomUUID().toString(); + long numreplicas = 1L; + long timeout = 1000L; + ClusterTransaction transaction = new ClusterTransaction(); + + transaction.set(key, "value").wait(numreplicas, timeout); + Object[] results = clusterClient.exec(transaction).get(); + Object[] expectedResult = + new Object[] { + OK, // set(key, "value") + 0L, // wait(numreplicas, timeout) + }; + assertEquals(expectedResult[0], results[0]); + assertTrue((Long) expectedResult[1] <= (Long) results[1]); + } } diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java index 80cb941d21..0bbd9aad69 100644 --- a/java/integTest/src/test/java/glide/cluster/CommandTests.java +++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java @@ -1831,4 +1831,21 @@ public void sort() { .get()); assertArrayEquals(key2DescendingListSubset, clusterClient.lrange(key3, 0, -1).get()); } + + @SneakyThrows + @Test + public void waitTest() { + // setup + String key = UUID.randomUUID().toString(); + long numreplicas = 1L; + long timeout = 1000L; + + assertEquals(OK, clusterClient.set(key, "value").get()); + assertTrue(clusterClient.wait(numreplicas, timeout).get() >= 1); + + // command should fail on a negative timeout value + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> clusterClient.wait(1L, -1L).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } } diff --git a/java/integTest/src/test/java/glide/standalone/CommandTests.java b/java/integTest/src/test/java/glide/standalone/CommandTests.java index d69cb817e8..37eb331bd9 100644 --- a/java/integTest/src/test/java/glide/standalone/CommandTests.java +++ b/java/integTest/src/test/java/glide/standalone/CommandTests.java @@ -1032,4 +1032,21 @@ public void sort() { .get()); assertArrayEquals(namesSortedByAge, regularClient.lrange(storeKey, 0, -1).get()); } + + @SneakyThrows + @Test + public void waitTest() { + // setup + String key = UUID.randomUUID().toString(); + long numreplicas = 1L; + long timeout = 1000L; + + assertEquals(OK, regularClient.set(key, "value").get()); + assertTrue(regularClient.wait(numreplicas, timeout).get() >= 0); + + // command should fail on a negative timeout value + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> regularClient.wait(1L, -1L).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } } diff --git a/java/integTest/src/test/java/glide/standalone/TransactionTests.java b/java/integTest/src/test/java/glide/standalone/TransactionTests.java index fa82d16f42..c76fadf2a4 100644 --- a/java/integTest/src/test/java/glide/standalone/TransactionTests.java +++ b/java/integTest/src/test/java/glide/standalone/TransactionTests.java @@ -471,4 +471,26 @@ public void sort_and_sortReadOnly() { assertArrayEquals(expectedResults, client.exec(transaction2).get()); } } + + @SneakyThrows + @Test + public void waitTest() { + // setup + String key = UUID.randomUUID().toString(); + long numreplicas = 1L; + long timeout = 1000L; + Transaction transaction = new Transaction(); + + transaction.set(key, "value"); + transaction.wait(numreplicas, timeout); + + Object[] results = client.exec(transaction).get(); + Object[] expectedResult = + new Object[] { + OK, // set(key, "value") + 0L, // wait(numreplicas, timeout) + }; + assertEquals(expectedResult[0], results[0]); + assertTrue((long) expectedResult[1] <= (long) results[1]); + } }