From 0ff8111a46afe7b85e157ae17fea5203411e3cdd Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 28 Jan 2025 19:52:10 +0530 Subject: [PATCH] fix(bento): Handle acks without restarting the read session (#39) --- go.mod | 12 +- go.sum | 44 ++--- s2-bentobox/bentobox.go | 7 - s2-bentobox/input.go | 364 ++++++++++++++++++++++++++++++---------- 4 files changed, 302 insertions(+), 125 deletions(-) diff --git a/go.mod b/go.mod index a863ea1..5313724 100644 --- a/go.mod +++ b/go.mod @@ -4,18 +4,20 @@ go 1.23.4 require ( github.com/google/uuid v1.6.0 + github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/s2-streamstore/optr v1.0.0 github.com/stretchr/testify v1.10.0 - google.golang.org/grpc v1.69.2 - google.golang.org/protobuf v1.36.1 + github.com/tidwall/btree v1.7.0 + google.golang.org/grpc v1.70.0 + google.golang.org/protobuf v1.36.4 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/sys v0.28.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index eab1a46..38ab5b7 100644 --- a/go.sum +++ b/go.sum @@ -10,34 +10,38 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/s2-streamstore/optr v1.0.0 h1:OqJpOO2uYsG+iXW9mNTlsCePDvxTS8mh9EtGcnyq8w8= github.com/s2-streamstore/optr v1.0.0/go.mod h1:x/wSUCzWgpnvWWQG4dZ4JBFW2Kw0pE0yhLGb+0xiN8g= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= -go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= +github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 h1:X58yt85/IXCx0Y3ZwN6sEIKZzQtDEYaBWrDvErdXrRE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= -google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= -google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/s2-bentobox/bentobox.go b/s2-bentobox/bentobox.go index 10fe561..461ac9b 100644 --- a/s2-bentobox/bentobox.go +++ b/s2-bentobox/bentobox.go @@ -35,13 +35,6 @@ func waitForClosers(ctx context.Context, closers ...<-chan struct{}) error { return nil } -func notifyOnce(ch chan<- struct{}) { - select { - case ch <- struct{}{}: - default: - } -} - type Logger interface { Tracef(template string, args ...any) Trace(message string) diff --git a/s2-bentobox/input.go b/s2-bentobox/input.go index ec0f201..28ce064 100644 --- a/s2-bentobox/input.go +++ b/s2-bentobox/input.go @@ -3,9 +3,13 @@ package s2bentobox import ( "context" "errors" + "sync" "time" + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/s2-streamstore/optr" "github.com/s2-streamstore/s2-sdk-go/s2" + "github.com/tidwall/btree" ) var ErrInputClosed = errors.New("input closed") @@ -15,6 +19,46 @@ type SeqNumCache interface { Set(ctx context.Context, stream string, seqNum uint64) error } +type seqNumCache struct { + inner SeqNumCache + // We don't trust the user to provide a valid cache so we have our own layer + // on top of the one provided by the user. + mem cmap.ConcurrentMap[string, uint64] +} + +func newSeqNumCache(inner SeqNumCache) *seqNumCache { + return &seqNumCache{ + inner: inner, + mem: cmap.New[uint64](), + } +} + +func (s *seqNumCache) Get(ctx context.Context, stream string) (uint64, error) { + seqNum, ok := s.mem.Get(stream) + if ok { + return seqNum, nil + } + + cached, err := s.inner.Get(ctx, stream) + if err != nil { + return 0, err + } + + s.mem.Set(stream, cached) + + return cached, nil +} + +func (s *seqNumCache) Set(ctx context.Context, stream string, seqNum uint64) error { + if err := s.inner.Set(ctx, stream, seqNum); err != nil { + return err + } + + s.mem.Set(stream, seqNum) + + return nil +} + type InputStreams interface { list(ctx context.Context, client *s2.BasinClient) ([]string, error) } @@ -62,11 +106,11 @@ func (p PrefixedInputStreams) list(ctx context.Context, client *s2.BasinClient) type InputConfig struct { *Config - Streams InputStreams - MaxInFlight int - UpdateListInterval time.Duration - Cache SeqNumCache - Logger Logger + Streams InputStreams + MaxInFlight int + UpdateStreamsInterval time.Duration + Cache SeqNumCache + Logger Logger } type recvOutput struct { @@ -77,10 +121,14 @@ type recvOutput struct { type Input struct { config *InputConfig - recvCh <-chan recvOutput + inputStream chan recvOutput streamsManagerCloser <-chan struct{} cancelSession context.CancelFunc - closeWorker chan<- string + + cache *seqNumCache + + ackMu sync.Mutex + nacks map[string]*btree.Set[uint64] } func ConnectInput(ctx context.Context, config *InputConfig) (*Input, error) { @@ -92,25 +140,28 @@ func ConnectInput(ctx context.Context, config *InputConfig) (*Input, error) { sessionCtx, cancelSession := context.WithCancel(ctx) streamsManagerCloser := make(chan struct{}) - closeWorker := make(chan string, config.MaxInFlight) - recvCh := make(chan recvOutput, config.MaxInFlight) + // Twice the inflight requests since we can have this blocked by nacks otherwise. + // TODO: Maybe this number can be improved somehow? + inputStream := make(chan recvOutput, config.MaxInFlight*2) + + cache := newSeqNumCache(config.Cache) go streamsManagerWorker( sessionCtx, client, config, - config.UpdateListInterval, - recvCh, - closeWorker, + cache, + inputStream, streamsManagerCloser, ) return &Input{ config: config, - recvCh: recvCh, + inputStream: inputStream, streamsManagerCloser: streamsManagerCloser, cancelSession: cancelSession, - closeWorker: closeWorker, + cache: cache, + nacks: make(map[string]*btree.Set[uint64]), }, nil } @@ -119,15 +170,6 @@ type streamWorker struct { closer <-chan struct{} } -func (sw *streamWorker) IsClosed() bool { - select { - case <-sw.closer: - return true - default: - return false - } -} - func (sw *streamWorker) Close() { sw.cancel() } @@ -140,23 +182,47 @@ func streamsManagerWorker( ctx context.Context, client *s2.BasinClient, config *InputConfig, - updateListInterval time.Duration, - recvCh chan<- recvOutput, - closeWorker <-chan string, + cache *seqNumCache, + inputStream chan<- recvOutput, streamsManagerCloser chan<- struct{}, ) { defer close(streamsManagerCloser) + updateStreamsInterval := time.Minute + if config.UpdateStreamsInterval != 0 { + updateStreamsInterval = config.UpdateStreamsInterval + } + var ( existingWorkers = make(map[string]streamWorker) - ticker = time.NewTicker(updateListInterval) - exitNotifier = make(chan struct{}, 1) + ticker = time.NewTicker(updateStreamsInterval) updateListNotifier = make(chan struct{}, 1) ) defer ticker.Stop() - notifyOnce(updateListNotifier) + // Fetch the list once immediately at startup. + updateListNotifier <- struct{}{} + + spawnWorker := func(stream string) { + workerCtx, cancelWorker := context.WithCancel(ctx) + workerCloser := make(chan struct{}) + worker := streamWorker{ + cancel: cancelWorker, + closer: workerCloser, + } + existingWorkers[stream] = worker + + go receiverWorker( + workerCtx, + client, + config, + cache, + stream, + inputStream, + workerCloser, + ) + } outerLoop: for { @@ -164,30 +230,27 @@ outerLoop: case <-ctx.Done(): break outerLoop - case <-exitNotifier: - for stream, worker := range existingWorkers { - if worker.IsClosed() { - delete(existingWorkers, stream) - // Maybe the worker exited due to an unexpected error. Let's list immediately. - notifyOnce(updateListNotifier) - } - } - - case stream := <-closeWorker: - if worker, found := existingWorkers[stream]; found { - worker.Close() - } - case <-updateListNotifier: case <-ticker.C: } + // Clear the update list notifier. + select { + case <-updateListNotifier: + default: + } + newStreams, err := config.Streams.list(ctx, client) if err != nil { config.Logger. With("error", err). - Warn("Failed to list streams") - notifyOnce(updateListNotifier) + Error("Failed to list streams") + + // Try updating the update list notifier. We need a retry here. + select { + case updateListNotifier <- struct{}{}: + default: + } continue } @@ -197,14 +260,7 @@ outerLoop: newStreamsSet[stream] = struct{}{} if _, found := existingWorkers[stream]; !found { - workerCtx, cancelWorker := context.WithCancel(ctx) - workerCloser := make(chan struct{}) - worker := streamWorker{ - cancel: cancelWorker, - closer: workerCloser, - } - existingWorkers[stream] = worker - go receiverWorker(workerCtx, client, config, stream, recvCh, workerCloser, exitNotifier) + spawnWorker(stream) } } @@ -229,67 +285,119 @@ func receiverWorker( ctx context.Context, client *s2.BasinClient, config *InputConfig, + cache *seqNumCache, stream string, recvCh chan<- recvOutput, closer chan<- struct{}, - exitNotifier chan<- struct{}, ) { - defer notifyOnce(exitNotifier) defer close(closer) - startSeqNum, err := config.Cache.Get(ctx, stream) - if err != nil { - config.Logger. - With( - "error", err, - "stream", stream, - ). - Warn("Failed to get last sequence number from cache. Setting start seq num to 0") - - startSeqNum = 0 - } - - receiver, err := client.StreamClient(stream).ReadSession(ctx, &s2.ReadSessionRequest{ - StartSeqNum: startSeqNum, - }) - if err != nil { - config.Logger. - With( - "error", err, - "stream", stream, - ). - Error("Failed to initialize receiver") - - return - } - config.Logger.With("stream", stream).Info("Reading from S2 source") defer config.Logger.With("stream", stream).Debug("Exiting S2 source worker") + var ( + receiver s2.Receiver[s2.ReadOutput] + backoff time.Duration + startSeqNumOpt *uint64 + ) + + initReceiver := func() error { + select { + case <-time.After(backoff): + backoff = time.Second + + case <-ctx.Done(): + return ctx.Err() + } + + startSeqNum := optr.UnwrapOr(startSeqNumOpt, func() uint64 { + startSeqNum, err := cache.Get(ctx, stream) + if err != nil { + config.Logger. + With( + "error", err, + "stream", stream, + ). + Warn("Failed to get last sequence number from cache") + + // Set it to 0 and let the loop try and get the latest sequence number. + return 0 + } + + return startSeqNum + }) + + var err error + + receiver, err = client.StreamClient(stream).ReadSession(ctx, &s2.ReadSessionRequest{ + StartSeqNum: startSeqNum, + }) + if err != nil { + return err + } + + // Reset so we can hit cache the next time unless otherwise asked to. + startSeqNumOpt = nil + + return nil + } + for { + select { + case <-ctx.Done(): + return + default: + } + + if receiver == nil { + if err := initReceiver(); err != nil { + config.Logger. + With( + "error", err, + "stream", stream, + ). + Error("Failed to initialize receiver") + + continue + } + } + output, err := receiver.Recv() if err != nil { select { case recvCh <- recvOutput{Stream: stream, Err: err}: + // Reset the receiver. + receiver = nil + + continue case <-ctx.Done(): + return } - - return } - if batch, ok := output.(s2.ReadOutputBatch); ok { + switch o := output.(type) { + case s2.ReadOutputBatch: select { - case recvCh <- recvOutput{Stream: stream, Batch: batch.SequencedRecordBatch}: + case recvCh <- recvOutput{Stream: stream, Batch: o.SequencedRecordBatch}: + // OK case <-ctx.Done(): return } + + case s2.ReadOutputFirstSeqNum: + startSeqNumOpt = optr.Some(uint64(o)) + receiver = nil + + case s2.ReadOutputNextSeqNum: + startSeqNumOpt = optr.Some(uint64(o)) + receiver = nil } } } func (i *Input) ReadBatch(ctx context.Context) (*s2.SequencedRecordBatch, string, error) { select { - case output := <-i.recvCh: + case output := <-i.inputStream: if output.Err != nil { return nil, output.Stream, output.Err } @@ -310,17 +418,87 @@ func (i *Input) ReadBatch(ctx context.Context) (*s2.SequencedRecordBatch, string func (i *Input) AckFunc(stream string, batch *s2.SequencedRecordBatch) func(context.Context, error) error { return func(ctx context.Context, err error) error { - if err == nil && batch != nil && len(batch.Records) > 0 { - // Update the cache with the last sequence number - lastSeqNum := batch.Records[len(batch.Records)-1].SeqNum + i.config.Logger.With("stream", stream).Debug("Acknowledging batch") - return i.config.Cache.Set(ctx, stream, lastSeqNum) + if len(batch.Records) == 0 { + // What is even being acked? + return nil } - // Close a specific worker and let it restart. - i.closeWorker <- stream + if err == nil { + // Set the cache if nacks are empty. + return func() error { + i.ackMu.Lock() + defer i.ackMu.Unlock() - return nil + streamNacks, ok := i.nacks[stream] + if ok && streamNacks.Len() > 0 { + // If smallest stream nack > this batch seq num: + // then -> we've already considered this acknowledged. + // else -> this message might have been received again or can be. + streamNacks.Delete(batch.Records[0].SeqNum) + + return nil + } + + // Ack this message by storing in the cache. + lastSeqNum := batch.Records[len(batch.Records)-1].SeqNum + nextSeqNum := lastSeqNum + 1 + + // Only cache the value if it's greater than the value that's already cached, + // otherwise this could result in duplication of messages. + // + // Duplication is not an error but we can try our best not to. + cachedSeqNum, cErr := i.cache.Get(ctx, stream) + if cErr != nil || cachedSeqNum < nextSeqNum { + return i.cache.Set(ctx, stream, nextSeqNum) + } + + return nil + }() + } + + if cErr := func() error { + i.ackMu.Lock() + defer i.ackMu.Unlock() + + streamNacks, ok := i.nacks[stream] + if !ok { + streamNacks = &btree.Set[uint64]{} + i.nacks[stream] = streamNacks + } + + nackSeqNum := batch.Records[0].SeqNum + + streamNacks.Insert(nackSeqNum) + + // Since it's already in the stream nacks, check if it's the minima. + if minSeqNum, ok := streamNacks.Min(); ok && minSeqNum == nackSeqNum { + // Update the cache since it's the minima. + return i.cache.Set(ctx, stream, nackSeqNum) + } + + return nil + }(); cErr != nil { + return cErr + } + + // Send the batch again to be acknowledged. + select { + case i.inputStream <- recvOutput{ + Stream: stream, + Batch: batch, + }: + return nil + + // Ack functions can also be called during closures. + // Error if the input has closed. + case <-i.streamsManagerCloser: + return context.Canceled + + case <-ctx.Done(): + return ctx.Err() + } } }