diff --git a/go.mod b/go.mod index e0fb57ed3a..fc4a2f1640 100644 --- a/go.mod +++ b/go.mod @@ -34,8 +34,9 @@ require ( github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 github.com/spf13/cobra v1.2.1 github.com/spf13/viper v1.9.0 - github.com/stretchr/testify v1.7.2 + github.com/stretchr/testify v1.8.0 go.uber.org/atomic v1.9.0 + go.uber.org/goleak v1.2.1 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.19.1 google.golang.org/genproto v0.0.0-20220118154757-00ab72f36ad5 @@ -76,7 +77,7 @@ require ( github.com/felixge/httpsnoop v1.0.2 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/go-logr/logr v1.2.0 // indirect + github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/analysis v0.20.1 // indirect github.com/go-openapi/errors v0.20.1 // indirect github.com/go-openapi/inflect v0.19.0 // indirect @@ -182,8 +183,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.23.0 // indirect k8s.io/component-base v0.23.0 // indirect - k8s.io/klog v0.2.0 // indirect - k8s.io/klog/v2 v2.30.0 // indirect + k8s.io/klog v1.0.0 // indirect + k8s.io/klog/v2 v2.90.1 // indirect moul.io/http2curl v1.0.1-0.20190925090545-5cd742060b0e // indirect sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect diff --git a/go.sum b/go.sum index 596d9fa20f..dac231e33b 100644 --- a/go.sum +++ b/go.sum @@ -233,8 +233,9 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= -github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/zapr v1.2.0 h1:n4JnPI1T3Qq1SFEi/F8rwLrZERp2bso19PJZDB9dayk= github.com/go-logr/zapr v1.2.0/go.mod h1:Qa4Bsj2Vb+FAVeAKsLD8RLQ+YRJB8YDmOAKxaBQf7Ro= github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI= @@ -838,6 +839,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -846,8 +848,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= @@ -938,7 +941,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= @@ -1497,12 +1501,14 @@ k8s.io/component-base v0.23.0/go.mod h1:DHH5uiFvLC1edCpvcTDV++NKULdYYU6pR9Tt3HIK k8s.io/gengo v0.0.0-20201203183100-97869a43a9d9/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c h1:GohjlNKauSai7gN4wsJkeZ3WAJx4Sh+oT/b5IYn5suA= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= -k8s.io/klog v0.2.0 h1:0ElL0OHzF3N+OhoJTL0uca20SxtYt4X4+bzHeqrB83c= k8s.io/klog v0.2.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= +k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= -k8s.io/klog/v2 v2.30.0 h1:bUO6drIvCIsvZ/XFgfxoGFQU/a4Qkh0iAlvUR7vlHJw= k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= +k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 h1:E3J9oCLlaobFUqsjG9DfKbP2BmgwBL2p7pn0A3dG9W4= k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk= k8s.io/metrics v0.23.3 h1:rX/RBOwqi0atFlTSlpbQ7CX5s/kfqGR9zEefCx9Rv1s= diff --git a/pkg/forward/forward.go b/pkg/forward/forward.go index 6e083b32c4..c0b6769409 100644 --- a/pkg/forward/forward.go +++ b/pkg/forward/forward.go @@ -332,16 +332,20 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // let's figure out which vertex to send the results to. // update the toBuffer(s) with writeMessages. for _, m := range udfResults { - // look for errors in udf processing, if we see even 1 error let's return. handling partial retrying is not worth ATM. + // look for errors in udf processing, if we see even 1 error flag the error boolean and + // NoAck all messages then return. Handling partial retrying is not worth ATM. if m.udfError != nil { udfError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, "buffer": isdf.fromBuffer.GetName()}).Inc() isdf.opts.logger.Errorw("failed to applyUDF", zap.Error(err)) + // As there's no partial failure, non-ack all the readOffsets + isdf.fromBuffer.NoAck(ctx, readOffsets) return } // update toBuffers for _, message := range m.writeMessages { if err := isdf.whereToStep(message, messageToStep, m.readMessage); err != nil { isdf.opts.logger.Errorw("failed in whereToStep", zap.Error(err)) + isdf.fromBuffer.NoAck(ctx, readOffsets) return } } @@ -351,6 +355,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { writeOffsets, err := isdf.writeToBuffers(ctx, messageToStep) if err != nil { isdf.opts.logger.Errorw("failed to write to toBuffers", zap.Error(err)) + isdf.fromBuffer.NoAck(ctx, readOffsets) return } diff --git a/pkg/forward/forward_test.go b/pkg/forward/forward_test.go index 7aebaf4c7a..0ebface65d 100644 --- a/pkg/forward/forward_test.go +++ b/pkg/forward/forward_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "go.uber.org/goleak" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" @@ -63,6 +65,10 @@ func (t *testForwardFetcher) Close() error { return nil } +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + // GetWatermark uses current time as the watermark because we want to make sure // the test publisher is publishing watermark func (t *testForwardFetcher) GetWatermark(_ isb.Offset) wmb.Watermark { @@ -1085,10 +1091,6 @@ func TestNewInterStepDataForward_UDFError(t *testing.T) { toSteps := map[string]isb.BufferWriter{ "to1": to1, } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - - writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime) vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ PipelineName: "testPipeline", @@ -1097,16 +1099,21 @@ func TestNewInterStepDataForward_UDFError(t *testing.T) { }, }} + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime) + fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps) f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyUDFErrTest{}, myForwardApplyUDFErrTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2)) assert.NoError(t, err) + assert.False(t, to1.IsFull()) assert.True(t, to1.IsEmpty()) stopped := f.Start() // write some data _, errs := fromStep.Write(ctx, writeMessages[0:5]) assert.Equal(t, make([]error, 5), errs) - assert.True(t, to1.IsEmpty()) f.Stop() diff --git a/pkg/isb/interfaces.go b/pkg/isb/interfaces.go index 20a680f173..2391c9df80 100644 --- a/pkg/isb/interfaces.go +++ b/pkg/isb/interfaces.go @@ -65,6 +65,8 @@ type BufferReader interface { Read(context.Context, int64) ([]*ReadMessage, error) // Ack acknowledges an array of offset. Ack(context.Context, []Offset) []error + // NoAck cancel acknowledgement of an array of offset. + NoAck(context.Context, []Offset) } // BufferReaderInformation has information regarding the buffer we are reading from. @@ -92,6 +94,10 @@ type Offset interface { // This is often used when the BufferReader can not simply use the offset identifier to ack the message, // then the work can be done in this function, and call it in BufferReader Ack() function implementation. AckIt() error + // NoAck to indicate the offset no longer needs to be acknowledged + // It is used when error occur, and we want to reprocess the batch to indicate acknowledgement no + // longer needed. + NoAck() } // SimpleStringOffset is an Offset convenient function for implementations without needing AckIt() when offset is a string. @@ -109,6 +115,8 @@ func (so SimpleStringOffset) AckIt() error { return nil } +func (so SimpleStringOffset) NoAck() {} + // SimpleIntOffset is an Offset convenient function for implementations without needing AckIt() when offset is a int64. type SimpleIntOffset func() int64 @@ -123,3 +131,5 @@ func (si SimpleIntOffset) Sequence() (int64, error) { func (si SimpleIntOffset) AckIt() error { return nil } + +func (si SimpleIntOffset) NoAck() {} diff --git a/pkg/isb/stores/jetstream/reader.go b/pkg/isb/stores/jetstream/reader.go index 8442155648..a46e08a861 100644 --- a/pkg/isb/stores/jetstream/reader.go +++ b/pkg/isb/stores/jetstream/reader.go @@ -270,6 +270,23 @@ func (jr *jetStreamReader) Ack(_ context.Context, offsets []isb.Offset) []error return errs } +func (jr *jetStreamReader) NoAck(_ context.Context, offsets []isb.Offset) { + done := make(chan struct{}) + wg := &sync.WaitGroup{} + for _, o := range offsets { + wg.Add(1) + go func(o isb.Offset) { + defer wg.Done() + o.NoAck() + }(o) + } + go func() { + wg.Wait() + close(done) + }() + <-done +} + // offset implements ID interface for JetStream. type offset struct { seq uint64 @@ -283,7 +300,7 @@ func newOffset(msg *nats.Msg, tickDuration time.Duration, log *zap.SugaredLogger seq: metadata.Sequence.Stream, msg: msg, } - // If tickDuration is 1s, which means ackWait is 1s or 2s, it doesn not make much sense to do it, instead, increasing ackWait is recommended. + // If tickDuration is 1s, which means ackWait is 1s or 2s, it does not make much sense to do it, instead, increasing ackWait is recommended. if tickDuration.Seconds() > 1 { ctx, cancel := context.WithCancel(context.Background()) go o.workInProgess(ctx, msg, tickDuration, log) @@ -322,6 +339,15 @@ func (o *offset) AckIt() error { return nil } +func (o *offset) NoAck() { + if o.cancelFunc != nil { + o.cancelFunc() + } + // Ignore the returned error as the worst case the message will + // take longer to be redelivered. + _ = o.msg.Nak() +} + func (o *offset) Sequence() (int64, error) { return int64(o.seq), nil } diff --git a/pkg/isb/stores/jetstream/reader_test.go b/pkg/isb/stores/jetstream/reader_test.go index 4af1a35edd..97589983ae 100644 --- a/pkg/isb/stores/jetstream/reader_test.go +++ b/pkg/isb/stores/jetstream/reader_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "go.uber.org/goleak" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" @@ -30,6 +32,10 @@ import ( natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test" ) +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + func TestJetStreamBufferRead(t *testing.T) { s := natstest.RunJetStreamServer(t) defer natstest.ShutdownJetStreamServer(t, s) @@ -40,6 +46,7 @@ func TestJetStreamBufferRead(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) conn, err := defaultJetStreamClient.Connect(ctx) assert.NoError(t, err) + defer conn.Close() js, err := conn.JetStream() assert.NoError(t, err) @@ -50,6 +57,7 @@ func TestJetStreamBufferRead(t *testing.T) { bw, err := NewJetStreamBufferWriter(ctx, defaultJetStreamClient, streamName, streamName, streamName) assert.NoError(t, err) jw, _ := bw.(*jetStreamWriter) + defer jw.Close() // Add some data startTime := time.Unix(1636470000, 0) messages := testutils.BuildTestWriteMessages(int64(20), startTime) @@ -77,6 +85,7 @@ func TestJetStreamBufferRead(t *testing.T) { assert.NoError(t, err) fromStep := bufferReader.(*jetStreamReader) + defer fromStep.Close() readMessages, err := fromStep.Read(ctx, 20) assert.NoError(t, err) @@ -133,6 +142,7 @@ func TestGetName(t *testing.T) { assert.NoError(t, err) js, err := conn.JetStream() assert.NoError(t, err) + defer conn.Close() streamName := "getName" addStream(t, js, streamName) @@ -142,6 +152,7 @@ func TestGetName(t *testing.T) { assert.NoError(t, err) br := bufferReader.(*jetStreamReader) assert.Equal(t, br.GetName(), streamName) + defer br.Close() } @@ -156,6 +167,7 @@ func TestClose(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) conn, err := defaultJetStreamClient.Connect(ctx) assert.NoError(t, err) + defer conn.Close() js, err := conn.JetStream() assert.NoError(t, err) @@ -172,8 +184,6 @@ func TestClose(t *testing.T) { } func addStream(t *testing.T, js *jsclient.JetStreamContext, streamName string) { - s := natstest.RunJetStreamServer(t) - defer natstest.ShutdownJetStreamServer(t, s) _, err := js.AddStream(&nats.StreamConfig{ Name: streamName, diff --git a/pkg/isb/stores/jetstream/writer.go b/pkg/isb/stores/jetstream/writer.go index 2f09eb8036..fd0f618d08 100644 --- a/pkg/isb/stores/jetstream/writer.go +++ b/pkg/isb/stores/jetstream/writer.go @@ -327,3 +327,5 @@ func (w *writeOffset) Sequence() (int64, error) { func (w *writeOffset) AckIt() error { return fmt.Errorf("not supported") } + +func (w *writeOffset) NoAck() {} diff --git a/pkg/isb/stores/jetstream/writer_test.go b/pkg/isb/stores/jetstream/writer_test.go index 04c3a47708..a16f780f2a 100644 --- a/pkg/isb/stores/jetstream/writer_test.go +++ b/pkg/isb/stores/jetstream/writer_test.go @@ -53,6 +53,7 @@ func TestForwarderJetStreamBuffer(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) conn, err := defaultJetStreamClient.Connect(ctx) assert.NoError(t, err) + defer conn.Close() js, err := conn.JetStream() assert.NoError(t, err) @@ -67,6 +68,7 @@ func TestForwarderJetStreamBuffer(t *testing.T) { bw, err := NewJetStreamBufferWriter(ctx, defaultJetStreamClient, streamName, streamName, streamName, WithMaxLength(10)) assert.NoError(t, err) jw, _ := bw.(*jetStreamWriter) + defer jw.Close() // Add some data startTime := time.Unix(1636470000, 0) messages := testutils.BuildTestWriteMessages(int64(10), startTime) @@ -103,13 +105,13 @@ func TestForwarderJetStreamBuffer(t *testing.T) { // Forwarder logic tested here with a jetstream read and write bufferReader, err := NewJetStreamBufferReader(ctx, defaultJetStreamClient, streamName, streamName, streamName) assert.NoError(t, err) - fromStep, _ := bufferReader.(*jetStreamReader) + defer fromStep.Close() bufferWriter, err := NewJetStreamBufferWriter(ctx, defaultJetStreamClient, toStreamName, toStreamName, toStreamName, WithMaxLength(10)) assert.NoError(t, err) - to1 := bufferWriter.(*jetStreamWriter) + defer to1.Close() toSteps := map[string]isb.BufferWriter{ "to1": to1, } @@ -128,7 +130,7 @@ func TestForwarderJetStreamBuffer(t *testing.T) { for streamInfo.State.Msgs != 10 { streamInfo, _ = to1js.StreamInfo(toStreamName) if streamInfo.State.Msgs == 10 { - return + break } select { case <-ctx.Done(): @@ -146,7 +148,7 @@ func TestForwarderJetStreamBuffer(t *testing.T) { assert.NoError(t, err) fromStepInfo, err := fromStepJs.StreamInfo(streamName) assert.NoError(t, err) - // Make sure all messages are cleared up from from buffer as DiscardOldPolicy is false + // Make sure all messages are cleared up from buffer as DiscardOldPolicy is false assert.Equal(t, uint64(0), fromStepInfo.State.Msgs) // Call stop to end the test as we have a blocking read. The forwarder is up and running with no messages written @@ -166,6 +168,7 @@ func TestJetStreamBufferWriterBufferFull(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) conn, err := defaultJetStreamClient.Connect(ctx) assert.NoError(t, err) + defer conn.Close() js, err := conn.JetStream() assert.NoError(t, err) @@ -176,6 +179,7 @@ func TestJetStreamBufferWriterBufferFull(t *testing.T) { bw, err := NewJetStreamBufferWriter(ctx, defaultJetStreamClient, streamName, streamName, streamName, WithMaxLength(10), WithBufferUsageLimit(0.2)) assert.NoError(t, err) jw, _ := bw.(*jetStreamWriter) + defer jw.Close() timeout := time.After(10 * time.Second) for jw.isFull.Load() { select { @@ -223,6 +227,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) conn, err := defaultJetStreamClient.Connect(ctx) assert.NoError(t, err) + defer conn.Close() js, err := conn.JetStream() assert.NoError(t, err) @@ -233,6 +238,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) { bw, err := NewJetStreamBufferWriter(ctx, defaultJetStreamClient, streamName, streamName, streamName, WithMaxLength(10), WithBufferUsageLimit(0.2), WithBufferFullWritingStrategy(dfv1.DiscardLatest)) assert.NoError(t, err) jw, _ := bw.(*jetStreamWriter) + defer jw.Close() timeout := time.After(10 * time.Second) for jw.isFull.Load() { select { @@ -279,6 +285,7 @@ func TestWriteGetName(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) conn, err := defaultJetStreamClient.Connect(ctx) assert.NoError(t, err) + defer conn.Close() js, err := conn.JetStream() assert.NoError(t, err) @@ -290,6 +297,7 @@ func TestWriteGetName(t *testing.T) { assert.NoError(t, err) bw := bufferWriter.(*jetStreamReader) + defer bw.Close() assert.Equal(t, bw.GetName(), streamName) } @@ -305,6 +313,7 @@ func TestWriteClose(t *testing.T) { defaultJetStreamClient := natstest.JetStreamClient(t, s) conn, err := defaultJetStreamClient.Connect(ctx) assert.NoError(t, err) + defer conn.Close() js, err := conn.JetStream() assert.NoError(t, err) @@ -316,6 +325,6 @@ func TestWriteClose(t *testing.T) { assert.NoError(t, err) bw := bufferWriter.(*jetStreamWriter) + defer bw.Close() assert.NoError(t, bw.Close()) - } diff --git a/pkg/isb/stores/redis/read.go b/pkg/isb/stores/redis/read.go index 7527240171..e6774737e1 100644 --- a/pkg/isb/stores/redis/read.go +++ b/pkg/isb/stores/redis/read.go @@ -261,6 +261,7 @@ func (br *BufferRead) setError(errMsg string, err error) { br.BufferReadInfo.refreshEmptyError.Inc() br.setIsEmptyFlag(false) } + func (br *BufferRead) Pending(_ context.Context) (int64, error) { // TODO: not implemented return isb.PendingNotAvailable, nil diff --git a/pkg/isb/stores/simplebuffer/buffer.go b/pkg/isb/stores/simplebuffer/buffer.go index dd147cf5d8..c409724c0f 100644 --- a/pkg/isb/stores/simplebuffer/buffer.go +++ b/pkg/isb/stores/simplebuffer/buffer.go @@ -264,6 +264,8 @@ func (b *InMemoryBuffer) Ack(_ context.Context, offsets []isb.Offset) []error { return errs } +func (b *InMemoryBuffer) NoAck(_ context.Context, _ []isb.Offset) {} + // GetMessages gets the first num messages in the in mem buffer // this function is for testing purpose func (b *InMemoryBuffer) GetMessages(num int) []*isb.Message { diff --git a/pkg/shared/clients/redis/redis_reader.go b/pkg/shared/clients/redis/redis_reader.go index 7de0aa6303..339998117e 100644 --- a/pkg/shared/clients/redis/redis_reader.go +++ b/pkg/shared/clients/redis/redis_reader.go @@ -137,6 +137,8 @@ func (br *RedisStreamsRead) Ack(_ context.Context, offsets []isb.Offset) []error return errs } +func (br *RedisStreamsRead) NoAck(_ context.Context, _ []isb.Offset) {} + // processXReadResult is used to process the results of XREADGROUP func (br *RedisStreamsRead) processXReadResult(startIndex string, count int64) ([]redis.XStream, error) { result := br.Client.XReadGroup(RedisContext, &redis.XReadGroupArgs{ diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index 6a8fcd9b33..8554a7b35b 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -284,6 +284,8 @@ func (mg *memgen) Ack(_ context.Context, offsets []isb.Offset) []error { return make([]error, len(offsets)) } +func (mg *memgen) NoAck(_ context.Context, _ []isb.Offset) {} + func (mg *memgen) Close() error { if err := mg.sourcePublishWM.Close(); err != nil { mg.logger.Errorw("Failed to close source vertex watermark publisher", zap.Error(err)) diff --git a/pkg/sources/http/http.go b/pkg/sources/http/http.go index 729c575dcd..64c2c50a08 100644 --- a/pkg/sources/http/http.go +++ b/pkg/sources/http/http.go @@ -255,6 +255,8 @@ func (h *httpSource) Ack(_ context.Context, offsets []isb.Offset) []error { return make([]error, len(offsets)) } +func (h *httpSource) NoAck(_ context.Context, _ []isb.Offset) {} + func (h *httpSource) Close() error { h.logger.Info("Shutting down http source server...") h.cancelFunc() diff --git a/pkg/sources/kafka/reader.go b/pkg/sources/kafka/reader.go index 54937910c8..aeba706260 100644 --- a/pkg/sources/kafka/reader.go +++ b/pkg/sources/kafka/reader.go @@ -193,6 +193,8 @@ func (r *KafkaSource) Ack(_ context.Context, offsets []isb.Offset) []error { return make([]error, len(offsets)) } +func (r *KafkaSource) NoAck(_ context.Context, _ []isb.Offset) {} + func (r *KafkaSource) Start() <-chan struct{} { go r.startConsumer() // wait for the consumer to setup. diff --git a/pkg/sources/nats/nats.go b/pkg/sources/nats/nats.go index a18d92f7bc..d381b95a00 100644 --- a/pkg/sources/nats/nats.go +++ b/pkg/sources/nats/nats.go @@ -252,6 +252,8 @@ func (ns *natsSource) Ack(_ context.Context, offsets []isb.Offset) []error { return make([]error, len(offsets)) } +func (ns *natsSource) NoAck(_ context.Context, _ []isb.Offset) {} + func (ns *natsSource) Close() error { ns.logger.Info("Shutting down nats source server...") ns.cancelfn()