Skip to content

Commit

Permalink
Refactoring, fix syntax errors
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Sep 18, 2024
1 parent 910ff9b commit c625326
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 340 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.17
github.com/nats-io/nats.go v1.36.0
github.com/numaproj/numaflow-go v0.8.0
github.com/numaproj/numaflow-go v0.8.2-0.20240916154529-f205113bba8e
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
Expand All @@ -51,7 +51,7 @@ require (
golang.org/x/crypto v0.24.0
golang.org/x/net v0.25.0
golang.org/x/oauth2 v0.20.0
golang.org/x/sync v0.7.0
golang.org/x/sync v0.8.0
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17
google.golang.org/grpc v1.59.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.8.0 h1:1Pp0AMLXkmUPlvFjKeY3a9X+OLU8oN1OQWxD9jLg8Uo=
github.com/numaproj/numaflow-go v0.8.0/go.mod h1:WoMt31+h3up202zTRI8c/qe42B8UbvwLe2mJH0MAlhI=
github.com/numaproj/numaflow-go v0.8.2-0.20240916154529-f205113bba8e h1:ApgoGlSvmSo/+/P9XF/XeY7afWE0jSxAFgkeFaM6IW4=
github.com/numaproj/numaflow-go v0.8.2-0.20240916154529-f205113bba8e/go.mod h1:s4F+Xh4BKjSEdL+49sxKgJq8MQO41aVjN4Try4gsKwg=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down Expand Up @@ -802,8 +802,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
57 changes: 51 additions & 6 deletions pkg/sdkclient/sourcetransformer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package sourcetransformer

import (
"context"
"errors"
"io"

"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -64,7 +66,7 @@ func NewFromClient(c transformpb.SourceTransformClient) (Client, error) {
}

// CloseConn closes the grpc client connection.
func (c *client) CloseConn(ctx context.Context) error {
func (c *client) CloseConn(_ context.Context) error {
if c.conn == nil {
return nil
}
Expand All @@ -81,11 +83,54 @@ func (c *client) IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) {
}

// SourceTransformFn SourceTransformerFn applies a function to each request element.
func (c *client) SourceTransformFn(ctx context.Context, request *transformpb.SourceTransformRequest) (*transformpb.SourceTransformResponse, error) {
transformResponse, err := c.grpcClt.SourceTransformFn(ctx, request)
err = sdkerr.ToUDFErr("c.grpcClt.SourceTransformFn", err)
func (c *client) SourceTransformFn(ctx context.Context, request <-chan *transformpb.SourceTransformRequest) (<-chan *transformpb.SourceTransformResponse, <-chan error) {
errCh := make(chan error)
responseCh := make(chan *transformpb.SourceTransformResponse)
stream, err := c.grpcClt.SourceTransformFn(ctx)
if err != nil {
return nil, err
go func() {
errCh <- sdkerr.ToUDFErr("c.grpcClt.SourceTransformFn", err)
}()
return nil, errCh
}
return transformResponse, nil

go func() {
defer close(responseCh)
for {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
errCh = nil
return
}
errCh <- sdkerr.ToUDFErr("c.grpcClt.SourceTransformFn", err)
return
}
responseCh <- resp
}
}()

go func() {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-request:
if !ok {
err := stream.CloseSend()
if err != nil && errors.Is(err, io.EOF) {
errCh <- sdkerr.ToUDFErr("c.grpcClt.SourceTransformFn stream.CloseSend", err)
}
return
}
err := stream.Send(msg)
if err != nil {
errCh <- sdkerr.ToUDFErr("c.grpcClt.SourceTransformFn stream.Send", err)
return
}
}
}
}()

return responseCh, errCh
}
137 changes: 100 additions & 37 deletions pkg/sdkclient/sourcetransformer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@ package sourcetransformer

import (
"context"
"errors"
"fmt"
"reflect"
"testing"

"github.com/golang/mock/gomock"
transformpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1"
transformermock "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1/transformmock"
"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"net"
"reflect"
"testing"
"time"
)

