diff --git a/Makefile b/Makefile index 5f27d4fad2..e345e6337c 100644 --- a/Makefile +++ b/Makefile @@ -117,7 +117,7 @@ test-udsource-e2e: test-transformer-e2e: test-diamond-e2e: test-sideinput-e2e: -test-%: +test-%: $(MAKE) cleanup-e2e $(MAKE) image e2eapi-image kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow diff --git a/test/sideinput-e2e/sideinput-e2e_sink_source_test.go b/test/sideinput-e2e/sideinput-e2e_sink_source_test.go new file mode 100644 index 0000000000..37bf3959fb --- /dev/null +++ b/test/sideinput-e2e/sideinput-e2e_sink_source_test.go @@ -0,0 +1,54 @@ +//go:build test + +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sideinput_e2e + +import ( + . "github.com/numaproj/numaflow/test/fixtures" + "github.com/stretchr/testify/suite" + "testing" + "time" +) + +type SideInputUDSSuite struct { + E2ESuite +} + +func (s *SideInputUDSSuite) setUpTests(pipeLineFile string) *When { + w := s.Given().Pipeline(pipeLineFile).When().CreatePipelineAndWait() + w.Expect().VertexPodsRunning() + return w +} + +func (s *SideInputUDSSuite) TestSinkWithSideInput() { + w := s.setUpTests("@testdata/sideinput_sink.yaml") + defer w.DeletePipelineAndWait() + w.Expect().SinkContains("redis-uds", "e2e-even", WithTimeout(2*time.Minute)) + +} + +func (s *SideInputUDSSuite) TestSourceWithSideInput() { + w := s.setUpTests("@testdata/sideinput_source.yaml") + defer w.DeletePipelineAndWait() + w.Expect().SinkContains("redis-uds", "e2e-even", WithTimeout(2*time.Minute)) + +} + +func TestSideInputUDSSuite(t *testing.T) { + suite.Run(t, new(SideInputUDSSuite)) +} diff --git a/test/sideinput-e2e/testdata/sideinput_sink.yaml b/test/sideinput-e2e/testdata/sideinput_sink.yaml new file mode 100644 index 0000000000..0919f4d779 --- /dev/null +++ b/test/sideinput-e2e/testdata/sideinput_sink.yaml @@ -0,0 +1,33 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: sideinput-sink-test +spec: + sideInputs: + - name: myticker + container: + image: "quay.io/numaio/numaflow-sideinput/sideinput-sink-e2e-test:latest" + imagePullPolicy: Always + trigger: + schedule: "@every 5s" + # timezone: America/Los_Angeles + vertices: + - name: in + source: + generator: + # How many messages to generate in the duration. + rpu: 10 + duration: 1s + # Optional, size of each generated message, defaults to 10. + msgSize: 1024 + - name: redis-uds + sink: + udsink: + container: + image: "quay.io/numaio/numaflow-sink/redis-e2e-test-sink-with-sideinput:latest" + sideInputs: + - myticker + + edges: + - from: in + to: redis-uds diff --git a/test/sideinput-e2e/testdata/sideinput_source.yaml b/test/sideinput-e2e/testdata/sideinput_source.yaml new file mode 100644 index 0000000000..982fca2a96 --- /dev/null +++ b/test/sideinput-e2e/testdata/sideinput_source.yaml @@ -0,0 +1,35 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: sideinput-source-test +spec: + sideInputs: + - name: myticker + container: + image: "quay.io/numaio/numaflow-sideinput/sideinput-sink-e2e-test:latest" + imagePullPolicy: Always + trigger: + schedule: "@every 5s" + # timezone: America/Los_Angeles + vertices: + - name: in + scale: + min: 1 + source: + udsource: + container: + image: "quay.io/numaio/numaflow-source/simple_source_with_sideinput:latest" + sideInputs: + - myticker + - name: redis-uds + sink: + udsink: + container: + image: "quay.io/numaio/numaflow-sink/redis-e2e-test-sink:v0.5.0" + sideInputs: + - myticker + edges: + - from: in + to: redis-uds + +