Skip to content

Commit

Permalink
fix e2e
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Dec 8, 2023
1 parent da3d38c commit 26b8eee
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 6 deletions.
3 changes: 2 additions & 1 deletion pkg/udf/reduce_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/numaproj/numaflow/pkg/window"
"github.com/numaproj/numaflow/pkg/window/strategy/fixed"
"github.com/numaproj/numaflow/pkg/window/strategy/session"
"github.com/numaproj/numaflow/pkg/window/strategy/sliding"
)

type ReduceUDFProcessor struct {
Expand Down Expand Up @@ -142,7 +143,7 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error {
if windowType.Fixed != nil {
windower = fixed.NewWindower(windowType.Fixed.Length.Duration)
} else if windowType.Sliding != nil {
windower = fixed.NewWindower(windowType.Sliding.Length.Duration)
windower = sliding.NewWindower(windowType.Sliding.Length.Duration, windowType.Sliding.Slide.Duration)
} else if windowType.Session != nil {
windower = session.NewWindower(windowType.Session.Timeout.Duration)
} else {
Expand Down
2 changes: 1 addition & 1 deletion test/idle-source-e2e/idle_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipeline() {
done := make(chan struct{})
go func() {
// publish messages to source vertex, with event time starting from 60000
startTime := 100
startTime := 1000
for i := 0; true; i++ {
select {
case <-ctx.Done():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ spec:
threshold: 5s # The pipeline will be considered idle if the source has not emitted any data for given threshold value.
incrementBy: 3s # If source is found to be idle then increment the watermark by given incrementBy value.
stepInterval: 2s # If source is idling then publish the watermark only when step interval has passed.
limits:
readBatchSize: 50
vertices:
- name: in
scale:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ spec:
storage:
emptyDir: {}
- name: sink
partitions: 2
scale:
min: 1
sink:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
udf:
container:
# A reduce side input udf, see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/reduce-side-input/udf
image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-example:dev"
image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-udf:dev"
imagePullPolicy: Always
groupBy:
window:
Expand Down

0 comments on commit 26b8eee

Please sign in to comment.