From 88ab707d5c95e3be0a2e8a220a2d81b2da731374 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Sun, 12 Jan 2025 12:19:21 -0500 Subject: [PATCH 1/3] chore: update generator to skip publishing empty struct during termination Signed-off-by: Keran Yang --- pkg/shared/util/struct.go | 15 ++++++++++++ pkg/shared/util/struct_test.go | 41 ++++++++++++++++++++++++++++++++ pkg/sources/generator/tickgen.go | 8 +++++++ 3 files changed, 64 insertions(+) create mode 100644 pkg/shared/util/struct.go create mode 100644 pkg/shared/util/struct_test.go diff --git a/pkg/shared/util/struct.go b/pkg/shared/util/struct.go new file mode 100644 index 000000000..9c2098788 --- /dev/null +++ b/pkg/shared/util/struct.go @@ -0,0 +1,15 @@ +package util + +import "reflect" + +// IsZeroStruct checks if a struct is zero value. +func IsZeroStruct(v interface{}) bool { + val := reflect.ValueOf(v) + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + if !field.IsZero() { + return false + } + } + return true +} diff --git a/pkg/shared/util/struct_test.go b/pkg/shared/util/struct_test.go new file mode 100644 index 000000000..d9e984246 --- /dev/null +++ b/pkg/shared/util/struct_test.go @@ -0,0 +1,41 @@ +package util + +import "testing" + +func TestIsZeroStruct(t *testing.T) { + type args struct { + v interface{} + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "zero_struct", + args: args{v: struct { + Name string + Age int + }{}}, + want: true, + }, + { + name: "non_zero_struct", + args: args{v: struct { + Name string + Age int + }{ + Name: "John", + Age: 25, + }}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsZeroStruct(tt.args.v); got != tt.want { + t.Errorf("IsZeroStruct() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index e2e96fec9..8a897c8c9 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -32,6 +32,7 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/sources/sourcer" ) @@ -198,6 +199,13 @@ loop: // we implement Read With Wait semantics select { case r := <-mg.srcChan: + // when the channel is closed and empty, go will still read from the channel once and return the zero value. + // exclude the zero value from being passed to the next vertex, + // ensuring all messages produced by generator follow the same data schema. + if util.IsZeroStruct(r) { + mg.logger.Info("Zero value read from the generator channel, skipping...") + continue + } msgs = append(msgs, mg.newReadMessage(r.key, r.data, r.offset, r.ts)) case <-timeout: break loop From 324aec01f95771211aaa4641bd970035bdfdceea Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 13 Jan 2025 16:15:35 -0500 Subject: [PATCH 2/3] address comments Signed-off-by: Keran Yang --- pkg/shared/util/struct.go | 15 ------------ pkg/shared/util/struct_test.go | 41 -------------------------------- pkg/sources/generator/tickgen.go | 5 ++-- 3 files changed, 2 insertions(+), 59 deletions(-) delete mode 100644 pkg/shared/util/struct.go delete mode 100644 pkg/shared/util/struct_test.go diff --git a/pkg/shared/util/struct.go b/pkg/shared/util/struct.go deleted file mode 100644 index 9c2098788..000000000 --- a/pkg/shared/util/struct.go +++ /dev/null @@ -1,15 +0,0 @@ -package util - -import "reflect" - -// IsZeroStruct checks if a struct is zero value. -func IsZeroStruct(v interface{}) bool { - val := reflect.ValueOf(v) - for i := 0; i < val.NumField(); i++ { - field := val.Field(i) - if !field.IsZero() { - return false - } - } - return true -} diff --git a/pkg/shared/util/struct_test.go b/pkg/shared/util/struct_test.go deleted file mode 100644 index d9e984246..000000000 --- a/pkg/shared/util/struct_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package util - -import "testing" - -func TestIsZeroStruct(t *testing.T) { - type args struct { - v interface{} - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "zero_struct", - args: args{v: struct { - Name string - Age int - }{}}, - want: true, - }, - { - name: "non_zero_struct", - args: args{v: struct { - Name string - Age int - }{ - Name: "John", - Age: 25, - }}, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := IsZeroStruct(tt.args.v); got != tt.want { - t.Errorf("IsZeroStruct() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index 8a897c8c9..0c32f049c 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -32,7 +32,6 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/shared/logging" - "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/sources/sourcer" ) @@ -198,11 +197,11 @@ loop: // since the Read call is blocking, and runs in an infinite loop, // we implement Read With Wait semantics select { - case r := <-mg.srcChan: + case r, ok := <-mg.srcChan: // when the channel is closed and empty, go will still read from the channel once and return the zero value. // exclude the zero value from being passed to the next vertex, // ensuring all messages produced by generator follow the same data schema. - if util.IsZeroStruct(r) { + if !ok { mg.logger.Info("Zero value read from the generator channel, skipping...") continue } From d1aaa024c67d120d8dc4b458b6b24c35cd9fb1d2 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 13 Jan 2025 17:11:24 -0500 Subject: [PATCH 3/3] . Signed-off-by: Keran Yang --- pkg/sources/generator/tickgen.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index 0c32f049c..a922e6557 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -198,12 +198,9 @@ loop: // we implement Read With Wait semantics select { case r, ok := <-mg.srcChan: - // when the channel is closed and empty, go will still read from the channel once and return the zero value. - // exclude the zero value from being passed to the next vertex, - // ensuring all messages produced by generator follow the same data schema. if !ok { - mg.logger.Info("Zero value read from the generator channel, skipping...") - continue + mg.logger.Info("All the messages have been read. returning.") + break loop } msgs = append(msgs, mg.newReadMessage(r.key, r.data, r.offset, r.ts)) case <-timeout: