diff --git a/go.mod b/go.mod index 12f6a9bbc0..bc14e846b1 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe github.com/nats-io/nats-server/v2 v2.10.4 github.com/nats-io/nats.go v1.31.0 - github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 + github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/common v0.37.0 github.com/redis/go-redis/v9 v9.0.3 diff --git a/go.sum b/go.sum index 02a2a5a686..a09f273f95 100644 --- a/go.sum +++ b/go.sum @@ -679,8 +679,8 @@ github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADym github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 h1:gAURJvmJv7nP8+Y7X+GGHGZ5sg7KatM4dhkWpFCsk+I= -github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= +github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9 h1:U/9e+ZENDVmWOURe7iXaK2RFJIANYg/HJZGeahErJQI= +github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= diff --git a/pkg/sdkclient/source/client/client.go b/pkg/sdkclient/source/client/client.go index e3055d4e14..0cae049d5f 100644 --- a/pkg/sdkclient/source/client/client.go +++ b/pkg/sdkclient/source/client/client.go @@ -130,3 +130,8 @@ func (c *client) AckFn(ctx context.Context, req *sourcepb.AckRequest) (*sourcepb func (c *client) PendingFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PendingResponse, error) { return c.grpcClt.PendingFn(ctx, req) } + +// PartitionsFn returns the number of partitions from the source. +func (c *client) PartitionsFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PartitionsResponse, error) { + return c.grpcClt.PartitionsFn(ctx, req) +} diff --git a/pkg/sdkclient/source/client/client_test.go b/pkg/sdkclient/source/client/client_test.go index 73000129e1..c3619708ed 100644 --- a/pkg/sdkclient/source/client/client_test.go +++ b/pkg/sdkclient/source/client/client_test.go @@ -90,7 +90,7 @@ func TestReadFn(t *testing.T) { expectedResp := &sourcepb.ReadResponse{ Result: &sourcepb.ReadResponse_Result{ Payload: []byte(`test_payload`), - Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"}, + Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0}, EventTime: timestamppb.New(TestEventTime), Keys: []string{"test_key"}, }, diff --git a/pkg/sdkclient/source/client/interface.go b/pkg/sdkclient/source/client/interface.go index 4b7dbfc522..9a9c9815d5 100644 --- a/pkg/sdkclient/source/client/interface.go +++ b/pkg/sdkclient/source/client/interface.go @@ -35,4 +35,6 @@ type Client interface { AckFn(ctx context.Context, req *sourcepb.AckRequest) (*sourcepb.AckResponse, error) // PendingFn returns the number of pending messages from the udsource. PendingFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PendingResponse, error) + // PartitionsFn returns the list of partitions from the udsource. + PartitionsFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PartitionsResponse, error) } diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index ccc350d09a..912c0b3763 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -218,7 +218,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // if the source is idling, we will publish idle watermark to the source and all the toBuffers // we will not publish idle watermark if the source is not idling. // publish idle watermark for the source - df.srcIdleHandler.PublishSourceIdleWatermark(df.reader.Partitions()) + df.srcIdleHandler.PublishSourceIdleWatermark(df.reader.Partitions(df.ctx)) // if we have published idle watermark to source, we need to publish idle watermark to all the toBuffers // it might not get the latest watermark because of publishing delay, but we will get in the subsequent @@ -230,7 +230,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { for index := range toVertexBuffers { // publish idle watermark to all the source partitions owned by this reader. // it is 1:1 for many (HTTP, tickgen, etc.) but for e.g., for Kafka it is 1:N and the list of partitions in the N could keep changing. - for _, sp := range df.reader.Partitions() { + for _, sp := range df.reader.Partitions(df.ctx) { if vertexPublishers, ok := df.toVertexWMPublishers[toVertexName]; ok { var publisher, ok = vertexPublishers[sp] if !ok { diff --git a/pkg/sources/forward/data_forward_test.go b/pkg/sources/forward/data_forward_test.go index 9ab1519011..c2c7894042 100644 --- a/pkg/sources/forward/data_forward_test.go +++ b/pkg/sources/forward/data_forward_test.go @@ -77,7 +77,7 @@ func (s *SimpleSource) Ack(ctx context.Context, offsets []isb.Offset) []error { return s.buffer.Ack(ctx, offsets) } -func (s *SimpleSource) Partitions() []int32 { +func (s *SimpleSource) Partitions(context.Context) []int32 { return []int32{0} } diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index 930ae15d86..ced3ed748f 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -217,7 +217,7 @@ func (mg *memGen) GetName() string { } // Partitions returns the partitions for the source. -func (mg *memGen) Partitions() []int32 { +func (mg *memGen) Partitions(context.Context) []int32 { return []int32{mg.vertexInstance.Replica} } diff --git a/pkg/sources/http/http.go b/pkg/sources/http/http.go index e1b8b8b102..6c066cdf66 100644 --- a/pkg/sources/http/http.go +++ b/pkg/sources/http/http.go @@ -219,7 +219,7 @@ func (h *httpSource) GetName() string { } // Partitions returns the partitions for the source. -func (h *httpSource) Partitions() []int32 { +func (h *httpSource) Partitions(context.Context) []int32 { return []int32{h.vertexReplica} } diff --git a/pkg/sources/kafka/reader.go b/pkg/sources/kafka/reader.go index f3de1ae6d4..2777f89d82 100644 --- a/pkg/sources/kafka/reader.go +++ b/pkg/sources/kafka/reader.go @@ -132,7 +132,7 @@ func (r *kafkaSource) GetName() string { } // Partitions returns the partitions from which the source is reading. -func (r *kafkaSource) Partitions() []int32 { +func (r *kafkaSource) Partitions(context.Context) []int32 { for topic, partitions := range r.handler.sess.Claims() { if topic == r.topic { return partitions diff --git a/pkg/sources/nats/nats.go b/pkg/sources/nats/nats.go index d0e2beec5e..1ab71536eb 100644 --- a/pkg/sources/nats/nats.go +++ b/pkg/sources/nats/nats.go @@ -216,7 +216,7 @@ func (ns *natsSource) GetName() string { } // Partitions returns the partitions associated with this source. -func (ns *natsSource) Partitions() []int32 { +func (ns *natsSource) Partitions(context.Context) []int32 { return []int32{ns.vertexReplica} } diff --git a/pkg/sources/sourcer/sourcer.go b/pkg/sources/sourcer/sourcer.go index 89983f965d..193af2c343 100644 --- a/pkg/sources/sourcer/sourcer.go +++ b/pkg/sources/sourcer/sourcer.go @@ -39,7 +39,7 @@ type SourceReader interface { // Partitions returns the partitions of the source. This is used by the forwarder to determine to which partition // idle watermarks should be published. Partition assignment to a pod is dynamic, so this method may return different // partitions at different times. (Example - Kafka, every time topic rebalancing happens, the partitions gets updated) - Partitions() []int32 + Partitions(ctx context.Context) []int32 } // Sourcer interface provides an isb.BufferReader abstraction over the underlying data source. diff --git a/pkg/sources/udsource/grpc_udsource.go b/pkg/sources/udsource/grpc_udsource.go index e168c88baa..b87ede0a27 100644 --- a/pkg/sources/udsource/grpc_udsource.go +++ b/pkg/sources/udsource/grpc_udsource.go @@ -161,7 +161,17 @@ func (u *GRPCBasedUDSource) ApplyAckFn(ctx context.Context, offsets []isb.Offset return err } +// ApplyPartitionFn returns the partitions associated with the source. +func (u *GRPCBasedUDSource) ApplyPartitionFn(ctx context.Context) ([]int32, error) { + resp, err := u.client.PartitionsFn(ctx, &emptypb.Empty{}) + if err != nil { + return nil, err + } + + return resp.GetResult().GetPartitions(), nil +} + func constructMessageID(r *sourcepb.ReadResponse_Result) string { // For a user-defined source, the partition ID plus the offset should be able to uniquely identify a message - return r.Offset.GetPartitionId() + "-" + string(r.Offset.GetOffset()) + return fmt.Sprintf("%d-%s", r.GetOffset().GetPartitionId(), string(r.GetOffset().GetOffset())) } diff --git a/pkg/sources/udsource/grpc_udsource_test.go b/pkg/sources/udsource/grpc_udsource_test.go index 22f3ff34c5..fca52d4696 100644 --- a/pkg/sources/udsource/grpc_udsource_test.go +++ b/pkg/sources/udsource/grpc_udsource_test.go @@ -191,7 +191,7 @@ func Test_gRPCBasedUDSource_ApplyReadWithMockClient(t *testing.T) { expectedResponse := &sourcepb.ReadResponse{ Result: &sourcepb.ReadResponse_Result{ Payload: []byte(`test_payload`), - Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"}, + Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0}, EventTime: timestamppb.New(TestEventTime), Keys: []string{"test_key"}, }, @@ -238,7 +238,7 @@ func Test_gRPCBasedUDSource_ApplyReadWithMockClient(t *testing.T) { expectedResponse := &sourcepb.ReadResponse{ Result: &sourcepb.ReadResponse_Result{ Payload: []byte(`test_payload`), - Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"}, + Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0}, EventTime: timestamppb.New(TestEventTime), Keys: []string{"test_key"}, }, @@ -272,7 +272,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) { req := &sourcepb.AckRequest{ Request: &sourcepb.AckRequest_Request{ Offsets: []*sourcepb.Offset{ - {Offset: []byte("test-offset-1"), PartitionId: "0"}, {Offset: []byte("test-offset-2"), PartitionId: "0"}, + {Offset: []byte("test-offset-1"), PartitionId: 0}, {Offset: []byte("test-offset-2"), PartitionId: 0}, }, }, } @@ -304,7 +304,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) { req := &sourcepb.AckRequest{ Request: &sourcepb.AckRequest_Request{ Offsets: []*sourcepb.Offset{ - {Offset: []byte("test-offset-1"), PartitionId: "0"}, {Offset: []byte("test-offset-2"), PartitionId: "0"}, + {Offset: []byte("test-offset-1"), PartitionId: 0}, {Offset: []byte("test-offset-2"), PartitionId: 0}, }, }, } diff --git a/pkg/sources/udsource/user_defined_source.go b/pkg/sources/udsource/user_defined_source.go index f2fadaceaf..ea2be3cad8 100644 --- a/pkg/sources/udsource/user_defined_source.go +++ b/pkg/sources/udsource/user_defined_source.go @@ -62,7 +62,6 @@ type userDefinedSource struct { srcPublishWMStores store.WatermarkStore // source watermark publisher stores lifecycleCtx context.Context // lifecycleCtx context is used to control the lifecycle of this source. readTimeout time.Duration // read timeout for the source - partitions map[int32]struct{} // partitions of the source logger *zap.SugaredLogger } @@ -85,7 +84,6 @@ func New( pipelineName: vertexInstance.Vertex.Spec.PipelineName, sourceApplier: sourceApplier, srcPublishWMStores: publishWMStores, - partitions: make(map[int32]struct{}), logger: logging.NewLogger(), // default logger } for _, opt := range opts { @@ -122,27 +120,18 @@ func (u *userDefinedSource) GetName() string { } // Partitions returns the partitions of the user-defined source -func (u *userDefinedSource) Partitions() []int32 { - partitions := make([]int32, 0, len(u.partitions)) - for partition := range u.partitions { - partitions = append(partitions, partition) +func (u *userDefinedSource) Partitions(ctx context.Context) []int32 { + partitions, err := u.sourceApplier.ApplyPartitionFn(ctx) + if err != nil { + u.logger.Errorw("Error getting partitions", zap.Error(err)) + return nil } return partitions } -// Read reads the messages from the user-defined source, tracks the partitions from which the messages are read -// tracked partitions are used to determine the partitions to which the watermarks should be published +// Read reads the messages from the user-defined source. func (u *userDefinedSource) Read(ctx context.Context, count int64) ([]*isb.ReadMessage, error) { - readMessages, err := u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout) - if err != nil { - return nil, err - } - for _, msg := range readMessages { - if _, ok := u.partitions[msg.ReadOffset.PartitionIdx()]; !ok { - u.partitions[msg.ReadOffset.PartitionIdx()] = struct{}{} - } - } - return readMessages, nil + return u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout) } // Ack acknowledges the messages from the user-defined source diff --git a/pkg/sources/udsource/utils/offset.go b/pkg/sources/udsource/utils/offset.go index e83a3adeca..b7d53029c5 100644 --- a/pkg/sources/udsource/utils/offset.go +++ b/pkg/sources/udsource/utils/offset.go @@ -17,16 +17,11 @@ limitations under the License. package utils import ( - "strconv" - sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1" "github.com/numaproj/numaflow/pkg/isb" ) -// DefaultPartitionIdx Default partition index -var DefaultPartitionIdx = int32(0) - // simpleSourceOffset is a simple implementation of isb.Offset from the source side. type simpleSourceOffset struct { offset string @@ -62,17 +57,11 @@ func (s *simpleSourceOffset) NoAck() error { func ConvertToSourceOffset(offset isb.Offset) *sourcepb.Offset { return &sourcepb.Offset{ - PartitionId: strconv.Itoa(int(offset.PartitionIdx())), + PartitionId: offset.PartitionIdx(), Offset: []byte(offset.String()), } } func ConvertToIsbOffset(offset *sourcepb.Offset) isb.Offset { - if partitionIdx, err := strconv.Atoi(offset.GetPartitionId()); err != nil { - // If the partition ID is not a number, use the default partition index - // TODO - should we require UDSource users to return us a number instead of string as partition ID? - return NewSimpleSourceOffset(string(offset.Offset), DefaultPartitionIdx) - } else { - return NewSimpleSourceOffset(string(offset.Offset), int32(partitionIdx)) - } + return NewSimpleSourceOffset(string(offset.Offset), offset.GetPartitionId()) } diff --git a/pkg/sources/udsource/utils/offset_test.go b/pkg/sources/udsource/utils/offset_test.go index 9586d9b6b5..64dae33918 100644 --- a/pkg/sources/udsource/utils/offset_test.go +++ b/pkg/sources/udsource/utils/offset_test.go @@ -30,7 +30,7 @@ func TestOffsetConversion(t *testing.T) { assert.Equal(t, testIsbOffset.PartitionIdx(), convertedBackIsbOffset.PartitionIdx()) assert.Equal(t, testIsbOffset.String(), convertedBackIsbOffset.String()) testSrcOffset := &sourcepb.Offset{ - PartitionId: "0", + PartitionId: 0, Offset: []byte("test"), } convertedIsbOffset := ConvertToIsbOffset(testSrcOffset) diff --git a/test/idle-source-e2e/idle_source_test.go b/test/idle-source-e2e/idle_source_test.go index 0bcaf9d818..a644912b48 100644 --- a/test/idle-source-e2e/idle_source_test.go +++ b/test/idle-source-e2e/idle_source_test.go @@ -44,8 +44,8 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipeline() { done := make(chan struct{}) go func() { - // publish messages to source vertex, with event time starting from 60000 - startTime := 100 + // publish messages to source vertex, with event time starting from 1000 + startTime := 1000 for i := 0; true; i++ { select { case <-ctx.Done(): diff --git a/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml b/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml index b7410a1807..6a3147197d 100644 --- a/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml +++ b/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml @@ -8,8 +8,6 @@ spec: threshold: 5s # The pipeline will be considered idle if the source has not emitted any data for given threshold value. incrementBy: 3s # If source is found to be idle then increment the watermark by given incrementBy value. stepInterval: 2s # If source is idling then publish the watermark only when step interval has passed. - limits: - readBatchSize: 50 vertices: - name: in scale: