Skip to content

Commit

Permalink
remove global window, cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Dec 8, 2023
1 parent 1375953 commit c0f0055
Show file tree
Hide file tree
Showing 27 changed files with 494 additions and 1,272 deletions.
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -18127,7 +18127,7 @@
"properties": {
"allowedLateness": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "AllowedLateness allows late data to be included for the ReduceStream operation as long as the late data is not later than (Watermark - AllowedLateness)."
"description": "AllowedLateness allows late data to be included for the Reduce operation as long as the late data is not later than (Watermark - AllowedLateness)."
},
"keyed": {
"type": "boolean"
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -18134,7 +18134,7 @@
],
"properties": {
"allowedLateness": {
"description": "AllowedLateness allows late data to be included for the ReduceStream operation as long as the late data is not later than (Watermark - AllowedLateness).",
"description": "AllowedLateness allows late data to be included for the Reduce operation as long as the late data is not later than (Watermark - AllowedLateness).",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"keyed": {
Expand Down
4 changes: 2 additions & 2 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1894,8 +1894,8 @@ Kubernetes meta/v1.Duration </a> </em>
<td>
<em>(Optional)</em>
<p>
AllowedLateness allows late data to be included for the ReduceStream
operation as long as the late data is not later than (Watermark -
AllowedLateness allows late data to be included for the Reduce operation
as long as the late data is not later than (Watermark -
AllowedLateness).
</p>
</td>
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932
github.com/numaproj/numaflow-go v0.5.3-0.20231208071018-2c9517d09d79
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,8 @@ github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADym
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 h1:gAURJvmJv7nP8+Y7X+GGHGZ5sg7KatM4dhkWpFCsk+I=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/numaproj/numaflow-go v0.5.3-0.20231208071018-2c9517d09d79 h1:4zLtkeVxgACERlsDa+MKGRunZvgbpFh6uz6sURMnSlw=
github.com/numaproj/numaflow-go v0.5.3-0.20231208071018-2c9517d09d79/go.mod h1:WoMt31+h3up202zTRI8c/qe42B8UbvwLe2mJH0MAlhI=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
887 changes: 446 additions & 441 deletions pkg/apis/numaflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type GroupBy struct {
Window Window `json:"window" protobuf:"bytes,1,opt,name=window"`
// +optional
Keyed bool `json:"keyed" protobuf:"bytes,2,opt,name=keyed"`
// AllowedLateness allows late data to be included for the ReduceStream operation as long as the late data is not later
// AllowedLateness allows late data to be included for the Reduce operation as long as the late data is not later
// than (Watermark - AllowedLateness).
// +optional
AllowedLateness *metav1.Duration `json:"allowedLateness,omitempty" protobuf:"bytes,3,opt,name=allowedLateness"`
Expand Down
6 changes: 3 additions & 3 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ var (
}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex, LabelPartitionName})
)

// ReduceStream forwarder specific metrics
// Reduce forwarder specific metrics
var (
// ReduceDroppedMessagesCount is used to indicate the number of messages dropped
ReduceDroppedMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -238,15 +238,15 @@ var (
ReduceProcessTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "reduce_pnf",
Name: "process_time",
Help: "ReduceStream process time (1 to 1200000 milliseconds)",
Help: "Reduce process time (1 to 1200000 milliseconds)",
Buckets: prometheus.ExponentialBucketsRange(1, 1200000, 5),
}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex})

// ReduceForwardTime is used to indicate the time it took to forward the writeMessages
ReduceForwardTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "reduce_pnf",
Name: "forward_time",
Help: "ReduceStream forward time (1 to 100000 microseconds)",
Help: "Reduce forward time (1 to 100000 microseconds)",
Buckets: prometheus.ExponentialBucketsRange(1, 100000, 5),
}, []string{LabelPipeline, LabelVertex, LabelVertexReplicaIndex})

Expand Down
12 changes: 6 additions & 6 deletions pkg/reconciler/pipeline/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
return fmt.Errorf("invalid pipeline, cannot be disjointed")
}

