Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into go/xgroupdestroy
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Jan 21, 2025
2 parents bedac71 + cf88d66 commit 5b536c9
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 5 deletions.
67 changes: 66 additions & 1 deletion go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,7 +1561,7 @@ func (client *baseClient) XReadWithOptions(
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
// a key does not exist or does not contain requested entries.
//
// For example:
//
Expand Down Expand Up @@ -2665,6 +2665,71 @@ func (client *baseClient) XGroupDestroy(key string, group string) (bool, error)
return handleBoolResponse(result)
}

// Sets the last delivered ID for a consumer group.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// id - The stream entry ID that should be set as the last delivered ID for the consumer group.
//
// Return value:
//
// `"OK"`.
//
// Example:
//
// ok, err := client.XGroupSetId("mystream", "mygroup", "0-0")
// if ok != "OK" || err != nil {
// // handle error
// }
//
// [valkey.io]: https://valkey.io/commands/xgroup-create/
func (client *baseClient) XGroupSetId(key string, group string, id string) (string, error) {
return client.XGroupSetIdWithOptions(key, group, id, options.NewXGroupSetIdOptionsOptions())
}

// Sets the last delivered ID for a consumer group.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// id - The stream entry ID that should be set as the last delivered ID for the consumer group.
// opts - The options for the command. See [options.XGroupSetIdOptions] for details.
//
// Return value:
//
// `"OK"`.
//
// Example:
//
// opts := options.NewXGroupSetIdOptionsOptions().SetEntriesRead(42)
// ok, err := client.XGroupSetIdWithOptions("mystream", "mygroup", "0-0", opts)
// if ok != "OK" || err != nil {
// // handle error
// }
//
// [valkey.io]: https://valkey.io/commands/xgroup-create/
func (client *baseClient) XGroupSetIdWithOptions(
key string,
group string,
id string,
opts *options.XGroupSetIdOptions,
) (string, error) {
optionArgs, _ := opts.ToArgs()
args := append([]string{key, group, id}, optionArgs...)
result, err := client.executeCommand(C.XGroupSetId, args)
if err != nil {
return defaultStringResponse, err
}
return handleStringResponse(result)
}

// Removes all elements in the sorted set stored at `key` with a lexicographical order
// between `rangeQuery.Start` and `rangeQuery.End`.
//
Expand Down
32 changes: 28 additions & 4 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ func (xpo *XPendingOptions) SetConsumer(consumer string) *XPendingOptions {
func (xpo *XPendingOptions) ToArgs() ([]string, error) {
args := []string{}

// if minIdleTime is set, we need to add an `IDLE` argument along with the minIdleTime
if xpo.minIdleTime > 0 {
args = append(args, "IDLE")
args = append(args, utils.IntToString(xpo.minIdleTime))
Expand Down Expand Up @@ -280,9 +279,6 @@ func (xgco *XGroupCreateOptions) SetMakeStream() *XGroupCreateOptions {
return xgco
}

// A value representing the number of stream entries already read by the group.
//
// Since Valkey version 7.0.0.
func (xgco *XGroupCreateOptions) SetEntriesRead(entriesRead int64) *XGroupCreateOptions {
xgco.entriesRead = entriesRead
return xgco
Expand All @@ -302,3 +298,31 @@ func (xgco *XGroupCreateOptions) ToArgs() ([]string, error) {

return args, nil
}

// Optional arguments for `XGroupSetId` in [StreamCommands]
type XGroupSetIdOptions struct {
entriesRead int64
}

// Create new empty `XGroupSetIdOptions`
func NewXGroupSetIdOptionsOptions() *XGroupSetIdOptions {
return &XGroupSetIdOptions{-1}
}

// A value representing the number of stream entries already read by the group.
//
// Since Valkey version 7.0.0.
func (xgsio *XGroupSetIdOptions) SetEntriesRead(entriesRead int64) *XGroupSetIdOptions {
xgsio.entriesRead = entriesRead
return xgsio
}

func (xgsio *XGroupSetIdOptions) ToArgs() ([]string, error) {
var args []string

if xgsio.entriesRead > -1 {
args = append(args, "ENTRIESREAD", utils.IntToString(xgsio.entriesRead))
}

return args, nil
}
4 changes: 4 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ type StreamCommands interface {

XPendingWithOptions(key string, group string, options *options.XPendingOptions) ([]XPendingDetail, error)

XGroupSetId(key string, group string, id string) (string, error)

XGroupSetIdWithOptions(key string, group string, id string, opts *options.XGroupSetIdOptions) (string, error)

XGroupCreate(key string, group string, id string) (string, error)

XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error)
Expand Down
88 changes: 88 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4425,6 +4425,94 @@ func (suite *GlideTestSuite) TestXRead() {
})
}

func (suite *GlideTestSuite) TestXGroupSetId() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
group := uuid.NewString()
consumer := uuid.NewString()

// Setup: Create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries List
xadd, err := client.XAddWithOptions(
key,
[][]string{{"f0", "v0"}},
options.NewXAddOptions().SetId("1-0"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "1-0", xadd.Value())
xadd, err = client.XAddWithOptions(
key,
[][]string{{"f1", "v1"}},
options.NewXAddOptions().SetId("1-1"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "1-1", xadd.Value())
xadd, err = client.XAddWithOptions(
key,
[][]string{{"f2", "v2"}},
options.NewXAddOptions().SetId("1-2"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "1-2", xadd.Value())

sendWithCustomCommand(
suite,
client,
[]string{"xgroup", "create", key, group, "0"},
"Can't send XGROUP CREATE as a custom command",
)

xreadgroup, err := client.XReadGroup(group, consumer, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), map[string]map[string][][]string{
key: {
"1-0": {{"f0", "v0"}},
"1-1": {{"f1", "v1"}},
"1-2": {{"f2", "v2"}},
},
}, xreadgroup)

// Sanity check: xreadgroup should not return more entries since they're all already in the
// Pending Entries List.
xreadgroup, err = client.XReadGroup(group, consumer, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.Nil(suite.T(), xreadgroup)

// Reset the last delivered ID for the consumer group to "1-1"
if suite.serverVersion < "7.0.0" {
suite.verifyOK(client.XGroupSetId(key, group, "1-1"))
} else {
opts := options.NewXGroupSetIdOptionsOptions().SetEntriesRead(42)
suite.verifyOK(client.XGroupSetIdWithOptions(key, group, "1-1", opts))
}

// xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1
xreadgroup, err = client.XReadGroup(group, consumer, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), map[string]map[string][][]string{
key: {
"1-2": {{"f2", "v2"}},
},
}, xreadgroup)

// An error is raised if XGROUP SETID is called with a non-existing key
_, err = client.XGroupSetId(uuid.NewString(), group, "1-1")
assert.IsType(suite.T(), &api.RequestError{}, err)

// An error is raised if XGROUP SETID is called with a non-existing group
_, err = client.XGroupSetId(key, uuid.NewString(), "1-1")
assert.IsType(suite.T(), &api.RequestError{}, err)

// Setting the ID to a non-existing ID is allowed
suite.verifyOK(client.XGroupSetId(key, group, "99-99"))

// key exists, but is not a stream
key = uuid.NewString()
suite.verifyOK(client.Set(key, "xgroup setid"))
_, err = client.XGroupSetId(key, group, "1-1")
assert.IsType(suite.T(), &api.RequestError{}, err)
})
}

func (suite *GlideTestSuite) TestZAddAndZAddIncr() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
Expand Down

0 comments on commit 5b536c9

Please sign in to comment.