From d1aaa024c67d120d8dc4b458b6b24c35cd9fb1d2 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 13 Jan 2025 17:11:24 -0500 Subject: [PATCH] . Signed-off-by: Keran Yang --- pkg/sources/generator/tickgen.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index 0c32f049c..a922e6557 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -198,12 +198,9 @@ loop: // we implement Read With Wait semantics select { case r, ok := <-mg.srcChan: - // when the channel is closed and empty, go will still read from the channel once and return the zero value. - // exclude the zero value from being passed to the next vertex, - // ensuring all messages produced by generator follow the same data schema. if !ok { - mg.logger.Info("Zero value read from the generator channel, skipping...") - continue + mg.logger.Info("All the messages have been read. returning.") + break loop } msgs = append(msgs, mg.newReadMessage(r.key, r.data, r.offset, r.ts)) case <-timeout: