Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: Fix XRange and XRevRange return data type #3052

Merged
merged 4 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6812,7 +6812,7 @@ func (client *baseClient) CopyWithOptions(
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// An `array` of stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive.
//
// Example:
Expand All @@ -6823,22 +6823,22 @@ func (client *baseClient) CopyWithOptions(
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]]
// fmt.Println(res) // [{streamId [["field1", "entry1"], ["field2", "entry2"]]}]
//
// // Retrieve exactly one stream entry by id
// res, err := client.XRange(
// "key",
// options.NewStreamBoundary(streamId, true),
// options.NewStreamBoundary(streamId, true),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"]]
// fmt.Println(res) // [{streamId [["field1", "entry1"]]}]
//
// [valkey.io]: https://valkey.io/commands/xrange/
func (client *baseClient) XRange(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
) (map[string][][]string, error) {
) ([]XRangeResponse, error) {
return client.XRangeWithOptions(key, start, end, nil)
}

Expand All @@ -6859,7 +6859,7 @@ func (client *baseClient) XRange(
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// An `array` of stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive.
//
// Example:
Expand All @@ -6871,7 +6871,7 @@ func (client *baseClient) XRange(
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// options.NewStreamRangeOptions().SetCount(10),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]]
// fmt.Println(res) // [{streamId [["field1", "entry1"], ["field2", "entry2"]]}]
//
// // Retrieve exactly one stream entry by id
// res, err := client.XRangeWithOptions(
Expand All @@ -6880,15 +6880,15 @@ func (client *baseClient) XRange(
// options.NewStreamBoundary(streamId, true),
// options.NewStreamRangeOptions().SetCount(1),
// )
// fmt.Println(res) // map[key:[["field1", "entry1"]]
// fmt.Println(res) // [{streamId [["field1", "entry1"]]}]
//
// [valkey.io]: https://valkey.io/commands/xrange/
func (client *baseClient) XRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
opts *options.StreamRangeOptions,
) (map[string][][]string, error) {
) ([]XRangeResponse, error) {
args := []string{key, string(start), string(end)}
if opts != nil {
optionArgs, err := opts.ToArgs()
Expand All @@ -6901,7 +6901,7 @@ func (client *baseClient) XRangeWithOptions(
if err != nil {
return nil, err
}
return handleMapOfArrayOfStringArrayOrNilResponse(result)
return handleXRangeResponse(result)
}

// Returns stream entries matching a given range of IDs in reverse order.
Expand All @@ -6921,7 +6921,7 @@ func (client *baseClient) XRangeWithOptions(
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// An `array` of stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`.
//
// Example:
Expand All @@ -6932,14 +6932,14 @@ func (client *baseClient) XRangeWithOptions(
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// )
// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]]
// fmt.Println(res) // [{streamID ["field2", "entry2"], ["field1", "entry1"]]}]
//
// [valkey.io]: https://valkey.io/commands/xrevrange/
func (client *baseClient) XRevRange(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
) (map[string][][]string, error) {
) ([]XRangeResponse, error) {
return client.XRevRangeWithOptions(key, start, end, nil)
}

Expand All @@ -6961,7 +6961,7 @@ func (client *baseClient) XRevRange(
//
// Return value:
//
// A `map` of key to stream entry data, where entry data is an array of
// An `array` of stream entry data, where entry data is an array of
// pairings with format `[[field, entry], [field, entry], ...]`.
// Returns `nil` if `count` is non-positive.
//
Expand All @@ -6974,15 +6974,15 @@ func (client *baseClient) XRevRange(
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
// options.NewStreamRangeOptions().SetCount(10),
// )
// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]]
// fmt.Println(res) // [{streamID [["field2", "entry2"], ["field1", "entry1"]]}]
//
// [valkey.io]: https://valkey.io/commands/xrevrange/
func (client *baseClient) XRevRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
opts *options.StreamRangeOptions,
) (map[string][][]string, error) {
) ([]XRangeResponse, error) {
args := []string{key, string(start), string(end)}
if opts != nil {
optionArgs, err := opts.ToArgs()
Expand All @@ -6995,7 +6995,7 @@ func (client *baseClient) XRevRangeWithOptions(
if err != nil {
return nil, err
}
return handleMapOfArrayOfStringArrayOrNilResponse(result)
edlng marked this conversation as resolved.
Show resolved Hide resolved
return handleXRevRangeResponse(result)
}

// Reads or modifies the array of bits representing the string that is held at key
Expand Down
87 changes: 85 additions & 2 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "C"
import (
"fmt"
"reflect"
"sort"
"strconv"
"unsafe"

Expand Down Expand Up @@ -833,12 +834,94 @@ func handleMapOfArrayOfStringArrayResponse(response *C.struct_CommandResponse) (
return claimedEntries, nil
}

func handleMapOfArrayOfStringArrayOrNilResponse(response *C.struct_CommandResponse) (map[string][][]string, error) {
func handleXRangeResponse(response *C.struct_CommandResponse) ([]XRangeResponse, error) {
defer C.free_command_response(response)

if response.response_type == uint32(C.Null) {
return nil, nil
}

typeErr := checkResponseType(response, C.Map, false)
if typeErr != nil {
return nil, typeErr
}
mapData, err := parseMap(response)
if err != nil {
return nil, err
}
converted, err := mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{
nil,
false,
},
false,
},
false,
}.convert(mapData)
if err != nil {
return nil, err
}
claimedEntries, ok := converted.(map[string][][]string)
if !ok {
return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type of second element: %T", converted)}
}

xRangeResponseArray := make([]XRangeResponse, 0, len(claimedEntries))

for k, v := range claimedEntries {
xRangeResponseArray = append(xRangeResponseArray, XRangeResponse{k, v})
}

sort.Slice(xRangeResponseArray, func(i, j int) bool {
return xRangeResponseArray[i].StreamId < xRangeResponseArray[j].StreamId
})
Comment on lines +876 to +878
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need sort?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since we get a map from the CommandResponse, the order of the entries are not guaranteed. Followed by that, the iteration I do to add these map entries to an array also would not guarantee the order is maintained. So I decided to add a sort to ensure the returned array is in order

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that we need it, but I don't mind

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding sort would be helpful in the XRevRange results but let me know your thoughts @jbrinkman

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like there is a problem with the Glide Core implementation. I think using sort here is warranted to guarantee that the sorting is maintained.

return xRangeResponseArray, nil
}

func handleXRevRangeResponse(response *C.struct_CommandResponse) ([]XRangeResponse, error) {
defer C.free_command_response(response)

if response.response_type == uint32(C.Null) {
return nil, nil
}

return handleMapOfArrayOfStringArrayResponse(response)
typeErr := checkResponseType(response, C.Map, false)
if typeErr != nil {
return nil, typeErr
}
mapData, err := parseMap(response)
if err != nil {
return nil, err
}
converted, err := mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{
nil,
false,
},
false,
},
false,
}.convert(mapData)
if err != nil {
return nil, err
}
claimedEntries, ok := converted.(map[string][][]string)
if !ok {
return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type of second element: %T", converted)}
}

xRangeResponseArray := make([]XRangeResponse, 0, len(claimedEntries))

for k, v := range claimedEntries {
xRangeResponseArray = append(xRangeResponseArray, XRangeResponse{k, v})
}

sort.Slice(xRangeResponseArray, func(i, j int) bool {
return xRangeResponseArray[i].StreamId > xRangeResponseArray[j].StreamId
})
Comment on lines +921 to +923
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reasoning as above but this time we return the order in reverse

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine.

return xRangeResponseArray, nil
}

func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) {
Expand Down
6 changes: 6 additions & 0 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type MemberAndScore struct {
Score float64
}

// Response type of [XRange] and [XRevRange] commands.
type XRangeResponse struct {
StreamId string
Entries [][]string
}

// Response type of [XAutoClaim] command.
type XAutoClaimResponse struct {
NextEntry string
Expand Down
8 changes: 4 additions & 4 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,21 @@ type StreamCommands interface {
options *options.StreamClaimOptions,
) ([]string, error)

XRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error)
XRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error)

XRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
options *options.StreamRangeOptions,
) (map[string][][]string, error)
) ([]XRangeResponse, error)

XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error)
XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error)

XRevRangeWithOptions(
key string,
start options.StreamBoundary,
end options.StreamBoundary,
options *options.StreamRangeOptions,
) (map[string][][]string, error)
) ([]XRangeResponse, error)
}
18 changes: 14 additions & 4 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7455,7 +7455,10 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() {
assert.NoError(suite.T(), err)
assert.Equal(
suite.T(),
map[string][][]string{streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}},
[]api.XRangeResponse{
{StreamId: streamId1.Value(), Entries: [][]string{{"field1", "value1"}}},
{StreamId: streamId2.Value(), Entries: [][]string{{"field2", "value2"}}},
},
xrangeResult,
)

Expand All @@ -7468,7 +7471,10 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() {
assert.NoError(suite.T(), err)
assert.Equal(
suite.T(),
map[string][][]string{streamId2.Value(): {{"field2", "value2"}}, streamId1.Value(): {{"field1", "value1"}}},
[]api.XRangeResponse{
{StreamId: streamId2.Value(), Entries: [][]string{{"field2", "value2"}}},
{StreamId: streamId1.Value(), Entries: [][]string{{"field1", "value1"}}},
},
xrevrangeResult,
)

Expand Down Expand Up @@ -7507,7 +7513,9 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() {
assert.NoError(suite.T(), err)
assert.Equal(
suite.T(),
map[string][][]string{streamId3.Value(): {{"field3", "value3"}}},
[]api.XRangeResponse{
{StreamId: streamId3.Value(), Entries: [][]string{{"field3", "value3"}}},
},
xrangeResult,
)

Expand All @@ -7521,7 +7529,9 @@ func (suite *GlideTestSuite) TestXRangeAndXRevRange() {
assert.NoError(suite.T(), err)
assert.Equal(
suite.T(),
map[string][][]string{streamId3.Value(): {{"field3", "value3"}}},
[]api.XRangeResponse{
{StreamId: streamId3.Value(), Entries: [][]string{{"field3", "value3"}}},
},
xrevrangeResult,
)

Expand Down
Loading