Skip to content

Commit

Permalink
chore: enable reduce java sdk e2e test (#1454)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Jan 12, 2024
1 parent 85e76c7 commit 645bcc5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
74 changes: 37 additions & 37 deletions test/sdks-e2e/sdks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package sdks_e2e

import (
"context"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -86,43 +87,42 @@ func (s *SDKsSuite) TestMapStreamUDFunctionAndSink() {
VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))
}

// TODO(session) fix after updating java sdk
//func (s *SDKsSuite) TestReduceSDK() {
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
// defer cancel()
// w := s.Given().Pipeline("@testdata/simple-keyed-reduce-pipeline.yaml").
// When().
// CreatePipelineAndWait()
// defer w.DeletePipelineAndWait()
// pipelineName := "even-odd-sum"
//
// // wait for all the pods to come up
// w.Expect().VertexPodsRunning()
//
// done := make(chan struct{})
// go func() {
// // publish messages to source vertex, with event time starting from 60000
// startTime := 60000
// for i := 0; true; i++ {
// select {
// case <-ctx.Done():
// return
// case <-done:
// return
// default:
// eventTime := strconv.Itoa(startTime + i*1000)
// w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("X-Numaflow-Event-Time", eventTime)).
// SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("X-Numaflow-Event-Time", eventTime)).
// SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("X-Numaflow-Event-Time", eventTime))
// }
// }
// }()
//
// w.Expect().
// VertexPodLogContains("java-udsink", "120", PodLogCheckOptionWithContainer("udsink")).
// VertexPodLogContains("java-udsink", "240", PodLogCheckOptionWithContainer("udsink"))
// done <- struct{}{}
//}
func (s *SDKsSuite) TestReduceSDK() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
w := s.Given().Pipeline("@testdata/simple-keyed-reduce-pipeline.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "even-odd-sum"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

done := make(chan struct{})
go func() {
// publish messages to source vertex, with event time starting from 60000
startTime := 60000
for i := 0; true; i++ {
select {
case <-ctx.Done():
return
case <-done:
return
default:
eventTime := strconv.Itoa(startTime + i*1000)
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("X-Numaflow-Event-Time", eventTime))
}
}
}()

w.Expect().
VertexPodLogContains("java-udsink", "120", PodLogCheckOptionWithContainer("udsink")).
VertexPodLogContains("java-udsink", "240", PodLogCheckOptionWithContainer("udsink"))
done <- struct{}{}
}

func (s *SDKsSuite) TestSourceTransformer() {
var wg sync.WaitGroup
Expand Down
2 changes: 1 addition & 1 deletion test/sdks-e2e/testdata/simple-keyed-reduce-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ spec:
udf:
container:
# compute the sum, see https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum
image: quay.io/numaio/numaflow-java/reduce-sum:v0.6.0
image: quay.io/numaio/numaflow-java/reduce-sum:v0.6.1
groupBy:
window:
fixed:
Expand Down

0 comments on commit 645bcc5

Please sign in to comment.