Skip to content

Commit

Permalink
Go: BZMPop (#3007)
Browse files Browse the repository at this point in the history
* Added functionality of bzmpop

Signed-off-by: Edward Liang <[email protected]>
  • Loading branch information
edlng authored Jan 30, 2025
1 parent b4e28e2 commit 41f423e
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 0 deletions.
141 changes: 141 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4471,6 +4471,147 @@ func (client *baseClient) BZPopMin(keys []string, timeoutSecs float64) (Result[K
return handleKeyWithMemberAndScoreResponse(result)
}

// Blocks the connection until it pops and returns a member-score pair from the first non-empty sorted set, with the
// given keys being checked in the order they are provided.
// BZMPop is the blocking variant of [baseClient.ZMPop].
//
// Note:
// - When in cluster mode, all keys must map to the same hash slot.
// - BZMPop is a client blocking command, see [Blocking Commands] for more details and best practices.
//
// Since:
//
// Valkey 7.0 and above.
//
// See [valkey.io] for details.
//
// Parameters:
//
// keys - An array of keys to lists.
// scoreFilter - The element pop criteria - either [api.MIN] or [api.MAX] to pop members with the lowest/highest
// scores accordingly.
// timeoutSecs - The number of seconds to wait for a blocking operation to complete. A value of `0` will block
// indefinitely.
//
// Return value:
//
// An object containing the following elements:
// - The key name of the set from which the element was popped.
// - An array of member scores of the popped elements.
// Returns `nil` if no member could be popped and the timeout expired.
//
// For example:
//
// result, err := client.ZAdd("my_list", map[string]float64{"five": 5.0, "six": 6.0})
// result, err := client.BZMPop([]string{"my_list"}, api.MAX, float64(0.1))
// result["my_list"] = []MemberAndScore{{Member: "six", Score: 6.0}}
//
// [valkey.io]: https://valkey.io/commands/bzmpop/
// [Blocking Commands]: https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#blocking-commands
func (client *baseClient) BZMPop(
keys []string,
scoreFilter ScoreFilter,
timeoutSecs float64,
) (Result[KeyWithArrayOfMembersAndScores], error) {
scoreFilterStr, err := scoreFilter.toString()
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}

// Check for potential length overflow.
if len(keys) > math.MaxInt-3 {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), &errors.RequestError{
Msg: "Length overflow for the provided keys",
}
}

// args slice will have 3 more arguments with the keys provided.
args := make([]string, 0, len(keys)+3)
args = append(args, utils.FloatToString(timeoutSecs), strconv.Itoa(len(keys)))
args = append(args, keys...)
args = append(args, scoreFilterStr)
result, err := client.executeCommand(C.BZMPop, args)
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}
return handleKeyWithArrayOfMembersAndScoresResponse(result)
}

// Blocks the connection until it pops and returns a member-score pair from the first non-empty sorted set, with the
// given keys being checked in the order they are provided.
// BZMPop is the blocking variant of [baseClient.ZMPop].
//
// Note:
// - When in cluster mode, all keys must map to the same hash slot.
// - BZMPop is a client blocking command, see [Blocking Commands] for more details and best practices.
//
// Since:
//
// Valkey 7.0 and above.
//
// See [valkey.io] for details.
//
// Parameters:
//
// keys - An array of keys to lists.
// scoreFilter - The element pop criteria - either [api.MIN] or [api.MAX] to pop members with the lowest/highest
// scores accordingly.
// count - The maximum number of popped elements.
// timeoutSecs - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely.
//
// Return value:
//
// An object containing the following elements:
// - The key name of the set from which the element was popped.
// - An array of member scores of the popped elements.
// Returns `nil` if no member could be popped and the timeout expired.
//
// For example:
//
// result, err := client.ZAdd("my_list", map[string]float64{"five": 5.0, "six": 6.0})
// result, err := client.BZMPopWithOptions([]string{"my_list"}, api.MAX, 0.1, options.NewZMPopOptions().SetCount(2))
// result["my_list"] = []MemberAndScore{{Member: "six", Score: 6.0}, {Member: "five", Score 5.0}}
//
// [valkey.io]: https://valkey.io/commands/bzmpop/
// [Blocking Commands]: https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#blocking-commands
func (client *baseClient) BZMPopWithOptions(
keys []string,
scoreFilter ScoreFilter,
timeoutSecs float64,
opts *options.ZMPopOptions,
) (Result[KeyWithArrayOfMembersAndScores], error) {
scoreFilterStr, err := scoreFilter.toString()
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}

// Check for potential length overflow.
if len(keys) > math.MaxInt-5 {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), &errors.RequestError{
Msg: "Length overflow for the provided keys",
}
}

// args slice will have 5 more arguments with the keys provided.
args := make([]string, 0, len(keys)+5)
args = append(args, utils.FloatToString(timeoutSecs), strconv.Itoa(len(keys)))
args = append(args, keys...)
args = append(args, scoreFilterStr)
if opts != nil {
optionArgs, err := opts.ToArgs()
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}
args = append(args, optionArgs...)
}
result, err := client.executeCommand(C.BZMPop, args)
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}

return handleKeyWithArrayOfMembersAndScoresResponse(result)
}

// Returns the specified range of elements in the sorted set stored at `key`.
// `ZRANGE` can perform different types of range queries: by index (rank), by the score, or by lexicographical order.
//
Expand Down
22 changes: 22 additions & 0 deletions go/api/command_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,28 @@ func (listDirection ListDirection) toString() (string, error) {
}
}

