Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: introducing partitions for udsource #1410

Merged
merged 6 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932
github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,8 @@ github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADym
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 h1:gAURJvmJv7nP8+Y7X+GGHGZ5sg7KatM4dhkWpFCsk+I=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9 h1:U/9e+ZENDVmWOURe7iXaK2RFJIANYg/HJZGeahErJQI=
github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
5 changes: 5 additions & 0 deletions pkg/sdkclient/source/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,8 @@ func (c *client) AckFn(ctx context.Context, req *sourcepb.AckRequest) (*sourcepb
func (c *client) PendingFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PendingResponse, error) {
return c.grpcClt.PendingFn(ctx, req)
}

// PartitionsFn returns the number of partitions from the source.
func (c *client) PartitionsFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PartitionsResponse, error) {
return c.grpcClt.PartitionsFn(ctx, req)
}
2 changes: 1 addition & 1 deletion pkg/sdkclient/source/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestReadFn(t *testing.T) {
expectedResp := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: []byte(`test_payload`),
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"},
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0},
EventTime: timestamppb.New(TestEventTime),
Keys: []string{"test_key"},
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sdkclient/source/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ type Client interface {
AckFn(ctx context.Context, req *sourcepb.AckRequest) (*sourcepb.AckResponse, error)
// PendingFn returns the number of pending messages from the udsource.
PendingFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PendingResponse, error)
// PartitionsFn returns the list of partitions from the udsource.
PartitionsFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PartitionsResponse, error)
}
4 changes: 2 additions & 2 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
// if the source is idling, we will publish idle watermark to the source and all the toBuffers
// we will not publish idle watermark if the source is not idling.
// publish idle watermark for the source
df.srcIdleHandler.PublishSourceIdleWatermark(df.reader.Partitions())
df.srcIdleHandler.PublishSourceIdleWatermark(df.reader.Partitions(df.ctx))

