From 52af617e7c60ea4fc9e5a0fbc0a998e06799c34b Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 21 Jan 2025 23:39:10 +0530 Subject: [PATCH] feat: Use `optr` for optional pointer handling (#29) Also resolves: #28 --------- Signed-off-by: Vaibhav Rabber --- .golangci.yml | 4 ++-- go.mod | 3 ++- go.sum | 2 ++ s2/batching.go | 22 +++++++++------------- s2/client.go | 19 +++++++++++++++++++ s2/client_test.go | 4 ++-- s2/example_test.go | 8 +++----- s2/types.go | 20 ++++++++++++++------ 8 files changed, 53 insertions(+), 29 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index ee1aadb..fbf1274 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -45,7 +45,6 @@ linters: - nosprintfhostport - prealloc - predeclared - - protogetter - reassign - recvcheck - revive @@ -67,7 +66,8 @@ linters-settings: issues: exclude-rules: - - path: s2/example_test.go + - path: _test.go linters: - goconst + - err113 - forcetypeassert diff --git a/go.mod b/go.mod index 6829525..a863ea1 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,10 @@ module github.com/s2-streamstore/s2-sdk-go -go 1.23 +go 1.23.4 require ( github.com/google/uuid v1.6.0 + github.com/s2-streamstore/optr v1.0.0 github.com/stretchr/testify v1.10.0 google.golang.org/grpc v1.69.2 google.golang.org/protobuf v1.36.1 diff --git a/go.sum b/go.sum index 91aa9a7..eab1a46 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/s2-streamstore/optr v1.0.0 h1:OqJpOO2uYsG+iXW9mNTlsCePDvxTS8mh9EtGcnyq8w8= +github.com/s2-streamstore/optr v1.0.0/go.mod h1:x/wSUCzWgpnvWWQG4dZ4JBFW2Kw0pE0yhLGb+0xiN8g= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= diff --git a/s2/batching.go b/s2/batching.go index 0334d8c..7062eff 100644 --- a/s2/batching.go +++ b/s2/batching.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "time" + + "github.com/s2-streamstore/optr" ) // Batching errors. @@ -151,6 +153,8 @@ func appendRecordBatchingWorker( recordsToFlush := newEmptyAppendRecordBatch(config.MaxBatchRecords, config.MaxBatchBytes) + nextMatchSeqNum := optr.Cloned(config.MatchSeqNum) + flush := func() error { if recordsToFlush.IsEmpty() { if peekedRecord != nil { @@ -160,21 +164,9 @@ func appendRecordBatchingWorker( return nil } - var matchSeqNum *uint64 - - if config.MatchSeqNum != nil { - // Copy current match sequence number in another variable so we can - // mutate it later for the up-coming sequence number. Working with - // optionals as pointer in Go is hard. - currentMatchSeqNum := *config.MatchSeqNum - matchSeqNum = ¤tMatchSeqNum - // Update the next matching sequence number. - *config.MatchSeqNum += uint64(recordsToFlush.Len()) - } - input := &AppendInput{ Records: recordsToFlush, - MatchSeqNum: matchSeqNum, + MatchSeqNum: optr.Cloned(nextMatchSeqNum), FencingToken: config.FencingToken, } @@ -182,6 +174,10 @@ func appendRecordBatchingWorker( return err } + nextMatchSeqNum = optr.Map(nextMatchSeqNum, func(m uint64) uint64 { + return m + uint64(recordsToFlush.Len()) + }) + recordsToFlush = newEmptyAppendRecordBatch(config.MaxBatchRecords, config.MaxBatchBytes) if peekedRecord != nil { diff --git a/s2/client.go b/s2/client.go index 340095f..f583d23 100644 --- a/s2/client.go +++ b/s2/client.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/s2-streamstore/optr" "github.com/s2-streamstore/s2-sdk-go/internal/pb" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -589,6 +590,24 @@ func (r *readSessionReceiver) Recv() (ReadOutput, error) { if err == nil { if batch, ok := next.(ReadOutputBatch); ok && len(batch.Records) > 0 { r.ServiceReq.Req.StartSeqNum = 1 + batch.Records[len(batch.Records)-1].SeqNum + + r.ServiceReq.Req.Limit.Bytes = optr.Map(r.ServiceReq.Req.Limit.Bytes, func(b uint64) uint64 { + batchBytes := uint64(batch.SequencedRecordBatch.MeteredBytes()) + if b < batchBytes { + return 0 + } + + return b - batchBytes + }) + + r.ServiceReq.Req.Limit.Count = optr.Map(r.ServiceReq.Req.Limit.Count, func(c uint64) uint64 { + batchSize := uint64(len(batch.Records)) + if c < batchSize { + return 0 + } + + return c - batchSize + }) } return next, nil diff --git a/s2/client_test.go b/s2/client_test.go index 00c90ed..99d9b60 100644 --- a/s2/client_test.go +++ b/s2/client_test.go @@ -40,12 +40,12 @@ func TestSendRetryable(t *testing.T) { testCases := []testCase{ { idLevel: idempotencyLevelNoSideEffects, - sendErr: errors.New("hello"), //nolint:err113 + sendErr: errors.New("hello"), shouldRetry: false, }, { idLevel: idempotencyLevelIdempotent, - sendErr: errors.New("hello"), //nolint:err113 + sendErr: errors.New("hello"), shouldRetry: false, }, { diff --git a/s2/example_test.go b/s2/example_test.go index fc18e87..9a77455 100644 --- a/s2/example_test.go +++ b/s2/example_test.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/s2-streamstore/optr" "github.com/s2-streamstore/s2-sdk-go/s2" ) @@ -133,11 +134,10 @@ func ExampleBasinClient_ListStreams() { } prefix := "my-" - limit := uint64(10) listStreamsResponse, err := basinClient.ListStreams(context.TODO(), &s2.ListStreamsRequest{ Prefix: prefix, - Limit: &limit, + Limit: optr.Some(uint64(10)), }) if err != nil { panic(err) @@ -350,11 +350,9 @@ func ExampleStreamClient_Read() { latestSeqNum := tail - 1 - count := uint64(1) - latestBatch, err := streamClient.Read(context.TODO(), &s2.ReadRequest{ StartSeqNum: latestSeqNum, - Limit: s2.ReadLimit{Count: &count}, + Limit: s2.ReadLimit{Count: optr.Some(uint64(1))}, }) if err != nil { panic(err) diff --git a/s2/types.go b/s2/types.go index 4852de0..43e1fe6 100644 --- a/s2/types.go +++ b/s2/types.go @@ -7,6 +7,7 @@ import ( "math/rand/v2" "time" + "github.com/s2-streamstore/optr" "github.com/s2-streamstore/s2-sdk-go/internal/pb" ) @@ -60,6 +61,16 @@ func (r *SequencedRecord) MeteredBytes() uint { return bytes } +// Metered bytes for a SequencedRecordBatch. +func (b *SequencedRecordBatch) MeteredBytes() uint { + var bytes uint + for i := range len(b.Records) { + bytes += b.Records[i].MeteredBytes() + } + + return bytes +} + // A collection of append records that can be sent together in a batch. type AppendRecordBatch struct { records []AppendRecord @@ -340,12 +351,9 @@ func basinInfoFromProto(pbInfo *pb.BasinInfo) (BasinInfo, error) { } func streamInfoFromProto(pbInfo *pb.StreamInfo) StreamInfo { - var deletedAt *time.Time - - if pbInfo.DeletedAt != nil { - deletedAtTime := time.Unix(int64(pbInfo.GetDeletedAt()), 0) - deletedAt = &deletedAtTime - } + deletedAt := optr.Map(pbInfo.DeletedAt, func(timestamp uint32) time.Time { + return time.Unix(int64(timestamp), 0) + }) return StreamInfo{ Name: pbInfo.GetName(),