Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go routine leak #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ require (
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
github.com/spf13/cobra v1.2.1
github.com/spf13/viper v1.9.0
github.com/stretchr/testify v1.7.2
github.com/stretchr/testify v1.8.0
go.uber.org/atomic v1.9.0
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
google.golang.org/genproto v0.0.0-20220118154757-00ab72f36ad5
Expand Down Expand Up @@ -76,7 +77,7 @@ require (
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/analysis v0.20.1 // indirect
github.com/go-openapi/errors v0.20.1 // indirect
github.com/go-openapi/inflect v0.19.0 // indirect
Expand Down Expand Up @@ -182,8 +183,8 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.23.0 // indirect
k8s.io/component-base v0.23.0 // indirect
k8s.io/klog v0.2.0 // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/klog v1.0.0 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
moul.io/http2curl v1.0.1-0.20190925090545-5cd742060b0e // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
Expand Down
16 changes: 11 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/zapr v1.2.0 h1:n4JnPI1T3Qq1SFEi/F8rwLrZERp2bso19PJZDB9dayk=
github.com/go-logr/zapr v1.2.0/go.mod h1:Qa4Bsj2Vb+FAVeAKsLD8RLQ+YRJB8YDmOAKxaBQf7Ro=
github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI=
Expand Down Expand Up @@ -838,6 +839,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand All @@ -846,8 +848,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
Expand Down Expand Up @@ -938,7 +941,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
Expand Down Expand Up @@ -1497,12 +1501,14 @@ k8s.io/component-base v0.23.0/go.mod h1:DHH5uiFvLC1edCpvcTDV++NKULdYYU6pR9Tt3HIK
k8s.io/gengo v0.0.0-20201203183100-97869a43a9d9/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c h1:GohjlNKauSai7gN4wsJkeZ3WAJx4Sh+oT/b5IYn5suA=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog v0.2.0 h1:0ElL0OHzF3N+OhoJTL0uca20SxtYt4X4+bzHeqrB83c=
k8s.io/klog v0.2.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.30.0 h1:bUO6drIvCIsvZ/XFgfxoGFQU/a4Qkh0iAlvUR7vlHJw=
k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 h1:E3J9oCLlaobFUqsjG9DfKbP2BmgwBL2p7pn0A3dG9W4=
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk=
k8s.io/metrics v0.23.3 h1:rX/RBOwqi0atFlTSlpbQ7CX5s/kfqGR9zEefCx9Rv1s=
Expand Down
7 changes: 6 additions & 1 deletion pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,16 +332,20 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// let's figure out which vertex to send the results to.
// update the toBuffer(s) with writeMessages.
for _, m := range udfResults {
// look for errors in udf processing, if we see even 1 error let's return. handling partial retrying is not worth ATM.
// look for errors in udf processing, if we see even 1 error flag the error boolean and
// NoAck all messages then return. Handling partial retrying is not worth ATM.
if m.udfError != nil {
udfError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, "buffer": isdf.fromBuffer.GetName()}).Inc()
isdf.opts.logger.Errorw("failed to applyUDF", zap.Error(err))
// As there's no partial failure, non-ack all the readOffsets
isdf.fromBuffer.NoAck(ctx, readOffsets)
return
}
// update toBuffers
for _, message := range m.writeMessages {
if err := isdf.whereToStep(message, messageToStep, m.readMessage); err != nil {
isdf.opts.logger.Errorw("failed in whereToStep", zap.Error(err))
isdf.fromBuffer.NoAck(ctx, readOffsets)
return
}
}
Expand All @@ -351,6 +355,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
writeOffsets, err := isdf.writeToBuffers(ctx, messageToStep)
if err != nil {
isdf.opts.logger.Errorw("failed to write to toBuffers", zap.Error(err))
isdf.fromBuffer.NoAck(ctx, readOffsets)
return
}

Expand Down
17 changes: 12 additions & 5 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

"go.uber.org/goleak"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
Expand Down Expand Up @@ -63,6 +65,10 @@ func (t *testForwardFetcher) Close() error {
return nil
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

// GetWatermark uses current time as the watermark because we want to make sure
// the test publisher is publishing watermark
func (t *testForwardFetcher) GetWatermark(_ isb.Offset) wmb.Watermark {
Expand Down Expand Up @@ -1085,10 +1091,6 @@ func TestNewInterStepDataForward_UDFError(t *testing.T) {
toSteps := map[string]isb.BufferWriter{
"to1": to1,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime)

vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{
PipelineName: "testPipeline",
Expand All @@ -1097,16 +1099,21 @@ func TestNewInterStepDataForward_UDFError(t *testing.T) {
},
}}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime)

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardApplyUDFErrTest{}, myForwardApplyUDFErrTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())

stopped := f.Start()
// write some data
_, errs := fromStep.Write(ctx, writeMessages[0:5])
assert.Equal(t, make([]error, 5), errs)

assert.True(t, to1.IsEmpty())

f.Stop()
Expand Down
10 changes: 10 additions & 0 deletions pkg/isb/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type BufferReader interface {
Read(context.Context, int64) ([]*ReadMessage, error)
// Ack acknowledges an array of offset.
Ack(context.Context, []Offset) []error
// NoAck cancel acknowledgement of an array of offset.
NoAck(context.Context, []Offset)
}

