From 54a5f778d285840568e1ed79d5c69e1bb78737fd Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Fri, 12 Jul 2024 17:26:12 -0400 Subject: [PATCH] docs: conditional forwarding from source is allowed (#1823) Signed-off-by: Keran Yang --- api/json-schema/schema.json | 4 ++-- api/openapi-spec/swagger.json | 4 ++-- docs/APIs.md | 2 +- pkg/apis/numaflow/v1alpha1/edge_types.go | 2 +- pkg/apis/numaflow/v1alpha1/generated.proto | 2 +- pkg/apis/numaflow/v1alpha1/openapi_generated.go | 4 ++-- pkg/reconciler/pipeline/validate_test.go | 5 ++++- 7 files changed, 13 insertions(+), 10 deletions(-) diff --git a/api/json-schema/schema.json b/api/json-schema/schema.json index 8ee933ff01..e6d3eae27c 100644 --- a/api/json-schema/schema.json +++ b/api/json-schema/schema.json @@ -17812,7 +17812,7 @@ "properties": { "conditions": { "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ForwardConditions", - "description": "Conditional forwarding, only allowed when \"From\" is a Sink or UDF." + "description": "Conditional forwarding, only allowed when \"From\" is a Source or UDF." }, "from": { "type": "string" @@ -18016,7 +18016,7 @@ "properties": { "conditions": { "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ForwardConditions", - "description": "Conditional forwarding, only allowed when \"From\" is a Sink or UDF." + "description": "Conditional forwarding, only allowed when \"From\" is a Source or UDF." }, "from": { "type": "string" diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 34cba78d08..febe476246 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -17822,7 +17822,7 @@ ], "properties": { "conditions": { - "description": "Conditional forwarding, only allowed when \"From\" is a Sink or UDF.", + "description": "Conditional forwarding, only allowed when \"From\" is a Source or UDF.", "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ForwardConditions" }, "from": { @@ -18024,7 +18024,7 @@ ], "properties": { "conditions": { - "description": "Conditional forwarding, only allowed when \"From\" is a Sink or UDF.", + "description": "Conditional forwarding, only allowed when \"From\" is a Source or UDF.", "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.ForwardConditions" }, "from": { diff --git a/docs/APIs.md b/docs/APIs.md index 2c1cd99dba..255d1dbb3c 100644 --- a/docs/APIs.md +++ b/docs/APIs.md @@ -1794,7 +1794,7 @@ ForwardConditions (Optional)

-Conditional forwarding, only allowed when “From” is a Sink or UDF. +Conditional forwarding, only allowed when “From” is a Source or UDF.

diff --git a/pkg/apis/numaflow/v1alpha1/edge_types.go b/pkg/apis/numaflow/v1alpha1/edge_types.go index 6767f310b8..113eae1299 100644 --- a/pkg/apis/numaflow/v1alpha1/edge_types.go +++ b/pkg/apis/numaflow/v1alpha1/edge_types.go @@ -21,7 +21,7 @@ import "fmt" type Edge struct { From string `json:"from" protobuf:"bytes,1,opt,name=from"` To string `json:"to" protobuf:"bytes,2,opt,name=to"` - // Conditional forwarding, only allowed when "From" is a Sink or UDF. + // Conditional forwarding, only allowed when "From" is a Source or UDF. // +optional Conditions *ForwardConditions `json:"conditions" protobuf:"bytes,3,opt,name=conditions"` // OnFull specifies the behaviour for the write actions when the inter step buffer is full. diff --git a/pkg/apis/numaflow/v1alpha1/generated.proto b/pkg/apis/numaflow/v1alpha1/generated.proto index 542ad9962f..d75c264db2 100644 --- a/pkg/apis/numaflow/v1alpha1/generated.proto +++ b/pkg/apis/numaflow/v1alpha1/generated.proto @@ -321,7 +321,7 @@ message Edge { optional string to = 2; - // Conditional forwarding, only allowed when "From" is a Sink or UDF. + // Conditional forwarding, only allowed when "From" is a Source or UDF. // +optional optional ForwardConditions conditions = 3; diff --git a/pkg/apis/numaflow/v1alpha1/openapi_generated.go b/pkg/apis/numaflow/v1alpha1/openapi_generated.go index 50c02f22dd..c42850d55f 100644 --- a/pkg/apis/numaflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/numaflow/v1alpha1/openapi_generated.go @@ -644,7 +644,7 @@ func schema_pkg_apis_numaflow_v1alpha1_CombinedEdge(ref common.ReferenceCallback }, "conditions": { SchemaProps: spec.SchemaProps{ - Description: "Conditional forwarding, only allowed when \"From\" is a Sink or UDF.", + Description: "Conditional forwarding, only allowed when \"From\" is a Source or UDF.", Ref: ref("github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1.ForwardConditions"), }, }, @@ -1037,7 +1037,7 @@ func schema_pkg_apis_numaflow_v1alpha1_Edge(ref common.ReferenceCallback) common }, "conditions": { SchemaProps: spec.SchemaProps{ - Description: "Conditional forwarding, only allowed when \"From\" is a Sink or UDF.", + Description: "Conditional forwarding, only allowed when \"From\" is a Source or UDF.", Ref: ref("github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1.ForwardConditions"), }, }, diff --git a/pkg/reconciler/pipeline/validate_test.go b/pkg/reconciler/pipeline/validate_test.go index 5ae85db3bb..486b36c9be 100644 --- a/pkg/reconciler/pipeline/validate_test.go +++ b/pkg/reconciler/pipeline/validate_test.go @@ -511,9 +511,12 @@ func TestValidatePipeline(t *testing.T) { assert.NoError(t, err) }) - t.Run("allow conditional forwarding from source vertex", func(t *testing.T) { + t.Run("allow conditional forwarding from source vertex or udf vertex", func(t *testing.T) { testObj := testPipeline.DeepCopy() operatorOr := dfv1.LogicOperatorOr + testObj.Spec.Edges[0].Conditions = &dfv1.ForwardConditions{Tags: &dfv1.TagConditions{ + Operator: &operatorOr, + Values: []string{"hello"}}} testObj.Spec.Edges[1].Conditions = &dfv1.ForwardConditions{Tags: &dfv1.TagConditions{ Operator: &operatorOr, Values: []string{"hello"}}}