Skip to content

Commit

Permalink
make ack bidirectional, separate source reader and acker
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 18, 2024
1 parent d0e31fa commit fdad5f4
Show file tree
Hide file tree
Showing 15 changed files with 220 additions and 143 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.17
github.com/nats-io/nats.go v1.36.0
github.com/numaproj/numaflow-go v0.8.2-0.20240917052911-ee2f3086d64e
github.com/numaproj/numaflow-go v0.8.2-0.20240918054944-0fd13d430793
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.8.2-0.20240917052911-ee2f3086d64e h1:F3iujbel8y5X20bVMY0Am6XDyL5eDOC/6kxyI8uxfpg=
github.com/numaproj/numaflow-go v0.8.2-0.20240917052911-ee2f3086d64e/go.mod h1:g4JZOyUPhjfhv+kR0sX5d8taw/dasgKPXLvQBi39mJ4=
github.com/numaproj/numaflow-go v0.8.2-0.20240918054944-0fd13d430793 h1:kUQw1LsUvmTjqFfcia6DZOxy8qCQwvdY0TpOnR8w3Xg=
github.com/numaproj/numaflow-go v0.8.2-0.20240918054944-0fd13d430793/go.mod h1:g4JZOyUPhjfhv+kR0sX5d8taw/dasgKPXLvQBi39mJ4=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
49 changes: 38 additions & 11 deletions pkg/sdkclient/source/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,42 @@ waitUntilReady:
return nil, fmt.Errorf("failed to create ack stream: %v", err)
}

// Send handshake request
handshakeRequest := &sourcepb.ReadRequest{
// Send handshake request for read stream
readHandshakeRequest := &sourcepb.ReadRequest{
Handshake: &sourcepb.Handshake{
Sot: true,
},
}
if err := c.readStream.Send(handshakeRequest); err != nil {
return nil, fmt.Errorf("failed to send handshake request: %v", err)
if err := c.readStream.Send(readHandshakeRequest); err != nil {
return nil, fmt.Errorf("failed to send read handshake request: %v", err)
}

// Wait for handshake response
handshakeResponse, err := c.readStream.Recv()
// Wait for handshake response for read stream
readHandshakeResponse, err := c.readStream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive handshake response: %v", err)
return nil, fmt.Errorf("failed to receive read handshake response: %v", err)
}
if handshakeResponse.GetHandshake() == nil || !handshakeResponse.GetHandshake().GetSot() {
return nil, fmt.Errorf("invalid handshake response")
if readHandshakeResponse.GetHandshake() == nil || !readHandshakeResponse.GetHandshake().GetSot() {
return nil, fmt.Errorf("invalid read handshake response")
}

// Send handshake request for ack stream
ackHandshakeRequest := &sourcepb.AckRequest{
Handshake: &sourcepb.Handshake{
Sot: true,
},
}
if err := c.ackStream.Send(ackHandshakeRequest); err != nil {
return nil, fmt.Errorf("failed to send ack handshake request: %v", err)
}

// Wait for handshake response for ack stream
ackHandshakeResponse, err := c.ackStream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive ack handshake response: %v", err)
}
if ackHandshakeResponse.GetHandshake() == nil || !ackHandshakeResponse.GetHandshake().GetSot() {
return nil, fmt.Errorf("invalid ack handshake response")
}

return c, nil
Expand Down Expand Up @@ -172,11 +191,19 @@ func (c *client) ReadFn(_ context.Context, req *sourcepb.ReadRequest, datumCh ch

// AckFn acknowledges the data from the source.
func (c *client) AckFn(_ context.Context, req *sourcepb.AckRequest) (*sourcepb.AckResponse, error) {
// Send the ack request
err := c.ackStream.Send(req)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to send ack request: %v", err)
}
return &sourcepb.AckResponse{}, nil

// Wait for the ack response
resp, err := c.ackStream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive ack response: %v", err)
}

return resp, nil
}

// PendingFn returns the number of pending data from the source.
Expand Down
35 changes: 30 additions & 5 deletions pkg/sdkclient/source/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,47 @@ func TestAckFn(t *testing.T) {
defer ctrl.Finish()

mockClient := sourcemock.NewMockSourceClient(ctrl)

mockStream := sourcemock.NewMockSource_AckFnClient(ctrl)

// Handshake request and response
mockStream.EXPECT().Send(&sourcepb.AckRequest{
Handshake: &sourcepb.Handshake{
Sot: true,
},
}).Return(nil)
mockStream.EXPECT().Recv().Return(&sourcepb.AckResponse{
Handshake: &sourcepb.Handshake{
Sot: true,
},
}, nil)

// Ack request and response
mockStream.EXPECT().Send(gomock.Any()).Return(nil)
mockStream.EXPECT().Send(gomock.Any()).Return(fmt.Errorf("mock connection refused"))
mockStream.EXPECT().Recv().Return(&sourcepb.AckResponse{}, nil)

testClient := client{
grpcClt: mockClient,
ackStream: mockStream,
}

// Perform handshake
ackHandshakeRequest := &sourcepb.AckRequest{
Handshake: &sourcepb.Handshake{
Sot: true,
},
}
err := testClient.ackStream.Send(ackHandshakeRequest)
assert.NoError(t, err)

ackHandshakeResponse, err := testClient.ackStream.Recv()
assert.NoError(t, err)
assert.NotNil(t, ackHandshakeResponse.GetHandshake())
assert.True(t, ackHandshakeResponse.GetHandshake().GetSot())

// Test AckFn
ack, err := testClient.AckFn(ctx, &sourcepb.AckRequest{})
assert.NoError(t, err)
assert.Equal(t, &sourcepb.AckResponse{}, ack)

_, err = testClient.AckFn(ctx, &sourcepb.AckRequest{})
assert.EqualError(t, err, "mock connection refused")
}

func TestPendingFn(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sources/udsource/grpc_udsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {

mockAckClient.EXPECT().Send(req1).Return(nil).Times(1)
mockAckClient.EXPECT().Send(req2).Return(nil).Times(1)
mockAckClient.EXPECT().Recv().Return(&sourcepb.AckResponse{}, nil).Times(2)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -340,6 +341,6 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {
NewUserDefinedSourceOffset(offset1),
NewUserDefinedSourceOffset(offset2),
})
assert.ErrorIs(t, err, status.New(codes.DeadlineExceeded, "mock test err").Err())
assert.Equal(t, err.Error(), fmt.Sprintf("failed to send ack request: %s", status.New(codes.DeadlineExceeded, "mock test err").Err()))
})
}
66 changes: 22 additions & 44 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion rust/monovertex/proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ service Source {
// The caller (numa) expects the AckFn to be successful, and it does not expect any errors.
// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request,
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(stream AckRequest) returns (AckResponse);
rpc AckFn(stream AckRequest) returns (stream AckResponse);

// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);
Expand Down Expand Up @@ -112,6 +112,7 @@ message AckRequest {
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
optional Handshake handshake = 2;
}

/*
Expand All @@ -131,6 +132,7 @@ message AckResponse {
}
// Required field holding the result.
Result result = 1;
optional Handshake handshake = 2;
}

/*
Expand Down
Loading

0 comments on commit fdad5f4

Please sign in to comment.