Skip to content

Commit

Permalink
feat: Add e2e test for map sideinput, Fixes #1192 (#1211)
Browse files Browse the repository at this point in the history
Signed-off-by: Chandan Kumar <[email protected]>
  • Loading branch information
chandankumar4 authored Oct 24, 2023
1 parent 7b32af3 commit 639a936
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 2 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ jobs:
timeout-minutes: 20
strategy:
fail-fast: false
max-parallel: 10
max-parallel: 11
matrix:
driver: [jetstream]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinput-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ test-api-e2e:
test-udsource-e2e:
test-transformer-e2e:
test-diamond-e2e:
test-sideinput-e2e:
test-%:
$(MAKE) cleanup-e2e
$(MAKE) image e2eapi-image
Expand Down
110 changes: 110 additions & 0 deletions test/sideinput-e2e/sideinput_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sideinput_e2e

import (
"context"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/suite"

. "github.com/numaproj/numaflow/test/fixtures"
)

type SideInputSuite struct {
E2ESuite
}

func (s *SideInputSuite) TestSimpleMapSideInputPipeline() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
w := s.Given().Pipeline("@testdata/map-sideinput-pipeline.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "map-sideinput-pipeline"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

done := make(chan struct{})
go func() {
// publish messages to source vertex, with event time starting from 60000
startTime := 60000
for i := 0; true; i++ {
select {
case <-ctx.Done():
return
case <-done:
return
default:
eventTime := strconv.Itoa(startTime + i*1000)
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("map")).WithHeader("X-Numaflow-Event-Time", eventTime))
}
}
}()

// map-even-data and map-odd-data message is generated based on map and side input data.
w.Expect().SinkContains("sink", "map-even-data")
w.Expect().SinkContains("sink", "map-odd-data")

done <- struct{}{}
}

func (s *SideInputSuite) TestSimpleReduceSideInputPipeline() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
w := s.Given().Pipeline("@testdata/reduce-sideinput-pipeline.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "reduce-sideinput-pipeline"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

done := make(chan struct{})
go func() {
// publish messages to source vertex, with event time starting from 60000
startTime := 60000
for i := 0; true; i++ {
select {
case <-ctx.Done():
return
case <-done:
return
default:
eventTime := strconv.Itoa(startTime + i*1000)
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("x-numaflow-event-time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("x-numaflow-event-time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("x-numaflow-event-time", eventTime))
}
}
}()

// here reduce-side-input text is generated based on reduce and side input data.
w.Expect().SinkContains("sink", "reduce-side-input")

done <- struct{}{}
}

func TestSideInputSuite(t *testing.T) {
suite.Run(t, new(SideInputSuite))
}
39 changes: 39 additions & 0 deletions test/sideinput-e2e/testdata/map-sideinput-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: map-sideinput-pipeline
spec:
sideInputs:
- name: myticker
container:
# A map side input example , see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/map-side-input
image: "quay.io/numaio/numaproj-contrib/e2e-map-sideinput-example:v0.0.2"
imagePullPolicy: Always
trigger:
schedule: "@every 5s"
vertices:
- name: in
source:
http: {}
- name: si-e2e
udf:
container:
# A map side input udf , see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/map-side-input/udf
image: "quay.io/numaio/numaproj-contrib/e2e-map-sideinput-udf:v0.0.2"
imagePullPolicy: Always
sideInputs:
- myticker
- name: sink
scale:
min: 1
sink:
udsink:
container:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink
image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:v0.5.0
imagePullPolicy: Always
edges:
- from: in
to: si-e2e
- from: si-e2e
to: sink
57 changes: 57 additions & 0 deletions test/sideinput-e2e/testdata/reduce-sideinput-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: reduce-sideinput-pipeline
spec:
sideInputs:
- name: myticker
container:
# A reduce side input, see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/reduce-side-input
image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-example:v0.0.2"
imagePullPolicy: Always
trigger:
schedule: "@every 5s"
vertices:
- name: in
source:
http: {}
- name: atoi
scale:
min: 1
udf:
container:
# Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/even_odd
image: quay.io/numaio/numaflow-go/map-even-odd:v0.5.0
imagePullPolicy: Always
- name: si-e2e
udf:
container:
# A reduce side input udf, see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/reduce-side-input/udf
image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-udf:v0.0.2"
imagePullPolicy: Always
groupBy:
window:
fixed:
length: 10s
keyed: true
storage:
persistentVolumeClaim:
volumeSize: 10Gi
accessMode: ReadWriteOnce
sideInputs:
- myticker
- name: sink
scale:
min: 1
sink:
udsink:
container:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink
image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:v0.5.0
edges:
- from: in
to: atoi
- from: atoi
to: si-e2e
- from: si-e2e
to: sink

0 comments on commit 639a936

Please sign in to comment.