diff --git a/pkg/udf/reduce_udf.go b/pkg/udf/reduce_udf.go index ea2bdc1031..d702a0f5e1 100644 --- a/pkg/udf/reduce_udf.go +++ b/pkg/udf/reduce_udf.go @@ -49,6 +49,7 @@ import ( "github.com/numaproj/numaflow/pkg/window" "github.com/numaproj/numaflow/pkg/window/strategy/fixed" "github.com/numaproj/numaflow/pkg/window/strategy/session" + "github.com/numaproj/numaflow/pkg/window/strategy/sliding" ) type ReduceUDFProcessor struct { @@ -142,7 +143,7 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { if windowType.Fixed != nil { windower = fixed.NewWindower(windowType.Fixed.Length.Duration) } else if windowType.Sliding != nil { - windower = fixed.NewWindower(windowType.Sliding.Length.Duration) + windower = sliding.NewWindower(windowType.Sliding.Length.Duration, windowType.Sliding.Slide.Duration) } else if windowType.Session != nil { windower = session.NewWindower(windowType.Session.Timeout.Duration) } else { diff --git a/test/idle-source-e2e/idle_source_test.go b/test/idle-source-e2e/idle_source_test.go index 0bcaf9d818..d53d792327 100644 --- a/test/idle-source-e2e/idle_source_test.go +++ b/test/idle-source-e2e/idle_source_test.go @@ -45,7 +45,7 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipeline() { done := make(chan struct{}) go func() { // publish messages to source vertex, with event time starting from 60000 - startTime := 100 + startTime := 1000 for i := 0; true; i++ { select { case <-ctx.Done(): diff --git a/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml b/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml index b7410a1807..6a3147197d 100644 --- a/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml +++ b/test/idle-source-e2e/testdata/idle-source-reduce-pipeline.yaml @@ -8,8 +8,6 @@ spec: threshold: 5s # The pipeline will be considered idle if the source has not emitted any data for given threshold value. incrementBy: 3s # If source is found to be idle then increment the watermark by given incrementBy value. stepInterval: 2s # If source is idling then publish the watermark only when step interval has passed. - limits: - readBatchSize: 50 vertices: - name: in scale: diff --git a/test/reduce-e2e/testdata/complex-sliding-window-pipeline.yaml b/test/reduce-e2e/testdata/complex-sliding-window-pipeline.yaml index 202ee6eda1..16d8b7217a 100644 --- a/test/reduce-e2e/testdata/complex-sliding-window-pipeline.yaml +++ b/test/reduce-e2e/testdata/complex-sliding-window-pipeline.yaml @@ -57,7 +57,6 @@ spec: storage: emptyDir: {} - name: sink - partitions: 2 scale: min: 1 sink: diff --git a/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml b/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml index d45d335fe3..b5ac6e9fc0 100644 --- a/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml +++ b/test/sideinputs-e2e/testdata/reduce-sideinput-pipeline.yaml @@ -27,7 +27,7 @@ spec: udf: container: # A reduce side input udf, see https://github.com/numaproj-contrib/e2e-tests-go/tree/main/reduce-side-input/udf - image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-example:dev" + image: "quay.io/numaio/numaproj-contrib/e2e-reduce-sideinput-udf:dev" imagePullPolicy: Always groupBy: window: