Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: builtin transformer should keep the keys #2047

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)

return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages {
log := logging.FromContext(ctx)
resultMsg, err := e.apply(datum.Value(), datum.EventTime())
resultMsg, err := e.apply(datum.Value(), datum.EventTime(), keys)
if err != nil {
log.Warnf("event time extractor got an error: %v, skip updating event time...", err)
}
Expand All @@ -66,10 +66,10 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)

// apply compiles the payload to extract the new event time. If there is any error during extraction,
// we pass on the original input event time. Otherwise, we assign the new event time to the message.
func (e eventTimeExtractor) apply(payload []byte, et time.Time) (sourcetransformer.Message, error) {
func (e eventTimeExtractor) apply(payload []byte, et time.Time, keys []string) (sourcetransformer.Message, error) {
timeStr, err := expr.EvalStr(e.expression, payload)
if err != nil {
return sourcetransformer.NewMessage(payload, et), err
return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err
}

var newEventTime time.Time
Expand All @@ -80,8 +80,8 @@ func (e eventTimeExtractor) apply(payload []byte, et time.Time) (sourcetransform
newEventTime, err = dateparse.ParseStrict(timeStr)
}
if err != nil {
return sourcetransformer.NewMessage(payload, et), err
return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err
} else {
return sourcetransformer.NewMessage(payload, newEventTime), nil
return sourcetransformer.NewMessage(payload, newEventTime).WithKeys(keys), nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/stretchr/testify/assert"
)

var _keys = []string{"test-key"}

type testDatum struct {
value []byte
eventTime time.Time
Expand Down Expand Up @@ -74,7 +76,7 @@ func TestEventTimeExtractor(t *testing.T) {
assert.NoError(t, err)

testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
Expand All @@ -86,6 +88,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("Json expression valid, assign a new event time to the message - format specified", func(t *testing.T) {
Expand All @@ -94,7 +98,7 @@ func TestEventTimeExtractor(t *testing.T) {
assert.NoError(t, err)

testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
Expand All @@ -106,6 +110,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("Time string not matching user-provided format, pass on the message without assigning new event time", func(t *testing.T) {
Expand All @@ -114,9 +120,9 @@ func TestEventTimeExtractor(t *testing.T) {
assert.NoError(t, err)

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
// Handler receives format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched.
// Handler receives a format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched.
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -126,6 +132,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.Equal(t, testInputEventTime, result.Items()[0].EventTime())
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("Cannot compile json expression, pass on the message without assigning new event time", func(t *testing.T) {
Expand All @@ -135,7 +143,7 @@ func TestEventTimeExtractor(t *testing.T) {

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -146,6 +154,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("The time string is in epoch format with a granularity of seconds, assign a new event time to the message", func(t *testing.T) {
Expand All @@ -154,9 +164,9 @@ func TestEventTimeExtractor(t *testing.T) {
assert.NoError(t, err)

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
// Handler receives format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched.
// Handler receives a format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched.
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "1673239888"},{"id": 2, "name": "numa", "time": "1673239888"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -168,6 +178,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("The time string is in epoch format with a granularity of milliseconds, assign a new event time to the message", func(t *testing.T) {
Expand All @@ -177,7 +189,7 @@ func TestEventTimeExtractor(t *testing.T) {

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "1673239888123"},{"id": 2, "name": "numa", "time": "1673239888123"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -189,6 +201,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("The time string is ambiguous, pass on the message without assigning new event time", func(t *testing.T) {
Expand All @@ -199,7 +213,7 @@ func TestEventTimeExtractor(t *testing.T) {
testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
// 04/08/2014 is ambiguous because it could be mm/dd/yyyy or dd/mm/yyyy.
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "04/08/2014 22:05"},{"id": 2, "name": "numa", "time": "04/08/2014 22:05"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -211,5 +225,7 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})
}
10 changes: 5 additions & 5 deletions pkg/sources/transformer/builtin/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,31 @@ type filter struct {
}

func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) {
expr, existing := args["expression"]
exp, existing := args["expression"]
if !existing {
return nil, fmt.Errorf(`missing "expression"`)
}
f := filter{
expression: expr,
expression: exp,
}

return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages {
log := logging.FromContext(ctx)
resultMsg, err := f.apply(datum.EventTime(), datum.Value())
resultMsg, err := f.apply(datum.EventTime(), datum.Value(), keys)
if err != nil {
log.Errorf("Filter map function apply got an error: %v", err)
}
return sourcetransformer.MessagesBuilder().Append(resultMsg)
}, nil
}

func (f filter) apply(et time.Time, msg []byte) (sourcetransformer.Message, error) {
func (f filter) apply(et time.Time, msg []byte, keys []string) (sourcetransformer.Message, error) {
result, err := expr.EvalBool(f.expression, msg)
if err != nil {
return sourcetransformer.MessageToDrop(et), err
}
if result {
return sourcetransformer.NewMessage(msg, et), nil
return sourcetransformer.NewMessage(msg, et).WithKeys(keys), nil
}
return sourcetransformer.MessageToDrop(et), nil
}
10 changes: 7 additions & 3 deletions pkg/sources/transformer/builtin/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -86,6 +87,7 @@ func TestExpression(t *testing.T) {
watermark: time.Time{},
})
assert.Equal(t, jsonMsg, string(result.Items()[0].Value()))
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("invalid expression", func(t *testing.T) {
Expand All @@ -99,7 +101,7 @@ func TestExpression(t *testing.T) {
eventTime: time.Time{},
watermark: time.Time{},
})
assert.Equal(t, "", string(result.Items()[0].Value()))
assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0])
})

t.Run("Json expression invalid", func(t *testing.T) {
Expand All @@ -113,7 +115,7 @@ func TestExpression(t *testing.T) {
eventTime: time.Time{},
watermark: time.Time{},
})
assert.Equal(t, "", string(result.Items()[0].Value()))
assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0])
})

t.Run("String expression invalid", func(t *testing.T) {
Expand All @@ -127,7 +129,7 @@ func TestExpression(t *testing.T) {
eventTime: time.Time{},
watermark: time.Time{},
})
assert.Equal(t, "", string(result.Items()[0].Value()))
assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0])
})

t.Run("base64 expression valid", func(t *testing.T) {
Expand All @@ -142,6 +144,7 @@ func TestExpression(t *testing.T) {
watermark: time.Time{},
})
assert.Equal(t, base64Msg, string(result.Items()[0].Value()))
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("event time unchanged", func(t *testing.T) {
Expand All @@ -157,5 +160,6 @@ func TestExpression(t *testing.T) {
watermark: time.Time{},
})
assert.Equal(t, testEventTime, result.Items()[0].EventTime())
assert.Equal(t, _keys, result.Items()[0].Keys())
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)

return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages {
log := logging.FromContext(ctx)
resultMsg, err := e.apply(datum.EventTime(), datum.Value())
resultMsg, err := e.apply(datum.EventTime(), datum.Value(), keys)
if err != nil {
log.Errorf("Filter or event time extractor got an error: %v", err)
}
Expand All @@ -68,15 +68,15 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)

}

func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Message, error) {
func (e expressions) apply(et time.Time, payload []byte, keys []string) (sourcetransformer.Message, error) {
result, err := expr.EvalBool(e.filterExpr, payload)
if err != nil {
return sourcetransformer.MessageToDrop(et), err
}
if result {
timeStr, err := expr.EvalStr(e.eventTimeExpr, payload)
if err != nil {
return sourcetransformer.NewMessage(payload, et), err
return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err
}
var newEventTime time.Time
time.Local, _ = time.LoadLocation("UTC")
Expand All @@ -86,9 +86,9 @@ func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Mess
newEventTime, err = dateparse.ParseStrict(timeStr)
}
if err != nil {
return sourcetransformer.NewMessage(payload, et), err
return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err
} else {
return sourcetransformer.NewMessage(payload, newEventTime), nil
return sourcetransformer.NewMessage(payload, newEventTime).WithKeys(keys), nil
}
}
return sourcetransformer.MessageToDrop(et), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"testing"
"time"

"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
"github.com/stretchr/testify/assert"
)

var _keys = []string{"test-key"}

type testDatum struct {
value []byte
eventTime time.Time
Expand Down Expand Up @@ -66,7 +69,6 @@ var (
)

func TestFilterEventTime(t *testing.T) {

t.Run("Missing both expressions, return error", func(t *testing.T) {
_, err := New(map[string]string{})
assert.Error(t, err)
Expand All @@ -89,14 +91,16 @@ func TestFilterEventTime(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 2", "eventTimeExpr": "json(payload).item[1].time", "eventTimeFormat": time.RFC3339})
assert.NoError(t, err)

result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
})

// check that messsage has not changed
// check that message has not changed
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// check that keys have not changed
assert.Equal(t, _keys, result.Items()[0].Keys())

// check that event time has changed
time.Local, _ = time.LoadLocation("UTC")
Expand All @@ -108,28 +112,31 @@ func TestFilterEventTime(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 3", "eventTimeExpr": "json(payload).item[1].time", "eventTimeFormat": time.RFC3339})
assert.NoError(t, err)

result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
})

assert.Equal(t, "", string(result.Items()[0].Value()))
assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0])
})

t.Run("Valid JSON expression for filter, incorrect format to eventTime", func(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 2", "eventTimeExpr": "json(payload).item[1].time", "eventTimeFormat": time.ANSIC})
assert.NoError(t, err)

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
})

// check that message event time has not changed
assert.Equal(t, testInputEventTime, result.Items()[0].EventTime())
// check that message has not changed
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// check that keys have not been added
assert.Equal(t, _keys, result.Items()[0].Keys())
})

}
Loading