From 324aec01f95771211aaa4641bd970035bdfdceea Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 13 Jan 2025 16:15:35 -0500 Subject: [PATCH] address comments Signed-off-by: Keran Yang --- pkg/shared/util/struct.go | 15 ------------ pkg/shared/util/struct_test.go | 41 -------------------------------- pkg/sources/generator/tickgen.go | 5 ++-- 3 files changed, 2 insertions(+), 59 deletions(-) delete mode 100644 pkg/shared/util/struct.go delete mode 100644 pkg/shared/util/struct_test.go diff --git a/pkg/shared/util/struct.go b/pkg/shared/util/struct.go deleted file mode 100644 index 9c2098788..000000000 --- a/pkg/shared/util/struct.go +++ /dev/null @@ -1,15 +0,0 @@ -package util - -import "reflect" - -// IsZeroStruct checks if a struct is zero value. -func IsZeroStruct(v interface{}) bool { - val := reflect.ValueOf(v) - for i := 0; i < val.NumField(); i++ { - field := val.Field(i) - if !field.IsZero() { - return false - } - } - return true -} diff --git a/pkg/shared/util/struct_test.go b/pkg/shared/util/struct_test.go deleted file mode 100644 index d9e984246..000000000 --- a/pkg/shared/util/struct_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package util - -import "testing" - -func TestIsZeroStruct(t *testing.T) { - type args struct { - v interface{} - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "zero_struct", - args: args{v: struct { - Name string - Age int - }{}}, - want: true, - }, - { - name: "non_zero_struct", - args: args{v: struct { - Name string - Age int - }{ - Name: "John", - Age: 25, - }}, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := IsZeroStruct(tt.args.v); got != tt.want { - t.Errorf("IsZeroStruct() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index 8a897c8c9..0c32f049c 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -32,7 +32,6 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/shared/logging" - "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/sources/sourcer" ) @@ -198,11 +197,11 @@ loop: // since the Read call is blocking, and runs in an infinite loop, // we implement Read With Wait semantics select { - case r := <-mg.srcChan: + case r, ok := <-mg.srcChan: // when the channel is closed and empty, go will still read from the channel once and return the zero value. // exclude the zero value from being passed to the next vertex, // ensuring all messages produced by generator follow the same data schema. - if util.IsZeroStruct(r) { + if !ok { mg.logger.Info("Zero value read from the generator channel, skipping...") continue }