Skip to content

Commit

Permalink
feat: Session Window and Reduce Streaming (#1384)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Dec 19, 2023
1 parent 38b44e6 commit 45c8594
Show file tree
Hide file tree
Showing 91 changed files with 6,931 additions and 4,483 deletions.
30 changes: 27 additions & 3 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -17731,7 +17731,12 @@
"description": "FixedWindow describes a fixed window",
"properties": {
"length": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Length is the duration of the fixed window."
},
"streaming": {
"description": "Streaming should be set to true if the reduce udf is streaming.",
"type": "boolean"
}
},
"type": "object"
Expand Down Expand Up @@ -19094,6 +19099,16 @@
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.SessionWindow": {
"description": "SessionWindow describes a session window",
"properties": {
"timeout": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Timeout is the duration of inactivity after which a session window closes."
}
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.SideInput": {
"description": "SideInput defines information of a Side Input",
"properties": {
Expand Down Expand Up @@ -19235,10 +19250,16 @@
"description": "SlidingWindow describes a sliding window",
"properties": {
"length": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Length is the duration of the sliding window."
},
"slide": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Slide is the slide parameter that controls the frequency at which the sliding window is created."
},
"streaming": {
"description": "Streaming should be set to true if the reduce udf is streaming.",
"type": "boolean"
}
},
"type": "object"
Expand Down Expand Up @@ -19803,6 +19824,9 @@
"fixed": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.FixedWindow"
},
"session": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SessionWindow"
},
"sliding": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SlidingWindow"
}
Expand Down
24 changes: 24 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -17736,7 +17736,12 @@
"type": "object",
"properties": {
"length": {
"description": "Length is the duration of the fixed window.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"streaming": {
"description": "Streaming should be set to true if the reduce udf is streaming.",
"type": "boolean"
}
}
},
Expand Down Expand Up @@ -19080,6 +19085,16 @@
}
}
},
"io.numaproj.numaflow.v1alpha1.SessionWindow": {
"description": "SessionWindow describes a session window",
"type": "object",
"properties": {
"timeout": {
"description": "Timeout is the duration of inactivity after which a session window closes.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
}
}
},
"io.numaproj.numaflow.v1alpha1.SideInput": {
"description": "SideInput defines information of a Side Input",
"type": "object",
Expand Down Expand Up @@ -19222,10 +19237,16 @@
"type": "object",
"properties": {
"length": {
"description": "Length is the duration of the sliding window.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"slide": {
"description": "Slide is the slide parameter that controls the frequency at which the sliding window is created.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"streaming": {
"description": "Streaming should be set to true if the reduce udf is streaming.",
"type": "boolean"
}
}
},
Expand Down Expand Up @@ -19781,6 +19802,9 @@
"fixed": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.FixedWindow"
},
"session": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SessionWindow"
},
"sliding": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SlidingWindow"
}
Expand Down
9 changes: 9 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7938,13 +7938,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down
9 changes: 9 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3853,13 +3853,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down
18 changes: 18 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10467,13 +10467,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down Expand Up @@ -15134,13 +15143,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down
18 changes: 18 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10467,13 +10467,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down Expand Up @@ -15134,13 +15143,22 @@ spec:
properties:
length:
type: string
streaming:
type: boolean
type: object
session:
properties:
timeout:
type: string
type: object
sliding:
properties:
length:
type: string
slide:
type: string
streaming:
type: boolean
type: object
type: object
required:
Expand Down
81 changes: 81 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,20 @@ Description
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Length is the duration of the fixed window.
</p>
</td>
</tr>
<tr>
<td>
<code>streaming</code></br> <em> bool </em>
</td>
<td>
<em>(Optional)</em>
<p>
Streaming should be set to true if the reduce udf is streaming.
</p>
</td>
</tr>
</tbody>
Expand Down Expand Up @@ -4146,6 +4160,45 @@ CooldownSeconds if not set.
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.SessionWindow">
SessionWindow
</h3>
<p>
(<em>Appears on:</em>
<a href="#numaflow.numaproj.io/v1alpha1.Window">Window</a>)
</p>
<p>
<p>
SessionWindow describes a session window
</p>
</p>
<table>
<thead>
<tr>
<th>
Field
</th>
<th>
Description
</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>timeout</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Timeout is the duration of inactivity after which a session window
closes.
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.SideInput">
SideInput
</h3>
Expand Down Expand Up @@ -4398,6 +4451,9 @@ Description
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Length is the duration of the sliding window.
</p>
</td>
</tr>
<tr>
Expand All @@ -4407,6 +4463,21 @@ Kubernetes meta/v1.Duration </a> </em>
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<p>
Slide is the slide parameter that controls the frequency at which the
sliding window is created.
</p>
</td>
</tr>
<tr>
<td>
<code>streaming</code></br> <em> bool </em>
</td>
<td>
<em>(Optional)</em>
<p>
Streaming should be set to true if the reduce udf is streaming.
</p>
</td>
</tr>
</tbody>
Expand Down Expand Up @@ -5523,6 +5594,16 @@ Description
<em>(Optional)</em>
</td>
</tr>
<tr>
<td>
<code>session</code></br> <em>
<a href="#numaflow.numaproj.io/v1alpha1.SessionWindow"> SessionWindow
</a> </em>
</td>
<td>
<em>(Optional)</em>
</td>
</tr>
</tbody>
</table>
<hr/>
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9
github.com/numaproj/numaflow-go v0.5.3-0.20231213060340-dbd9016bbcb6
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand All @@ -40,7 +40,7 @@ require (
github.com/spf13/cobra v1.6.0
github.com/spf13/viper v1.9.0
github.com/stretchr/testify v1.8.4
go.uber.org/atomic v1.9.0
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.24.0
Expand Down
Loading

0 comments on commit 45c8594

Please sign in to comment.