diff --git a/pkg/sources/transformer/grpc_transformer_test.go b/pkg/sources/transformer/grpc_transformer_test.go index 86d25c147c..600369b255 100644 --- a/pkg/sources/transformer/grpc_transformer_test.go +++ b/pkg/sources/transformer/grpc_transformer_test.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "github.com/numaproj/numaflow-go/pkg/sourcetransformer" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" @@ -28,96 +27,49 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" transformpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" - transformermock "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1/transformmock" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/testutils" sourcetransformerSdk "github.com/numaproj/numaflow/pkg/sdkclient/sourcetransformer" "github.com/numaproj/numaflow/pkg/udf/rpc" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" ) -func NewMockGRPCBasedTransformer(mockClient *transformermock.MockSourceTransformClient) *GRPCBasedTransformer { - c, _ := sourcetransformerSdk.NewFromClient(mockClient) - return &GRPCBasedTransformer{"test-vertex", c} -} - -func TestGRPCBasedTransformer_WaitUntilReadyWithMockClient(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient := transformermock.NewMockSourceTransformClient(ctrl) - mockClient.EXPECT().IsReady(gomock.Any(), gomock.Any()).Return(&transformpb.ReadyResponse{Ready: true}, nil) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Log(t.Name(), "test timeout") - } - }() - - u := NewMockGRPCBasedTransformer(mockClient) - err := u.WaitUntilReady(ctx) - assert.NoError(t, err) -} - -type rpcMsg struct { - msg proto.Message -} - -func (r *rpcMsg) Matches(msg interface{}) bool { - m, ok := msg.(proto.Message) - if !ok { - return false +func TestGRPCBasedTransformer_WaitUntilReadyWithServer(t *testing.T) { + svc := &sourcetransformer.Service{ + Transformer: sourcetransformer.SourceTransformFunc(func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages { + return sourcetransformer.Messages{} + }), } - return proto.Equal(m, r.msg) -} -func (r *rpcMsg) String() string { - return fmt.Sprintf("is %s", r.msg) + conn := newServer(t, func(server *grpc.Server) { + transformpb.RegisterSourceTransformServer(server, svc) + }) + transformClient := transformpb.NewSourceTransformClient(conn) + client, _ := sourcetransformerSdk.NewFromClient(transformClient) + u := NewGRPCBasedTransformer("testVertex", client) + err := u.WaitUntilReady(context.Background()) + assert.NoError(t, err) } -func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) { +func TestGRPCBasedTransformer_BasicApplyWithServer(t *testing.T) { t.Run("test success", func(t *testing.T) { + svc := &sourcetransformer.Service{ + Transformer: sourcetransformer.SourceTransformFunc(func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages { + return sourcetransformer.MessagesBuilder().Append(sourcetransformer.NewMessage(datum.Value(), datum.EventTime()).WithKeys(keys)) + }), + } - ctrl := gomock.NewController(t) - defer ctrl.Finish() + conn := newServer(t, func(server *grpc.Server) { + transformpb.RegisterSourceTransformServer(server, svc) + }) + transformClient := transformpb.NewSourceTransformClient(conn) + client, _ := sourcetransformerSdk.NewFromClient(transformClient) + u := NewGRPCBasedTransformer("testVertex", client) - mockClient := transformermock.NewMockSourceTransformClient(ctrl) - req := &transformpb.SourceTransformRequest{ - Keys: []string{"test_success_key"}, - Value: []byte(`forward_message`), - EventTime: timestamppb.New(time.Unix(1661169600, 0)), - Watermark: timestamppb.New(time.Time{}), - } - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(&transformpb.SourceTransformResponse{ - Results: []*transformpb.SourceTransformResponse_Result{ - { - Keys: []string{"test_success_key"}, - Value: []byte(`forward_message`), - }, - }, - }, nil) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Log(t.Name(), "test timeout") - } - }() + ctx := context.Background() - u := NewMockGRPCBasedTransformer(mockClient) got, err := u.ApplyTransform(ctx, []*isb.ReadMessage{{ Message: isb.Message{ Header: isb.Header{ @@ -138,90 +90,29 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) { }}, ) assert.NoError(t, err) - assert.Equal(t, req.Keys, got[0].WriteMessages[0].Keys) - assert.Equal(t, req.Value, got[0].WriteMessages[0].Payload) + assert.Equal(t, []string{"test_success_key"}, got[0].WriteMessages[0].Keys) + assert.Equal(t, []byte(`forward_message`), got[0].WriteMessages[0].Payload) }) t.Run("test error", func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient := transformermock.NewMockSourceTransformClient(ctrl) - req := &transformpb.SourceTransformRequest{ - Keys: []string{"test_error_key"}, - Value: []byte(`forward_message`), - EventTime: timestamppb.New(time.Unix(1661169660, 0)), - Watermark: timestamppb.New(time.Time{}), + svc := &sourcetransformer.Service{ + Transformer: sourcetransformer.SourceTransformFunc(func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages { + return sourcetransformer.Messages{} + }), } - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, fmt.Errorf("mock error")) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Log(t.Name(), "test timeout") - } - }() - u := NewMockGRPCBasedTransformer(mockClient) - _, err := u.ApplyTransform(ctx, []*isb.ReadMessage{{ - Message: isb.Message{ - Header: isb.Header{ - MessageInfo: isb.MessageInfo{ - EventTime: time.Unix(1661169660, 0), - }, - ID: isb.MessageID{ - VertexName: "test-vertex", - Offset: "0-0", - }, - Keys: []string{"test_error_key"}, - }, - Body: isb.Body{ - Payload: []byte(`forward_message`), - }, - }, - ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), - }}, - ) - assert.ErrorIs(t, err, &rpc.ApplyUDFErr{ - UserUDFErr: false, - Message: fmt.Sprintf("%s", err), - InternalErr: rpc.InternalErr{ - Flag: true, - MainCarDown: false, - }, + conn := newServer(t, func(server *grpc.Server) { + transformpb.RegisterSourceTransformServer(server, svc) }) - }) - - t.Run("test error retryable: failed after 5 retries", func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() + transformClient := transformpb.NewSourceTransformClient(conn) + client, _ := sourcetransformerSdk.NewFromClient(transformClient) + u := NewGRPCBasedTransformer("testVertex", client) - mockClient := transformermock.NewMockSourceTransformClient(ctrl) - req := &transformpb.SourceTransformRequest{ - Keys: []string{"test_error_key"}, - Value: []byte(`forward_message`), - EventTime: timestamppb.New(time.Unix(1661169660, 0)), - Watermark: timestamppb.New(time.Time{}), - } - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Log(t.Name(), "test timeout") - } - }() + // Create a context and cancel it immediately. + // This cancelled context is passed to the ApplyTransform function to simulate failure + ctx, cancel := context.WithCancel(context.Background()) + cancel() - u := NewMockGRPCBasedTransformer(mockClient) _, err := u.ApplyTransform(ctx, []*isb.ReadMessage{{ Message: isb.Message{ Header: isb.Header{ @@ -241,173 +132,18 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) { ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }}, ) - assert.ErrorIs(t, err, &rpc.ApplyUDFErr{ - UserUDFErr: false, - Message: fmt.Sprintf("%s", err), - InternalErr: rpc.InternalErr{ - Flag: true, - MainCarDown: false, - }, - }) - }) - t.Run("test error retryable: failed after 1 retry", func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient := transformermock.NewMockSourceTransformClient(ctrl) - req := &transformpb.SourceTransformRequest{ - Keys: []string{"test_error_key"}, - Value: []byte(`forward_message`), - EventTime: timestamppb.New(time.Unix(1661169660, 0)), - Watermark: timestamppb.New(time.Time{}), - } - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.InvalidArgument, "mock test err: non retryable").Err()) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Log(t.Name(), "test timeout") - } - }() - - u := NewMockGRPCBasedTransformer(mockClient) - _, err := u.ApplyTransform(ctx, []*isb.ReadMessage{{ - Message: isb.Message{ - Header: isb.Header{ - MessageInfo: isb.MessageInfo{ - EventTime: time.Unix(1661169660, 0), - }, - ID: isb.MessageID{ - VertexName: "test-vertex", - Offset: "0-0", - }, - Keys: []string{"test_error_key"}, - }, - Body: isb.Body{ - Payload: []byte(`forward_message`), - }, - }, - ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), - }}, - ) - assert.ErrorIs(t, err, &rpc.ApplyUDFErr{ + expectedUDFErr := &rpc.ApplyUDFErr{ UserUDFErr: false, - Message: fmt.Sprintf("%s", err), + Message: "gRPC client.SourceTransformFn failed, Canceled: context canceled", InternalErr: rpc.InternalErr{ Flag: true, MainCarDown: false, }, - }) - }) - - t.Run("test error retryable: success after 1 retry", func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient := transformermock.NewMockSourceTransformClient(ctrl) - req := &transformpb.SourceTransformRequest{ - Keys: []string{"test_success_key"}, - Value: []byte(`forward_message`), - EventTime: timestamppb.New(time.Unix(1661169720, 0)), - Watermark: timestamppb.New(time.Time{}), - } - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err()) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(&transformpb.SourceTransformResponse{ - Results: []*transformpb.SourceTransformResponse_Result{ - { - Keys: []string{"test_success_key"}, - Value: []byte(`forward_message`), - }, - }, - }, nil) - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Log(t.Name(), "test timeout") - } - }() - - u := NewMockGRPCBasedTransformer(mockClient) - got, err := u.ApplyTransform(ctx, []*isb.ReadMessage{{ - Message: isb.Message{ - Header: isb.Header{ - MessageInfo: isb.MessageInfo{ - EventTime: time.Unix(1661169720, 0), - }, - ID: isb.MessageID{ - VertexName: "test-vertex", - Offset: "0-0", - }, - Keys: []string{"test_success_key"}, - }, - Body: isb.Body{ - Payload: []byte(`forward_message`), - }, - }, - ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), - }}, - ) - assert.NoError(t, err) - assert.Equal(t, req.Keys, got[0].WriteMessages[0].Keys) - assert.Equal(t, req.Value, got[0].WriteMessages[0].Payload) - }) - - t.Run("test error non retryable", func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient := transformermock.NewMockSourceTransformClient(ctrl) - req := &transformpb.SourceTransformRequest{ - Keys: []string{"test_error_key"}, - Value: []byte(`forward_message`), - EventTime: timestamppb.New(time.Unix(1661169660, 0)), - Watermark: timestamppb.New(time.Time{}), } - mockClient.EXPECT().SourceTransformFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.InvalidArgument, "mock test err: non retryable").Err()) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Log(t.Name(), "test timeout") - } - }() - - u := NewMockGRPCBasedTransformer(mockClient) - _, err := u.ApplyTransform(ctx, []*isb.ReadMessage{{ - Message: isb.Message{ - Header: isb.Header{ - MessageInfo: isb.MessageInfo{ - EventTime: time.Unix(1661169660, 0), - }, - ID: isb.MessageID{ - VertexName: "test-vertex", - Offset: "0-0", - }, - Keys: []string{"test_error_key"}, - }, - Body: isb.Body{ - Payload: []byte(`forward_message`), - }, - }, - ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), - }}, - ) - assert.ErrorIs(t, err, &rpc.ApplyUDFErr{ - UserUDFErr: false, - Message: fmt.Sprintf("%s", err), - InternalErr: rpc.InternalErr{ - Flag: true, - MainCarDown: false, - }, - }) + var receivedErr *rpc.ApplyUDFErr + assert.ErrorAs(t, err, &receivedErr) + assert.Equal(t, expectedUDFErr, receivedErr) }) } @@ -448,114 +184,26 @@ func TestGRPCBasedTransformer_ApplyWithServer_ChangePayload(t *testing.T) { apply, err := u.ApplyTransform(context.TODO(), messages) assert.NoError(t, err) - var results = make([][]byte, len(readMessages)) - var resultKeys = make([][]string, len(readMessages)) - for idx, pair := range apply { - results[idx] = pair.WriteMessages[0].Payload - resultKeys[idx] = pair.WriteMessages[0].Header.Keys - } - - var expectedResults = make([][]byte, count) - var expectedKeys = make([][]string, count) - for idx, readMessage := range readMessages { + for _, pair := range apply { + resultPayload := pair.WriteMessages[0].Payload + resultKeys := pair.WriteMessages[0].Header.Keys var readMessagePayload testutils.PayloadForTest - _ = json.Unmarshal(readMessage.Payload, &readMessagePayload) + _ = json.Unmarshal(pair.ReadMessage.Payload, &readMessagePayload) + var expectedKeys []string if readMessagePayload.Value%2 == 0 { - expectedKeys[idx] = []string{"even"} + expectedKeys = []string{"even"} } else { - expectedKeys[idx] = []string{"odd"} + expectedKeys = []string{"odd"} } + assert.Equal(t, expectedKeys, resultKeys) + doubledValue := testutils.PayloadForTest{ Key: readMessagePayload.Key, Value: readMessagePayload.Value * 2, } marshal, _ := json.Marshal(doubledValue) - expectedResults[idx] = marshal - } - - assert.ElementsMatch(t, expectedResults, results) - assert.Equal(t, expectedKeys, resultKeys) -} - -func TestGRPCBasedTransformer_ApplyWithMockClient_ChangePayload(t *testing.T) { - multiplyBy2 := func(body []byte) interface{} { - var result testutils.PayloadForTest - _ = json.Unmarshal(body, &result) - result.Value = result.Value * 2 - return result - } - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient := transformermock.NewMockSourceTransformClient(ctrl) - mockClient.EXPECT().SourceTransformFn(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, datum *transformpb.SourceTransformRequest, opts ...grpc.CallOption) (*transformpb.SourceTransformResponse, error) { - var originalValue testutils.PayloadForTest - _ = json.Unmarshal(datum.GetValue(), &originalValue) - doubledValue, _ := json.Marshal(multiplyBy2(datum.GetValue()).(testutils.PayloadForTest)) - var Results []*transformpb.SourceTransformResponse_Result - if originalValue.Value%2 == 0 { - Results = append(Results, &transformpb.SourceTransformResponse_Result{ - Keys: []string{"even"}, - Value: doubledValue, - }) - } else { - Results = append(Results, &transformpb.SourceTransformResponse_Result{ - Keys: []string{"odd"}, - Value: doubledValue, - }) - } - datumList := &transformpb.SourceTransformResponse{ - Results: Results, - } - return datumList, nil - }, - ).AnyTimes() - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() == context.DeadlineExceeded { - t.Log(t.Name(), "test timeout") - } - }() - - u := NewMockGRPCBasedTransformer(mockClient) - - var count = int64(10) - readMessages := testutils.BuildTestReadMessages(count, time.Unix(1661169600, 0), nil) - - var results = make([][]byte, len(readMessages)) - var resultKeys = make([][]string, len(readMessages)) - messages := make([]*isb.ReadMessage, len(readMessages)) - for idx, readMessage := range readMessages { - messages[idx] = &readMessage - } - apply, err := u.ApplyTransform(ctx, messages) - assert.NoError(t, err) - for idx, pair := range apply { - results[idx] = pair.WriteMessages[0].Payload - resultKeys[idx] = pair.WriteMessages[0].Header.Keys - } - - var expectedResults = make([][]byte, count) - var expectedKeys = make([][]string, count) - for idx, readMessage := range readMessages { - var readMessagePayload testutils.PayloadForTest - _ = json.Unmarshal(readMessage.Payload, &readMessagePayload) - if readMessagePayload.Value%2 == 0 { - expectedKeys[idx] = []string{"even"} - } else { - expectedKeys[idx] = []string{"odd"} - } - marshal, _ := json.Marshal(multiplyBy2(readMessage.Payload)) - expectedResults[idx] = marshal + assert.Equal(t, marshal, resultPayload) } - - assert.Equal(t, expectedResults, results) - assert.Equal(t, expectedKeys, resultKeys) } func newServer(t *testing.T, register func(server *grpc.Server)) *grpc.ClientConn { @@ -609,7 +257,7 @@ func newServer(t *testing.T, register func(server *grpc.Server)) *grpc.ClientCon return conn } -func TestGRPCBasedTransformer_ApplyWithMockClient_ChangeEventTime(t *testing.T) { +func TestGRPCBasedTransformer_Apply_ChangeEventTime(t *testing.T) { testEventTime := time.Date(1992, 2, 8, 0, 0, 0, 100, time.UTC) svc := &sourcetransformer.Service{ Transformer: sourcetransformer.SourceTransformFunc(func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages {