Skip to content

Commit

Permalink
Feat/side input tests (#1257)
Browse files Browse the repository at this point in the history
Signed-off-by: shubham dixit <[email protected]>
Co-authored-by: Keran Yang <[email protected]>
  • Loading branch information
shubhamdixit863 and KeranYang authored Oct 26, 2023
1 parent 5554bd6 commit c1725b1
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ test-udsource-e2e:
test-transformer-e2e:
test-diamond-e2e:
test-sideinput-e2e:
test-%:
test-%:
$(MAKE) cleanup-e2e
$(MAKE) image e2eapi-image
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow
Expand Down
54 changes: 54 additions & 0 deletions test/sideinput-e2e/sideinput-e2e_sink_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sideinput_e2e

import (
. "github.com/numaproj/numaflow/test/fixtures"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

type SideInputUDSSuite struct {
E2ESuite
}

func (s *SideInputUDSSuite) setUpTests(pipeLineFile string) *When {
w := s.Given().Pipeline(pipeLineFile).When().CreatePipelineAndWait()
w.Expect().VertexPodsRunning()
return w
}

func (s *SideInputUDSSuite) TestSinkWithSideInput() {
w := s.setUpTests("@testdata/sideinput_sink.yaml")
defer w.DeletePipelineAndWait()
w.Expect().SinkContains("redis-uds", "e2e-even", WithTimeout(2*time.Minute))

}

func (s *SideInputUDSSuite) TestSourceWithSideInput() {
w := s.setUpTests("@testdata/sideinput_source.yaml")
defer w.DeletePipelineAndWait()
w.Expect().SinkContains("redis-uds", "e2e-even", WithTimeout(2*time.Minute))

}

func TestSideInputUDSSuite(t *testing.T) {
suite.Run(t, new(SideInputUDSSuite))
}
33 changes: 33 additions & 0 deletions test/sideinput-e2e/testdata/sideinput_sink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: sideinput-sink-test
spec:
sideInputs:
- name: myticker
container:
image: "quay.io/numaio/numaflow-sideinput/sideinput-sink-e2e-test:latest"
imagePullPolicy: Always
trigger:
schedule: "@every 5s"
# timezone: America/Los_Angeles
vertices:
- name: in
source:
generator:
# How many messages to generate in the duration.
rpu: 10
duration: 1s
# Optional, size of each generated message, defaults to 10.
msgSize: 1024
- name: redis-uds
sink:
udsink:
container:
image: "quay.io/numaio/numaflow-sink/redis-e2e-test-sink-with-sideinput:latest"
sideInputs:
- myticker

edges:
- from: in
to: redis-uds
35 changes: 35 additions & 0 deletions test/sideinput-e2e/testdata/sideinput_source.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: sideinput-source-test
spec:
sideInputs:
- name: myticker
container:
image: "quay.io/numaio/numaflow-sideinput/sideinput-sink-e2e-test:latest"
imagePullPolicy: Always
trigger:
schedule: "@every 5s"
# timezone: America/Los_Angeles
vertices:
- name: in
scale:
min: 1
source:
udsource:
container:
image: "quay.io/numaio/numaflow-source/simple_source_with_sideinput:latest"
sideInputs:
- myticker
- name: redis-uds
sink:
udsink:
container:
image: "quay.io/numaio/numaflow-sink/redis-e2e-test-sink:v0.5.0"
sideInputs:
- myticker
edges:
- from: in
to: redis-uds


0 comments on commit c1725b1

Please sign in to comment.