From 639a936438e0ee8312d5c0534c5c3ddd46ecd24c Mon Sep 17 00:00:00 2001 From: Chandan Kumar Date: Wed, 25 Oct 2023 04:33:04 +0530 Subject: [PATCH] feat: Add e2e test for map sideinput, Fixes #1192 (#1211) Signed-off-by: Chandan Kumar --- .github/workflows/ci.yaml | 4 +- Makefile | 1 + test/sideinput-e2e/sideinput_test.go | 110 ++++++++++++++++++ .../testdata/map-sideinput-pipeline.yaml | 39 +++++++ .../testdata/reduce-sideinput-pipeline.yaml | 57 +++++++++ 5 files changed, 209 insertions(+), 2 deletions(-) create mode 100644 test/sideinput-e2e/sideinput_test.go create mode 100644 test/sideinput-e2e/testdata/map-sideinput-pipeline.yaml create mode 100644 test/sideinput-e2e/testdata/reduce-sideinput-pipeline.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c39d8d170f..578352ed55 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -137,10 +137,10 @@ jobs: timeout-minutes: 20 strategy: fail-fast: false - max-parallel: 10 + max-parallel: 11 matrix: driver: [jetstream] - case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e] + case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinput-e2e] steps: - name: Checkout code uses: actions/checkout@v3 diff --git a/Makefile b/Makefile index d0dd17df3f..5f27d4fad2 100644 --- a/Makefile +++ b/Makefile @@ -116,6 +116,7 @@ test-api-e2e: test-udsource-e2e: test-transformer-e2e: test-diamond-e2e: +test-sideinput-e2e: test-%: $(MAKE) cleanup-e2e $(MAKE) image e2eapi-image diff --git a/test/sideinput-e2e/sideinput_test.go b/test/sideinput-e2e/sideinput_test.go new file mode 100644 index 0000000000..98d96d43ad --- /dev/null +++ b/test/sideinput-e2e/sideinput_test.go @@ -0,0 +1,110 @@ +//go:build test + +/* +Copyright 2022 The Numaproj Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sideinput_e2e + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + . "github.com/numaproj/numaflow/test/fixtures" +) + +type SideInputSuite struct { + E2ESuite +} + +func (s *SideInputSuite) TestSimpleMapSideInputPipeline() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + w := s.Given().Pipeline("@testdata/map-sideinput-pipeline.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "map-sideinput-pipeline" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + done := make(chan struct{}) + go func() { + // publish messages to source vertex, with event time starting from 60000 + startTime := 60000 + for i := 0; true; i++ { + select { + case <-ctx.Done(): + return + case <-done: + return + default: + eventTime := strconv.Itoa(startTime + i*1000) + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("map")).WithHeader("X-Numaflow-Event-Time", eventTime)) + } + } + }() + + // map-even-data and map-odd-data message is generated based on map and side input data. + w.Expect().SinkContains("sink", "map-even-data") + w.Expect().SinkContains("sink", "map-odd-data") + + done <- struct{}{} +} + +func (s *SideInputSuite) TestSimpleReduceSideInputPipeline() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + w := s.Given().Pipeline("@testdata/reduce-sideinput-pipeline.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "reduce-sideinput-pipeline" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + done := make(chan struct{}) + go func() { + // publish messages to source vertex, with event time starting from 60000 + startTime := 60000 + for i := 0; true; i++ { + select { + case <-ctx.Done(): + return + case <-done: + return + default: + eventTime := strconv.Itoa(startTime + i*1000) + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("x-numaflow-event-time", eventTime)). + SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("x-numaflow-event-time", eventTime)). + SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("x-numaflow-event-time", eventTime)) + } + } + }() + + // here reduce-side-input text is generated based on reduce and side input data. + w.Expect().SinkContains("sink", "reduce-side-input") + + done <- struct{}{} +} + +func TestSideInputSuite(t *testing.T) { + suite.Run(t, new(SideInputSuite)) +} diff --git a/test/sideinput-e2e/testdata/map-sideinput-pipeline.yaml b/test/sideinput-e2e/testdata/map-sideinput-pipeline.yaml new file mode 100644 index 0000000000..ba1a14d379 --- /dev/null +++ b/test/sideinput-e2e/testdata/map-sideinput-pipeline.yaml @@ -0,0 +1,39 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: map-sideinput-pipeline +spec: + sideInputs: + - name: myticker + container: + # A map side input example , see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/map-side-input + image: "quay.io/numaio/numaproj-contrib/e2e-map-sideinput-example:v0.0.2" + imagePullPolicy: Always + trigger: + schedule: "@every 5s" + vertices: + - name: in + source: + http: {} + - name: si-e2e + udf: + container: + # A map side input udf , see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/map-side-input/udf + image: "quay.io/numaio/numaproj-contrib/e2e-map-sideinput-udf:v0.0.2" + imagePullPolicy: Always + sideInputs: + - myticker + - name: sink + scale: + min: 1 + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:v0.5.0 + imagePullPolicy: Always + edges: + - from: in + to: si-e2e + - from: si-e2e + to: sink \ No newline at end of file diff --git a/test/sideinput-e2e/testdata/reduce-sideinput-pipeline.yaml b/test/sideinput-e2e/testdata/reduce-sideinput-pipeline.yaml new file mode 100644 index 0000000000..c3c8281307 --- /dev/null +++ b/test/sideinput-e2e/testdata/reduce-sideinput-pipeline.yaml @@ -0,0 +1,57 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: reduce-sideinput-pipeline +spec: + sideInputs: + - name: myticker + container: + # A reduce side input, see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/reduce-side-input + image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-example:v0.0.2" + imagePullPolicy: Always + trigger: + schedule: "@every 5s" + vertices: + - name: in + source: + http: {} + - name: atoi + scale: + min: 1 + udf: + container: + # Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/even_odd + image: quay.io/numaio/numaflow-go/map-even-odd:v0.5.0 + imagePullPolicy: Always + - name: si-e2e + udf: + container: + # A reduce side input udf, see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/reduce-side-input/udf + image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-udf:v0.0.2" + imagePullPolicy: Always + groupBy: + window: + fixed: + length: 10s + keyed: true + storage: + persistentVolumeClaim: + volumeSize: 10Gi + accessMode: ReadWriteOnce + sideInputs: + - myticker + - name: sink + scale: + min: 1 + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:v0.5.0 + edges: + - from: in + to: atoi + - from: atoi + to: si-e2e + - from: si-e2e + to: sink \ No newline at end of file