Skip to content

Commit

Permalink
chore: support for multiple serving sources (#1841)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Co-authored-by: Derek Wang <[email protected]>
  • Loading branch information
yhl25 and whynowy authored Jul 17, 2024
1 parent ccfb8c2 commit ba581c7
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 70 deletions.
14 changes: 7 additions & 7 deletions cmd/commands/isbsvc_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (
func NewISBSvcCreateCommand() *cobra.Command {

var (
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStream string
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStreams []string
)

command := &cobra.Command{
Expand Down Expand Up @@ -81,7 +81,7 @@ func NewISBSvcCreateCommand() *cobra.Command {
return fmt.Errorf("unsupported isb service type %q", isbSvcType)
}

if err = isbsClient.CreateBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStream, opts...); err != nil {
if err = isbsClient.CreateBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStreams, opts...); err != nil {
logger.Errorw("Failed to create buffers, buckets and side inputs store.", zap.Error(err))
return err
}
Expand All @@ -93,6 +93,6 @@ func NewISBSvcCreateCommand() *cobra.Command {
command.Flags().StringSliceVar(&buffers, "buffers", []string{}, "Buffers to create") // --buffers=a,b, --buffers=c
command.Flags().StringSliceVar(&buckets, "buckets", []string{}, "Buckets to create") // --buckets=xxa,xxb --buckets=xxc
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringVar(&servingSourceStream, "serving-source-stream", "", "Name of the serving source stream")
command.Flags().StringSliceVar(&servingSourceStreams, "serving-source-streams", []string{}, "Serving source streams to create") // --serving-source-streams=a,b, --serving-source-streams=c
return command
}
14 changes: 7 additions & 7 deletions cmd/commands/isbsvc_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (

func NewISBSvcDeleteCommand() *cobra.Command {
var (
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStream string
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStreams []string
)

command := &cobra.Command{
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewISBSvcDeleteCommand() *cobra.Command {
cmd.HelpFunc()(cmd, args)
return fmt.Errorf("unsupported isb service type %q", isbSvcType)
}
if err = isbsClient.DeleteBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStream); err != nil {
if err = isbsClient.DeleteBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStreams); err != nil {
logger.Errorw("Failed on buffers, buckets and side inputs store deletion.", zap.Error(err))
return err
}
Expand All @@ -76,6 +76,6 @@ func NewISBSvcDeleteCommand() *cobra.Command {
command.Flags().StringSliceVar(&buffers, "buffers", []string{}, "Buffers to delete") // --buffers=a,b, --buffers=c
command.Flags().StringSliceVar(&buckets, "buckets", []string{}, "Buckets to delete") // --buckets=xxa,xxb --buckets=xxc return command
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringVar(&servingSourceStream, "serving-source-stream", "", "Name of the serving source stream")
command.Flags().StringSliceVar(&servingSourceStreams, "serving-source-streams", []string{}, "Serving source streams to delete") // --serving-source-streams=a,b, --serving-source-streams=c
return command
}
14 changes: 7 additions & 7 deletions cmd/commands/isbsvc_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (
func NewISBSvcValidateCommand() *cobra.Command {

var (
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStream string
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStreams []string
)

command := &cobra.Command{
Expand Down Expand Up @@ -68,7 +68,7 @@ func NewISBSvcValidateCommand() *cobra.Command {
return fmt.Errorf("unsupported isb service type")
}
_ = wait.ExponentialBackoffWithContext(ctx, sharedutil.DefaultRetryBackoff, func(_ context.Context) (bool, error) {
if err = isbsClient.ValidateBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStream); err != nil {
if err = isbsClient.ValidateBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStreams); err != nil {
logger.Infow("Buffers, buckets and side inputs store might have not been created yet, will retry if the limit is not reached", zap.Error(err))
return false, nil
}
Expand All @@ -86,7 +86,7 @@ func NewISBSvcValidateCommand() *cobra.Command {
command.Flags().StringSliceVar(&buffers, "buffers", []string{}, "Buffers to validate") // --buffers=a,b, --buffers=c
command.Flags().StringSliceVar(&buckets, "buckets", []string{}, "Buckets to validate") // --buckets=xxa,xxb --buckets=xxc
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringVar(&servingSourceStream, "serving-source-stream", "", "Name of the serving source stream")
command.Flags().StringSliceVar(&servingSourceStreams, "serving-source-streams", []string{}, "Serving source streams to validate") // --serving-source-streams=a,b, --serving-source-streams=c

return command
}
4 changes: 0 additions & 4 deletions examples/15-serving-source-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ spec:
msgIDHeaderKey: "X-Request-ID"
store:
url: "redis://redis:6379"
auth:
token:
name: http-source-token
key: my-token
- name: cat
scale:
min: 1
Expand Down
10 changes: 8 additions & 2 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,14 @@ func (p Pipeline) GetSideInputsStoreName() string {
return fmt.Sprintf("%s-%s", p.Namespace, p.Name)
}

func (p Pipeline) GetServingSourceStreamName() string {
return fmt.Sprintf("%s-serving-source", p.Name)
func (p Pipeline) GetServingSourceStreamNames() []string {
var servingSourceNames []string
for _, srcVertex := range p.Spec.Vertices {
if srcVertex.IsASource() && srcVertex.Source.Serving != nil {
servingSourceNames = append(servingSourceNames, fmt.Sprintf("%s-%s-serving-source", p.Name, srcVertex.Name))
}
}
return servingSourceNames
}

func (p Pipeline) GetSideInputsManagerDeployments(req GetSideInputDeploymentReq) ([]*appv1.Deployment, error) {
Expand Down
37 changes: 37 additions & 0 deletions pkg/apis/numaflow/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,40 @@ func Test_GetSideInputManagerDeployments(t *testing.T) {
assert.Equal(t, 2, len(deployments[0].Spec.Template.Spec.Containers))
})
}

func TestGetServingSourceStreamNames(t *testing.T) {
t.Run("no serving sources", func(t *testing.T) {
p := &Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline",
},
Spec: PipelineSpec{
Vertices: []AbstractVertex{
{Name: "v1", Source: &Source{}},
{Name: "v2", UDF: &UDF{}},
{Name: "v3", Sink: &Sink{}},
},
},
}
var expected []string
assert.Equal(t, expected, p.GetServingSourceStreamNames())
})

t.Run("with serving sources", func(t *testing.T) {
p := &Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pipeline",
},
Spec: PipelineSpec{
Vertices: []AbstractVertex{
{Name: "v1", Source: &Source{Serving: &ServingSource{}}},
{Name: "v2", Source: &Source{Serving: &ServingSource{}}},
{Name: "v3", UDF: &UDF{}},
{Name: "v4", Sink: &Sink{}},
},
},
}
expected := []string{"test-pipeline-v1-serving-source", "test-pipeline-v2-serving-source"}
assert.Equal(t, expected, p.GetServingSourceStreamNames())
})
}
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (v Vertex) GetServiceObjs() []*corev1.Service {
return svcs
}

func (v Vertex) GetServingSourceStreamName() string {
return fmt.Sprintf("%s-%s-serving-source", v.Spec.PipelineName, v.Spec.Name)
}

func (v Vertex) getServiceObj(name string, headless bool, port int32, servicePortName string) *corev1.Service {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,3 +752,16 @@ func Test_GetToBuckets(t *testing.T) {
assert.Len(t, buckets, 0)
})
}

func TestGetServingSourceStreamName(t *testing.T) {
v := Vertex{
Spec: VertexSpec{
PipelineName: "test-pipeline",
AbstractVertex: AbstractVertex{
Name: "test-vertex",
},
},
}
expected := "test-pipeline-test-vertex-serving-source"
assert.Equal(t, expected, v.GetServingSourceStreamName())
}
6 changes: 3 additions & 3 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ func (ms *mockIsbSvcClient) GetBufferInfo(ctx context.Context, buffer string) (*
}, nil
}

func (ms *mockIsbSvcClient) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string, opts ...isbsvc.CreateOption) error {
func (ms *mockIsbSvcClient) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string, opts ...isbsvc.CreateOption) error {
return nil
}

func (ms *mockIsbSvcClient) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string) error {
func (ms *mockIsbSvcClient) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error {
return nil
}

func (ms *mockIsbSvcClient) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string) error {
func (ms *mockIsbSvcClient) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/isbsvc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
// ISBService is an interface used to do the operations on ISBSvc
type ISBService interface {
// CreateBuffersAndBuckets creates buffers and buckets
CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string, opts ...CreateOption) error
CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string, opts ...CreateOption) error
// DeleteBuffersAndBuckets deletes buffers and buckets
DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string) error
DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error
// ValidateBuffersAndBuckets validates buffers and buckets
ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceSTream string) error
ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceSTreams []string) error
// GetBufferInfo returns buffer info for the given buffer
GetBufferInfo(ctx context.Context, buffer string) (*BufferInfo, error)
// CreateWatermarkStores creates watermark stores
Expand Down
60 changes: 33 additions & 27 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func WithJetStreamClient(jsClient *jsclient.Client) JSServiceOption {
}
}

