Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: include dropped messages in source watermark calculation #1404

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions docs/user-guide/sources/transformer/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ package main
import (
"context"
"encoding/json"
"log"
"time"

functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/server"
"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
)

func Handle(_ context.Context, keys []string, data functionsdk.Datum) functionsdk.MessageTs {
func transform(_ context.Context, keys []string, data sourcetransformer.Datum) sourcetransformer.Messages {
/*
Input messages are in JSON format. Sample: {"timestamp": "1673239888", "filterOut": "true"}.
Field "timestamp" shows the real event time of the message, in format of epoch.
Field "filterOut" indicates whether the message should be filtered out, in format of boolean.
Field "timestamp" shows the real event time of the message, in the format of epoch.
Field "filterOut" indicates whether the message should be filtered out, in the format of boolean.
*/
var jsonObject map[string]interface{}
json.Unmarshal(data.Value(), &jsonObject)
Expand All @@ -53,14 +53,17 @@ func Handle(_ context.Context, keys []string, data functionsdk.Datum) functionsd
filterOut = f.(bool)
}
if filterOut {
return functionsdk.MessageTsBuilder().Append(functionsdk.MessageTToDrop())
return sourcetransformer.MessagesBuilder().Append(sourcetransformer.MessageToDrop(eventTime))
} else {
return functionsdk.MessageTsBuilder().Append(functionsdk.NewMessageT(data.Value(), eventTime).WithKeys(keys))
return sourcetransformer.MessagesBuilder().Append(sourcetransformer.NewMessage(data.Value(), eventTime).WithKeys(keys))
}
}

func main() {
server.New().RegisterMapperT(functionsdk.MapTFunc(Handle)).Start(context.Background())
err := sourcetransformer.NewServer(sourcetransformer.SourceTransformFunc(transform)).Start(context.Background())
if err != nil {
log.Panic("Failed to start source transform server: ", err)
}
}
```

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/numaproj/numaflow-go v0.5.2
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,8 @@ github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADym
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.5.2 h1:U/57eDqodVVpzLQqMR/iql8eQf6HsgFMbnWCUU69HZA=
github.com/numaproj/numaflow-go v0.5.2/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 h1:gAURJvmJv7nP8+Y7X+GGHGZ5sg7KatM4dhkWpFCsk+I=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
7 changes: 1 addition & 6 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,7 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) {
// since we use message event time instead of the watermark to determine and publish source watermarks,
// time.UnixMilli(-1) is assigned to the message watermark. transformedReadMessages are immediately
// used below for publishing source watermarks.

// if message.EventTime is -1, it means that the message was dropped by the transformer
// so we should exclude it from watermark computation
if message.EventTime != time.UnixMilli(-1) {
transformedReadMessages = append(transformedReadMessages, message.ToReadMessage(m.readMessage.ReadOffset, time.UnixMilli(-1)))
}
transformedReadMessages = append(transformedReadMessages, message.ToReadMessage(m.readMessage.ReadOffset, time.UnixMilli(-1)))
}
}
// publish source watermark
Expand Down
4 changes: 2 additions & 2 deletions pkg/sources/transformer/builtin/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)
func (f filter) apply(et time.Time, msg []byte) (sourcetransformer.Message, error) {
result, err := expr.EvalBool(f.expression, msg)
if err != nil {
return sourcetransformer.MessageToDrop(), err
return sourcetransformer.MessageToDrop(et), err
}
if result {
return sourcetransformer.NewMessage(msg, et), nil
}
return sourcetransformer.MessageToDrop(), nil
return sourcetransformer.MessageToDrop(et), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)
func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Message, error) {
result, err := expr.EvalBool(e.filterExpr, payload)
if err != nil {
return sourcetransformer.MessageToDrop(), err
return sourcetransformer.MessageToDrop(et), err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the dropped messages use the default event time coming from the original datum?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Thanks for raising this question. It seems using the extracted event time should be more accurate. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

et is "almost" accurate because we subtract maxDelay before publishing the watermark.

One could also argue that, since this is an "error", perhaps we should stick to -1 watermark.

Let's change this in the future based on the feedback we receive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use default et (or -1), then why don't we use the same in the original MessageToDrop()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for the builtin extractEventTimeFilter, where we choose to assign the original event time to dropped messages. For other transformers, users have to provide a new event time.

For extractEventTimeFilter, I think we should also assign the new event time if it has been successfully extracted from the payload. But I am also ok with making the change in the future after feedbacks.

}
if result {
timeStr, err := expr.EvalStr(e.eventTimeExpr, payload)
Expand All @@ -91,5 +91,5 @@ func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Mess
return sourcetransformer.NewMessage(payload, newEventTime), nil
}
}
return sourcetransformer.MessageToDrop(), nil
return sourcetransformer.MessageToDrop(et), nil
}
Loading