diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index e2e96fec9..a922e6557 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -197,7 +197,11 @@ loop: // since the Read call is blocking, and runs in an infinite loop, // we implement Read With Wait semantics select { - case r := <-mg.srcChan: + case r, ok := <-mg.srcChan: + if !ok { + mg.logger.Info("All the messages have been read. returning.") + break loop + } msgs = append(msgs, mg.newReadMessage(r.key, r.data, r.offset, r.ts)) case <-timeout: break loop