Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use gRPC bidirectional streaming for source transformer #2071

Merged
merged 11 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ manifests: crds
kubectl kustomize config/extensions/webhook > config/validating-webhook-install.yaml

$(GOPATH)/bin/golangci-lint:
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.54.1
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b `go env GOPATH`/bin v1.61.0

.PHONY: lint
lint: $(GOPATH)/bin/golangci-lint
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.20
github.com/nats-io/nats.go v1.37.0
github.com/numaproj/numaflow-go v0.8.2-0.20240923064822-e16694a878d0
github.com/numaproj/numaflow-go v0.8.2-0.20241001031210-60188185d9c0
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
Expand All @@ -55,7 +55,7 @@ require (
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117
google.golang.org/grpc v1.66.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0
google.golang.org/protobuf v1.34.2
k8s.io/api v0.29.2
k8s.io/apimachinery v0.29.2
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.8.2-0.20240923064822-e16694a878d0 h1:qPqZfJdPdsz4qymyzMSNICQe/xBnx9P/G3hRbC1DR7k=
github.com/numaproj/numaflow-go v0.8.2-0.20240923064822-e16694a878d0/go.mod h1:g4JZOyUPhjfhv+kR0sX5d8taw/dasgKPXLvQBi39mJ4=
github.com/numaproj/numaflow-go v0.8.2-0.20241001031210-60188185d9c0 h1:MN4Q36mPrXqPrv2dNoK3gyV7c1CGwUF3wNJxTZSw1lk=
github.com/numaproj/numaflow-go v0.8.2-0.20241001031210-60188185d9c0/go.mod h1:FaCMeV0V9SiLcVf2fwT+GeTJHNaK2gdQsTAIqQ4x7oc=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down Expand Up @@ -1049,8 +1049,8 @@ google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0 h1:9SxA29VM43MF5Z9dQu694wmY5t8E/Gxr7s+RSxiIDmc=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0/go.mod h1:yZOK5zhQMiALmuweVdIVoQPa6eIJyXn2B9g5dJDhqX4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
7 changes: 5 additions & 2 deletions hack/generate-proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ install-protobuf() {
ARCH=$(uname_arch)

echo "OS: $OS ARCH: $ARCH"
if [[ "$ARCH" = "amd64" ]]; then
ARCH="x86_64"
elif [[ "$ARCH" = "arm64" ]]; then
ARCH="aarch_64"
fi
BINARY_URL=$PB_REL/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-${OS}-${ARCH}.zip
if [[ "$OS" = "darwin" ]]; then
BINARY_URL=$PB_REL/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-osx-universal_binary.zip
elif [[ "$OS" = "linux" ]]; then
BINARY_URL=$PB_REL/download/v${PROTOBUF_VERSION}/protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
fi
echo "Downloading $BINARY_URL"

Expand Down
25 changes: 17 additions & 8 deletions pkg/apis/proto/daemon/daemon_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 24 additions & 6 deletions pkg/apis/proto/sourcetransform/v1/sourcetransform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,35 @@ service SourceTransform {
// SourceTransformFn applies a function to each request element.
// In addition to map function, SourceTransformFn also supports assigning a new event time to response.
// SourceTransformFn can be used only at source vertex by source data transformer.
rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse);
rpc SourceTransformFn(stream SourceTransformRequest) returns (stream SourceTransformResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
* SourceTransformerRequest represents a request element.
*/
message SourceTransformRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
// This ID is used to uniquely identify a transform request
string id = 6;
}
Request request = 1;
optional Handshake handshake = 2;
}

/**
Expand All @@ -56,6 +70,10 @@ message SourceTransformResponse {
repeated string tags = 4;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 3;
}

/**
Expand Down
56 changes: 56 additions & 0 deletions pkg/isb/tracker/message_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package tracker

import (
"sync"

"github.com/numaproj/numaflow/pkg/isb"
)

// MessageTracker is used to store a key value pair for string and *ReadMessage
// as it can be accessed by concurrent goroutines, we keep all operations
// under a mutex
type MessageTracker struct {
lock sync.RWMutex
m map[string]*isb.ReadMessage
}

// NewMessageTracker initializes a new instance of a Tracker
func NewMessageTracker(messages []*isb.ReadMessage) *MessageTracker {
m := make(map[string]*isb.ReadMessage, len(messages))
for _, msg := range messages {
id := msg.ReadOffset.String()
m[id] = msg
}
return &MessageTracker{
m: m,
lock: sync.RWMutex{},
}
}

// Remove will remove the entry for a given id and return the stored value corresponding to this id.
// A `nil` return value indicates that the id doesn't exist in the tracker.
func (t *MessageTracker) Remove(id string) *isb.ReadMessage {
t.lock.Lock()
defer t.lock.Unlock()
item, ok := t.m[id]
if !ok {
return nil
}
delete(t.m, id)
return item
}

// IsEmpty is a helper function which checks if the Tracker map is empty
// return true if empty
func (t *MessageTracker) IsEmpty() bool {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.m) == 0
}

// Len returns the number of messages currently stored in the tracker
func (t *MessageTracker) Len() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.m)
}
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
package rpc
package tracker

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
)

func TestTracker_AddRequest(t *testing.T) {
tr := NewTracker()
readMessages := testutils.BuildTestReadMessages(3, time.Unix(1661169600, 0), nil)
for _, msg := range readMessages {
tr.addRequest(&msg)
messages := make([]*isb.ReadMessage, len(readMessages))
for i, msg := range readMessages {
messages[i] = &msg
}
tr := NewMessageTracker(messages)
id := readMessages[0].ReadOffset.String()
m, ok := tr.getRequest(id)
assert.True(t, ok)
m := tr.Remove(id)
assert.NotNil(t, m)
assert.Equal(t, readMessages[0], *m)
}

func TestTracker_RemoveRequest(t *testing.T) {
tr := NewTracker()
readMessages := testutils.BuildTestReadMessages(3, time.Unix(1661169600, 0), nil)
for _, msg := range readMessages {
tr.addRequest(&msg)
messages := make([]*isb.ReadMessage, len(readMessages))
for i, msg := range readMessages {
messages[i] = &msg
}
tr := NewMessageTracker(messages)
id := readMessages[0].ReadOffset.String()
m, ok := tr.getRequest(id)
assert.True(t, ok)
m := tr.Remove(id)
assert.NotNil(t, m)
assert.Equal(t, readMessages[0], *m)
tr.removeRequest(id)
_, ok = tr.getRequest(id)
assert.False(t, ok)
m = tr.Remove(id)
assert.Nil(t, m)
}
2 changes: 0 additions & 2 deletions pkg/sdkclient/grpc/grpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package grpc

import (
"fmt"
"log"
"strconv"

"google.golang.org/grpc"
Expand Down Expand Up @@ -56,7 +55,6 @@ func ConnectToServer(udsSockAddr string, serverInfo *serverinfo.ServerInfo, maxM
)
} else {
sockAddr = getUdsSockAddr(udsSockAddr)
log.Println("UDS Client:", sockAddr)

conn, err = grpc.NewClient(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize), grpc.MaxCallSendMsgSize(maxMessageSize)))
Expand Down
Loading
Loading