From 7b85e89f307c2286c5b57daf3fb17d70a08107c0 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 16 Aug 2024 08:29:07 +0530 Subject: [PATCH] test: add mono vertex e2e tests (#1945) Signed-off-by: Yashash H L Signed-off-by: Keran Yang Co-authored-by: Keran Yang --- .github/workflows/ci.yaml | 2 +- pkg/reconciler/monovertex/controller.go | 20 ++-- test/diamond-e2e/diamond_test.go | 20 ++-- test/diamond-e2e/testdata/cycle-backward.yaml | 4 + test/diamond-e2e/testdata/cycle-to-self.yaml | 4 + ...-on-map-pipeline.yaml => join-on-map.yaml} | 4 + ...duce-pipeline.yaml => join-on-reduce.yaml} | 4 + test/diamond-e2e/testdata/join-on-sink.yaml | 5 +- test/e2e-api/redis.go | 6 +- test/e2e/functional_test.go | 31 +++-- test/e2e/testdata/even-odd.yaml | 14 +++ test/e2e/testdata/simple-fallback.yaml | 4 + test/e2e/testdata/udf-filtering.yaml | 4 + test/fixtures/e2e_suite.go | 27 +++-- test/fixtures/expect.go | 78 +++++++++---- test/fixtures/given.go | 70 +++++++++--- test/fixtures/redis.go | 7 +- test/fixtures/redis_check.go | 24 ++-- test/fixtures/util.go | 71 ++++++++++++ test/fixtures/when.go | 106 ++++++++++++++---- test/http-e2e/http_test.go | 10 +- .../testdata/http-source-with-auth.yaml | 4 + test/http-e2e/testdata/http-source.yaml | 4 + test/idle-source-e2e/idle_source_test.go | 8 +- .../testdata/idle-source-reduce-pipeline.yaml | 4 + .../testdata/kafka-pipeline.yaml | 4 + test/jetstream-e2e/jetstream_test.go | 5 +- .../testdata/jetstream-source-pipeline.yaml | 4 + test/monovertex-e2e/monovertex_test.go | 32 ++++++ .../mono-vertex-with-transformer.yaml | 24 ++++ test/nats-e2e/nats_test.go | 2 +- .../testdata/nats-source-pipeline.yaml | 4 + test/reduce-one-e2e/reduce_one_test.go | 28 ++--- .../testdata/complex-reduce-pipeline.yaml | 4 + .../complex-sliding-window-pipeline.yaml | 4 + .../simple-keyed-reduce-pipeline.yaml | 4 + .../simple-non-keyed-reduce-pipeline.yaml | 4 + .../testdata/simple-reduce-pipeline-wal.yaml | 4 + test/reduce-two-e2e/reduce_two_test.go | 28 ++--- .../reduce-stream/reduce-stream-go.yaml | 4 + .../reduce-stream/reduce-stream-java.yaml | 4 + ...ple-session-keyed-counter-pipeline-go.yaml | 4 + ...e-session-keyed-counter-pipeline-java.yaml | 4 + .../simple-session-sum-pipeline.yaml | 4 + .../sideinput-e2e_sink_source_test.go | 10 +- test/sideinputs-e2e/sideinput_test.go | 6 +- .../testdata/map-sideinput-pipeline.yaml | 4 + .../testdata/reduce-sideinput-pipeline.yaml | 4 + ...ideinput_sink.yaml => sideinput-sink.yaml} | 5 +- ...nput_source.yaml => sideinput-source.yaml} | 3 + .../testdata/source-filtering.yaml | 4 + test/transformer-e2e/transformer_test.go | 12 +- 52 files changed, 562 insertions(+), 192 deletions(-) rename test/diamond-e2e/testdata/{join-on-map-pipeline.yaml => join-on-map.yaml} (82%) rename test/diamond-e2e/testdata/{join-on-reduce-pipeline.yaml => join-on-reduce.yaml} (91%) create mode 100644 test/monovertex-e2e/monovertex_test.go create mode 100644 test/monovertex-e2e/testdata/mono-vertex-with-transformer.yaml rename test/sideinputs-e2e/testdata/{sideinput_sink.yaml => sideinput-sink.yaml} (89%) rename test/sideinputs-e2e/testdata/{sideinput_source.yaml => sideinput-source.yaml} (91%) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 064afdb38c..ba128ef042 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -151,7 +151,7 @@ jobs: fail-fast: false matrix: driver: [jetstream] - case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, jetstream-e2e, sdks-e2e, reduce-one-e2e, reduce-two-e2e, udsource-e2e, api-e2e, sideinputs-e2e, idle-source-e2e] + case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, jetstream-e2e, sdks-e2e, reduce-one-e2e, reduce-two-e2e, udsource-e2e, api-e2e, sideinputs-e2e, idle-source-e2e, monovertex-e2e] include: - driver: redis case: e2e diff --git a/pkg/reconciler/monovertex/controller.go b/pkg/reconciler/monovertex/controller.go index 108580f3dc..b345369c66 100644 --- a/pkg/reconciler/monovertex/controller.go +++ b/pkg/reconciler/monovertex/controller.go @@ -186,21 +186,21 @@ func (mr *monoVertexReconciler) reconcilePods(ctx context.Context, monoVtx *dfv1 } } if needToCreate { - labels := map[string]string{} + podLabels := map[string]string{} annotations := map[string]string{} if x := monoVtx.Spec.Metadata; x != nil { for k, v := range x.Annotations { annotations[k] = v } for k, v := range x.Labels { - labels[k] = v + podLabels[k] = v } } - labels[dfv1.KeyPartOf] = dfv1.Project - labels[dfv1.KeyManagedBy] = dfv1.ControllerMonoVertex - labels[dfv1.KeyComponent] = dfv1.ComponentMonoVertex - labels[dfv1.KeyAppName] = monoVtx.Name - labels[dfv1.KeyMonoVertexName] = monoVtx.Name + podLabels[dfv1.KeyPartOf] = dfv1.Project + podLabels[dfv1.KeyManagedBy] = dfv1.ControllerMonoVertex + podLabels[dfv1.KeyComponent] = dfv1.ComponentMonoVertex + podLabels[dfv1.KeyAppName] = monoVtx.Name + podLabels[dfv1.KeyMonoVertexName] = monoVtx.Name annotations[dfv1.KeyHash] = hash annotations[dfv1.KeyReplica] = strconv.Itoa(replica) // Defaults to udf @@ -209,7 +209,7 @@ func (mr *monoVertexReconciler) reconcilePods(ctx context.Context, monoVtx *dfv1 ObjectMeta: metav1.ObjectMeta{ Namespace: monoVtx.Namespace, Name: podNamePrefix + sharedutil.RandomLowerCaseString(5), - Labels: labels, + Labels: podLabels, Annotations: annotations, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(monoVtx.GetObjectMeta(), dfv1.MonoVertexGroupVersionKind)}, }, @@ -392,10 +392,10 @@ func (mr *monoVertexReconciler) createOrUpdateDaemonDeployment(ctx context.Conte return nil } -func (r *monoVertexReconciler) findExistingPods(ctx context.Context, monoVtx *dfv1.MonoVertex) (map[string]corev1.Pod, error) { +func (mr *monoVertexReconciler) findExistingPods(ctx context.Context, monoVtx *dfv1.MonoVertex) (map[string]corev1.Pod, error) { pods := &corev1.PodList{} selector, _ := labels.Parse(dfv1.KeyComponent + "=" + dfv1.ComponentMonoVertex + "," + dfv1.KeyMonoVertexName + "=" + monoVtx.Name) - if err := r.client.List(ctx, pods, &client.ListOptions{Namespace: monoVtx.Namespace, LabelSelector: selector}); err != nil { + if err := mr.client.List(ctx, pods, &client.ListOptions{Namespace: monoVtx.Namespace, LabelSelector: selector}); err != nil { return nil, fmt.Errorf("failed to list mono vertex pods: %w", err) } result := make(map[string]corev1.Pod) diff --git a/test/diamond-e2e/diamond_test.go b/test/diamond-e2e/diamond_test.go index cdb98c7783..b30c72c7c0 100644 --- a/test/diamond-e2e/diamond_test.go +++ b/test/diamond-e2e/diamond_test.go @@ -45,7 +45,7 @@ func (s *DiamondSuite) TestJoinOnReducePipeline() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - w := s.Given().Pipeline("@testdata/join-on-reduce-pipeline.yaml"). + w := s.Given().Pipeline("@testdata/join-on-reduce.yaml"). When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() @@ -74,13 +74,13 @@ func (s *DiamondSuite) TestJoinOnReducePipeline() { }() // todo: this only tests for one occurrence: ideally should verify all w.Expect(). - SinkContains("sink", "40"). // per 10-second window: (10 * 2) * 2 atoi vertices - SinkContains("sink", "80") // per 10-second window: 10 * (1 + 3) * 2 atoi vertices + RedisSinkContains("join-on-reduce-sink", "40"). // per 10-second window: (10 * 2) * 2 atoi vertices + RedisSinkContains("join-on-reduce-sink", "80") // per 10-second window: 10 * (1 + 3) * 2 atoi vertices done <- struct{}{} } func (s *DiamondSuite) TestJoinOnMapPipeline() { - w := s.Given().Pipeline("@testdata/join-on-map-pipeline.yaml"). + w := s.Given().Pipeline("@testdata/join-on-map.yaml"). When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() @@ -93,8 +93,8 @@ func (s *DiamondSuite) TestJoinOnMapPipeline() { w.SendMessageTo(pipelineName, "in-1", NewHttpPostRequest().WithBody([]byte("2"))) w.Expect(). - SinkContains("sink", "1"). - SinkContains("sink", "2") + RedisSinkContains("join-on-map-sink", "1"). + RedisSinkContains("join-on-map-sink", "2") } func (s *DiamondSuite) TestJoinOnSinkVertex() { @@ -110,8 +110,8 @@ func (s *DiamondSuite) TestJoinOnSinkVertex() { w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("888888"))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("888889"))) - w.Expect().SinkContains("out", "888888") - w.Expect().SinkContains("out", "888889") + w.Expect().RedisSinkContains("join-on-sink-out", "888888") + w.Expect().RedisSinkContains("join-on-sink-out", "888889") } func (s *DiamondSuite) TestCycleToSelf() { @@ -136,7 +136,7 @@ func (s *DiamondSuite) TestCycleToSelf() { } } for i := 0; i < 10; i++ { - w.Expect().SinkContains("out", msgs[i]) + w.Expect().RedisSinkContains("cycle-to-self-out", msgs[i]) } } @@ -162,7 +162,7 @@ func (s *DiamondSuite) TestCycleBackward() { } } for i := 0; i < 10; i++ { - w.Expect().SinkContains("out", msgs[i]) + w.Expect().RedisSinkContains("cycle-backward-out", msgs[i]) } } diff --git a/test/diamond-e2e/testdata/cycle-backward.yaml b/test/diamond-e2e/testdata/cycle-backward.yaml index 5505af8e0f..7312fc6cce 100644 --- a/test/diamond-e2e/testdata/cycle-backward.yaml +++ b/test/diamond-e2e/testdata/cycle-backward.yaml @@ -26,6 +26,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "cycle-backward-out" edges: - from: in to: cat diff --git a/test/diamond-e2e/testdata/cycle-to-self.yaml b/test/diamond-e2e/testdata/cycle-to-self.yaml index 063354d5ff..eaa7bf9cfa 100644 --- a/test/diamond-e2e/testdata/cycle-to-self.yaml +++ b/test/diamond-e2e/testdata/cycle-to-self.yaml @@ -22,6 +22,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "cycle-to-self-out" edges: - from: in to: retry diff --git a/test/diamond-e2e/testdata/join-on-map-pipeline.yaml b/test/diamond-e2e/testdata/join-on-map.yaml similarity index 82% rename from test/diamond-e2e/testdata/join-on-map-pipeline.yaml rename to test/diamond-e2e/testdata/join-on-map.yaml index 286a075d8c..6ca69cda24 100644 --- a/test/diamond-e2e/testdata/join-on-map-pipeline.yaml +++ b/test/diamond-e2e/testdata/join-on-map.yaml @@ -32,6 +32,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "join-on-map-sink" edges: - from: in-0 to: cat diff --git a/test/diamond-e2e/testdata/join-on-reduce-pipeline.yaml b/test/diamond-e2e/testdata/join-on-reduce.yaml similarity index 91% rename from test/diamond-e2e/testdata/join-on-reduce-pipeline.yaml rename to test/diamond-e2e/testdata/join-on-reduce.yaml index 2f2066a2f0..f0ec43e19f 100644 --- a/test/diamond-e2e/testdata/join-on-reduce-pipeline.yaml +++ b/test/diamond-e2e/testdata/join-on-reduce.yaml @@ -53,6 +53,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "join-on-reduce-sink" edges: - from: in to: atoi-0 diff --git a/test/diamond-e2e/testdata/join-on-sink.yaml b/test/diamond-e2e/testdata/join-on-sink.yaml index 46a7294922..1f1a442b24 100644 --- a/test/diamond-e2e/testdata/join-on-sink.yaml +++ b/test/diamond-e2e/testdata/join-on-sink.yaml @@ -28,7 +28,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always - + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "join-on-sink-out" edges: - from: in to: even-or-odd diff --git a/test/e2e-api/redis.go b/test/e2e-api/redis.go index ea373634ac..228da31dec 100644 --- a/test/e2e-api/redis.go +++ b/test/e2e-api/redis.go @@ -18,7 +18,6 @@ package main import ( "context" - "fmt" "log" "net/http" "net/url" @@ -62,8 +61,7 @@ func (h *RedisController) GetMsgCountContains(w http.ResponseWriter, r *http.Req redisClient := h.getRedisClient() - pipelineName := r.URL.Query().Get("pipelineName") - sinkName := r.URL.Query().Get("sinkName") + keyName := r.URL.Query().Get("keyName") targetStr, err := url.QueryUnescape(r.URL.Query().Get("targetStr")) if err != nil { log.Println(err) @@ -71,7 +69,7 @@ func (h *RedisController) GetMsgCountContains(w http.ResponseWriter, r *http.Req return } - count, err := redisClient.HGet(context.Background(), fmt.Sprintf("%s:%s", pipelineName, sinkName), targetStr).Result() + count, err := redisClient.HGet(context.Background(), keyName, targetStr).Result() if err != nil { log.Println(err) diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 573e0d6f8f..3e9c1d70dd 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -169,15 +169,14 @@ func (s *FunctionalSuite) TestUDFFiltering() { SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(expect3))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(expect4))) - w.Expect().SinkContains("out", expect3) - w.Expect().SinkContains("out", expect4) - w.Expect().SinkNotContains("out", expect0) - w.Expect().SinkNotContains("out", expect1) - w.Expect().SinkNotContains("out", expect2) + w.Expect().RedisSinkContains("udf-filtering-out", expect3) + w.Expect().RedisSinkContains("udf-filtering-out", expect4) + w.Expect().RedisSinkNotContains("udf-filtering-out", expect0) + w.Expect().RedisSinkNotContains("udf-filtering-out", expect1) + w.Expect().RedisSinkNotContains("udf-filtering-out", expect2) } func (s *FunctionalSuite) TestConditionalForwarding() { - // FIXME: flaky when redis is used as isb if strings.ToUpper(os.Getenv("ISBSVC")) == "REDIS" { s.T().SkipNow() @@ -196,17 +195,17 @@ func (s *FunctionalSuite) TestConditionalForwarding() { SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("888889"))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("not an integer"))) - w.Expect().SinkContains("even-sink", "888888") - w.Expect().SinkNotContains("even-sink", "888889") - w.Expect().SinkNotContains("even-sink", "not an integer") + w.Expect().RedisSinkContains("even-odd-even-sink", "888888") + w.Expect().RedisSinkNotContains("even-odd-even-sink", "888889") + w.Expect().RedisSinkNotContains("even-odd-even-sink", "not an integer") - w.Expect().SinkContains("odd-sink", "888889") - w.Expect().SinkNotContains("odd-sink", "888888") - w.Expect().SinkNotContains("odd-sink", "not an integer") + w.Expect().RedisSinkContains("even-odd-odd-sink", "888889") + w.Expect().RedisSinkNotContains("even-odd-odd-sink", "888888") + w.Expect().RedisSinkNotContains("even-odd-odd-sink", "not an integer") - w.Expect().SinkContains("number-sink", "888888") - w.Expect().SinkContains("number-sink", "888889") - w.Expect().SinkNotContains("number-sink", "not an integer") + w.Expect().RedisSinkContains("even-odd-number-sink", "888888") + w.Expect().RedisSinkContains("even-odd-number-sink", "888889") + w.Expect().RedisSinkNotContains("even-odd-number-sink", "not an integer") } func (s *FunctionalSuite) TestDropOnFull() { @@ -354,7 +353,7 @@ func (s *FunctionalSuite) TestFallbackSink() { // wait for all the pods to come up w.Expect().VertexPodsRunning() - w.Expect().SinkContains("output", "fallback-message") + w.Expect().RedisSinkContains("simple-fallback-output", "fallback-message") } func TestFunctionalSuite(t *testing.T) { diff --git a/test/e2e/testdata/even-odd.yaml b/test/e2e/testdata/even-odd.yaml index 0c3da8cdd5..7cc6822243 100644 --- a/test/e2e/testdata/even-odd.yaml +++ b/test/e2e/testdata/even-odd.yaml @@ -22,18 +22,32 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "even-odd-even-sink" - name: odd-sink sink: udsink: container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "even-odd-odd-sink" - name: number-sink sink: udsink: container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "even-odd-number-sink" edges: - from: in to: even-or-odd diff --git a/test/e2e/testdata/simple-fallback.yaml b/test/e2e/testdata/simple-fallback.yaml index de76a06d59..0e39df2ddf 100644 --- a/test/e2e/testdata/simple-fallback.yaml +++ b/test/e2e/testdata/simple-fallback.yaml @@ -27,6 +27,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "simple-fallback-output" edges: - from: in to: cat diff --git a/test/e2e/testdata/udf-filtering.yaml b/test/e2e/testdata/udf-filtering.yaml index 8cc332ddfc..1c8c79057c 100644 --- a/test/e2e/testdata/udf-filtering.yaml +++ b/test/e2e/testdata/udf-filtering.yaml @@ -20,6 +20,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "udf-filtering-out" edges: - from: in to: p1 diff --git a/test/fixtures/e2e_suite.go b/test/fixtures/e2e_suite.go index 3012be8754..64b27e3eb9 100644 --- a/test/fixtures/e2e_suite.go +++ b/test/fixtures/e2e_suite.go @@ -80,12 +80,13 @@ spec: type E2ESuite struct { suite.Suite - restConfig *rest.Config - isbSvcClient flowpkg.InterStepBufferServiceInterface - pipelineClient flowpkg.PipelineInterface - vertexClient flowpkg.VertexInterface - kubeClient kubernetes.Interface - stopch chan struct{} + restConfig *rest.Config + isbSvcClient flowpkg.InterStepBufferServiceInterface + pipelineClient flowpkg.PipelineInterface + vertexClient flowpkg.VertexInterface + monoVertexClient flowpkg.MonoVertexInterface + kubeClient kubernetes.Interface + stopch chan struct{} } func (s *E2ESuite) SetupSuite() { @@ -98,6 +99,7 @@ func (s *E2ESuite) SetupSuite() { s.isbSvcClient = flowversiond.NewForConfigOrDie(s.restConfig).NumaflowV1alpha1().InterStepBufferServices(Namespace) s.pipelineClient = flowversiond.NewForConfigOrDie(s.restConfig).NumaflowV1alpha1().Pipelines(Namespace) s.vertexClient = flowversiond.NewForConfigOrDie(s.restConfig).NumaflowV1alpha1().Vertices(Namespace) + s.monoVertexClient = flowversiond.NewForConfigOrDie(s.restConfig).NumaflowV1alpha1().MonoVertices(Namespace) // Clean up resources if any s.deleteResources([]schema.GroupVersionResource{ @@ -182,12 +184,13 @@ func (s *E2ESuite) deleteResources(resources []schema.GroupVersionResource) { func (s *E2ESuite) Given() *Given { return &Given{ - t: s.T(), - isbSvcClient: s.isbSvcClient, - pipelineClient: s.pipelineClient, - vertexClient: s.vertexClient, - restConfig: s.restConfig, - kubeClient: s.kubeClient, + t: s.T(), + isbSvcClient: s.isbSvcClient, + pipelineClient: s.pipelineClient, + vertexClient: s.vertexClient, + monoVertexClient: s.monoVertexClient, + restConfig: s.restConfig, + kubeClient: s.kubeClient, } } diff --git a/test/fixtures/expect.go b/test/fixtures/expect.go index 6438959786..2e550653fe 100644 --- a/test/fixtures/expect.go +++ b/test/fixtures/expect.go @@ -32,32 +32,38 @@ import ( ) type Expect struct { - t *testing.T - isbSvcClient flowpkg.InterStepBufferServiceInterface - pipelineClient flowpkg.PipelineInterface - vertexClient flowpkg.VertexInterface - isbSvc *dfv1.InterStepBufferService - pipeline *dfv1.Pipeline - restConfig *rest.Config - kubeClient kubernetes.Interface + t *testing.T + isbSvcClient flowpkg.InterStepBufferServiceInterface + pipelineClient flowpkg.PipelineInterface + vertexClient flowpkg.VertexInterface + monoVertexClient flowpkg.MonoVertexInterface + isbSvc *dfv1.InterStepBufferService + pipeline *dfv1.Pipeline + monoVertex *dfv1.MonoVertex + restConfig *rest.Config + kubeClient kubernetes.Interface } -func (t *Expect) SinkContains(sinkName string, targetStr string, opts ...SinkCheckOption) *Expect { +// RedisSinkContains checks if the target string is written to the redis sink +// hashKey is the hash key environment variable set by the sink +// targetStr is the target string to check +func (t *Expect) RedisSinkContains(hashKey string, targetStr string, opts ...SinkCheckOption) *Expect { t.t.Helper() ctx := context.Background() - contains := RedisContains(ctx, t.pipeline.Name, sinkName, targetStr, opts...) - if !contains { - t.t.Fatalf("Expected redis contains target string %s written by pipeline %s, sink %s.", targetStr, t.pipeline.Name, sinkName) + if contains := redisContains(ctx, hashKey, targetStr, opts...); !contains { + t.t.Fatalf("Expected redis contains target string %s written under hash key %s.", targetStr, hashKey) } return t } -func (t *Expect) SinkNotContains(sinkName string, targetStr string, opts ...SinkCheckOption) *Expect { +// RedisSinkNotContains checks if the target string is not written to the redis sink +// hashKey is the hash key environment variable set by the sink +// targetStr is the target string to check +func (t *Expect) RedisSinkNotContains(hashKey string, targetStr string, opts ...SinkCheckOption) *Expect { t.t.Helper() ctx := context.Background() - notContains := RedisNotContains(ctx, t.pipeline.Name, sinkName, targetStr, opts...) - if !notContains { - t.t.Fatalf("Not expected redis contains target string %s written by pipeline %s, sink %s.", targetStr, t.pipeline.Name, sinkName) + if notContain := redisNotContains(ctx, hashKey, targetStr, opts...); !notContain { + t.t.Fatalf("Not expected redis contains target string %s written under hash key %s.", targetStr, hashKey) } return t } @@ -112,6 +118,14 @@ func (t *Expect) VertexPodsRunning() *Expect { return t } +func (t *Expect) MonoVertexPodsRunning() *Expect { + t.t.Helper() + if err := WaitForMonoVertexPodRunning(t.kubeClient, t.monoVertexClient, Namespace, t.monoVertex.Name, 2*time.Minute); err != nil { + t.t.Fatalf("Expected mono vertex %q pod running: %v", t.monoVertex.Name, err) + } + return t +} + func (t *Expect) VertexSizeScaledTo(v string, size int) *Expect { t.t.Helper() ctx := context.Background() @@ -141,6 +155,20 @@ func (t *Expect) VertexPodLogContains(vertexName, regex string, opts ...PodLogCh return t } +func (t *Expect) MonoVertexPodLogContains(regex string, opts ...PodLogCheckOption) *Expect { + t.t.Helper() + ctx := context.Background() + contains, err := MonoVertexPodLogContains(ctx, t.kubeClient, Namespace, t.monoVertex.Name, regex, opts...) + if err != nil { + t.t.Fatalf("Failed to check mono vertex %q pod logs: %v", t.monoVertex.Name, err) + } + if !contains { + t.t.Fatalf("Expected mono vertex [%q] pod log to contain [%q] but didn't.", t.monoVertex.Name, regex) + } + t.t.Logf("Expected mono vertex %q pod contains %q", t.monoVertex.Name, regex) + return t +} + func (t *Expect) VertexPodLogNotContains(vertexName, regex string, opts ...PodLogCheckOption) *Expect { t.t.Helper() ctx := context.Background() @@ -178,13 +206,15 @@ func (t *Expect) DaemonPodLogContains(pipelineName, regex string, opts ...PodLog func (t *Expect) When() *When { return &When{ - t: t.t, - isbSvcClient: t.isbSvcClient, - pipelineClient: t.pipelineClient, - vertexClient: t.vertexClient, - isbSvc: t.isbSvc, - pipeline: t.pipeline, - restConfig: t.restConfig, - kubeClient: t.kubeClient, + t: t.t, + isbSvcClient: t.isbSvcClient, + pipelineClient: t.pipelineClient, + vertexClient: t.vertexClient, + monoVertexClient: t.monoVertexClient, + isbSvc: t.isbSvc, + pipeline: t.pipeline, + monoVertex: t.monoVertex, + restConfig: t.restConfig, + kubeClient: t.kubeClient, } } diff --git a/test/fixtures/given.go b/test/fixtures/given.go index 448f8e10f6..30a4ab01a7 100644 --- a/test/fixtures/given.go +++ b/test/fixtures/given.go @@ -21,23 +21,26 @@ import ( "strings" "testing" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - flowpkg "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "sigs.k8s.io/yaml" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + flowpkg "github.com/numaproj/numaflow/pkg/client/clientset/versioned/typed/numaflow/v1alpha1" ) type Given struct { - t *testing.T - isbSvcClient flowpkg.InterStepBufferServiceInterface - pipelineClient flowpkg.PipelineInterface - vertexClient flowpkg.VertexInterface - isbSvc *dfv1.InterStepBufferService - pipeline *dfv1.Pipeline - restConfig *rest.Config - kubeClient kubernetes.Interface + t *testing.T + isbSvcClient flowpkg.InterStepBufferServiceInterface + pipelineClient flowpkg.PipelineInterface + vertexClient flowpkg.VertexInterface + monoVertexClient flowpkg.MonoVertexInterface + isbSvc *dfv1.InterStepBufferService + pipeline *dfv1.Pipeline + monoVertex *dfv1.MonoVertex + restConfig *rest.Config + kubeClient kubernetes.Interface } // creates an ISBSvc based on the parameter, this may be: @@ -76,6 +79,23 @@ func (g *Given) Pipeline(text string) *Given { return g } +// / creates a MonoVertex based on the parameter, this may be: +// +// 1. A file name if it starts with "@" +// 2. Raw YAML. +func (g *Given) MonoVertex(text string) *Given { + g.t.Helper() + g.monoVertex = &dfv1.MonoVertex{} + g.readResource(text, g.monoVertex) + l := g.monoVertex.GetLabels() + if l == nil { + l = map[string]string{} + } + l[Label] = LabelValue + g.monoVertex.SetLabels(l) + return g +} + func (g *Given) WithPipeline(p *dfv1.Pipeline) *Given { g.t.Helper() g.pipeline = p @@ -89,6 +109,18 @@ func (g *Given) WithPipeline(p *dfv1.Pipeline) *Given { return g } +func (g *Given) WithMonoVertex(mv *dfv1.MonoVertex) *Given { + g.t.Helper() + g.monoVertex = mv + l := g.monoVertex.GetLabels() + if l == nil { + l = map[string]string{} + } + l[Label] = LabelValue + g.monoVertex.SetLabels(l) + return g +} + func (g *Given) readResource(text string, v metav1.Object) { g.t.Helper() var file string @@ -122,13 +154,15 @@ func (g *Given) readResource(text string, v metav1.Object) { func (g *Given) When() *When { return &When{ - t: g.t, - isbSvcClient: g.isbSvcClient, - pipelineClient: g.pipelineClient, - vertexClient: g.vertexClient, - isbSvc: g.isbSvc, - pipeline: g.pipeline, - restConfig: g.restConfig, - kubeClient: g.kubeClient, + t: g.t, + isbSvcClient: g.isbSvcClient, + pipelineClient: g.pipelineClient, + vertexClient: g.vertexClient, + monoVertexClient: g.monoVertexClient, + isbSvc: g.isbSvc, + pipeline: g.pipeline, + monoVertex: g.monoVertex, + restConfig: g.restConfig, + kubeClient: g.kubeClient, } } diff --git a/test/fixtures/redis.go b/test/fixtures/redis.go index 7f0b4c6e7e..93d5d4996f 100644 --- a/test/fixtures/redis.go +++ b/test/fixtures/redis.go @@ -22,9 +22,10 @@ import ( "strconv" ) -// GetMsgCountContains returns number of occurrences of the targetStr in redis that are written by pipelineName, sinkName. -func GetMsgCountContains(pipelineName, sinkName, targetStr string) int { - str := InvokeE2EAPI("/redis/get-msg-count-contains?pipelineName=%s&sinkName=%s&targetStr=%s", pipelineName, sinkName, url.QueryEscape(targetStr)) +// getMsgCountContains returns the number of occurrences of the targetStr in redis +// that are written by under hash key keyName. +func getMsgCountContains(keyName, targetStr string) int { + str := InvokeE2EAPI("/redis/get-msg-count-contains?keyName=%s&targetStr=%s", keyName, url.QueryEscape(targetStr)) count, err := strconv.Atoi(str) if err != nil { panic(fmt.Sprintf("Can't parse string %s to an integer.", str)) diff --git a/test/fixtures/redis_check.go b/test/fixtures/redis_check.go index b73078a3e5..b784aa2d61 100644 --- a/test/fixtures/redis_check.go +++ b/test/fixtures/redis_check.go @@ -24,8 +24,8 @@ import ( // Retry checking redis every 5 seconds. const retryInterval = time.Second * 5 -// RedisNotContains verifies that there is no occurrence of targetStr in redis that is written by pipelineName, sinkName. -func RedisNotContains(ctx context.Context, pipelineName, sinkName, targetStr string, opts ...SinkCheckOption) bool { +// redisNotContains verifies that there is no occurrence of targetStr in redis that is written under hashKey. +func redisNotContains(ctx context.Context, hashKey, targetStr string, opts ...SinkCheckOption) bool { o := defaultRedisCheckOptions() for _, opt := range opts { if opt != nil { @@ -34,14 +34,13 @@ func RedisNotContains(ctx context.Context, pipelineName, sinkName, targetStr str } ctx, cancel := context.WithTimeout(ctx, o.timeout) defer cancel() - return runChecks(ctx, func() bool { - return !redisContains(pipelineName, sinkName, targetStr, 1) + return !redisContainsCount(hashKey, targetStr, 1) }) } -// RedisContains verifies that there are targetStr in redis written by pipelineName, sinkName. -func RedisContains(ctx context.Context, pipelineName, sinkName, targetStr string, opts ...SinkCheckOption) bool { +// redisContains verifies that there are targetStr in redis written under hashKey. +func redisContains(ctx context.Context, hashKey, targetStr string, opts ...SinkCheckOption) bool { o := defaultRedisCheckOptions() for _, opt := range opts { if opt != nil { @@ -50,15 +49,14 @@ func RedisContains(ctx context.Context, pipelineName, sinkName, targetStr string } ctx, cancel := context.WithTimeout(ctx, o.timeout) defer cancel() - return runChecks(ctx, func() bool { - return redisContains(pipelineName, sinkName, targetStr, o.count) + return redisContainsCount(hashKey, targetStr, o.count) }) } -func redisContains(pipelineName, sinkName, targetStr string, expectedCount int) bool { - // If number of matches is higher than expected, we treat it as passing the check. - return GetMsgCountContains(pipelineName, sinkName, targetStr) >= expectedCount +func redisContainsCount(hashKey, targetStr string, expectedCount int) bool { + // If the number of matches is higher than expected, we treat it as passing the check. + return getMsgCountContains(hashKey, targetStr) >= expectedCount } type redisCheckOptions struct { @@ -96,8 +94,8 @@ type CheckFunc func() bool // runChecks executes a performChecks function with retry strategy (retryInterval with timeout). // If performChecks doesn't pass within timeout, runChecks returns false indicating the checks have failed. // This is to mitigate the problem that we don't know exactly when a numaflow pipeline finishes processing our test data. -// Please notice such approach is not strictly accurate as there can be case where runChecks passes before pipeline finishes processing data. -// Which could result in false positive test results. e.g. checking data doesn't exist can pass before data gets persisted to redis. +// Please notice such an approach is not strictly accurate as there can be a case where runChecks passes before the pipeline finishes processing data. +// Which could result in false positive test results. E.g., checking data doesn't exist can pass before data gets persisted to redis. func runChecks(ctx context.Context, performChecks CheckFunc) bool { ticker := time.NewTicker(retryInterval) defer ticker.Stop() diff --git a/test/fixtures/util.go b/test/fixtures/util.go index 73b42b9f88..2c8c8a0ae1 100644 --- a/test/fixtures/util.go +++ b/test/fixtures/util.go @@ -230,6 +230,65 @@ func WaitForPipelineRunning(ctx context.Context, pipelineClient flowpkg.Pipeline } } +func WaitForMonoVertexRunning(ctx context.Context, monoVertexClient flowpkg.MonoVertexInterface, monoVertexName string, timeout time.Duration) error { + fieldSelector := "metadata.name=" + monoVertexName + opts := metav1.ListOptions{FieldSelector: fieldSelector} + watch, err := monoVertexClient.Watch(ctx, opts) + if err != nil { + return err + } + defer watch.Stop() + timeoutCh := make(chan bool, 1) + go func() { + time.Sleep(timeout) + timeoutCh <- true + }() + for { + select { + case event := <-watch.ResultChan(): + i, ok := event.Object.(*dfv1.MonoVertex) + if ok { + if i.Status.Phase == dfv1.MonoVertexPhaseRunning { + return nil + } + } else { + return fmt.Errorf("not monovertex") + } + case <-timeoutCh: + return fmt.Errorf("timeout after %v waiting for MonoVertex running", timeout) + } + } +} + +func WaitForMonoVertexPodRunning(kubeClient kubernetes.Interface, monoVertexClient flowpkg.MonoVertexInterface, namespace, monoVertexName string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyMonoVertexName, monoVertexName, dfv1.KeyComponent, dfv1.ComponentMonoVertex) + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout after %v waiting for monovertex pod running", timeout) + default: + } + monoVertex, err := monoVertexClient.Get(ctx, monoVertexName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error getting the monovertex: %w", err) + } + podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return fmt.Errorf("error getting monovertex pod list: %w", err) + } + ok := len(podList.Items) > 0 && len(podList.Items) == monoVertex.GetReplicas() // pod number should equal to desired replicas + for _, p := range podList.Items { + ok = ok && p.Status.Phase == corev1.PodRunning + } + if ok { + return nil + } + time.Sleep(2 * time.Second) + } +} + func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowpkg.VertexInterface, namespace, pipelineName, vertexName string, timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -359,6 +418,18 @@ func VertexPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, return PodsLogContains(ctx, kubeClient, namespace, regex, podList, opts...), nil } +func MonoVertexPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, mvName, regex string, opts ...PodLogCheckOption) (bool, error) { + labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyMonoVertexName, mvName, dfv1.KeyComponent, dfv1.ComponentMonoVertex) + podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return false, fmt.Errorf("error getting monovertex pods: %w", err) + } + if len(podList.Items) == 0 { + return false, fmt.Errorf("no monovertex pods found") + } + return PodsLogContains(ctx, kubeClient, namespace, regex, podList, opts...), nil +} + func DaemonPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, pipelineName, regex string, opts ...PodLogCheckOption) (bool, error) { labelSelector := fmt.Sprintf("%s=%s,%s=%s", dfv1.KeyPipelineName, pipelineName, dfv1.KeyComponent, dfv1.ComponentDaemon) podList, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector, FieldSelector: "status.phase=Running"}) diff --git a/test/fixtures/when.go b/test/fixtures/when.go index 35295d6dfb..7ab85ae772 100644 --- a/test/fixtures/when.go +++ b/test/fixtures/when.go @@ -31,14 +31,16 @@ import ( ) type When struct { - t *testing.T - isbSvcClient flowpkg.InterStepBufferServiceInterface - pipelineClient flowpkg.PipelineInterface - vertexClient flowpkg.VertexInterface - isbSvc *dfv1.InterStepBufferService - pipeline *dfv1.Pipeline - restConfig *rest.Config - kubeClient kubernetes.Interface + t *testing.T + isbSvcClient flowpkg.InterStepBufferServiceInterface + pipelineClient flowpkg.PipelineInterface + vertexClient flowpkg.VertexInterface + monoVertexClient flowpkg.MonoVertexInterface + isbSvc *dfv1.InterStepBufferService + pipeline *dfv1.Pipeline + monoVertex *dfv1.MonoVertex + restConfig *rest.Config + kubeClient kubernetes.Interface portForwarderStopChannels map[string]chan struct{} streamLogsStopChannels map[string]chan struct{} @@ -101,6 +103,26 @@ func (w *When) CreatePipelineAndWait() *When { return w } +func (w *When) CreateMonoVertexAndWait() *When { + w.t.Helper() + if w.monoVertex == nil { + w.t.Fatal("No MonoVertex to create") + } + w.t.Log("Creating MonoVertex", w.monoVertex.Name) + ctx := context.Background() + i, err := w.monoVertexClient.Create(ctx, w.monoVertex, metav1.CreateOptions{}) + if err != nil { + w.t.Fatal(err) + } else { + w.monoVertex = i + } + // wait + if err := WaitForMonoVertexRunning(ctx, w.monoVertexClient, w.monoVertex.Name, defaultTimeout); err != nil { + w.t.Fatal(err) + } + return w +} + func (w *When) DeletePipelineAndWait() *When { w.t.Helper() if w.pipeline == nil { @@ -133,6 +155,38 @@ func (w *When) DeletePipelineAndWait() *When { } } +func (w *When) DeleteMonoVertexAndWait() *When { + w.t.Helper() + if w.monoVertex == nil { + w.t.Fatal("No MonoVertex to delete") + } + w.t.Log("Deleting MonoVertex", w.monoVertex.Name) + ctx := context.Background() + if err := w.monoVertexClient.Delete(ctx, w.monoVertex.Name, metav1.DeleteOptions{}); err != nil { + w.t.Fatal(err) + } + + timeout := defaultTimeout + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + labelSelector := fmt.Sprintf("%s=%s", dfv1.KeyMonoVertexName, w.monoVertex.Name) + for { + select { + case <-ctx.Done(): + w.t.Fatalf("Timeout after %v waiting for mono vertex pods terminating", timeout) + default: + } + podList, err := w.kubeClient.CoreV1().Pods(Namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + w.t.Fatalf("Error getting mono vertex pods: %v", err) + } + if len(podList.Items) == 0 { + return w + } + time.Sleep(2 * time.Second) + } +} + func (w *When) WaitForISBSvcReady() *When { w.t.Helper() ctx := context.Background() @@ -281,26 +335,30 @@ func (w *When) Exec(name string, args []string, block func(t *testing.T, output func (w *When) Given() *Given { return &Given{ - t: w.t, - isbSvcClient: w.isbSvcClient, - pipelineClient: w.pipelineClient, - vertexClient: w.vertexClient, - isbSvc: w.isbSvc, - pipeline: w.pipeline, - restConfig: w.restConfig, - kubeClient: w.kubeClient, + t: w.t, + isbSvcClient: w.isbSvcClient, + pipelineClient: w.pipelineClient, + vertexClient: w.vertexClient, + monoVertexClient: w.monoVertexClient, + isbSvc: w.isbSvc, + pipeline: w.pipeline, + monoVertex: w.monoVertex, + restConfig: w.restConfig, + kubeClient: w.kubeClient, } } func (w *When) Expect() *Expect { return &Expect{ - t: w.t, - isbSvcClient: w.isbSvcClient, - pipelineClient: w.pipelineClient, - vertexClient: w.vertexClient, - isbSvc: w.isbSvc, - pipeline: w.pipeline, - restConfig: w.restConfig, - kubeClient: w.kubeClient, + t: w.t, + isbSvcClient: w.isbSvcClient, + pipelineClient: w.pipelineClient, + vertexClient: w.vertexClient, + monoVertexClient: w.monoVertexClient, + isbSvc: w.isbSvc, + pipeline: w.pipeline, + monoVertex: w.monoVertex, + restConfig: w.restConfig, + kubeClient: w.kubeClient, } } diff --git a/test/http-e2e/http_test.go b/test/http-e2e/http_test.go index ce46e23aef..a5e7058dee 100644 --- a/test/http-e2e/http_test.go +++ b/test/http-e2e/http_test.go @@ -47,16 +47,16 @@ func (s *HTTPSuite) TestHTTPSourcePipeline() { w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("no-id"))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("no-id"))) // No x-numaflow-id, expect 2 outputs - w.Expect().SinkContains("out", "no-id", SinkCheckWithContainCount(2)) + w.Expect().RedisSinkContains("http-source-out", "no-id", SinkCheckWithContainCount(2)) w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("with-id")).WithHeader("x-numaflow-id", "101")). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("with-id")).WithHeader("x-numaflow-id", "101")) // With same x-numaflow-id, expect 1 output - w.Expect().SinkContains("out", "with-id", SinkCheckWithContainCount(1)) + w.Expect().RedisSinkContains("http-source-out", "with-id", SinkCheckWithContainCount(1)) w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("with-id")).WithHeader("x-numaflow-id", "102")) // With a new x-numaflow-id, expect 2 outputs - w.Expect().SinkContains("out", "with-id", SinkCheckWithContainCount(2)) + w.Expect().RedisSinkContains("http-source-out", "with-id", SinkCheckWithContainCount(2)) } func (s *HTTPSuite) TestHTTPSourceAuthPipeline() { @@ -71,8 +71,8 @@ func (s *HTTPSuite) TestHTTPSourceAuthPipeline() { w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("no-auth"))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("with-auth")).WithHeader("Authorization", "Bearer faketoken")) - w.Expect().SinkContains("out", "with-auth") - w.Expect().SinkNotContains("out", "no-auth") + w.Expect().RedisSinkContains("http-auth-source-out", "with-auth") + w.Expect().RedisSinkNotContains("http-auth-source-out", "no-auth") } func TestHTTPSuite(t *testing.T) { diff --git a/test/http-e2e/testdata/http-source-with-auth.yaml b/test/http-e2e/testdata/http-source-with-auth.yaml index a96cd3b58e..4b27a58cee 100644 --- a/test/http-e2e/testdata/http-source-with-auth.yaml +++ b/test/http-e2e/testdata/http-source-with-auth.yaml @@ -18,6 +18,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "http-auth-source-out" edges: - from: in to: out diff --git a/test/http-e2e/testdata/http-source.yaml b/test/http-e2e/testdata/http-source.yaml index 3f81fc1902..c67d366ddb 100644 --- a/test/http-e2e/testdata/http-source.yaml +++ b/test/http-e2e/testdata/http-source.yaml @@ -17,6 +17,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "http-source-out" edges: - from: in to: out diff --git a/test/idle-source-e2e/idle_source_test.go b/test/idle-source-e2e/idle_source_test.go index faf5be1f00..ef5e3eff98 100644 --- a/test/idle-source-e2e/idle_source_test.go +++ b/test/idle-source-e2e/idle_source_test.go @@ -87,8 +87,8 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithHttpSource() { // since the key can be even or odd and the window duration is 10s // the sum should be 20(for even) and 40(for odd) w.Expect(). - SinkContains("sink", "20", SinkCheckWithTimeout(300*time.Second)). - SinkContains("sink", "40", SinkCheckWithTimeout(300*time.Second)) + RedisSinkContains("http-idle-source-sink", "20", SinkCheckWithTimeout(300*time.Second)). + RedisSinkContains("http-idle-source-sink", "40", SinkCheckWithTimeout(300*time.Second)) done <- struct{}{} } @@ -138,9 +138,9 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithKafkaSource() { }() // since the window duration is 10 second, so the count of event will be 20, when sending data to both partitions. - w.Expect().SinkContains("sink", "20", SinkCheckWithTimeout(300*time.Second)) + w.Expect().RedisSinkContains("kafka-idle-source-sink", "20", SinkCheckWithTimeout(300*time.Second)) // since the window duration is 10 second, so the count of event will be 10, when sending data to only one partition. - w.Expect().SinkContains("sink", "10", SinkCheckWithTimeout(300*time.Second)) + w.Expect().RedisSinkContains("kafka-idle-source-sink", "10", SinkCheckWithTimeout(300*time.Second)) done <- struct{}{} } diff --git a/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml b/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml index 68bd2f15b0..2fc2a0a89e 100644 --- a/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml +++ b/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml @@ -50,6 +50,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "http-idle-source-sink" edges: - from: in to: atoi diff --git a/test/idle-source-e2e/testdata/kafka-pipeline.yaml b/test/idle-source-e2e/testdata/kafka-pipeline.yaml index fdc4e33bf0..3af968f58d 100644 --- a/test/idle-source-e2e/testdata/kafka-pipeline.yaml +++ b/test/idle-source-e2e/testdata/kafka-pipeline.yaml @@ -53,6 +53,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "kafka-idle-source-sink" edges: - from: kafka-in to: count-event diff --git a/test/jetstream-e2e/jetstream_test.go b/test/jetstream-e2e/jetstream_test.go index 40687c1f17..06aa24d05a 100644 --- a/test/jetstream-e2e/jetstream_test.go +++ b/test/jetstream-e2e/jetstream_test.go @@ -21,8 +21,9 @@ package jetstream_e2e import ( "testing" - "github.com/numaproj/numaflow/test/fixtures" "github.com/stretchr/testify/suite" + + "github.com/numaproj/numaflow/test/fixtures" ) //go:generate kubectl -n numaflow-system delete statefulset nats --ignore-not-found=true @@ -47,7 +48,7 @@ func (ns *JetstreamSuite) TestJetstreamSource() { // wait for all the pods to come up w.Expect().VertexPodsRunning() - w.Expect().SinkContains("out", msgPayload, fixtures.SinkCheckWithContainCount(100)) + w.Expect().RedisSinkContains("jetstream-source-e2e-out", msgPayload, fixtures.SinkCheckWithContainCount(100)) } func TestJetstreamSuite(t *testing.T) { diff --git a/test/jetstream-e2e/testdata/jetstream-source-pipeline.yaml b/test/jetstream-e2e/testdata/jetstream-source-pipeline.yaml index 1f01be02db..c1a3766530 100644 --- a/test/jetstream-e2e/testdata/jetstream-source-pipeline.yaml +++ b/test/jetstream-e2e/testdata/jetstream-source-pipeline.yaml @@ -26,6 +26,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "jetstream-source-e2e-out" edges: - from: in to: p1 diff --git a/test/monovertex-e2e/monovertex_test.go b/test/monovertex-e2e/monovertex_test.go new file mode 100644 index 0000000000..51d9135c56 --- /dev/null +++ b/test/monovertex-e2e/monovertex_test.go @@ -0,0 +1,32 @@ +package monovertex_e2e + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + . "github.com/numaproj/numaflow/test/fixtures" +) + +type MonoVertexSuite struct { + E2ESuite +} + +func (s *MonoVertexSuite) TestMonoVertexWithTransformer() { + w := s.Given().MonoVertex("@testdata/mono-vertex-with-transformer.yaml"). + When().CreateMonoVertexAndWait() + defer w.DeleteMonoVertexAndWait() + + w.Expect().MonoVertexPodsRunning() + + // Expect the messages to be processed by the transformer. + w.Expect().MonoVertexPodLogContains("AssignEventTime", PodLogCheckOptionWithContainer("transformer")) + + // Expect the messages to reach the sink. + w.Expect().RedisSinkContains("transformer-mono-vertex", "199") + w.Expect().RedisSinkContains("transformer-mono-vertex", "200") +} + +func TestMonoVertexSuite(t *testing.T) { + suite.Run(t, new(MonoVertexSuite)) +} diff --git a/test/monovertex-e2e/testdata/mono-vertex-with-transformer.yaml b/test/monovertex-e2e/testdata/mono-vertex-with-transformer.yaml new file mode 100644 index 0000000000..66f1bb34bd --- /dev/null +++ b/test/monovertex-e2e/testdata/mono-vertex-with-transformer.yaml @@ -0,0 +1,24 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: MonoVertex +metadata: + name: transformer-mono-vertex +spec: + source: + udsource: + container: + image: quay.io/numaio/numaflow-go/source-simple-source:stable + imagePullPolicy: Always + transformer: + container: + image: quay.io/numaio/numaflow-go/mapt-assign-event-time:stable + imagePullPolicy: Always + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink + image: quay.io/numaio/numaflow-go/redis-sink:stable + imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # Use the name of the mono vertex as the key + value: "transformer-mono-vertex" \ No newline at end of file diff --git a/test/nats-e2e/nats_test.go b/test/nats-e2e/nats_test.go index 504432214e..72d2c70e99 100644 --- a/test/nats-e2e/nats_test.go +++ b/test/nats-e2e/nats_test.go @@ -45,7 +45,7 @@ func (ns *NatsSuite) TestNatsSource() { w.Expect().VertexPodsRunning() PumpNatsSubject(subject, 100, 20*time.Millisecond, 10, "test-message") - w.Expect().SinkContains("out", "test-message", SinkCheckWithContainCount(100)) + w.Expect().RedisSinkContains("nats-source-e2e-out", "test-message", SinkCheckWithContainCount(100)) } func TestNatsSuite(t *testing.T) { diff --git a/test/nats-e2e/testdata/nats-source-pipeline.yaml b/test/nats-e2e/testdata/nats-source-pipeline.yaml index bab98eeb52..3dd0d965df 100644 --- a/test/nats-e2e/testdata/nats-source-pipeline.yaml +++ b/test/nats-e2e/testdata/nats-source-pipeline.yaml @@ -27,6 +27,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "nats-source-e2e-out" edges: - from: in to: p1 diff --git a/test/reduce-one-e2e/reduce_one_test.go b/test/reduce-one-e2e/reduce_one_test.go index 6fa78f92f1..4cf592e1cf 100644 --- a/test/reduce-one-e2e/reduce_one_test.go +++ b/test/reduce-one-e2e/reduce_one_test.go @@ -72,8 +72,8 @@ func (r *ReduceSuite) TestSimpleKeyedReducePipeline() { // since the key can be even or odd and the window duration is 10s // the sum should be 20(for even) and 40(for odd) w.Expect(). - SinkContains("sink", "40"). - SinkContains("sink", "20") + RedisSinkContains("simple-sum-sink", "40"). + RedisSinkContains("simple-sum-sink", "20") done <- struct{}{} } @@ -117,7 +117,7 @@ func (r *ReduceSuite) TestSimpleNonKeyedReducePipeline() { // since there is no key, all the messages will be assigned to same window // the sum should be 60(since the window is 10s) - w.Expect().SinkContains("sink", "60") + w.Expect().RedisSinkContains("reduce-sum-sink", "60") done <- struct{}{} } @@ -161,7 +161,7 @@ func (r *ReduceSuite) TestComplexReducePipelineKeyedNonKeyed() { // since the key can be even or odd and the first window duration is 10s(which is keyed) // and the second window duration is 60s(non-keyed) // the sum should be 180(60 + 120) - w.Expect().SinkContains("sink", "180") + w.Expect().RedisSinkContains("complex-sum-sink", "180") done <- struct{}{} } @@ -219,10 +219,10 @@ func (r *ReduceSuite) TestSimpleReducePipelineFailOverUsingWAL() { }() w.Expect(). - SinkContains("sink", "38"). - SinkContains("sink", "76"). - SinkContains("sink", "120"). - SinkContains("sink", "240") + RedisSinkContains("even-odd-sum-sink", "38"). + RedisSinkContains("even-odd-sum-sink", "76"). + RedisSinkContains("even-odd-sum-sink", "120"). + RedisSinkContains("even-odd-sum-sink", "240") done <- struct{}{} } @@ -280,12 +280,12 @@ func (r *ReduceSuite) TestComplexSlidingWindowPipeline() { // we only have to extend the timeout for the first output to be produced. for the rest, // we just need to wait for the default timeout for the rest of the outputs since its synchronous w.Expect(). - SinkContains("sink", "30"). - SinkContains("sink", "60"). - SinkNotContains("sink", "80"). - SinkContains("sink", "90"). - SinkContains("sink", "180"). - SinkNotContains("sink", "210") + RedisSinkContains("complex-sliding-sum-sink", "30"). + RedisSinkContains("complex-sliding-sum-sink", "60"). + RedisSinkNotContains("complex-sliding-sum-sink", "80"). + RedisSinkContains("complex-sliding-sum-sink", "90"). + RedisSinkContains("complex-sliding-sum-sink", "180"). + RedisSinkNotContains("complex-sliding-sum-sink", "210") done <- struct{}{} } diff --git a/test/reduce-one-e2e/testdata/complex-reduce-pipeline.yaml b/test/reduce-one-e2e/testdata/complex-reduce-pipeline.yaml index 0c745be3b6..d7a56c5f0e 100644 --- a/test/reduce-one-e2e/testdata/complex-reduce-pipeline.yaml +++ b/test/reduce-one-e2e/testdata/complex-reduce-pipeline.yaml @@ -57,6 +57,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "complex-sum-sink" edges: - from: in to: atoi diff --git a/test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml b/test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml index 2eafdd6f85..8c19e22d72 100644 --- a/test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml +++ b/test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml @@ -89,6 +89,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "complex-sliding-sum-sink" edges: - from: in to: atoi diff --git a/test/reduce-one-e2e/testdata/simple-keyed-reduce-pipeline.yaml b/test/reduce-one-e2e/testdata/simple-keyed-reduce-pipeline.yaml index 51e62d0613..25e1976331 100644 --- a/test/reduce-one-e2e/testdata/simple-keyed-reduce-pipeline.yaml +++ b/test/reduce-one-e2e/testdata/simple-keyed-reduce-pipeline.yaml @@ -46,6 +46,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "simple-sum-sink" edges: - from: in to: atoi diff --git a/test/reduce-one-e2e/testdata/simple-non-keyed-reduce-pipeline.yaml b/test/reduce-one-e2e/testdata/simple-non-keyed-reduce-pipeline.yaml index 65b80c2dc0..d5a932a7c5 100644 --- a/test/reduce-one-e2e/testdata/simple-non-keyed-reduce-pipeline.yaml +++ b/test/reduce-one-e2e/testdata/simple-non-keyed-reduce-pipeline.yaml @@ -45,6 +45,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "reduce-sum-sink" edges: - from: in to: atoi diff --git a/test/reduce-one-e2e/testdata/simple-reduce-pipeline-wal.yaml b/test/reduce-one-e2e/testdata/simple-reduce-pipeline-wal.yaml index e77479eaad..25a6770a5d 100644 --- a/test/reduce-one-e2e/testdata/simple-reduce-pipeline-wal.yaml +++ b/test/reduce-one-e2e/testdata/simple-reduce-pipeline-wal.yaml @@ -44,6 +44,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "even-odd-sum-sink" edges: - from: in to: atoi diff --git a/test/reduce-two-e2e/reduce_two_test.go b/test/reduce-two-e2e/reduce_two_test.go index 3af75820fd..7a1815f2c2 100644 --- a/test/reduce-two-e2e/reduce_two_test.go +++ b/test/reduce-two-e2e/reduce_two_test.go @@ -83,9 +83,9 @@ func (r *ReduceSuite) testReduceStream(lang string) { // The reduce stream application summarizes the input messages and returns the sum when the sum is greater than 100. // Since we are sending 3s, the first returned message should be 102. // There should be no other values. - w.Expect().SinkContains("sink", "102") - w.Expect().SinkNotContains("sink", "99") - w.Expect().SinkNotContains("sink", "105") + w.Expect().RedisSinkContains(pipelineName+"-sink", "102") + w.Expect().RedisSinkNotContains(pipelineName+"-sink", "99") + w.Expect().RedisSinkNotContains(pipelineName+"sink", "105") done <- struct{}{} } @@ -132,7 +132,7 @@ func (r *ReduceSuite) TestSimpleSessionPipeline() { } }() - w.Expect().SinkContains("sink", "1000") + w.Expect().RedisSinkContains("simple-session-sum-sink", "1000") done <- struct{}{} } @@ -190,11 +190,11 @@ func (r *ReduceSuite) testSimpleSessionKeyedPipeline(lang string) { } }() - w.Expect().SinkContains("sink", "5") - w.Expect().SinkNotContains("sink", "4", SinkCheckWithTimeout(20*time.Second)) - w.Expect().SinkNotContains("sink", "3", SinkCheckWithTimeout(20*time.Second)) - w.Expect().SinkNotContains("sink", "2", SinkCheckWithTimeout(20*time.Second)) - w.Expect().SinkNotContains("sink", "1", SinkCheckWithTimeout(20*time.Second)) + w.Expect().RedisSinkContains(pipelineName+"-sink", "5") + w.Expect().RedisSinkNotContains(pipelineName+"-sink", "4", SinkCheckWithTimeout(20*time.Second)) + w.Expect().RedisSinkNotContains(pipelineName+"-sink", "3", SinkCheckWithTimeout(20*time.Second)) + w.Expect().RedisSinkNotContains(pipelineName+"-sink", "2", SinkCheckWithTimeout(20*time.Second)) + w.Expect().RedisSinkNotContains(pipelineName+"-sink", "1", SinkCheckWithTimeout(20*time.Second)) done <- struct{}{} } @@ -255,11 +255,11 @@ func (r *ReduceSuite) TestSimpleSessionPipelineFailOverUsingWAL() { }() w.Expect(). - SinkContains("sink", "5"). - SinkNotContains("sink", "4", SinkCheckWithTimeout(20*time.Second)). - SinkNotContains("sink", "3", SinkCheckWithTimeout(20*time.Second)). - SinkNotContains("sink", "2", SinkCheckWithTimeout(20*time.Second)). - SinkNotContains("sink", "1", SinkCheckWithTimeout(20*time.Second)) + RedisSinkContains("simple-session-counter-go-sink", "5"). + RedisSinkNotContains("simple-session-counter-go-sink", "4", SinkCheckWithTimeout(20*time.Second)). + RedisSinkNotContains("simple-session-counter-go-sink", "3", SinkCheckWithTimeout(20*time.Second)). + RedisSinkNotContains("simple-session-counter-go-sink", "2", SinkCheckWithTimeout(20*time.Second)). + RedisSinkNotContains("simple-session-counter-go-sink", "1", SinkCheckWithTimeout(20*time.Second)) done <- struct{}{} } diff --git a/test/reduce-two-e2e/testdata/reduce-stream/reduce-stream-go.yaml b/test/reduce-two-e2e/testdata/reduce-stream/reduce-stream-go.yaml index 250c6c126f..d80138366e 100644 --- a/test/reduce-two-e2e/testdata/reduce-stream/reduce-stream-go.yaml +++ b/test/reduce-two-e2e/testdata/reduce-stream/reduce-stream-go.yaml @@ -42,6 +42,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "reduce-stream-go-sink" edges: - from: in to: atoi diff --git a/test/reduce-two-e2e/testdata/reduce-stream/reduce-stream-java.yaml b/test/reduce-two-e2e/testdata/reduce-stream/reduce-stream-java.yaml index ebdd2b4c8a..b94e7fd649 100644 --- a/test/reduce-two-e2e/testdata/reduce-stream/reduce-stream-java.yaml +++ b/test/reduce-two-e2e/testdata/reduce-stream/reduce-stream-java.yaml @@ -42,6 +42,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "reduce-stream-java-sink" edges: - from: in to: atoi diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml index ff2ab4a2ee..a2d88efeb6 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-go.yaml @@ -43,6 +43,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "simple-session-counter-go-sink" edges: - from: in to: even-odd diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml index 91a9d168ed..f468710f03 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-keyed-counter-pipeline-java.yaml @@ -48,6 +48,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "simple-session-counter-java-sink" edges: - from: in to: even-odd diff --git a/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml b/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml index 57f8cdadf4..6f1259e085 100644 --- a/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml +++ b/test/reduce-two-e2e/testdata/session-reduce/simple-session-sum-pipeline.yaml @@ -36,6 +36,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "simple-session-sum-sink" edges: - from: in to: compute-sum diff --git a/test/sideinputs-e2e/sideinput-e2e_sink_source_test.go b/test/sideinputs-e2e/sideinput-e2e_sink_source_test.go index dfec57977c..be95e4c3f5 100644 --- a/test/sideinputs-e2e/sideinput-e2e_sink_source_test.go +++ b/test/sideinputs-e2e/sideinput-e2e_sink_source_test.go @@ -40,16 +40,14 @@ func (s *SideInputUDSSuite) setUpTests(pipeLineFile string) *When { } func (s *SideInputUDSSuite) TestSinkWithSideInput() { - // the side inputs feature is not supported with redis ISBSVC if strings.ToUpper(os.Getenv("ISBSVC")) == "REDIS" { s.T().SkipNow() } - w := s.setUpTests("@testdata/sideinput_sink.yaml") + w := s.setUpTests("@testdata/sideinput-sink.yaml") defer w.DeletePipelineAndWait() - w.Expect().SinkContains("redis-uds", "e2e-even", SinkCheckWithTimeout(2*time.Minute)) - + w.Expect().RedisSinkContains("sideinput-sink-test-redis-uds", "e2e-even", SinkCheckWithTimeout(2*time.Minute)) } func (s *SideInputUDSSuite) TestSourceWithSideInput() { @@ -59,9 +57,9 @@ func (s *SideInputUDSSuite) TestSourceWithSideInput() { s.T().SkipNow() } - w := s.setUpTests("@testdata/sideinput_source.yaml") + w := s.setUpTests("@testdata/sideinput-source.yaml") defer w.DeletePipelineAndWait() - w.Expect().SinkContains("redis-uds", "e2e-even", SinkCheckWithTimeout(2*time.Minute)) + w.Expect().RedisSinkContains("sideinput-source-test-redis-uds", "e2e-even", SinkCheckWithTimeout(2*time.Minute)) } diff --git a/test/sideinputs-e2e/sideinput_test.go b/test/sideinputs-e2e/sideinput_test.go index 1b4c18b2ac..8f1e156926 100644 --- a/test/sideinputs-e2e/sideinput_test.go +++ b/test/sideinputs-e2e/sideinput_test.go @@ -70,8 +70,8 @@ func (s *SideInputSuite) TestSimpleMapSideInputPipeline() { }() // map-even-data and map-odd-data message is generated based on map and side input data. - w.Expect().SinkContains("sink", "map-even-data") - w.Expect().SinkContains("sink", "map-odd-data") + w.Expect().RedisSinkContains("map-sideinput-pipeline-sink", "map-even-data") + w.Expect().RedisSinkContains("map-sideinput-pipeline-sink", "map-odd-data") done <- struct{}{} } @@ -114,7 +114,7 @@ func (s *SideInputSuite) TestSimpleReduceSideInputPipeline() { }() // here reduce-side-input text is generated based on reduce and side input data. - w.Expect().SinkContains("sink", "reduce_sideinput") + w.Expect().RedisSinkContains("reduce-sideinput-pipeline-sink", "reduce_sideinput") done <- struct{}{} } diff --git a/test/sideinputs-e2e/testdata/map-sideinput-pipeline.yaml b/test/sideinputs-e2e/testdata/map-sideinput-pipeline.yaml index 30079de488..cb3ea152ed 100644 --- a/test/sideinputs-e2e/testdata/map-sideinput-pipeline.yaml +++ b/test/sideinputs-e2e/testdata/map-sideinput-pipeline.yaml @@ -32,6 +32,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "map-sideinput-pipeline-sink" edges: - from: in to: si-e2e diff --git a/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml b/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml index b5f253c0fe..028a20125e 100644 --- a/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml +++ b/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml @@ -49,6 +49,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "reduce-sideinput-pipeline-sink" edges: - from: in to: atoi diff --git a/test/sideinputs-e2e/testdata/sideinput_sink.yaml b/test/sideinputs-e2e/testdata/sideinput-sink.yaml similarity index 89% rename from test/sideinputs-e2e/testdata/sideinput_sink.yaml rename to test/sideinputs-e2e/testdata/sideinput-sink.yaml index a9cd439d4a..f172b4ec31 100644 --- a/test/sideinputs-e2e/testdata/sideinput_sink.yaml +++ b/test/sideinputs-e2e/testdata/sideinput-sink.yaml @@ -26,8 +26,11 @@ spec: udsink: container: # see https://github.com/numaproj/numaflow-go/tree/main/pkg/sideinput/examples/sink_sideinput - image: quay.io/numaio/numaflow-go/redis-sink-with-sideinput:stable + image: quay.io/numaio/numaflow-go/redis-sink-with-sideinput:hash imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + value: "sideinput-sink-test-redis-uds" sideInputs: - myticker diff --git a/test/sideinputs-e2e/testdata/sideinput_source.yaml b/test/sideinputs-e2e/testdata/sideinput-source.yaml similarity index 91% rename from test/sideinputs-e2e/testdata/sideinput_source.yaml rename to test/sideinputs-e2e/testdata/sideinput-source.yaml index 62754d05b1..6c21edd717 100644 --- a/test/sideinputs-e2e/testdata/sideinput_source.yaml +++ b/test/sideinputs-e2e/testdata/sideinput-source.yaml @@ -31,6 +31,9 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + value: "sideinput-source-test-redis-uds" sideInputs: - myticker edges: diff --git a/test/transformer-e2e/testdata/source-filtering.yaml b/test/transformer-e2e/testdata/source-filtering.yaml index 670e9f87fd..549fddec63 100644 --- a/test/transformer-e2e/testdata/source-filtering.yaml +++ b/test/transformer-e2e/testdata/source-filtering.yaml @@ -19,6 +19,10 @@ spec: # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink image: quay.io/numaio/numaflow-go/redis-sink:stable imagePullPolicy: Always + env: + - name: SINK_HASH_KEY + # The key is set in the format of "pipeline_name-vertex_name" + value: "source-filtering-out" edges: - from: in to: out diff --git a/test/transformer-e2e/transformer_test.go b/test/transformer-e2e/transformer_test.go index 8f0a226c79..888b3ee3fb 100644 --- a/test/transformer-e2e/transformer_test.go +++ b/test/transformer-e2e/transformer_test.go @@ -61,11 +61,11 @@ func (s *TransformerSuite) TestSourceFiltering() { SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(expect3))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(expect4))) - w.Expect().SinkContains("out", expect3) - w.Expect().SinkContains("out", expect4) - w.Expect().SinkNotContains("out", expect0) - w.Expect().SinkNotContains("out", expect1) - w.Expect().SinkNotContains("out", expect2) + w.Expect().RedisSinkContains("source-filtering-out", expect3) + w.Expect().RedisSinkContains("source-filtering-out", expect4) + w.Expect().RedisSinkNotContains("source-filtering-out", expect0) + w.Expect().RedisSinkNotContains("source-filtering-out", expect1) + w.Expect().RedisSinkNotContains("source-filtering-out", expect2) } func (s *TransformerSuite) TestTimeExtractionFilter() { @@ -84,7 +84,7 @@ func (s *TransformerSuite) TestTimeExtractionFilter() { testMsgTwo := `{"id": 101, "msg": "test", "time": "2021-01-18T21:54:42.123Z", "desc": "A bad ID."}` w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgTwo))) - w.Expect().SinkNotContains("out", testMsgTwo) + w.Expect().RedisSinkNotContains("time-extraction-filter-out", testMsgTwo) } func (s *TransformerSuite) TestBuiltinEventTimeExtractor() {