Skip to content

Commit

Permalink
Node: add ZUNIONSTORE (valkey-io#2145)
Browse files Browse the repository at this point in the history
* implement zunionstore

Signed-off-by: Chloe Yip <[email protected]>

* add changelog

Signed-off-by: Chloe Yip <[email protected]>

* implement zunionstore

Signed-off-by: Chloe Yip <[email protected]>

* add changelog

Signed-off-by: Chloe Yip <[email protected]>

* address comments

Signed-off-by: Chloe Yip <[email protected]>

* delete glide-for-redis submodule package

Signed-off-by: Chloe Yip <[email protected]>

* add cluster example

Signed-off-by: Chloe Yip <[email protected]>

* add remarks to base client

Signed-off-by: Chloe Yip <[email protected]>

* fix test

Signed-off-by: Chloe Yip <[email protected]>

* Apply suggestions from code review

Co-authored-by: Yury-Fridlyand <[email protected]>
Signed-off-by: jonathanl-bq <[email protected]>

---------

Signed-off-by: Chloe Yip <[email protected]>
Signed-off-by: Chloe Yip <[email protected]>
Signed-off-by: jonathanl-bq <[email protected]>
Co-authored-by: jonathanl-bq <[email protected]>
Co-authored-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2024
1 parent 6d7658d commit ddb5af9
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added ZUNIONSTORE command ([#2145](https://github.com/valkey-io/valkey-glide/pull/2145))
* Node: Added XREADGROUP command ([#2124](https://github.com/valkey-io/valkey-glide/pull/2124))
* Node: Added XINFO GROUPS command ([#2122](https://github.com/valkey-io/valkey-glide/pull/2122))
* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105))
Expand Down
37 changes: 37 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ import {
createZRevRankWithScore,
createZScan,
createZScore,
createZUnionStore,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -3579,6 +3580,42 @@ export class BaseClient {
return this.createWritePromise(createZScore(key, member));
}

/**
* Computes the union of sorted sets given by the specified `keys` and stores the result in `destination`.
* If `destination` already exists, it is overwritten. Otherwise, a new sorted set will be created.
* To get the result directly, see {@link zunionWithScores}.
*
* @see {@link https://valkey.io/commands/zunionstore/|valkey.io} for details.
* @remarks When in cluster mode, `destination` and all keys in `keys` both must map to the same hash slot.
* @param destination - The key of the destination sorted set.
* @param keys - The keys of the sorted sets with possible formats:
* string[] - for keys only.
* KeyWeight[] - for weighted keys with score multipliers.
* @param aggregationType - Specifies the aggregation strategy to apply when combining the scores of elements. See {@link AggregationType}.
* @returns The number of elements in the resulting sorted set stored at `destination`.
*
* * @example
* ```typescript
* // Example usage of zunionstore command with an existing key
* await client.zadd("key1", {"member1": 10.5, "member2": 8.2})
* await client.zadd("key2", {"member1": 9.5})
* await client.zunionstore("my_sorted_set", ["key1", "key2"]) // Output: 2 - Indicates that the sorted set "my_sorted_set" contains two elements.
* await client.zrangeWithScores("my_sorted_set", RangeByIndex(0, -1)) // Output: {'member1': 20, 'member2': 8.2} - "member1" is now stored in "my_sorted_set" with score of 20 and "member2" with score of 8.2.
* await client.zunionstore("my_sorted_set", ["key1", "key2"] , AggregationType.MAX ) // Output: 2 - Indicates that the sorted set "my_sorted_set" contains two elements, and each score is the maximum score between the sets.
* await client.zrangeWithScores("my_sorted_set", RangeByIndex(0, -1)) // Output: {'member1': 10.5, 'member2': 8.2} - "member1" is now stored in "my_sorted_set" with score of 10.5 and "member2" with score of 8.2.
* await client.zunionstore("my_sorted_set", ["key1, "key2], {weights: [2, 1]}) // Output: 46
* ```
*/
public async zunionstore(
destination: string,
keys: string[] | KeyWeight[],
aggregationType?: AggregationType,
): Promise<number> {
return this.createWritePromise(
createZUnionStore(destination, keys, aggregationType),
);
}

/**
* Returns the scores associated with the specified `members` in the sorted set stored at `key`.
*
Expand Down
12 changes: 12 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,18 @@ export function createZScore(
return createCommand(RequestType.ZScore, [key, member]);
}

/**
* @internal
*/
export function createZUnionStore(
destination: string,
keys: string[] | KeyWeight[],
aggregationType?: AggregationType,
): command_request.Command {
const args = createZCmdStoreArgs(destination, keys, aggregationType);
return createCommand(RequestType.ZUnionStore, args);
}

/**
* @internal
*/
Expand Down
25 changes: 25 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ import {
createZRevRankWithScore,
createZScan,
createZScore,
createZUnionStore,
} from "./Commands";
import { command_request } from "./ProtobufMessage";

Expand Down Expand Up @@ -1769,6 +1770,30 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createZScore(key, member));
}

/**
* Computes the union of sorted sets given by the specified `keys` and stores the result in `destination`.
* If `destination` already exists, it is overwritten. Otherwise, a new sorted set will be created.
* To get the result directly, see {@link zunionWithScores}.
*
* @see {@link https://valkey.io/commands/zunionstore/|valkey.io} for details.
* @param destination - The key of the destination sorted set.
* @param keys - The keys of the sorted sets with possible formats:
* string[] - for keys only.
* KeyWeight[] - for weighted keys with score multipliers.
* @param aggregationType - Specifies the aggregation strategy to apply when combining the scores of elements. See {@link AggregationType}.
*
* Command Response - The number of elements in the resulting sorted set stored at `destination`.
*/
public zunionstore(
destination: string,
keys: string[] | KeyWeight[],
aggregationType?: AggregationType,
): T {
return this.addAndReturn(
createZUnionStore(destination, keys, aggregationType),
);
}

/**
* Returns the scores associated with the specified `members` in the sorted set stored at `key`.
*
Expand Down
1 change: 1 addition & 0 deletions node/tests/GlideClusterClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ describe("GlideClusterClient", () => {
client.sinter(["abc", "zxy", "lkn"]),
client.sinterstore("abc", ["zxy", "lkn"]),
client.zinterstore("abc", ["zxy", "lkn"]),
client.zunionstore("abc", ["zxy", "lkn"]),
client.sunionstore("abc", ["zxy", "lkn"]),
client.sunion(["abc", "zxy", "lkn"]),
client.pfcount(["abc", "zxy", "lkn"]),
Expand Down
190 changes: 190 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3858,6 +3858,196 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

// ZUnionStore command tests
async function zunionStoreWithMaxAggregation(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};

const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 1.5, two: 2.5, three: 3.5 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

// Union results are aggregated by the MAX score of elements
expect(await client.zunionstore(key3, [key1, key2], "MAX")).toEqual(3);
const zunionstoreMapMax = await client.zrangeWithScores(key3, range);
const expectedMapMax = {
one: 1.5,
two: 2.5,
three: 3.5,
};
expect(zunionstoreMapMax).toEqual(expectedMapMax);
}

async function zunionStoreWithMinAggregation(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};

const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 1.5, two: 2.5, three: 3.5 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

// Union results are aggregated by the MIN score of elements
expect(await client.zunionstore(key3, [key1, key2], "MIN")).toEqual(3);
const zunionstoreMapMin = await client.zrangeWithScores(key3, range);
const expectedMapMin = {
one: 1.0,
two: 2.0,
three: 3.5,
};
expect(zunionstoreMapMin).toEqual(expectedMapMin);
}

async function zunionStoreWithSumAggregation(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};

const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 1.5, two: 2.5, three: 3.5 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

// Union results are aggregated by the SUM score of elements
expect(await client.zunionstore(key3, [key1, key2], "SUM")).toEqual(3);
const zunionstoreMapSum = await client.zrangeWithScores(key3, range);
const expectedMapSum = {
one: 2.5,
two: 4.5,
three: 3.5,
};
expect(zunionstoreMapSum).toEqual(expectedMapSum);
}

async function zunionStoreBasicTest(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};

const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 2.0, two: 3.0, three: 4.0 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

expect(await client.zunionstore(key3, [key1, key2])).toEqual(3);
const zunionstoreMap = await client.zrangeWithScores(key3, range);
const expectedMap = {
one: 3.0,
three: 4.0,
two: 5.0,
};
expect(zunionstoreMap).toEqual(expectedMap);
}

async function zunionStoreWithWeightsAndAggregation(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const key3 = "{testKey}:3-" + uuidv4();
const range = {
start: 0,
stop: -1,
};
const membersScores1 = { one: 1.0, two: 2.0 };
const membersScores2 = { one: 1.5, two: 2.5, three: 3.5 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);
expect(await client.zadd(key2, membersScores2)).toEqual(3);

// Scores are multiplied by 2.0 for key1 and key2 during aggregation.
expect(
await client.zunionstore(
key3,
[
[key1, 2.0],
[key2, 2.0],
],
"SUM",
),
).toEqual(3);
const zunionstoreMapMultiplied = await client.zrangeWithScores(
key3,
range,
);
const expectedMapMultiplied = {
one: 5.0,
three: 7.0,
two: 9.0,
};
expect(zunionstoreMapMultiplied).toEqual(expectedMapMultiplied);
}

async function zunionStoreEmptyCases(client: BaseClient) {
const key1 = "{testKey}:1-" + uuidv4();
const key2 = "{testKey}:2-" + uuidv4();
const range = {
start: 0,
stop: -1,
};
const membersScores1 = { one: 1.0, two: 2.0 };

expect(await client.zadd(key1, membersScores1)).toEqual(2);

// Non existing key
expect(
await client.zunionstore(key2, [
key1,
"{testKey}-non_existing_key",
]),
).toEqual(2);

const zunionstore_map_nonexistingkey = await client.zrangeWithScores(
key2,
range,
);

const expectedMapMultiplied = {
one: 1.0,
two: 2.0,
};
expect(zunionstore_map_nonexistingkey).toEqual(expectedMapMultiplied);

// Empty list check
await expect(client.zunionstore("{xyz}", [])).rejects.toThrow();
}

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`zunionstore test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
await zunionStoreBasicTest(client);
await zunionStoreWithMaxAggregation(client);
await zunionStoreWithMinAggregation(client);
await zunionStoreWithSumAggregation(client);
await zunionStoreWithWeightsAndAggregation(client);
await zunionStoreEmptyCases(client);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`zmscore test_%p`,
async (protocol) => {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,8 @@ export async function transactionTest(
responseData.push(["zdiffWithScores([key13, key12])", { three: 3.5 }]);
baseTransaction.zdiffstore(key13, [key13, key13]);
responseData.push(["zdiffstore(key13, [key13, key13])", 0]);
baseTransaction.zunionstore(key5, [key12, key13]);
responseData.push(["zunionstore(key5, [key12, key13])", 2]);
baseTransaction.zmscore(key12, ["two", "one"]);
responseData.push(['zmscore(key12, ["two", "one"]', [2.0, 1.0]]);
baseTransaction.zinterstore(key12, [key12, key13]);
Expand Down

0 comments on commit ddb5af9

Please sign in to comment.