diff --git a/go/api/base_client.go b/go/api/base_client.go index a3c93f97ee..9c08a9fc64 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1874,6 +1874,257 @@ func (client *baseClient) XLen(key string) (int64, error) { return handleIntResponse(result) } +// Transfers ownership of pending stream entries that match the specified criteria. +// +// Since: +// +// Valkey 6.2.0 and above. +// +// See [valkey.io] for more details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The group consumer. +// minIdleTime - The minimum idle time for the message to be claimed. +// start - Filters the claimed entries to those that have an ID equal or greater than the specified value. +// +// Return value: +// +// An object containing the following elements: +// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is +// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if +// the entire stream was scanned. +// - A map of the claimed entries. +// - If you are using Valkey 7.0.0 or above, the response will also include an array containing +// the message IDs that were in the Pending Entries List but no longer exist in the stream. +// These IDs are deleted from the Pending Entries List. +// +// Example: +// +// result, err := client.XAutoClaim("myStream", "myGroup", "myConsumer", 42, "0-0") +// result: +// // &{ +// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call +// // map[ +// // "1609338752495-0": [ // claimed entries +// // ["field 1", "value 1"] +// // ["field 2", "value 2"] +// // ] +// // ] +// // [ +// // "1594324506465-0", // array of IDs of deleted messages, +// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above +// // ] +// // } +// +// [valkey.io]: https://valkey.io/commands/xautoclaim/ +func (client *baseClient) XAutoClaim( + key string, + group string, + consumer string, + minIdleTime int64, + start string, +) (XAutoClaimResponse, error) { + return client.XAutoClaimWithOptions(key, group, consumer, minIdleTime, start, nil) +} + +// Transfers ownership of pending stream entries that match the specified criteria. +// +// Since: +// +// Valkey 6.2.0 and above. +// +// See [valkey.io] for more details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The group consumer. +// minIdleTime - The minimum idle time for the message to be claimed. +// start - Filters the claimed entries to those that have an ID equal or greater than the specified value. +// options - Options detailing how to read the stream. +// +// Return value: +// +// An object containing the following elements: +// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is +// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if +// the entire stream was scanned. +// - A map of the claimed entries. +// - If you are using Valkey 7.0.0 or above, the response will also include an array containing +// the message IDs that were in the Pending Entries List but no longer exist in the stream. +// These IDs are deleted from the Pending Entries List. +// +// Example: +// +// opts := options.NewXAutoClaimOptionsWithCount(1) +// result, err := client.XAutoClaimWithOptions("myStream", "myGroup", "myConsumer", 42, "0-0", opts) +// result: +// // &{ +// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call +// // map[ +// // "1609338752495-0": [ // claimed entries +// // ["field 1", "value 1"] +// // ["field 2", "value 2"] +// // ] +// // ] +// // [ +// // "1594324506465-0", // array of IDs of deleted messages, +// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above +// // ] +// // } +// +// [valkey.io]: https://valkey.io/commands/xautoclaim/ +func (client *baseClient) XAutoClaimWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + options *options.XAutoClaimOptions, +) (XAutoClaimResponse, error) { + args := []string{key, group, consumer, utils.IntToString(minIdleTime), start} + if options != nil { + optArgs, err := options.ToArgs() + if err != nil { + return XAutoClaimResponse{}, err + } + args = append(args, optArgs...) + } + result, err := client.executeCommand(C.XAutoClaim, args) + if err != nil { + return XAutoClaimResponse{}, err + } + return handleXAutoClaimResponse(result) +} + +// Transfers ownership of pending stream entries that match the specified criteria. +// +// Since: +// +// Valkey 6.2.0 and above. +// +// See [valkey.io] for more details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The group consumer. +// minIdleTime - The minimum idle time for the message to be claimed. +// start - Filters the claimed entries to those that have an ID equal or greater than the specified value. +// +// Return value: +// +// An object containing the following elements: +// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is +// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if +// the entire stream was scanned. +// - An array of IDs for the claimed entries. +// - If you are using Valkey 7.0.0 or above, the response will also include an array containing +// the message IDs that were in the Pending Entries List but no longer exist in the stream. +// These IDs are deleted from the Pending Entries List. +// +// Example: +// +// result, err := client.XAutoClaimJustId("myStream", "myGroup", "myConsumer", 42, "0-0") +// result: +// // &{ +// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call +// // [ +// // "1609338752495-0", // claimed entries +// // "1609338752495-1" +// // ] +// // [ +// // "1594324506465-0", // array of IDs of deleted messages, +// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above +// // ] +// // } +// +// [valkey.io]: https://valkey.io/commands/xautoclaim/ +func (client *baseClient) XAutoClaimJustId( + key string, + group string, + consumer string, + minIdleTime int64, + start string, +) (XAutoClaimJustIdResponse, error) { + return client.XAutoClaimJustIdWithOptions(key, group, consumer, minIdleTime, start, nil) +} + +// Transfers ownership of pending stream entries that match the specified criteria. +// +// Since: +// +// Valkey 6.2.0 and above. +// +// See [valkey.io] for more details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The group consumer. +// minIdleTime - The minimum idle time for the message to be claimed. +// start - Filters the claimed entries to those that have an ID equal or greater than the specified value. +// options - Options detailing how to read the stream. +// +// Return value: +// +// An object containing the following elements: +// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is +// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if +// the entire stream was scanned. +// - An array of IDs for the claimed entries. +// - If you are using Valkey 7.0.0 or above, the response will also include an array containing +// the message IDs that were in the Pending Entries List but no longer exist in the stream. +// These IDs are deleted from the Pending Entries List. +// +// Example: +// +// opts := options.NewXAutoClaimOptionsWithCount(1) +// result, err := client.XAutoClaimJustIdWithOptions("myStream", "myGroup", "myConsumer", 42, "0-0", opts) +// result: +// // &{ +// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call +// // [ +// // "1609338752495-0", // claimed entries +// // "1609338752495-1" +// // ] +// // [ +// // "1594324506465-0", // array of IDs of deleted messages, +// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above +// // ] +// // } +// +// [valkey.io]: https://valkey.io/commands/xautoclaim/ +func (client *baseClient) XAutoClaimJustIdWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + options *options.XAutoClaimOptions, +) (XAutoClaimJustIdResponse, error) { + args := []string{key, group, consumer, utils.IntToString(minIdleTime), start} + if options != nil { + optArgs, err := options.ToArgs() + if err != nil { + return XAutoClaimJustIdResponse{}, err + } + args = append(args, optArgs...) + } + args = append(args, "JUSTID") + result, err := client.executeCommand(C.XAutoClaim, args) + if err != nil { + return XAutoClaimJustIdResponse{}, err + } + return handleXAutoClaimJustIdResponse(result) +} + // Removes the specified entries by id from a stream, and returns the number of entries deleted. // // See [valkey.io] for details. diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index ff40c224ac..4507b0478c 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -116,6 +116,20 @@ func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) { return args, nil } +// Optional arguments for `XAutoClaim` in [StreamCommands] +type XAutoClaimOptions struct { + count int64 +} + +// Option to trim the stream according to minimum ID. +func NewXAutoClaimOptionsWithCount(count int64) *XAutoClaimOptions { + return &XAutoClaimOptions{count} +} + +func (xacp *XAutoClaimOptions) ToArgs() ([]string, error) { + return []string{"COUNT", utils.IntToString(xacp.count)}, nil +} + // Optional arguments for `XRead` in [StreamCommands] type XReadOptions struct { count, block int64 diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index c848cdc57e..d8b3a734e2 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -603,6 +603,102 @@ func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) { // TODO: convert sets +func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) { + defer C.free_command_response(response) + var null XAutoClaimResponse // default response + typeErr := checkResponseType(response, C.Array, false) + if typeErr != nil { + return null, typeErr + } + slice, err := parseArray(response) + if err != nil { + return null, err + } + arr := slice.([]interface{}) + len := len(arr) + if len < 2 || len > 3 { + return null, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)} + } + converted, err := mapConverter[[][]string]{ + arrayConverter[[]string]{ + arrayConverter[string]{ + nil, + false, + }, + false, + }, + false, + }.convert(arr[1]) + if err != nil { + return null, err + } + claimedEntries, ok := converted.(map[string][][]string) + if !ok { + return null, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} + } + var deletedMessages []string + deletedMessages = nil + if len == 3 { + converted, err = arrayConverter[string]{ + nil, + false, + }.convert(arr[2]) + if err != nil { + return null, err + } + deletedMessages, ok = converted.([]string) + if !ok { + return null, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)} + } + } + return XAutoClaimResponse{arr[0].(string), claimedEntries, deletedMessages}, nil +} + +func handleXAutoClaimJustIdResponse(response *C.struct_CommandResponse) (XAutoClaimJustIdResponse, error) { + defer C.free_command_response(response) + var null XAutoClaimJustIdResponse // default response + typeErr := checkResponseType(response, C.Array, false) + if typeErr != nil { + return null, typeErr + } + slice, err := parseArray(response) + if err != nil { + return null, err + } + arr := slice.([]interface{}) + len := len(arr) + if len < 2 || len > 3 { + return null, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)} + } + converted, err := arrayConverter[string]{ + nil, + false, + }.convert(arr[1]) + if err != nil { + return null, err + } + claimedEntries, ok := converted.([]string) + if !ok { + return null, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} + } + var deletedMessages []string + deletedMessages = nil + if len == 3 { + converted, err = arrayConverter[string]{ + nil, + false, + }.convert(arr[2]) + if err != nil { + return null, err + } + deletedMessages, ok = converted.([]string) + if !ok { + return null, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)} + } + } + return XAutoClaimJustIdResponse{arr[0].(string), claimedEntries, deletedMessages}, nil +} + func handleXReadResponse(response *C.struct_CommandResponse) (map[string]map[string][][]string, error) { defer C.free_command_response(response) data, err := parseMap(response) diff --git a/go/api/response_types.go b/go/api/response_types.go index 2e6e527c43..84de6aed7f 100644 --- a/go/api/response_types.go +++ b/go/api/response_types.go @@ -23,6 +23,20 @@ type KeyWithMemberAndScore struct { Score float64 } +// Response type of [XAutoClaim] command. +type XAutoClaimResponse struct { + NextEntry string + ClaimedEntries map[string][][]string + DeletedMessages []string +} + +// Response type of [XAutoClaimJustId] command. +type XAutoClaimJustIdResponse struct { + NextEntry string + ClaimedEntries []string + DeletedMessages []string +} + func (result Result[T]) IsNil() bool { return result.isNil } diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 211d27cdaa..5005c47373 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -102,6 +102,34 @@ type StreamCommands interface { // [valkey.io]: https://valkey.io/commands/xlen/ XLen(key string) (int64, error) + XAutoClaim(key string, group string, consumer string, minIdleTime int64, start string) (XAutoClaimResponse, error) + + XAutoClaimWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + options *options.XAutoClaimOptions, + ) (XAutoClaimResponse, error) + + XAutoClaimJustId( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + ) (XAutoClaimJustIdResponse, error) + + XAutoClaimJustIdWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + options *options.XAutoClaimOptions, + ) (XAutoClaimJustIdResponse, error) + XReadGroup(group string, consumer string, keysAndIds map[string]string) (map[string]map[string][][]string, error) XReadGroupWithOptions( diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 32dcb6a055..507df5e959 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4122,6 +4122,122 @@ func sendWithCustomCommand(suite *GlideTestSuite, client api.BaseClient, args [] return res } +func (suite *GlideTestSuite) TestXAutoClaim() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + group := uuid.NewString() + consumer := uuid.NewString() + + sendWithCustomCommand( + suite, + client, + []string{"xgroup", "create", key, group, "0", "MKSTREAM"}, + "Can't send XGROUP CREATE as a custom command", + ) + sendWithCustomCommand( + suite, + client, + []string{"xgroup", "createconsumer", key, group, consumer}, + "Can't send XGROUP CREATECONSUMER as a custom command", + ) + + xadd, err := client.XAddWithOptions( + key, + [][]string{{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, + options.NewXAddOptions().SetId("0-1"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-1", xadd.Value()) + xadd, err = client.XAddWithOptions( + key, + [][]string{{"entry2_field1", "entry2_value1"}}, + options.NewXAddOptions().SetId("0-2"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-2", xadd.Value()) + + xreadgroup, err := client.XReadGroup(group, consumer, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), map[string]map[string][][]string{ + key: { + "0-1": {{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, + "0-2": {{"entry2_field1", "entry2_value1"}}, + }, + }, xreadgroup) + + opts := options.NewXAutoClaimOptionsWithCount(1) + xautoclaim, err := client.XAutoClaimWithOptions(key, group, consumer, 0, "0-0", opts) + assert.NoError(suite.T(), err) + var deletedEntries []string + if suite.serverVersion >= "7.0.0" { + deletedEntries = []string{} + } + assert.Equal( + suite.T(), + api.XAutoClaimResponse{ + NextEntry: "0-2", + ClaimedEntries: map[string][][]string{ + "0-1": {{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, + }, + DeletedMessages: deletedEntries, + }, + xautoclaim, + ) + + justId, err := client.XAutoClaimJustId(key, group, consumer, 0, "0-0") + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + api.XAutoClaimJustIdResponse{ + NextEntry: "0-0", + ClaimedEntries: []string{"0-1", "0-2"}, + DeletedMessages: deletedEntries, + }, + justId, + ) + + // add one more entry + xadd, err = client.XAddWithOptions( + key, + [][]string{{"entry3_field1", "entry3_value1"}}, + options.NewXAddOptions().SetId("0-3"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-3", xadd.Value()) + + // incorrect IDs - response is empty + xautoclaim, err = client.XAutoClaim(key, group, consumer, 0, "5-0") + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + api.XAutoClaimResponse{ + NextEntry: "0-0", + ClaimedEntries: map[string][][]string{}, + DeletedMessages: deletedEntries, + }, + xautoclaim, + ) + + justId, err = client.XAutoClaimJustId(key, group, consumer, 0, "5-0") + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + api.XAutoClaimJustIdResponse{ + NextEntry: "0-0", + ClaimedEntries: []string{}, + DeletedMessages: deletedEntries, + }, + justId, + ) + + // key exists, but it is not a stream + key2 := uuid.New().String() + suite.verifyOK(client.Set(key2, key2)) + _, err = client.XAutoClaim(key2, "_", "_", 0, "_") + assert.IsType(suite.T(), &api.RequestError{}, err) + }) +} + func (suite *GlideTestSuite) TestXReadGroup() { suite.runWithDefaultClients(func(client api.BaseClient) { key1 := "{xreadgroup}-1-" + uuid.NewString() @@ -5389,8 +5505,7 @@ func (suite *GlideTestSuite) TestXPending() { streamid_2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) assert.NoError(suite.T(), err) - command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) assert.NoError(suite.T(), err) _, err = client.XAdd(key, [][]string{{"field3", "value3"}}) @@ -5400,8 +5515,7 @@ func (suite *GlideTestSuite) TestXPending() { streamid_5, err := client.XAdd(key, [][]string{{"field5", "value5"}}) assert.NoError(suite.T(), err) - command = []string{"XReadGroup", "GROUP", groupName, consumer2, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer2, map[string]string{key: ">"}) assert.NoError(suite.T(), err) expectedSummary := api.XPendingSummary{ @@ -5465,8 +5579,7 @@ func (suite *GlideTestSuite) TestXPending() { streamid_2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) assert.NoError(suite.T(), err) - command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) assert.NoError(suite.T(), err) _, err = client.XAdd(key, [][]string{{"field3", "value3"}}) @@ -5476,8 +5589,7 @@ func (suite *GlideTestSuite) TestXPending() { streamid_5, err := client.XAdd(key, [][]string{{"field5", "value5"}}) assert.NoError(suite.T(), err) - command = []string{"XReadGroup", "GROUP", groupName, consumer2, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer2, map[string]string{key: ">"}) assert.NoError(suite.T(), err) expectedSummary := api.XPendingSummary{ @@ -5574,8 +5686,7 @@ func (suite *GlideTestSuite) TestXPendingFailures() { assert.Equal(suite.T(), 0, len(detailResult)) // read the entire stream for the consumer and mark messages as pending - command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) assert.NoError(suite.T(), err) // sanity check - expect some results: @@ -5727,8 +5838,7 @@ func (suite *GlideTestSuite) TestXPendingFailures() { assert.Equal(suite.T(), 0, len(detailResult)) // read the entire stream for the consumer and mark messages as pending - command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) assert.NoError(suite.T(), err) // sanity check - expect some results: