From 1f98bb91dfb7e761ccbde6f796c47f724cfd8c3b Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 17 Jan 2025 14:49:07 -0800 Subject: [PATCH] Go: `XGROUP DESTROY`. Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 30 ++++++++++++++++++++++++++-- go/api/stream_commands.go | 2 ++ go/integTest/shared_commands_test.go | 26 ++++++++++++++---------- 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 1686efdc2f..f0c64979d2 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1996,7 +1996,7 @@ func (client *baseClient) XPendingWithOptions( return handleXPendingDetailResponse(result) } -// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`. +// Creates a new consumer group uniquely identified by `group` for the stream stored at `key`. // // See [valkey.io] for details. // @@ -2020,7 +2020,7 @@ func (client *baseClient) XGroupCreate(key string, group string, id string) (str return client.XGroupCreateWithOptions(key, group, id, options.NewXGroupCreateOptions()) } -// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`. +// Creates a new consumer group uniquely identified by `group` for the stream stored at `key`. // // See [valkey.io] for details. // @@ -2056,3 +2056,29 @@ func (client *baseClient) XGroupCreateWithOptions( } return handleStringResponse(result) } + +// Destroys the consumer group `group` for the stream stored at `key`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name to delete. +// +// Return value: +// +// `true` if the consumer group is destroyed. Otherwise, `false`. +// +// Example: +// +// client.XGroupDestroy("mystream", "mygroup") +// +// [valkey.io]: https://valkey.io/commands/xgroup-destroy/ +func (client *baseClient) XGroupDestroy(key string, group string) (bool, error) { + result, err := client.executeCommand(C.XGroupDestroy, []string{key, group}) + if err != nil { + return defaultBoolResponse, err + } + return handleBoolResponse(result) +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index e8e71ef74a..293813b4b6 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -115,4 +115,6 @@ type StreamCommands interface { XGroupCreate(key string, group string, id string) (string, error) XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error) + + XGroupDestroy(key string, group string) (bool, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 4a6e561da2..0d2158d203 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -5713,36 +5713,42 @@ func (suite *GlideTestSuite) TestXPendingFailures() { }) } -// TODO add XGroupDestroy tests there func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.NewString() - group1 := uuid.NewString() - group2 := uuid.NewString() + group := uuid.NewString() id := "0-1" // Stream not created results in error - _, err := client.XGroupCreate(key, group1, id) + _, err := client.XGroupCreate(key, group, id) assert.Error(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) // Stream with option to create creates stream & Group opts := options.NewXGroupCreateOptions().SetMakeStream() - suite.verifyOK(client.XGroupCreateWithOptions(key, group1, id, opts)) + suite.verifyOK(client.XGroupCreateWithOptions(key, group, id, opts)) // ...and again results in BUSYGROUP error, because group names must be unique - _, err = client.XGroupCreate(key, group1, id) + _, err = client.XGroupCreate(key, group, id) assert.ErrorContains(suite.T(), err, "BUSYGROUP") assert.IsType(suite.T(), &api.RequestError{}, err) - // TODO add XGroupDestroy tests there + // Stream Group can be destroyed returns: true + destroyed, err := client.XGroupDestroy(key, group) + assert.NoError(suite.T(), err) + assert.True(suite.T(), destroyed) + + // ...and again results in: false + destroyed, err = client.XGroupDestroy(key, group) + assert.NoError(suite.T(), err) + assert.False(suite.T(), destroyed) // ENTRIESREAD option was added in valkey 7.0.0 opts = options.NewXGroupCreateOptions().SetEntriesRead(100) if suite.serverVersion >= "7.0.0" { - suite.verifyOK(client.XGroupCreateWithOptions(key, group2, id, opts)) + suite.verifyOK(client.XGroupCreateWithOptions(key, group, id, opts)) } else { - _, err = client.XGroupCreateWithOptions(key, group2, id, opts) + _, err = client.XGroupCreateWithOptions(key, group, id, opts) assert.Error(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) } @@ -5750,7 +5756,7 @@ func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() { // key is not a stream key = uuid.NewString() suite.verifyOK(client.Set(key, id)) - _, err = client.XGroupCreate(key, group1, id) + _, err = client.XGroupCreate(key, group, id) assert.Error(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) })