func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string, opts ...CreateOption) error {
func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string, opts ...CreateOption) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
}
Expand Down Expand Up @@ -104,23 +104,25 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
}
}

if servingSourceStream != "" {
_, err := js.StreamInfo(servingSourceStream)
if err != nil {
if !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to query information of stream %q during buffer creating, %w", servingSourceStream, err)
}
if _, err := js.AddStream(&nats.StreamConfig{
Name: servingSourceStream,
Subjects: []string{servingSourceStream}, // Use the stream name as the only subject
Storage: nats.StorageType(v.GetInt("stream.storage")),
Replicas: v.GetInt("stream.replicas"),
Retention: nats.WorkQueuePolicy, // we can delete the message immediately after it's consumed and acked
MaxMsgs: -1, // unlimited messages
MaxBytes: -1, // unlimited bytes
Duplicates: v.GetDuration("stream.duplicates"),
}); err != nil {
return fmt.Errorf("failed to create serving source stream %q, %w", servingSourceStream, err)
if len(servingSourceStreams) > 0 {
for _, servingSourceStream := range servingSourceStreams {
_, err := js.StreamInfo(servingSourceStream)
if err != nil {
if !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to query information of stream %q during buffer creating, %w", servingSourceStream, err)
}
if _, err := js.AddStream(&nats.StreamConfig{
Name: servingSourceStream,
Subjects: []string{servingSourceStream}, // Use the stream name as the only subject
Storage: nats.StorageType(v.GetInt("stream.storage")),
Replicas: v.GetInt("stream.replicas"),
Retention: nats.WorkQueuePolicy, // we can delete the message immediately after it's consumed and acked
MaxMsgs: -1, // unlimited messages
MaxBytes: -1, // unlimited bytes
Duplicates: v.GetDuration("stream.duplicates"),
}); err != nil {
return fmt.Errorf("failed to create serving source stream %q, %w", servingSourceStream, err)
}
}
}
}
Expand Down Expand Up @@ -204,7 +206,7 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
return nil
}

