Skip to content

Commit

Permalink
Go xack command new (valkey-io#2982)
Browse files Browse the repository at this point in the history
* Go: Add XAck command

Signed-off-by: Prateek Kumar <prateek.kumar@improving.com>
prateek-kumar-improving authored Jan 21, 2025
1 parent 502a8d7 commit 4050baf
Showing 3 changed files with 42 additions and 8 deletions.
31 changes: 31 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
@@ -2938,3 +2938,34 @@ func (client *baseClient) XGroupDelConsumer(
}
return handleIntResponse(result)
}

// Returns the number of messages that were successfully acknowledged by the consumer group member
// of a stream. This command should be called on a pending message so that such message does not
// get processed again.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - he consumer group name.
// ids - Stream entry IDs to acknowledge and purge messages.
//
// Return value:
//
// The number of messages that were successfully acknowledged.
//
// Example:
//
// // Assuming streamId1 and streamId2 already exist.
// xackResult, err := client.XAck("key", "groupName", []string{"streamId1", "streamId2"})
// fmt.Println(xackResult) // 2
//
// [valkey.io]: https://valkey.io/commands/xack/
func (client *baseClient) XAck(key string, group string, ids []string) (int64, error) {
result, err := client.executeCommand(C.XAck, append([]string{key, group}, ids...))
if err != nil {
return defaultIntResponse, err
}
return handleIntResponse(result)
}
2 changes: 2 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
@@ -156,4 +156,6 @@ type StreamCommands interface {
XGroupCreateConsumer(key string, group string, consumer string) (bool, error)

XGroupDelConsumer(key string, group string, consumer string) (int64, error)

XAck(key string, group string, ids []string) (int64, error)
}
17 changes: 9 additions & 8 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
@@ -6498,28 +6498,29 @@ func (suite *GlideTestSuite) TestXGroupStreamCommands() {
assert.NotNil(suite.T(), streamId3)

// xack that streamid1 and streamid2 have been processed
command := []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")
xackResult, err := client.XAck(key, groupName, []string{streamId1.Value(), streamId2.Value()})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(2), xackResult)

// Delete the consumer group and expect 0 pending messages
respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(0), respInt64)

// TODO: Use XAck when it is added to the Go client
// xack streamid_1, and streamid_2 already received returns 0L
command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")
xackResult, err = client.XAck(key, groupName, []string{streamId1.Value(), streamId2.Value()})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(0), xackResult)

// Consume the last message with the previously deleted consumer (creates the consumer anew)
resp, err = client.XReadGroup(groupName, consumerName, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), 1, len(resp[key]))

// TODO: Use XAck when it is added to the Go client
// Use non existent group, so xack streamid_3 returns 0
command = []string{"XAck", key, "non-existent-group", streamId3.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")
xackResult, err = client.XAck(key, "non-existent-group", []string{streamId3.Value()})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(0), xackResult)

// Delete the consumer group and expect 1 pending message
respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName)

0 comments on commit 4050baf

Please sign in to comment.