// Mandatory option for [ZMPop] and for [BZMPop].
// Defines which elements to pop from the sorted set.
type ScoreFilter string

const (
// Pop elements with the highest scores.
MAX ScoreFilter = "MAX"
// Pop elements with the lowest scores.
MIN ScoreFilter = "MIN"
)

func (scoreFilter ScoreFilter) toString() (string, error) {
switch scoreFilter {
case MAX:
return string(MAX), nil
case MIN:
return string(MIN), nil
default:
return "", &errors.RequestError{Msg: "Invalid score filter"}
}
}

// Optional arguments to Restore(key string, ttl int64, value string, option *RestoreOptions)
//
// Note IDLETIME and FREQ modifiers cannot be set at the same time.
Expand Down
34 changes: 34 additions & 0 deletions go/api/options/zmpop_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

package options

import (
"github.com/valkey-io/valkey-glide/go/glide/utils"
)

// Optional arguments for `ZMPop` and `BZMPop` in [SortedSetCommands]
type ZMPopOptions struct {
count int64
countIsSet bool
}

func NewZMPopOptions() *ZMPopOptions {
return &ZMPopOptions{}
}

// Set the count.
func (zmpo *ZMPopOptions) SetCount(count int64) *ZMPopOptions {
zmpo.count = count
zmpo.countIsSet = true
return zmpo
}

func (zmpo *ZMPopOptions) ToArgs() ([]string, error) {
var args []string

if zmpo.countIsSet {
args = append(args, "COUNT", utils.IntToString(zmpo.count))
}

return args, nil
}
44 changes: 44 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,50 @@ func handleKeyWithMemberAndScoreResponse(response *C.struct_CommandResponse) (Re
return CreateKeyWithMemberAndScoreResult(KeyWithMemberAndScore{key, member, score}), nil
}

func handleKeyWithArrayOfMembersAndScoresResponse(
response *C.struct_CommandResponse,
) (Result[KeyWithArrayOfMembersAndScores], error) {
defer C.free_command_response(response)

if response.response_type == uint32(C.Null) {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), nil
}

typeErr := checkResponseType(response, C.Array, true)
if typeErr != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), typeErr
}

slice, err := parseArray(response)
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}

arr := slice.([]interface{})
key := arr[0].(string)
converted, err := mapConverter[float64]{
nil,
false,
}.convert(arr[1])
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}
res, ok := converted.(map[string]float64)

if !ok {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), &errors.RequestError{
Msg: fmt.Sprintf("unexpected type of second element: %T", converted),
}
}
memberAndScoreArray := make([]MemberAndScore, 0, len(res))

for k, v := range res {
memberAndScoreArray = append(memberAndScoreArray, MemberAndScore{k, v})
}

return CreateKeyWithArrayOfMembersAndScoresResult(KeyWithArrayOfMembersAndScores{key, memberAndScoreArray}), nil
}

func handleScanResponse(response *C.struct_CommandResponse) (string, []string, error) {
defer C.free_command_response(response)

Expand Down
21 changes: 21 additions & 0 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ type KeyWithMemberAndScore struct {
Score float64
}

// Response of the [ZMPop] and [BZMPop] command.
type KeyWithArrayOfMembersAndScores struct {
Key string
MembersAndScores []MemberAndScore
}

type MemberAndScore struct {
Member string
Score float64
}

// Response type of [XAutoClaim] command.
type XAutoClaimResponse struct {
NextEntry string
Expand Down Expand Up @@ -77,6 +88,16 @@ func CreateNilKeyWithMemberAndScoreResult() Result[KeyWithMemberAndScore] {
return Result[KeyWithMemberAndScore]{val: KeyWithMemberAndScore{"", "", 0.0}, isNil: true}
}

func CreateKeyWithArrayOfMembersAndScoresResult(
kmsVals KeyWithArrayOfMembersAndScores,
) Result[KeyWithArrayOfMembersAndScores] {
return Result[KeyWithArrayOfMembersAndScores]{val: kmsVals, isNil: false}
}

func CreateNilKeyWithArrayOfMembersAndScoresResult() Result[KeyWithArrayOfMembersAndScores] {
return Result[KeyWithArrayOfMembersAndScores]{val: KeyWithArrayOfMembersAndScores{"", nil}, isNil: true}
}

// Enum to distinguish value types stored in `ClusterValue`
type ValueType int

Expand Down
9 changes: 9 additions & 0 deletions go/api/sorted_set_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ type SortedSetCommands interface {

BZPopMin(keys []string, timeoutSecs float64) (Result[KeyWithMemberAndScore], error)

BZMPop(keys []string, scoreFilter ScoreFilter, timeoutSecs float64) (Result[KeyWithArrayOfMembersAndScores], error)

BZMPopWithOptions(
keys []string,
scoreFilter ScoreFilter,
timeoutSecs float64,
options *options.ZMPopOptions,
) (Result[KeyWithArrayOfMembersAndScores], error)

ZRange(key string, rangeQuery options.ZRangeQuery) ([]string, error)

ZRangeWithScores(key string, rangeQuery options.ZRangeQueryWithScores) (map[string]float64, error)
Expand Down
Loading

0 comments on commit 41f423e

Please sign in to comment.