func (jss *jetStreamSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string) error {
func (jss *jetStreamSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
}
Expand Down Expand Up @@ -246,16 +248,18 @@ func (jss *jetStreamSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, b
log.Infow("Succeeded to delete a side inputs KV", zap.String("kvName", sideInputsKVName))
}

if servingSourceStream != "" {
if err := js.DeleteStream(servingSourceStream); err != nil && !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to delete serving source stream %q, %w", servingSourceStream, err)
if len(servingSourceStreams) > 0 {
for _, servingSourceStream := range servingSourceStreams {
if err := js.DeleteStream(servingSourceStream); err != nil && !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to delete serving source stream %q, %w", servingSourceStream, err)
}
log.Infow("Succeeded to delete the serving source stream", zap.String("stream", servingSourceStream))
}
log.Infow("Succeeded to delete the serving source stream", zap.String("stream", servingSourceStream))
}
return nil
}

func (jss *jetStreamSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string) error {
func (jss *jetStreamSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
}
Expand Down Expand Up @@ -291,9 +295,11 @@ func (jss *jetStreamSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers,
return fmt.Errorf("failed to query side inputs store KV %q, %w", sideInputsKVName, err)
}
}
if servingSourceStream != "" {
if _, err := js.StreamInfo(servingSourceStream); err != nil {
return fmt.Errorf("failed to query information of stream %q, %w", servingSourceStream, err)
if len(servingSourceStreams) > 0 {
for _, servingSourceStream := range servingSourceStreams {
if _, err := js.StreamInfo(servingSourceStream); err != nil {
return fmt.Errorf("failed to query information of stream %q, %w", servingSourceStream, err)
}
}
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewISBRedisSvc(client *redisclient.RedisClient) ISBService {
}

// CreateBuffersAndBuckets is used to create the inter-step redis buffers.
func (r *isbsRedisSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string, opts ...CreateOption) error {
func (r *isbsRedisSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string, opts ...CreateOption) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
}
Expand Down Expand Up @@ -67,7 +67,7 @@ func (r *isbsRedisSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, buc
}

// DeleteBuffersAndBuckets is used to delete the inter-step redis buffers.
func (r *isbsRedisSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string) error {
func (r *isbsRedisSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func (r *isbsRedisSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, buc
}

// ValidateBuffersAndBuckets is used to validate inter-step redis buffers to see if the stream/stream group exist
func (r *isbsRedisSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStream string) error {
func (r *isbsRedisSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error {
if len(buffers) == 0 && len(buckets) == 0 {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/isbsvc/redis_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func TestIsbsRedisSvc_Buffers(t *testing.T) {
buffers := []string{buffer}
redisClient := redisclient.NewRedisClient(redisOptions)
isbsRedisSvc := NewISBRedisSvc(redisClient)
assert.NoError(t, isbsRedisSvc.CreateBuffersAndBuckets(ctx, buffers, nil, "", ""))
assert.NoError(t, isbsRedisSvc.CreateBuffersAndBuckets(ctx, buffers, nil, "", []string{}))

// validate buffer
assert.NoError(t, isbsRedisSvc.ValidateBuffersAndBuckets(ctx, buffers, nil, "", ""))
assert.NoError(t, isbsRedisSvc.ValidateBuffersAndBuckets(ctx, buffers, nil, "", []string{}))

// Verify
// Add some data
Expand Down Expand Up @@ -79,5 +79,5 @@ func TestIsbsRedisSvc_Buffers(t *testing.T) {
}

// delete buffer
assert.NoError(t, isbsRedisSvc.DeleteBuffersAndBuckets(ctx, buffers, nil, "", ""))
assert.NoError(t, isbsRedisSvc.DeleteBuffersAndBuckets(ctx, buffers, nil, "", []string{}))
}
Loading

0 comments on commit ba581c7

Please sign in to comment.