Skip to content

Commit

Permalink
Java: Add PFMERGE command. (valkey-io#1224)
Browse files Browse the repository at this point in the history
* Add `PFMERGE` command. (#168)

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Apr 5, 2024
1 parent 63ab900 commit ab4b884
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,4 @@

Preview release of **GLIDE for Redis** a Polyglot Redis client.

See the [README](README.md) for additional information.
See the [README](README.md) for additional information.
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ enum RequestType {
Hkeys = 94;
PfAdd = 96;
PfCount = 97;
PfMerge = 98;
Blpop = 100;
RPushX = 102;
LPushX = 103;
Expand Down
1 change: 1 addition & 0 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ fn get_command(request: &Command) -> Option<Cmd> {
RequestType::Hkeys => Some(cmd("HKEYS")),
RequestType::PfAdd => Some(cmd("PFADD")),
RequestType::PfCount => Some(cmd("PFCOUNT")),
RequestType::PfMerge => Some(cmd("PFMERGE")),
RequestType::RPushX => Some(cmd("RPUSHX")),
RequestType::LPushX => Some(cmd("LPUSHX")),
RequestType::Blpop => Some(cmd("BLPOP")),
Expand Down
8 changes: 8 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Persist;
import static redis_request.RedisRequestOuterClass.RequestType.PfAdd;
import static redis_request.RedisRequestOuterClass.RequestType.PfCount;
import static redis_request.RedisRequestOuterClass.RequestType.PfMerge;
import static redis_request.RedisRequestOuterClass.RequestType.RPop;
import static redis_request.RedisRequestOuterClass.RequestType.RPush;
import static redis_request.RedisRequestOuterClass.RequestType.RPushX;
Expand Down Expand Up @@ -795,4 +796,11 @@ public CompletableFuture<Long> pfadd(@NonNull String key, @NonNull String[] elem
public CompletableFuture<Long> pfcount(@NonNull String[] keys) {
return commandManager.submitNewCommand(PfCount, keys, this::handleLongResponse);
}

@Override
public CompletableFuture<String> pfmerge(
@NonNull String destination, @NonNull String[] sourceKeys) {
String[] arguments = ArrayUtils.addFirst(sourceKeys, destination);
return commandManager.submitNewCommand(PfMerge, arguments, this::handleStringResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,25 @@ public interface HyperLogLogBaseCommands {
* }</pre>
*/
CompletableFuture<Long> pfcount(String[] keys);

/**
* Merges multiple HyperLogLog values into a unique value.<br>
* If the destination variable exists, it is treated as one of the source HyperLogLog data sets,
* otherwise a new HyperLogLog is created.
*
* @see <a href="https://redis.io/commands/pfmerge/">redis.io</a> for details.
* @param destination The key of the destination HyperLogLog where the merged data sets will be
* stored.
* @param sourceKeys The keys of the HyperLogLog structures to be merged.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.pfmerge("new_HLL", "old_HLL_1", "old_HLL_2").get();
* assert response.equals("OK"); // new HyperLogLog data set was created with merged content of old ones
*
* String response = client.pfmerge("old_HLL_1", "old_HLL_2", "old_HLL_3").get();
* assert response.equals("OK"); // content of existing HyperLogLogs was merged into existing variable
* }</pre>
*/
CompletableFuture<String> pfmerge(String destination, String[] sourceKeys);
}
18 changes: 18 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Persist;
import static redis_request.RedisRequestOuterClass.RequestType.PfAdd;
import static redis_request.RedisRequestOuterClass.RequestType.PfCount;
import static redis_request.RedisRequestOuterClass.RequestType.PfMerge;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.RPop;
import static redis_request.RedisRequestOuterClass.RequestType.RPush;
Expand Down Expand Up @@ -1774,6 +1775,23 @@ public T pfcount(@NonNull String[] keys) {
return getThis();
}

/**
* Merges multiple HyperLogLog values into a unique value.<br>
* If the destination variable exists, it is treated as one of the source HyperLogLog data sets,
* otherwise a new HyperLogLog is created.
*
* @see <a href="https://redis.io/commands/pfmerge/">redis.io</a> for details.
* @param destination The key of the destination HyperLogLog where the merged data sets will be
* stored.
* @param sourceKeys The keys of the HyperLogLog structures to be merged.
* @return Command Response - <code>OK</code>.
*/
public T pfmerge(@NonNull String destination, @NonNull String[] sourceKeys) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(sourceKeys, destination));
protobufTransaction.addCommands(buildCommand(PfMerge, commandArgs));
return getThis();
}

/** Build protobuf {@link Command} object for given command and arguments. */
protected Command buildCommand(RequestType requestType) {
return buildCommand(requestType, buildArgs());
Expand Down
24 changes: 24 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Persist;
import static redis_request.RedisRequestOuterClass.RequestType.PfAdd;
import static redis_request.RedisRequestOuterClass.RequestType.PfCount;
import static redis_request.RedisRequestOuterClass.RequestType.PfMerge;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.RPop;
import static redis_request.RedisRequestOuterClass.RequestType.RPush;
Expand Down Expand Up @@ -2580,4 +2581,27 @@ public void pfcount_returns_success() {
assertEquals(value, payload);
assertEquals(payload, response.get());
}

@SneakyThrows
@Test
public void pfmerge_returns_success() {
// setup
String destKey = "testKey";
String[] sourceKeys = new String[] {"a", "b", "c"};
String[] arguments = new String[] {destKey, "a", "b", "c"};

CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(PfMerge), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.pfmerge(destKey, sourceKeys);

// verify
assertEquals(testResponse, response);
assertEquals(OK, response.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Persist;
import static redis_request.RedisRequestOuterClass.RequestType.PfAdd;
import static redis_request.RedisRequestOuterClass.RequestType.PfCount;
import static redis_request.RedisRequestOuterClass.RequestType.PfMerge;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.RPop;
import static redis_request.RedisRequestOuterClass.RequestType.RPush;
Expand Down Expand Up @@ -547,6 +548,11 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),

transaction.pfcount(new String[] {"hll1", "hll2"});
results.add(Pair.of(PfCount, ArgsArray.newBuilder().addArgs("hll1").addArgs("hll2").build()));
transaction.pfmerge("hll", new String[] {"hll1", "hll2"});
results.add(
Pair.of(
PfMerge,
ArgsArray.newBuilder().addArgs("hll").addArgs("hll1").addArgs("hll2").build()));

var protobufTransaction = transaction.getProtobufTransaction().build();

Expand Down
30 changes: 30 additions & 0 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1665,4 +1665,34 @@ public void pfcount(BaseClient client) {
assertThrows(ExecutionException.class, () -> client.pfcount(new String[] {"foo"}).get());
assertTrue(executionException.getCause() instanceof RequestException);
}

@SneakyThrows
@ParameterizedTest
@MethodSource("getClients")
public void pfmerge(BaseClient client) {
String key1 = "{test}-hll1-" + UUID.randomUUID();
String key2 = "{test}-hll2-" + UUID.randomUUID();
String key3 = "{test}-hll3-" + UUID.randomUUID();
assertEquals(1, client.pfadd(key1, new String[] {"a", "b", "c"}).get());
assertEquals(1, client.pfadd(key2, new String[] {"b", "c", "d"}).get());
// new HyperLogLog data set
assertEquals(OK, client.pfmerge(key3, new String[] {key1, key2}).get());
assertEquals(
client.pfcount(new String[] {key1, key2}).get(), client.pfcount(new String[] {key3}).get());
// existing HyperLogLog data set
assertEquals(OK, client.pfmerge(key1, new String[] {key2}).get());
assertEquals(
client.pfcount(new String[] {key1, key2}).get(), client.pfcount(new String[] {key1}).get());

// Key exists, but it is not a HyperLogLog
assertEquals(OK, client.set("foo", "bar").get());
ExecutionException executionException =
assertThrows(
ExecutionException.class, () -> client.pfmerge("foo", new String[] {key1}).get());
assertTrue(executionException.getCause() instanceof RequestException);
executionException =
assertThrows(
ExecutionException.class, () -> client.pfmerge(key1, new String[] {"foo"}).get());
assertTrue(executionException.getCause() instanceof RequestException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class TransactionTestUtilities {
private static final String key9 = "{key}" + UUID.randomUUID();
private static final String hllKey1 = "{key}:hllKey1-" + UUID.randomUUID();
private static final String hllKey2 = "{key}:hllKey2-" + UUID.randomUUID();
private static final String hllKey3 = "{key}:hllKey3-" + UUID.randomUUID();
private static final String value1 = UUID.randomUUID().toString();
private static final String value2 = UUID.randomUUID().toString();
private static final String value3 = UUID.randomUUID().toString();
Expand Down Expand Up @@ -128,6 +129,9 @@ public static BaseTransaction<?> transactionTest(BaseTransaction<?> baseTransact

baseTransaction.pfadd(hllKey1, new String[] {"a", "b", "c"});
baseTransaction.pfcount(new String[] {hllKey1, hllKey2});
baseTransaction
.pfmerge(hllKey3, new String[] {hllKey1, hllKey2})
.pfcount(new String[] {hllKey3});

return baseTransaction;
}
Expand Down Expand Up @@ -205,7 +209,9 @@ public static Object[] transactionTestResult() {
new String[] {listKey3, value3}, // blpop(new String[] { listKey3 }, 0.01)
new String[] {listKey3, value1}, // brpop(new String[] { listKey3 }, 0.01);
1L, // pfadd(hllKey1, new String[] {"a", "b", "c"})
3L, // pfcount(new String[] { hllKey1, hllKey2 });
3L, // pfcount(new String[] { hllKey1, hllKey2 });;
OK, // pfmerge(hllKey3, new String[] {hllKey1, hllKey2})
3L, // pfcount(new String[] { hllKey3 })
};
}
}

0 comments on commit ab4b884

Please sign in to comment.