// Prevent pipelines with Cycles in the case that there is a ReduceStream Vertex at the point of the cycle or to the right of it.
// Whenever there's a cycle, there will inherently be "late data", and we don't want late data for a ReduceStream Vertex, which may
// Prevent pipelines with Cycles in the case that there is a Reduce Vertex at the point of the cycle or to the right of it.
// Whenever there's a cycle, there will inherently be "late data", and we don't want late data for a Reduce Vertex, which may
// have already "closed the book" on the data's time window.
if err := validateCycles(&pl.Spec); err != nil {
return err
Expand Down Expand Up @@ -366,8 +366,8 @@ func isReservedContainerName(name string) bool {
}

// validateCycles verifies that there are no invalid cycles in the pipeline.
// An invalid cycle has a ReduceStream Vertex at or to the right of the cycle. Whenever there's a cycle,
// there will inherently be "late data", and we don't want late data for a ReduceStream Vertex, which may
// An invalid cycle has a Reduce Vertex at or to the right of the cycle. Whenever there's a cycle,
// there will inherently be "late data", and we don't want late data for a Reduce Vertex, which may
// have already "closed the book" on the data's time window.
func validateCycles(pipelineSpec *dfv1.PipelineSpec) error {
verticesByName := pipelineSpec.GetVerticesByName()
Expand All @@ -381,7 +381,7 @@ func validateCycles(pipelineSpec *dfv1.PipelineSpec) error {
if err != nil {
return err
}
// need to make sure none of the cycles have a ReduceStream Vertex at or to the right of the cycle
// need to make sure none of the cycles have a Reduce Vertex at or to the right of the cycle
for cycleVertexName := range cycles {
cycleVertex, found := verticesByName[cycleVertexName]
if !found {
Expand All @@ -391,7 +391,7 @@ func validateCycles(pipelineSpec *dfv1.PipelineSpec) error {
return v.IsReduceUDF()
})
if invalidReduce {
return fmt.Errorf("there's a ReduceStream Vertex at or to the right of a Cycle occurring at Vertex %q", cycleVertexName)
return fmt.Errorf("there's a Reduce Vertex at or to the right of a Cycle occurring at Vertex %q", cycleVertexName)
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/pipeline/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ func Test_validateCycles(t *testing.T) {
pipelineSpec: &dfv1.PipelineSpec{
Vertices: []dfv1.AbstractVertex{
{Name: "A", Source: &dfv1.Source{}},
{Name: "B", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //ReduceStream vertex
{Name: "B", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //Reduce vertex
{Name: "C", UDF: &dfv1.UDF{}},
{Name: "D", UDF: &dfv1.UDF{}},
{Name: "E", UDF: &dfv1.UDF{}},
Expand All @@ -910,7 +910,7 @@ func Test_validateCycles(t *testing.T) {
pipelineSpec: &dfv1.PipelineSpec{
Vertices: []dfv1.AbstractVertex{
{Name: "A", Source: &dfv1.Source{}},
{Name: "B", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //ReduceStream vertex
{Name: "B", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //Reduce vertex
{Name: "C", UDF: &dfv1.UDF{}},
{Name: "D", UDF: &dfv1.UDF{}},
{Name: "E", UDF: &dfv1.UDF{}},
Expand All @@ -935,7 +935,7 @@ func Test_validateCycles(t *testing.T) {
Vertices: []dfv1.AbstractVertex{
{Name: "A", Source: &dfv1.Source{}},
{Name: "B", UDF: &dfv1.UDF{}},
{Name: "C", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //ReduceStream vertex
{Name: "C", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //Reduce vertex
{Name: "D", UDF: &dfv1.UDF{}},
{Name: "E", UDF: &dfv1.UDF{}},
{Name: "F", Source: &dfv1.Source{}},
Expand All @@ -958,10 +958,10 @@ func Test_validateCycles(t *testing.T) {
pipelineSpec: &dfv1.PipelineSpec{
Vertices: []dfv1.AbstractVertex{
{Name: "A", Source: &dfv1.Source{}},
{Name: "B", UDF: &dfv1.UDF{}}, //ReduceStream vertex
{Name: "B", UDF: &dfv1.UDF{}}, //Reduce vertex
{Name: "C", UDF: &dfv1.UDF{}},
{Name: "D", UDF: &dfv1.UDF{}},
{Name: "E", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //ReduceStream vertex
{Name: "E", UDF: &dfv1.UDF{GroupBy: &dfv1.GroupBy{}}}, //Reduce vertex
{Name: "F", Source: &dfv1.Source{}},
{Name: "G", UDF: &dfv1.UDF{}},
{Name: "H", UDF: &dfv1.UDF{}},
Expand Down
2 changes: 1 addition & 1 deletion pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (df *DataForward) process(ctx context.Context, messages []*isb.ReadMessage)
}
}

// solve ReduceStream withholding of watermark where we do not send WM until the window is closed.
// solve Reduce withholding of watermark where we do not send WM until the window is closed.
oldestWindowEndTime := df.windower.OldestWindowEndTime()
if oldestWindowEndTime != time.UnixMilli(-1) {
// minus 1 ms because if it's the same as the end time the window would have already been closed
Expand Down
156 changes: 0 additions & 156 deletions pkg/sdkclient/globalreducer/client.go

This file was deleted.

Loading

0 comments on commit c0f0055

Please sign in to comment.