Skip to content

Commit

Permalink
Go: XGROUP DESTROY.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Jan 17, 2025
1 parent bc88940 commit 1f98bb9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
30 changes: 28 additions & 2 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,7 @@ func (client *baseClient) XPendingWithOptions(
return handleXPendingDetailResponse(result)
}

// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
// Creates a new consumer group uniquely identified by `group` for the stream stored at `key`.
//
// See [valkey.io] for details.
//
Expand All @@ -2020,7 +2020,7 @@ func (client *baseClient) XGroupCreate(key string, group string, id string) (str
return client.XGroupCreateWithOptions(key, group, id, options.NewXGroupCreateOptions())
}

// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
// Creates a new consumer group uniquely identified by `group` for the stream stored at `key`.
//
// See [valkey.io] for details.
//
Expand Down Expand Up @@ -2056,3 +2056,29 @@ func (client *baseClient) XGroupCreateWithOptions(
}
return handleStringResponse(result)
}

// Destroys the consumer group `group` for the stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name to delete.
//
// Return value:
//
// `true` if the consumer group is destroyed. Otherwise, `false`.
//
// Example:
//
// client.XGroupDestroy("mystream", "mygroup")
//
// [valkey.io]: https://valkey.io/commands/xgroup-destroy/
func (client *baseClient) XGroupDestroy(key string, group string) (bool, error) {
result, err := client.executeCommand(C.XGroupDestroy, []string{key, group})
if err != nil {
return defaultBoolResponse, err
}
return handleBoolResponse(result)
}
2 changes: 2 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,6 @@ type StreamCommands interface {
XGroupCreate(key string, group string, id string) (string, error)

XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error)

XGroupDestroy(key string, group string) (bool, error)
}
26 changes: 16 additions & 10 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5713,44 +5713,50 @@ func (suite *GlideTestSuite) TestXPendingFailures() {
})
}

// TODO add XGroupDestroy tests there
func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
group1 := uuid.NewString()
group2 := uuid.NewString()
group := uuid.NewString()
id := "0-1"

// Stream not created results in error
_, err := client.XGroupCreate(key, group1, id)
_, err := client.XGroupCreate(key, group, id)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

// Stream with option to create creates stream & Group
opts := options.NewXGroupCreateOptions().SetMakeStream()
suite.verifyOK(client.XGroupCreateWithOptions(key, group1, id, opts))
suite.verifyOK(client.XGroupCreateWithOptions(key, group, id, opts))

// ...and again results in BUSYGROUP error, because group names must be unique
_, err = client.XGroupCreate(key, group1, id)
_, err = client.XGroupCreate(key, group, id)
assert.ErrorContains(suite.T(), err, "BUSYGROUP")
assert.IsType(suite.T(), &api.RequestError{}, err)

// TODO add XGroupDestroy tests there
// Stream Group can be destroyed returns: true
destroyed, err := client.XGroupDestroy(key, group)
assert.NoError(suite.T(), err)
assert.True(suite.T(), destroyed)

// ...and again results in: false
destroyed, err = client.XGroupDestroy(key, group)
assert.NoError(suite.T(), err)
assert.False(suite.T(), destroyed)

// ENTRIESREAD option was added in valkey 7.0.0
opts = options.NewXGroupCreateOptions().SetEntriesRead(100)
if suite.serverVersion >= "7.0.0" {
suite.verifyOK(client.XGroupCreateWithOptions(key, group2, id, opts))
suite.verifyOK(client.XGroupCreateWithOptions(key, group, id, opts))
} else {
_, err = client.XGroupCreateWithOptions(key, group2, id, opts)
_, err = client.XGroupCreateWithOptions(key, group, id, opts)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
}

// key is not a stream
key = uuid.NewString()
suite.verifyOK(client.Set(key, id))
_, err = client.XGroupCreate(key, group1, id)
_, err = client.XGroupCreate(key, group, id)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
})
Expand Down

0 comments on commit 1f98bb9

Please sign in to comment.