// if we have published idle watermark to source, we need to publish idle watermark to all the toBuffers
// it might not get the latest watermark because of publishing delay, but we will get in the subsequent
Expand All @@ -230,7 +230,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
for index := range toVertexBuffers {
// publish idle watermark to all the source partitions owned by this reader.
// it is 1:1 for many (HTTP, tickgen, etc.) but for e.g., for Kafka it is 1:N and the list of partitions in the N could keep changing.
for _, sp := range df.reader.Partitions() {
for _, sp := range df.reader.Partitions(df.ctx) {
if vertexPublishers, ok := df.toVertexWMPublishers[toVertexName]; ok {
var publisher, ok = vertexPublishers[sp]
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SimpleSource) Ack(ctx context.Context, offsets []isb.Offset) []error {
return s.buffer.Ack(ctx, offsets)
}

func (s *SimpleSource) Partitions() []int32 {
func (s *SimpleSource) Partitions(context.Context) []int32 {
return []int32{0}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (mg *memGen) GetName() string {
}

// Partitions returns the partitions for the source.
func (mg *memGen) Partitions() []int32 {
func (mg *memGen) Partitions(context.Context) []int32 {
return []int32{mg.vertexInstance.Replica}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (h *httpSource) GetName() string {
}

// Partitions returns the partitions for the source.
func (h *httpSource) Partitions() []int32 {
func (h *httpSource) Partitions(context.Context) []int32 {
return []int32{h.vertexReplica}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r *kafkaSource) GetName() string {
}

// Partitions returns the partitions from which the source is reading.
func (r *kafkaSource) Partitions() []int32 {
func (r *kafkaSource) Partitions(context.Context) []int32 {
for topic, partitions := range r.handler.sess.Claims() {
if topic == r.topic {
return partitions
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (ns *natsSource) GetName() string {
}

// Partitions returns the partitions associated with this source.
func (ns *natsSource) Partitions() []int32 {
func (ns *natsSource) Partitions(context.Context) []int32 {
return []int32{ns.vertexReplica}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/sourcer/sourcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SourceReader interface {
// Partitions returns the partitions of the source. This is used by the forwarder to determine to which partition
// idle watermarks should be published. Partition assignment to a pod is dynamic, so this method may return different
// partitions at different times. (Example - Kafka, every time topic rebalancing happens, the partitions gets updated)
Partitions() []int32
Partitions(ctx context.Context) []int32
}

// Sourcer interface provides an isb.BufferReader abstraction over the underlying data source.
Expand Down
12 changes: 11 additions & 1 deletion pkg/sources/udsource/grpc_udsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,17 @@ func (u *GRPCBasedUDSource) ApplyAckFn(ctx context.Context, offsets []isb.Offset
return err
}

// ApplyPartitionFn returns the partitions associated with the source.
func (u *GRPCBasedUDSource) ApplyPartitionFn(ctx context.Context) ([]int32, error) {
resp, err := u.client.PartitionsFn(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}

return resp.GetResult().GetPartitions(), nil
}

func constructMessageID(r *sourcepb.ReadResponse_Result) string {
// For a user-defined source, the partition ID plus the offset should be able to uniquely identify a message
return r.Offset.GetPartitionId() + "-" + string(r.Offset.GetOffset())
return fmt.Sprintf("%d-%s", r.GetOffset().GetPartitionId(), string(r.GetOffset().GetOffset()))
}
8 changes: 4 additions & 4 deletions pkg/sources/udsource/grpc_udsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func Test_gRPCBasedUDSource_ApplyReadWithMockClient(t *testing.T) {
expectedResponse := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: []byte(`test_payload`),
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"},
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0},
EventTime: timestamppb.New(TestEventTime),
Keys: []string{"test_key"},
},
Expand Down Expand Up @@ -238,7 +238,7 @@ func Test_gRPCBasedUDSource_ApplyReadWithMockClient(t *testing.T) {
expectedResponse := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: []byte(`test_payload`),
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"},
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0},
EventTime: timestamppb.New(TestEventTime),
Keys: []string{"test_key"},
},
Expand Down Expand Up @@ -272,7 +272,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {
req := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: []*sourcepb.Offset{
{Offset: []byte("test-offset-1"), PartitionId: "0"}, {Offset: []byte("test-offset-2"), PartitionId: "0"},
{Offset: []byte("test-offset-1"), PartitionId: 0}, {Offset: []byte("test-offset-2"), PartitionId: 0},
},
},
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {
req := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: []*sourcepb.Offset{
{Offset: []byte("test-offset-1"), PartitionId: "0"}, {Offset: []byte("test-offset-2"), PartitionId: "0"},
{Offset: []byte("test-offset-1"), PartitionId: 0}, {Offset: []byte("test-offset-2"), PartitionId: 0},
},
},
}
Expand Down
25 changes: 7 additions & 18 deletions pkg/sources/udsource/user_defined_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type userDefinedSource struct {
srcPublishWMStores store.WatermarkStore // source watermark publisher stores
lifecycleCtx context.Context // lifecycleCtx context is used to control the lifecycle of this source.
readTimeout time.Duration // read timeout for the source
partitions map[int32]struct{} // partitions of the source
logger *zap.SugaredLogger
}

Expand All @@ -85,7 +84,6 @@ func New(
pipelineName: vertexInstance.Vertex.Spec.PipelineName,
sourceApplier: sourceApplier,
srcPublishWMStores: publishWMStores,
partitions: make(map[int32]struct{}),
logger: logging.NewLogger(), // default logger
}
for _, opt := range opts {
Expand Down Expand Up @@ -122,27 +120,18 @@ func (u *userDefinedSource) GetName() string {
}

// Partitions returns the partitions of the user-defined source
func (u *userDefinedSource) Partitions() []int32 {
partitions := make([]int32, 0, len(u.partitions))
for partition := range u.partitions {
partitions = append(partitions, partition)
func (u *userDefinedSource) Partitions(ctx context.Context) []int32 {
partitions, err := u.sourceApplier.ApplyPartitionFn(ctx)
if err != nil {
u.logger.Errorw("Error getting partitions", zap.Error(err))
return nil
}
return partitions
}

// Read reads the messages from the user-defined source, tracks the partitions from which the messages are read
// tracked partitions are used to determine the partitions to which the watermarks should be published
// Read reads the messages from the user-defined source.
func (u *userDefinedSource) Read(ctx context.Context, count int64) ([]*isb.ReadMessage, error) {
readMessages, err := u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout)
if err != nil {
return nil, err
}
for _, msg := range readMessages {
if _, ok := u.partitions[msg.ReadOffset.PartitionIdx()]; !ok {
u.partitions[msg.ReadOffset.PartitionIdx()] = struct{}{}
}
}
return readMessages, nil
return u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout)
}

// Ack acknowledges the messages from the user-defined source
Expand Down
15 changes: 2 additions & 13 deletions pkg/sources/udsource/utils/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@ limitations under the License.
package utils

import (
"strconv"

sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1"

"github.com/numaproj/numaflow/pkg/isb"
)

// DefaultPartitionIdx Default partition index
var DefaultPartitionIdx = int32(0)

// simpleSourceOffset is a simple implementation of isb.Offset from the source side.
type simpleSourceOffset struct {
offset string
Expand Down Expand Up @@ -62,17 +57,11 @@ func (s *simpleSourceOffset) NoAck() error {

func ConvertToSourceOffset(offset isb.Offset) *sourcepb.Offset {
return &sourcepb.Offset{
PartitionId: strconv.Itoa(int(offset.PartitionIdx())),
PartitionId: offset.PartitionIdx(),
Offset: []byte(offset.String()),
}
}

func ConvertToIsbOffset(offset *sourcepb.Offset) isb.Offset {
if partitionIdx, err := strconv.Atoi(offset.GetPartitionId()); err != nil {
// If the partition ID is not a number, use the default partition index
// TODO - should we require UDSource users to return us a number instead of string as partition ID?
return NewSimpleSourceOffset(string(offset.Offset), DefaultPartitionIdx)
} else {
return NewSimpleSourceOffset(string(offset.Offset), int32(partitionIdx))
}
return NewSimpleSourceOffset(string(offset.Offset), offset.GetPartitionId())
}
2 changes: 1 addition & 1 deletion pkg/sources/udsource/utils/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestOffsetConversion(t *testing.T) {
assert.Equal(t, testIsbOffset.PartitionIdx(), convertedBackIsbOffset.PartitionIdx())
assert.Equal(t, testIsbOffset.String(), convertedBackIsbOffset.String())
testSrcOffset := &sourcepb.Offset{
PartitionId: "0",
PartitionId: 0,
Offset: []byte("test"),
}
convertedIsbOffset := ConvertToIsbOffset(testSrcOffset)
Expand Down
4 changes: 2 additions & 2 deletions test/idle-source-e2e/idle_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipeline() {

done := make(chan struct{})
go func() {
// publish messages to source vertex, with event time starting from 60000
startTime := 100
// publish messages to source vertex, with event time starting from 1000
startTime := 1000
for i := 0; true; i++ {
select {
case <-ctx.Done():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ spec:
threshold: 5s # The pipeline will be considered idle if the source has not emitted any data for given threshold value.
incrementBy: 3s # If source is found to be idle then increment the watermark by given incrementBy value.
stepInterval: 2s # If source is idling then publish the watermark only when step interval has passed.
limits:
readBatchSize: 50
vertices:
- name: in
scale:
Expand Down
Loading