Skip to content

Commit

Permalink
feat: Use optr for optional pointer handling (#29)
Browse files Browse the repository at this point in the history
Also resolves: #28

---------

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Jan 21, 2025
1 parent 0c0bade commit 52af617
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 29 deletions.
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ linters:
- nosprintfhostport
- prealloc
- predeclared
- protogetter
- reassign
- recvcheck
- revive
Expand All @@ -67,7 +66,8 @@ linters-settings:

issues:
exclude-rules:
- path: s2/example_test.go
- path: _test.go
linters:
- goconst
- err113
- forcetypeassert
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
module github.com/s2-streamstore/s2-sdk-go

go 1.23
go 1.23.4

require (
github.com/google/uuid v1.6.0
github.com/s2-streamstore/optr v1.0.0
github.com/stretchr/testify v1.10.0
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/s2-streamstore/optr v1.0.0 h1:OqJpOO2uYsG+iXW9mNTlsCePDvxTS8mh9EtGcnyq8w8=
github.com/s2-streamstore/optr v1.0.0/go.mod h1:x/wSUCzWgpnvWWQG4dZ4JBFW2Kw0pE0yhLGb+0xiN8g=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
Expand Down
22 changes: 9 additions & 13 deletions s2/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"time"

"github.com/s2-streamstore/optr"
)

// Batching errors.
Expand Down Expand Up @@ -151,6 +153,8 @@ func appendRecordBatchingWorker(

recordsToFlush := newEmptyAppendRecordBatch(config.MaxBatchRecords, config.MaxBatchBytes)

nextMatchSeqNum := optr.Cloned(config.MatchSeqNum)

flush := func() error {
if recordsToFlush.IsEmpty() {
if peekedRecord != nil {
Expand All @@ -160,28 +164,20 @@ func appendRecordBatchingWorker(
return nil
}

var matchSeqNum *uint64

if config.MatchSeqNum != nil {
// Copy current match sequence number in another variable so we can
// mutate it later for the up-coming sequence number. Working with
// optionals as pointer in Go is hard.
currentMatchSeqNum := *config.MatchSeqNum
matchSeqNum = &currentMatchSeqNum
// Update the next matching sequence number.
*config.MatchSeqNum += uint64(recordsToFlush.Len())
}

input := &AppendInput{
Records: recordsToFlush,
MatchSeqNum: matchSeqNum,
MatchSeqNum: optr.Cloned(nextMatchSeqNum),
FencingToken: config.FencingToken,
}

if err := sender.Send(input); err != nil {
return err
}

nextMatchSeqNum = optr.Map(nextMatchSeqNum, func(m uint64) uint64 {
return m + uint64(recordsToFlush.Len())
})

recordsToFlush = newEmptyAppendRecordBatch(config.MaxBatchRecords, config.MaxBatchBytes)

if peekedRecord != nil {
Expand Down
19 changes: 19 additions & 0 deletions s2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/s2-streamstore/optr"
"github.com/s2-streamstore/s2-sdk-go/internal/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand Down Expand Up @@ -589,6 +590,24 @@ func (r *readSessionReceiver) Recv() (ReadOutput, error) {
if err == nil {
if batch, ok := next.(ReadOutputBatch); ok && len(batch.Records) > 0 {
r.ServiceReq.Req.StartSeqNum = 1 + batch.Records[len(batch.Records)-1].SeqNum

r.ServiceReq.Req.Limit.Bytes = optr.Map(r.ServiceReq.Req.Limit.Bytes, func(b uint64) uint64 {
batchBytes := uint64(batch.SequencedRecordBatch.MeteredBytes())
if b < batchBytes {
return 0
}

return b - batchBytes
})

r.ServiceReq.Req.Limit.Count = optr.Map(r.ServiceReq.Req.Limit.Count, func(c uint64) uint64 {
batchSize := uint64(len(batch.Records))
if c < batchSize {
return 0
}

return c - batchSize
})
}

return next, nil
Expand Down
4 changes: 2 additions & 2 deletions s2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func TestSendRetryable(t *testing.T) {
testCases := []testCase{
{
idLevel: idempotencyLevelNoSideEffects,
sendErr: errors.New("hello"), //nolint:err113
sendErr: errors.New("hello"),
shouldRetry: false,
},
{
idLevel: idempotencyLevelIdempotent,
sendErr: errors.New("hello"), //nolint:err113
sendErr: errors.New("hello"),
shouldRetry: false,
},
{
Expand Down
8 changes: 3 additions & 5 deletions s2/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/s2-streamstore/optr"
"github.com/s2-streamstore/s2-sdk-go/s2"
)

Expand Down Expand Up @@ -133,11 +134,10 @@ func ExampleBasinClient_ListStreams() {
}

prefix := "my-"
limit := uint64(10)

listStreamsResponse, err := basinClient.ListStreams(context.TODO(), &s2.ListStreamsRequest{
Prefix: prefix,
Limit: &limit,
Limit: optr.Some(uint64(10)),
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -350,11 +350,9 @@ func ExampleStreamClient_Read() {

latestSeqNum := tail - 1

count := uint64(1)

latestBatch, err := streamClient.Read(context.TODO(), &s2.ReadRequest{
StartSeqNum: latestSeqNum,
Limit: s2.ReadLimit{Count: &count},
Limit: s2.ReadLimit{Count: optr.Some(uint64(1))},
})
if err != nil {
panic(err)
Expand Down
20 changes: 14 additions & 6 deletions s2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand/v2"
"time"

"github.com/s2-streamstore/optr"
"github.com/s2-streamstore/s2-sdk-go/internal/pb"
)

Expand Down Expand Up @@ -60,6 +61,16 @@ func (r *SequencedRecord) MeteredBytes() uint {
return bytes
}

// Metered bytes for a SequencedRecordBatch.
func (b *SequencedRecordBatch) MeteredBytes() uint {
var bytes uint
for i := range len(b.Records) {
bytes += b.Records[i].MeteredBytes()
}

return bytes
}

// A collection of append records that can be sent together in a batch.
type AppendRecordBatch struct {
records []AppendRecord
Expand Down Expand Up @@ -340,12 +351,9 @@ func basinInfoFromProto(pbInfo *pb.BasinInfo) (BasinInfo, error) {
}

func streamInfoFromProto(pbInfo *pb.StreamInfo) StreamInfo {
var deletedAt *time.Time

if pbInfo.DeletedAt != nil {
deletedAtTime := time.Unix(int64(pbInfo.GetDeletedAt()), 0)
deletedAt = &deletedAtTime
}
deletedAt := optr.Map(pbInfo.DeletedAt, func(timestamp uint32) time.Time {
return time.Unix(int64(timestamp), 0)
})

return StreamInfo{
Name: pbInfo.GetName(),
Expand Down

0 comments on commit 52af617

Please sign in to comment.