Skip to content

Commit

Permalink
feat: implement map batch (#1778)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Jul 11, 2024
1 parent 48cad9a commit 00619b6
Show file tree
Hide file tree
Showing 27 changed files with 1,485 additions and 116 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.17
github.com/nats-io/nats.go v1.36.0
github.com/numaproj/numaflow-go v0.7.0-rc2
github.com/numaproj/numaflow-go v0.7.1-0.20240711051731-15e45210b784
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.7.0-rc2 h1:oLP0uM9ToFEtiSyzX46cFOlr7ctgcH02cY/XFA1qF8E=
github.com/numaproj/numaflow-go v0.7.0-rc2/go.mod h1:WoMt31+h3up202zTRI8c/qe42B8UbvwLe2mJH0MAlhI=
github.com/numaproj/numaflow-go v0.7.1-0.20240711051731-15e45210b784 h1:JnpaG557KqDrzIi1c5YeffeLXGmLd8F2lzQEBC+wFWQ=
github.com/numaproj/numaflow-go v0.7.1-0.20240711051731-15e45210b784/go.mod h1:WoMt31+h3up202zTRI8c/qe42B8UbvwLe2mJH0MAlhI=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const (
EnvPipelineName = "NUMAFLOW_PIPELINE_NAME"
EnvVertexName = "NUMAFLOW_VERTEX_NAME"
EnvMapStreaming = "NUMAFLOW_MAP_STREAMING"
EnvBatchMap = "NUMAFLOW_BATCH_MAP"
EnvCallbackEnabled = "NUMAFLOW_CALLBACK_ENABLED"
EnvCallbackURL = "NUMAFLOW_CALLBACK_URL"
EnvPod = "NUMAFLOW_POD"
Expand Down Expand Up @@ -205,6 +206,9 @@ const (
// UDF map streaming
MapUdfStreamKey = "numaflow.numaproj.io/map-stream"

// BatchMapUdfStreamKey is the annotation for enabling UDF Batch Map
BatchMapUdfStreamKey = "numaflow.numaproj.io/batch-map"

// Pipeline health status
PipelineStatusHealthy = "healthy"
PipelineStatusUnknown = "unknown"
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (v Vertex) commonEnvs() []corev1.EnvVar {
{Name: EnvMapStreaming, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + MapUdfStreamKey + "']"}}},
{Name: EnvCallbackEnabled, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackEnabledKey + "']"}}},
{Name: EnvCallbackURL, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackURLKey + "']"}}},
{Name: EnvBatchMap, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + BatchMapUdfStreamKey + "']"}}},
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/isb/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Header struct {
Keys []string
// Headers is the headers of the message which can be used to store and propagate source headers like kafka headers,
// http headers and Numaflow platform headers like tracing headers etc.
//TODO: can we rename this? Gets confusing for understanding headers under header
Headers map[string]string
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (f myForwardJetStreamTest) ApplyMapStream(ctx context.Context, message *isb
return testutils.CopyUDFTestApplyStream(ctx, "", writeMessageCh, message)
}

func (f myForwardJetStreamTest) ApplyBatchMap(ctx context.Context, messages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error) {
return testutils.CopyUDFTestApplyBatchMap(ctx, "test-vertex", messages)
}

// TestForwarderJetStreamBuffer is a test that is used to test forwarder with jetstream buffer
func TestForwarderJetStreamBuffer(t *testing.T) {
tests := []struct {
Expand Down Expand Up @@ -148,7 +152,12 @@ func TestForwarderJetStreamBuffer(t *testing.T) {
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
idleManager, err := wmb.NewIdleManager(1, len(toSteps))
assert.NoError(t, err)
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardJetStreamTest{}, myForwardJetStreamTest{}, myForwardJetStreamTest{}, fetchWatermark, publishWatermark, idleManager, forward.WithReadBatchSize(tt.batchSize), forward.WithUDFStreaming(tt.streamEnabled))
appliers := forward.MapAppliers{
MapUDF: myForwardJetStreamTest{},
MapStreamUDF: myForwardJetStreamTest{},
BatchMapUDF: myForwardJetStreamTest{},
}
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardJetStreamTest{}, appliers, fetchWatermark, publishWatermark, idleManager, forward.WithReadBatchSize(tt.batchSize), forward.WithUDFStreaming(tt.streamEnabled))
assert.NoError(t, err)

stopped := f.Start()
Expand Down
15 changes: 12 additions & 3 deletions pkg/isb/stores/redis/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ var (
// Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}
Addrs: []string{":6379"},
}
readTestAppliers = forward.MapAppliers{
MapUDF: forwardReadWritePerformance{},
MapStreamUDF: forwardReadWritePerformance{},
BatchMapUDF: forwardReadWritePerformance{},
}

testStartTime = time.Unix(1636470000, 0).UTC()
)
Expand Down Expand Up @@ -140,7 +145,7 @@ func TestRedisCheckBacklog(t *testing.T) {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward.NewInterStepDataForward(vertexInstance, rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithReadBatchSize(10))
f, err := forward.NewInterStepDataForward(vertexInstance, rqr, toSteps, forwardReadWritePerformance{}, readTestAppliers, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager(), forward.WithReadBatchSize(10))

stopped := f.Start()
// validate the length of the toStep stream.
Expand Down Expand Up @@ -315,6 +320,10 @@ func (f forwardReadWritePerformance) ApplyMapStream(ctx context.Context, message
return testutils.CopyUDFTestApplyStream(ctx, "testVertex", writeMessageCh, message)
}

func (f forwardReadWritePerformance) ApplyBatchMap(ctx context.Context, messages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error) {
return testutils.CopyUDFTestApplyBatchMap(ctx, "test-vertex", messages)
}

func (suite *ReadWritePerformance) SetupSuite() {
client := redisclient.NewRedisClient(redisOptions)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
Expand Down Expand Up @@ -344,7 +353,7 @@ func (suite *ReadWritePerformance) SetupSuite() {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
isdf, _ := forward.NewInterStepDataForward(vertexInstance, rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager())
isdf, _ := forward.NewInterStepDataForward(vertexInstance, rqr, toSteps, forwardReadWritePerformance{}, readTestAppliers, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager())

suite.ctx = ctx
suite.rclient = client
Expand Down Expand Up @@ -438,7 +447,7 @@ func (suite *ReadWritePerformance) TestReadWriteLatencyPipelining() {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
suite.isdf, _ = forward.NewInterStepDataForward(vertexInstance, suite.rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager())
suite.isdf, _ = forward.NewInterStepDataForward(vertexInstance, suite.rqr, toSteps, forwardReadWritePerformance{}, readTestAppliers, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager())

suite.False(suite.rqw.IsFull())
var writeMessages = make([]isb.Message, 0, suite.count)
Expand Down
16 changes: 14 additions & 2 deletions pkg/isb/stores/redis/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ import (
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

var (
appliers = forward.MapAppliers{
MapUDF: myForwardRedisTest{},
MapStreamUDF: myForwardRedisTest{},
BatchMapUDF: myForwardRedisTest{},
}
)

func TestRedisQWrite_Write(t *testing.T) {
client := redisclient.NewRedisClient(redisOptions)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
Expand Down Expand Up @@ -359,6 +367,10 @@ func (f myForwardRedisTest) ApplyMapStream(ctx context.Context, message *isb.Rea
return testutils.CopyUDFTestApplyStream(ctx, "", writeMessageCh, message)
}

func (f myForwardRedisTest) ApplyBatchMap(ctx context.Context, messages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error) {
return testutils.CopyUDFTestApplyBatchMap(ctx, "", messages)
}

// TestNewInterStepDataForwardRedis is used to read data from one step to another using redis as the Inter-Step Buffer
// For the purposes of testing we need to write some data to the from step
func TestNewInterStepDataForwardRedis(t *testing.T) {
Expand Down Expand Up @@ -410,7 +422,7 @@ func TestNewInterStepDataForwardRedis(t *testing.T) {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardRedisTest{}, myForwardRedisTest{}, myForwardRedisTest{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager())
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardRedisTest{}, appliers, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager())
assert.NoError(t, err)
assert.False(t, to1.IsFull())

Expand Down Expand Up @@ -459,7 +471,7 @@ func TestReadTimeout(t *testing.T) {
}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardRedisTest{}, myForwardRedisTest{}, myForwardRedisTest{}, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager())
f, err := forward.NewInterStepDataForward(vertexInstance, fromStep, toSteps, myForwardRedisTest{}, appliers, fetchWatermark, publishWatermark, wmb.NewNoOpIdleManager())
assert.NoError(t, err)
stopped := f.Start()
// Call stop to end the test as we have a blocking read. The forwarder is up and running with no messages written
Expand Down
36 changes: 36 additions & 0 deletions pkg/isb/testutils/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,39 @@ func CopyUDFTestApplyStream(ctx context.Context, vertexName string, writeMessage
writeMessageCh <- isb.WriteMessage{Message: writeMessage}
return nil
}

func CopyUDFTestApplyBatchMap(ctx context.Context, vertexName string, readMessages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error) {
udfResults := make([]isb.ReadWriteMessagePair, len(readMessages))

for idx, readMessage := range readMessages {
offset := readMessage.ReadOffset
payload := readMessage.Body.Payload
parentPaneInfo := readMessage.MessageInfo
// copy the payload
result := payload

writeMessage := isb.Message{
Header: isb.Header{
MessageInfo: parentPaneInfo,
ID: isb.MessageID{
VertexName: vertexName,
Offset: offset.String(),
Index: 0,
},
Keys: readMessage.Keys,
},
Body: isb.Body{
Payload: result,
},
}
writeMessage.Headers = readMessage.Headers
taggedMessage := &isb.WriteMessage{
Message: writeMessage,
Tags: nil,
}
udfResults[idx].WriteMessages = []*isb.WriteMessage{taggedMessage}
udfResults[idx].ReadMessage = readMessage
}

return udfResults, nil
}
181 changes: 181 additions & 0 deletions pkg/sdkclient/batchmapper/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package batchmapper

import (
"context"
"errors"
"io"

"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"github.com/numaproj/numaflow-go/pkg/info"

"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
)

// client contains the grpc connection and the grpc client.
type client struct {
conn *grpc.ClientConn
grpcClt batchmappb.BatchMapClient
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.BatchMapAddr)

for _, inputOption := range inputOptions {
inputOption(opts)
}

// Connect to the server
conn, err := grpcutil.ConnectToServer(opts.UdsSockAddr(), serverInfo, opts.MaxMessageSize())
if err != nil {
return nil, err
}

c := new(client)
c.conn = conn
c.grpcClt = batchmappb.NewBatchMapClient(conn)
return c, nil
}

func NewFromClient(c batchmappb.BatchMapClient) (Client, error) {
return &client{
grpcClt: c,
}, nil
}

// CloseConn closes the grpc client connection.
func (c *client) CloseConn(ctx context.Context) error {
return c.conn.Close()
}

// IsReady returns true if the grpc connection is ready to use.
func (c *client) IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) {
resp, err := c.grpcClt.IsReady(ctx, in)
if err != nil {
return false, err
}
return resp.GetReady(), nil
}

// BatchMapFn is the handler for the gRPC client (Numa container)
// It takes in a stream of input Requests, sends them to the gRPC server(UDF) and then streams the
// responses received back on a channel asynchronously.
// We spawn 2 goroutines here, one for sending the requests over the stream
// and the other one for receiving the responses
func (c *client) BatchMapFn(ctx context.Context, inputCh <-chan *batchmappb.BatchMapRequest) (<-chan *batchmappb.BatchMapResponse, <-chan error) {
// errCh is used to track and propagate any errors that might occur during the rpc lifecyle, these could include
// errors in sending, UDF errors etc
// These are propagated to the applier for further handling
errCh := make(chan error)

// response channel for streaming back the results received from the gRPC server
responseCh := make(chan *batchmappb.BatchMapResponse)

// BatchMapFn is a bidirectional streaming RPC
// We get a Map_BatchMapFnClient object over which we can send the requests,
// receive the responses asynchronously.
// TODO(map-batch): this creates a new gRPC stream for every batch,
// it might be useful to see the performance difference between this approach
// and a long-running RPC
stream, err := c.grpcClt.BatchMapFn(ctx)
if err != nil {
go func() {
errCh <- sdkerr.ToUDFErr("c.grpcClt.BatchMapFn", err)
}()
// passing a nil channel for responseCh to ensure that it is never selected as this is an error scenario
// and we should be reading on the error channel only.
return nil, errCh
}

// read the response from the server stream and send it to responseCh channel
// any error is sent to errCh channel
go func() {
// close this channel to indicate that no more elements left to receive from grpc
// We do defer here on the whole go-routine as even during a error scenario, we
// want to close the channel and stop forwarding any more responses from the UDF
// as we would be replaying the current ones.
defer close(responseCh)

var resp *batchmappb.BatchMapResponse
var recvErr error
for {
resp, recvErr = stream.Recv()
// check if this is EOF error, which indicates that no more responses left to process on the
// stream from the UDF, in such a case we return without any error to indicate this
if errors.Is(recvErr, io.EOF) {
// set the error channel to nil in case of no errors to ensure
// that it is not picked up
errCh = nil
return
}
// If this is some other error, propagate it to error channel,
// also close the response channel(done using the defer close) to indicate no more messages being read
errSDK := sdkerr.ToUDFErr("c.grpcClt.BatchMapFn", recvErr)
if errSDK != nil {
errCh <- errSDK
return
}
// send the response upstream
responseCh <- resp
}
}()

// Read from the read messages and send them individually to the bi-di stream for processing
// in case there is an error in sending, send it to the error channel for handling
go func() {
for {
select {
case <-ctx.Done():
// If the context is done we do not want to send further on the stream,
// the Recv should get an error from the server as the stream uses the same ctx
return
case inputMsg, ok := <-inputCh:
// If there are no more messages left to read on the channel, then we can
// close the stream.
if !ok {
// CloseSend closes the send direction of the stream. This indicates to the
// UDF that we have sent all requests from the client, and it can safely
// stop listening on the stream
sendErr := stream.CloseSend()
if sendErr != nil && !errors.Is(sendErr, io.EOF) {
errCh <- sdkerr.ToUDFErr("c.grpcClt.BatchMapFn stream.CloseSend()", sendErr)
}
// exit this routine
return
} else {
err = stream.Send(inputMsg)
if err != nil {
errCh <- sdkerr.ToUDFErr("c.grpcClt.BatchMapFn", err)
// On an error we would be stopping any further processing and go for a replay
// so return directly
return
}
}
}
}

}()
return responseCh, errCh

}
Loading

0 comments on commit 00619b6

Please sign in to comment.