// BufferReaderInformation has information regarding the buffer we are reading from.
Expand Down Expand Up @@ -92,6 +94,10 @@ type Offset interface {
// This is often used when the BufferReader can not simply use the offset identifier to ack the message,
// then the work can be done in this function, and call it in BufferReader Ack() function implementation.
AckIt() error
// NoAck to indicate the offset no longer needs to be acknowledged
// It is used when error occur, and we want to reprocess the batch to indicate acknowledgement no
// longer needed.
NoAck()
}

// SimpleStringOffset is an Offset convenient function for implementations without needing AckIt() when offset is a string.
Expand All @@ -109,6 +115,8 @@ func (so SimpleStringOffset) AckIt() error {
return nil
}

func (so SimpleStringOffset) NoAck() {}

// SimpleIntOffset is an Offset convenient function for implementations without needing AckIt() when offset is a int64.
type SimpleIntOffset func() int64

Expand All @@ -123,3 +131,5 @@ func (si SimpleIntOffset) Sequence() (int64, error) {
func (si SimpleIntOffset) AckIt() error {
return nil
}

func (si SimpleIntOffset) NoAck() {}
28 changes: 27 additions & 1 deletion pkg/isb/stores/jetstream/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,23 @@ func (jr *jetStreamReader) Ack(_ context.Context, offsets []isb.Offset) []error
return errs
}

func (jr *jetStreamReader) NoAck(_ context.Context, offsets []isb.Offset) {
done := make(chan struct{})
wg := &sync.WaitGroup{}
for _, o := range offsets {
wg.Add(1)
go func(o isb.Offset) {
defer wg.Done()
o.NoAck()
}(o)
}
go func() {
wg.Wait()
close(done)
}()
<-done
}

// offset implements ID interface for JetStream.
type offset struct {
seq uint64
Expand All @@ -283,7 +300,7 @@ func newOffset(msg *nats.Msg, tickDuration time.Duration, log *zap.SugaredLogger
seq: metadata.Sequence.Stream,
msg: msg,
}
// If tickDuration is 1s, which means ackWait is 1s or 2s, it doesn not make much sense to do it, instead, increasing ackWait is recommended.
// If tickDuration is 1s, which means ackWait is 1s or 2s, it does not make much sense to do it, instead, increasing ackWait is recommended.
if tickDuration.Seconds() > 1 {
ctx, cancel := context.WithCancel(context.Background())
go o.workInProgess(ctx, msg, tickDuration, log)
Expand Down Expand Up @@ -322,6 +339,15 @@ func (o *offset) AckIt() error {
return nil
}

func (o *offset) NoAck() {
if o.cancelFunc != nil {
o.cancelFunc()
}
// Ignore the returned error as the worst case the message will
// take longer to be redelivered.
_ = o.msg.Nak()
}

func (o *offset) Sequence() (int64, error) {
return int64(o.seq), nil
}
14 changes: 12 additions & 2 deletions pkg/isb/stores/jetstream/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"go.uber.org/goleak"

"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"

Expand All @@ -30,6 +32,10 @@ import (
natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestJetStreamBufferRead(t *testing.T) {
s := natstest.RunJetStreamServer(t)
defer natstest.ShutdownJetStreamServer(t, s)
Expand All @@ -40,6 +46,7 @@ func TestJetStreamBufferRead(t *testing.T) {
defaultJetStreamClient := natstest.JetStreamClient(t, s)
conn, err := defaultJetStreamClient.Connect(ctx)
assert.NoError(t, err)
defer conn.Close()
js, err := conn.JetStream()
assert.NoError(t, err)

Expand All @@ -50,6 +57,7 @@ func TestJetStreamBufferRead(t *testing.T) {
bw, err := NewJetStreamBufferWriter(ctx, defaultJetStreamClient, streamName, streamName, streamName)
assert.NoError(t, err)
jw, _ := bw.(*jetStreamWriter)
defer jw.Close()
// Add some data
startTime := time.Unix(1636470000, 0)
messages := testutils.BuildTestWriteMessages(int64(20), startTime)
Expand Down Expand Up @@ -77,6 +85,7 @@ func TestJetStreamBufferRead(t *testing.T) {
assert.NoError(t, err)

fromStep := bufferReader.(*jetStreamReader)
defer fromStep.Close()

readMessages, err := fromStep.Read(ctx, 20)
assert.NoError(t, err)
Expand Down Expand Up @@ -133,6 +142,7 @@ func TestGetName(t *testing.T) {
assert.NoError(t, err)
js, err := conn.JetStream()
assert.NoError(t, err)
defer conn.Close()

streamName := "getName"
addStream(t, js, streamName)
Expand All @@ -142,6 +152,7 @@ func TestGetName(t *testing.T) {
assert.NoError(t, err)
br := bufferReader.(*jetStreamReader)
assert.Equal(t, br.GetName(), streamName)
defer br.Close()

}

Expand All @@ -156,6 +167,7 @@ func TestClose(t *testing.T) {
defaultJetStreamClient := natstest.JetStreamClient(t, s)
conn, err := defaultJetStreamClient.Connect(ctx)
assert.NoError(t, err)
defer conn.Close()
js, err := conn.JetStream()
assert.NoError(t, err)

Expand All @@ -172,8 +184,6 @@ func TestClose(t *testing.T) {
}

func addStream(t *testing.T, js *jsclient.JetStreamContext, streamName string) {
s := natstest.RunJetStreamServer(t)
defer natstest.ShutdownJetStreamServer(t, s)

_, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Expand Down
2 changes: 2 additions & 0 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,5 @@ func (w *writeOffset) Sequence() (int64, error) {
func (w *writeOffset) AckIt() error {
return fmt.Errorf("not supported")
}

func (w *writeOffset) NoAck() {}
Loading