Skip to content

Commit

Permalink
chore: improve ctx handling for reduce (#1601)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Mar 26, 2024
1 parent aab37c6 commit 464baf7
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 57 deletions.
2 changes: 2 additions & 0 deletions examples/6-reduce-fixed-window-with-pvc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ kind: Pipeline
metadata:
name: even-odd-sum
spec:
watermark:
maxDelay: 30s
vertices:
- name: in
source:
Expand Down
77 changes: 55 additions & 22 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type DataForward struct {
storeManager wal.Manager
of *pnf.Manager
opts *Options
currentWatermark time.Time // if watermark is -1, then make sure event-time is < watermark
log *zap.SugaredLogger
}

Expand Down Expand Up @@ -116,6 +117,7 @@ func NewDataForward(ctx context.Context,
whereToDecider: whereToDecider,
of: of,
wmbChecker: wmb.NewWMBChecker(2), // TODO: make configurable
currentWatermark: time.UnixMilli(-1),
log: logging.FromContext(ctx),
opts: options}

Expand Down Expand Up @@ -170,7 +172,10 @@ func (df *DataForward) replayForUnalignedWindows(ctx context.Context, discovered
for _, s := range discoveredWALs {
p := s.PartitionID()
// associate the PBQ and PnF
df.associatePBQAndPnF(ctx, p)
_, err := df.associatePBQAndPnF(ctx, p)
if err != nil {
return err
}
}
eg := errgroup.Group{}
df.log.Info("Number of partitions to replay: ", len(discoveredWALs))
Expand Down Expand Up @@ -225,7 +230,10 @@ func (df *DataForward) replayForAlignedWindows(ctx context.Context, discoveredWA
df.windower.InsertWindow(window.NewAlignedTimedWindow(p.Start, p.End, p.Slot))

// associate the PBQ and PnF
df.associatePBQAndPnF(ctx, p)
_, err := df.associatePBQAndPnF(ctx, p)
if err != nil {
return err
}
}
eg := errgroup.Group{}
df.log.Infow("Number of partitions to replay: ", zap.Int("count", len(discoveredWALs)))
Expand Down Expand Up @@ -321,7 +329,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if watermark := time.UnixMilli(processorWMB.Watermark).Add(-1 * time.Millisecond); oldestWindowEndTime.Before(watermark) {
windowOperations := df.windower.CloseWindows(watermark)
for _, op := range windowOperations {
err = df.writeToPBQ(ctx, op, true)
// we do not have to persist close operations
err = df.writeToPBQ(ctx, op, false)
if err != nil {
df.log.Errorw("Failed to write to PBQ", zap.Error(err))
}
Expand All @@ -347,6 +356,10 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
// get the watermark for the partition from which we read the messages
processorWM := df.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, df.fromBufferPartition.GetPartitionIdx())

if processorWM.After(df.currentWatermark) {
df.currentWatermark = time.Time(processorWM)
}

for _, m := range readMessages {
if !df.keyed {
m.Keys = []string{dfv1.DefaultKeyForNonKeyedData}
Expand All @@ -369,7 +382,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {

// associatePBQAndPnF associates a PBQ with the partition if a PBQ exists, else creates a new one and then associates
// it to the partition.
func (df *DataForward) associatePBQAndPnF(ctx context.Context, partitionID *partition.ID) pbq.ReadWriteCloser {
func (df *DataForward) associatePBQAndPnF(ctx context.Context, partitionID *partition.ID) (pbq.ReadWriteCloser, error) {
// look for existing pbq
q := df.pbqManager.GetPBQ(*partitionID)

Expand All @@ -382,24 +395,38 @@ func (df *DataForward) associatePBQAndPnF(ctx context.Context, partitionID *part
Factor: 1.5,
Jitter: 0.1,
}
pbqErr = wait.ExponentialBackoffWithContext(ctx, infiniteBackoff, func(_ context.Context) (done bool, err error) {
// we manage ctx ourselves
pbqErr = wait.ExponentialBackoff(infiniteBackoff, func() (done bool, err error) {
var attempt int
q, pbqErr = df.pbqManager.CreateNewPBQ(ctx, *partitionID)
if pbqErr != nil {
attempt += 1
df.log.Warnw("Failed to create pbq during startup, retrying", zap.Any("attempt", attempt), zap.String("partitionID", partitionID.String()), zap.Error(pbqErr))
return false, nil
df.log.Warnw("Failed to create pbq, retrying", zap.Any("attempt", attempt), zap.String("partitionID", partitionID.String()), zap.Error(pbqErr))
// no point retrying if ctx.Done has been invoked
select {
case <-ctx.Done():
// no point in retrying after we have been asked to stop.
return false, ctx.Err()
default:
// keep retrying
return false, nil
}
}
return true, nil
})

if pbqErr != nil {
df.log.Errorw("Failed to create pbq context cancelled", zap.String("partitionID", partitionID.String()), zap.Error(pbqErr))
return nil, pbqErr
}
// since we created a brand new PBQ it means there is no PnF listening on this PBQ.
// we should create and attach the read side of the loop (PnF) to the partition and then
// start process-and-forward (pnf) loop
df.of.AsyncSchedulePnF(ctx, partitionID, q)
df.log.Infow("Successfully Created/Found pbq and started PnF", zap.String("partitionID", partitionID.String()))
}

return q
return q, nil
}

// process is one iteration of the read loop which create/merges/closes windows and writes window requests to the PBQs followed by acking the messages, and
Expand Down Expand Up @@ -441,9 +468,6 @@ func (df *DataForward) process(ctx context.Context, messages []*isb.ReadMessage)
df.ackMessages(ctx, ctrlMessages)
}

// ack successful messages
df.ackMessages(ctx, successfullyWrittenMessages)

// no-ack the failed messages, so that they will be retried in the next iteration
// if we don't do this, the failed messages will be retried after the ackWait time
// which will cause correctness issues. We want these messages to be immediately retried.
Expand All @@ -456,6 +480,9 @@ func (df *DataForward) process(ctx context.Context, messages []*isb.ReadMessage)
return
}

// ack successful messages
df.ackMessages(ctx, successfullyWrittenMessages)

// close any windows that need to be closed.
// since the watermark will be same for all the messages in the batch
// we can invoke remove windows only once per batch
Expand All @@ -467,7 +494,7 @@ func (df *DataForward) process(ctx context.Context, messages []*isb.ReadMessage)

// write the close window operations to PBQ
for _, winOp := range closedWindowOps {
err = df.writeToPBQ(ctx, winOp, true)
err = df.writeToPBQ(ctx, winOp, false)
if err != nil {
df.log.Errorw("Failed to write close signal to PBQ", zap.Error(err))
}
Expand Down Expand Up @@ -499,30 +526,32 @@ func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*i
var writtenMessages = make([]*isb.ReadMessage, 0, len(messages))
var failedMessages = make([]*isb.ReadMessage, 0)

messagesLoop:
for i, message := range messages {
for _, message := range messages {
if df.shouldDropMessage(message) {
writtenMessages = append(writtenMessages, message)
continue
}

var failed bool
// identify and add window for the message
windowOperations := df.windower.AssignWindows(message)

// for each window we will have a PBQ. A message could belong to multiple windows (e.g., sliding).
// We need to write the messages to these PBQs
for _, winOp := range windowOperations {

err := df.writeToPBQ(ctx, winOp, true)
err = df.writeToPBQ(ctx, winOp, true)
// there is no point continuing because we are seeing an error.
// this error will ONLY BE set if we are in an error loop and ctx.Done() has been invoked.
if err != nil {
df.log.Errorw("Failed to write message, asked to stop trying", zap.Any("msgOffSet", message.ReadOffset.String()), zap.String("partitionID", winOp.ID.String()), zap.Error(err))
failedMessages = append(failedMessages, messages[i:]...)
break messagesLoop
failed = true
}
}

if failed {
failedMessages = append(failedMessages, message)
continue
}
writtenMessages = append(writtenMessages, message)
}
return writtenMessages, failedMessages, err
Expand Down Expand Up @@ -558,7 +587,8 @@ func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool {
// This could be due to a couple of problem, eg. ack was not registered, etc.
// Please do not confuse this with late data! This is a platform related problem causing the watermark inequality
// to be violated.
if !message.IsLate && message.EventTime.Before(message.Watermark.Add(-1*df.opts.allowedLateness)) {
// df.currentWatermark cannot be -1 except for the first time till it gets a valid watermark (wm > -1)
if !message.IsLate && message.EventTime.Before(df.currentWatermark.Add(-1*df.opts.allowedLateness)) {
// TODO: track as a counter metric
df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message))
// mark it as a successfully written message as the message will be acked to avoid subsequent retries
Expand Down Expand Up @@ -593,9 +623,12 @@ func (df *DataForward) writeToPBQ(ctx context.Context, winOp *window.TimedWindow
Jitter: 0.1,
}

q := df.associatePBQAndPnF(ctx, winOp.ID)
q, err := df.associatePBQAndPnF(ctx, winOp.ID)
if err != nil {
return err
}

err := wait.ExponentialBackoff(pbqWriteBackoff, func() (done bool, err error) {
err = wait.ExponentialBackoff(pbqWriteBackoff, func() (done bool, err error) {
rErr := q.Write(ctx, winOp, persist)
if rErr != nil {
df.log.Errorw("Failed to write message", zap.String("msgOffSet", winOp.ReadMessage.ReadOffset.String()), zap.String("partitionID", winOp.ID.String()), zap.Error(rErr))
Expand Down Expand Up @@ -692,7 +725,7 @@ func (df *DataForward) ackMessages(ctx context.Context, messages []*isb.ReadMess
func (df *DataForward) noAckMessages(ctx context.Context, failedMessages []*isb.ReadMessage) {
var readOffsets []isb.Offset
for _, m := range failedMessages {
df.log.Debugw("No-ack message", zap.String("msgOffSet", m.ReadOffset.String()))
df.log.Infow("No-ack message", zap.String("msgOffSet", m.ReadOffset.String()))
readOffsets = append(readOffsets, m.ReadOffset)
}
df.fromBufferPartition.NoAck(ctx, readOffsets)
Expand Down
40 changes: 19 additions & 21 deletions pkg/reduce/pbq/pbq.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ var _ ReadWriteCloser = (*PBQ)(nil)
// Write accepts a window request and writes it to the PBQ, only the isb message is written to the store.
// The other metadata like operation etc are recomputed from WAL.
// request can never be nil.
func (p *PBQ) Write(ctx context.Context, request *window.TimedWindowRequest, persist bool) error {
func (p *PBQ) Write(_ context.Context, request *window.TimedWindowRequest, persist bool) error {
var writeErr error

// if cob we should return
if p.cob {
p.log.Errorw("Failed to write request to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("request", request))
return nil
return fmt.Errorf("pbq is closed")
}

// if the window operation is delete, we should close the output channel and return
Expand All @@ -65,32 +67,28 @@ func (p *PBQ) Write(ctx context.Context, request *window.TimedWindowRequest, per
return nil
}

var writeErr error
// we need context to get out of blocking write
select {
case p.output <- request:
switch request.Operation {
case window.Open, window.Append, window.Expand:
// during replay we do not have to persist
if persist {
writeErr = p.store.Write(request.ReadMessage)
}
case window.Close, window.Merge:
// these do not have request.ReadMessage, only metadata fields are used
default:
return fmt.Errorf("unknown request.Operation, %v", request.Operation)
// write the request to the output channel
// NOTE: this is a blocking call! it should only block if UDF is blocking.
p.output <- request

switch request.Operation {
case window.Open, window.Append, window.Expand:
// during replay we do not have to persist
if persist {
writeErr = p.store.Write(request.ReadMessage)
}
case <-ctx.Done():
// closing the output channel will not cause panic, since its inside select case
// ctx.Done implicitly means write hasn't succeeded.
close(p.output)
writeErr = ctx.Err()
case window.Close, window.Merge:
// these do not have request.ReadMessage, only metadata fields are used
default:
return fmt.Errorf("unknown request.Operation, %v", request.Operation)
}

pbqChannelSize.With(map[string]string{
metrics.LabelVertex: p.vertexName,
metrics.LabelPipeline: p.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(p.vertexReplica)),
}).Set(float64(len(p.output)))

return writeErr
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reduce/pbq/pbqmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (m *Manager) ShutDown(ctx context.Context) {
// exponential backoff will return if err is not nil
return false, nil
}
m.log.Infow("Successfully closed pbq", zap.String("ID", q.PartitionID.String()))
return true, nil
})
if ctxClosedErr != nil {
Expand Down
5 changes: 1 addition & 4 deletions pkg/reduce/pbq/wal/aligned/fs/wal_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,7 @@ func (w *alignedWAL) Close() (err error) {
return err
}

err = w.fp.Close()
if err != nil {
return err
}
_ = w.fp.Close()

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reduce/pnf/processandforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ outerLoop:
for {
select {
case err := <-errCh:
if errors.Is(err, context.Canceled) {
if errors.Is(err, context.Canceled) || ctx.Err() != nil {
return
}
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions pkg/sdkclient/reducer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ func (c *client) ReduceFn(ctx context.Context, datumStreamCh <-chan *reducepb.Re
}
}

// close the stream after sending all the messages
if sendErr = stream.CloseSend(); sendErr != nil && !errors.Is(sendErr, io.EOF) {
errCh <- util.ToUDFErr("ReduceFn stream.Send()", sendErr)
select {
case <-ctx.Done():
return
default:
sendErr = stream.CloseSend()
if sendErr != nil && !errors.Is(sendErr, io.EOF) {
errCh <- util.ToUDFErr("ReduceFn stream.CloseSend()", sendErr)
}
}
}()

Expand Down
11 changes: 9 additions & 2 deletions pkg/sdkclient/sessionreducer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,16 @@ func (c *client) SessionReduceFn(ctx context.Context, datumStreamCh <-chan *sess
}
}
}

// close the stream after sending all the messages
if sendErr = stream.CloseSend(); sendErr != nil && !errors.Is(sendErr, io.EOF) {
errCh <- util.ToUDFErr("SessionReduceFn stream.Send()", sendErr)
select {
case <-ctx.Done():
return
default:
sendErr = stream.CloseSend()
if sendErr != nil && !errors.Is(sendErr, io.EOF) {
errCh <- util.ToUDFErr("SessionReduceFn stream.CloseSend()", sendErr)
}
}
}()

Expand Down
14 changes: 10 additions & 4 deletions test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: complex-sliding-sum
spec:
watermark:
maxDelay: "20s"
maxDelay: 30s
limits:
readBatchSize: 50
vertices:
Expand Down Expand Up @@ -36,7 +36,9 @@ spec:
length: 5s
keyed: true
storage:
emptyDir: {}
persistentVolumeClaim:
volumeSize: 1Gi
accessMode: ReadWriteOnce
- name: non-keyed-fixed-sum
containerTemplate:
env:
Expand All @@ -52,7 +54,9 @@ spec:
length: 10s
keyed: false
storage:
emptyDir: {}
persistentVolumeClaim:
volumeSize: 1Gi
accessMode: ReadWriteOnce
- name: non-keyed-sliding-sum
containerTemplate:
env:
Expand All @@ -69,7 +73,9 @@ spec:
slide: 10s
keyed: false
storage:
emptyDir: {}
persistentVolumeClaim:
volumeSize: 1Gi
accessMode: ReadWriteOnce
- name: sink
scale:
min: 1
Expand Down

0 comments on commit 464baf7

Please sign in to comment.