func TestClient_IsReady(t *testing.T) {
Expand Down Expand Up @@ -54,44 +61,100 @@ func TestClient_IsReady(t *testing.T) {
assert.EqualError(t, err, "mock connection refused")
}

func TestClient_SourceTransformFn(t *testing.T) {
var ctx = context.Background()
func newServer(t *testing.T, register func(server *grpc.Server)) *grpc.ClientConn {
lis := bufconn.Listen(100)
t.Cleanup(func() {
_ = lis.Close()
})

ctrl := gomock.NewController(t)
defer ctrl.Finish()
server := grpc.NewServer()
t.Cleanup(func() {
server.Stop()
})

mockClient := transformermock.NewMockSourceTransformClient(ctrl)
mockClient.EXPECT().SourceTransformFn(gomock.Any(), gomock.Any()).Return(&transformpb.SourceTransformResponse{Results: []*transformpb.SourceTransformResponse_Result{
{
Keys: []string{"temp-key"},
Value: []byte("mock result"),
Tags: nil,
},
}}, nil)
mockClient.EXPECT().SourceTransformFn(gomock.Any(), gomock.Any()).Return(&transformpb.SourceTransformResponse{Results: []*transformpb.SourceTransformResponse_Result{
{
Keys: []string{"temp-key"},
Value: []byte("mock result"),
Tags: nil,
},
}}, fmt.Errorf("mock connection refused"))
register(server)

testClient, err := NewFromClient(mockClient)
assert.NoError(t, err)
reflect.DeepEqual(testClient, &client{
grpcClt: mockClient,
errChan := make(chan error, 1)
go func() {
// t.Fatal should only be called from the goroutine running the test
if err := server.Serve(lis); err != nil {
errChan <- err
}
}()

dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(func() {
cancel()
})

result, err := testClient.SourceTransformFn(ctx, &transformpb.SourceTransformRequest{})
assert.Equal(t, &transformpb.SourceTransformResponse{Results: []*transformpb.SourceTransformResponse_Result{
{
Keys: []string{"temp-key"},
Value: []byte("mock result"),
Tags: nil,
},
}}, result)
assert.NoError(t, err)
conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials()))
t.Cleanup(func() {
_ = conn.Close()
})
if err != nil {
t.Fatalf("Creating new gRPC client connection: %v", err)
}

var grpcServerErr error
select {
case grpcServerErr = <-errChan:
case <-time.After(500 * time.Millisecond):
grpcServerErr = errors.New("gRPC server didn't start in 500ms")
}
if err != nil {
t.Fatalf("Failed to start gRPC server: %v", grpcServerErr)
}

return conn
}

_, err = testClient.SourceTransformFn(ctx, &transformpb.SourceTransformRequest{})
assert.EqualError(t, err, "NonRetryable: mock connection refused")
func TestClient_SourceTransformFn(t *testing.T) {
var testTime = time.Date(2021, 8, 15, 14, 30, 45, 100, time.Local)
svc := &sourcetransformer.Service{
Transformer: sourcetransformer.SourceTransformFunc(func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages {
msg := datum.Value()
return sourcetransformer.MessagesBuilder().Append(sourcetransformer.NewMessage(msg, testTime).WithKeys([]string{keys[0] + "_test"}))
}),
}
conn := newServer(t, func(server *grpc.Server) {
transformpb.RegisterSourceTransformServer(server, svc)
})
transformClient := transformpb.NewSourceTransformClient(conn)
var ctx = context.Background()
client, _ := NewFromClient(transformClient)

reqChan := make(chan *transformpb.SourceTransformRequest, 1)
go func() {
for i := 0; i < 5; i++ {
reqChan <- &transformpb.SourceTransformRequest{
Keys: []string{fmt.Sprintf("client_key_%d", i)},
Value: []byte("test"),
}
}
}()

respChan, errChan := client.SourceTransformFn(ctx, reqChan)
var results [][]*transformpb.SourceTransformResponse_Result
for i := 0; i < 5; i++ {
var resp *transformpb.SourceTransformResponse
var err error
select {
case resp = <-respChan:
case err = <-errChan:
}
assert.NoError(t, err)
results = append(results, resp.GetResults())
}
expected := [][]*transformpb.SourceTransformResponse_Result{
{{Keys: []string{"client_key_0_test"}, Value: []byte("test"), EventTime: timestamppb.New(testTime)}},
{{Keys: []string{"client_key_1_test"}, Value: []byte("test"), EventTime: timestamppb.New(testTime)}},
{{Keys: []string{"client_key_2_test"}, Value: []byte("test"), EventTime: timestamppb.New(testTime)}},
{{Keys: []string{"client_key_3_test"}, Value: []byte("test"), EventTime: timestamppb.New(testTime)}},
{{Keys: []string{"client_key_4_test"}, Value: []byte("test"), EventTime: timestamppb.New(testTime)}},
}
assert.ElementsMatch(t, expected, results)
}
2 changes: 1 addition & 1 deletion pkg/sdkclient/sourcetransformer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ import (
type Client interface {
CloseConn(ctx context.Context) error
IsReady(ctx context.Context, in *emptypb.Empty) (bool, error)
SourceTransformFn(ctx context.Context, request *transformpb.SourceTransformRequest) (*transformpb.SourceTransformResponse, error)
SourceTransformFn(ctx context.Context, request <-chan *transformpb.SourceTransformRequest) (<-chan *transformpb.SourceTransformResponse, <-chan error)
}
8 changes: 4 additions & 4 deletions pkg/sources/forward/applier/sourcetransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
// SourceTransformApplier applies the source transform on the read message and gives back a new message. Any UserError will be retried here, while
// InternalErr can be returned and could be retried by the callee.
type SourceTransformApplier interface {
ApplyTransform(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error)
ApplyTransform(ctx context.Context, messages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error)
}

// ApplySourceTransformFunc is a function type that implements SourceTransformApplier interface.
type ApplySourceTransformFunc func(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error)
type ApplySourceTransformFunc func(ctx context.Context, messages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error)

// ApplyTransform implements SourceTransformApplier interface.
func (f ApplySourceTransformFunc) ApplyTransform(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return f(ctx, message)
func (f ApplySourceTransformFunc) ApplyTransform(ctx context.Context, messages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error) {
return f(ctx, messages)
}
Loading

0 comments on commit c625326

Please sign in to comment.