diff --git a/examples/6-reduce-fixed-window-with-pvc.yaml b/examples/6-reduce-fixed-window-with-pvc.yaml index 0acc2a40c4..03fed6215b 100644 --- a/examples/6-reduce-fixed-window-with-pvc.yaml +++ b/examples/6-reduce-fixed-window-with-pvc.yaml @@ -3,6 +3,8 @@ kind: Pipeline metadata: name: even-odd-sum spec: + watermark: + maxDelay: 30s vertices: - name: in source: diff --git a/pkg/reduce/data_forward.go b/pkg/reduce/data_forward.go index 50164d7771..153d095f2f 100644 --- a/pkg/reduce/data_forward.go +++ b/pkg/reduce/data_forward.go @@ -73,6 +73,7 @@ type DataForward struct { storeManager wal.Manager of *pnf.Manager opts *Options + currentWatermark time.Time // if watermark is -1, then make sure event-time is < watermark log *zap.SugaredLogger } @@ -116,6 +117,7 @@ func NewDataForward(ctx context.Context, whereToDecider: whereToDecider, of: of, wmbChecker: wmb.NewWMBChecker(2), // TODO: make configurable + currentWatermark: time.UnixMilli(-1), log: logging.FromContext(ctx), opts: options} @@ -170,7 +172,10 @@ func (df *DataForward) replayForUnalignedWindows(ctx context.Context, discovered for _, s := range discoveredWALs { p := s.PartitionID() // associate the PBQ and PnF - df.associatePBQAndPnF(ctx, p) + _, err := df.associatePBQAndPnF(ctx, p) + if err != nil { + return err + } } eg := errgroup.Group{} df.log.Info("Number of partitions to replay: ", len(discoveredWALs)) @@ -225,7 +230,10 @@ func (df *DataForward) replayForAlignedWindows(ctx context.Context, discoveredWA df.windower.InsertWindow(window.NewAlignedTimedWindow(p.Start, p.End, p.Slot)) // associate the PBQ and PnF - df.associatePBQAndPnF(ctx, p) + _, err := df.associatePBQAndPnF(ctx, p) + if err != nil { + return err + } } eg := errgroup.Group{} df.log.Infow("Number of partitions to replay: ", zap.Int("count", len(discoveredWALs))) @@ -321,7 +329,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { if watermark := time.UnixMilli(processorWMB.Watermark).Add(-1 * time.Millisecond); oldestWindowEndTime.Before(watermark) { windowOperations := df.windower.CloseWindows(watermark) for _, op := range windowOperations { - err = df.writeToPBQ(ctx, op, true) + // we do not have to persist close operations + err = df.writeToPBQ(ctx, op, false) if err != nil { df.log.Errorw("Failed to write to PBQ", zap.Error(err)) } @@ -347,6 +356,10 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // get the watermark for the partition from which we read the messages processorWM := df.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, df.fromBufferPartition.GetPartitionIdx()) + if processorWM.After(df.currentWatermark) { + df.currentWatermark = time.Time(processorWM) + } + for _, m := range readMessages { if !df.keyed { m.Keys = []string{dfv1.DefaultKeyForNonKeyedData} @@ -369,7 +382,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // associatePBQAndPnF associates a PBQ with the partition if a PBQ exists, else creates a new one and then associates // it to the partition. -func (df *DataForward) associatePBQAndPnF(ctx context.Context, partitionID *partition.ID) pbq.ReadWriteCloser { +func (df *DataForward) associatePBQAndPnF(ctx context.Context, partitionID *partition.ID) (pbq.ReadWriteCloser, error) { // look for existing pbq q := df.pbqManager.GetPBQ(*partitionID) @@ -382,16 +395,30 @@ func (df *DataForward) associatePBQAndPnF(ctx context.Context, partitionID *part Factor: 1.5, Jitter: 0.1, } - pbqErr = wait.ExponentialBackoffWithContext(ctx, infiniteBackoff, func(_ context.Context) (done bool, err error) { + // we manage ctx ourselves + pbqErr = wait.ExponentialBackoff(infiniteBackoff, func() (done bool, err error) { var attempt int q, pbqErr = df.pbqManager.CreateNewPBQ(ctx, *partitionID) if pbqErr != nil { attempt += 1 - df.log.Warnw("Failed to create pbq during startup, retrying", zap.Any("attempt", attempt), zap.String("partitionID", partitionID.String()), zap.Error(pbqErr)) - return false, nil + df.log.Warnw("Failed to create pbq, retrying", zap.Any("attempt", attempt), zap.String("partitionID", partitionID.String()), zap.Error(pbqErr)) + // no point retrying if ctx.Done has been invoked + select { + case <-ctx.Done(): + // no point in retrying after we have been asked to stop. + return false, ctx.Err() + default: + // keep retrying + return false, nil + } } return true, nil }) + + if pbqErr != nil { + df.log.Errorw("Failed to create pbq context cancelled", zap.String("partitionID", partitionID.String()), zap.Error(pbqErr)) + return nil, pbqErr + } // since we created a brand new PBQ it means there is no PnF listening on this PBQ. // we should create and attach the read side of the loop (PnF) to the partition and then // start process-and-forward (pnf) loop @@ -399,7 +426,7 @@ func (df *DataForward) associatePBQAndPnF(ctx context.Context, partitionID *part df.log.Infow("Successfully Created/Found pbq and started PnF", zap.String("partitionID", partitionID.String())) } - return q + return q, nil } // process is one iteration of the read loop which create/merges/closes windows and writes window requests to the PBQs followed by acking the messages, and @@ -441,9 +468,6 @@ func (df *DataForward) process(ctx context.Context, messages []*isb.ReadMessage) df.ackMessages(ctx, ctrlMessages) } - // ack successful messages - df.ackMessages(ctx, successfullyWrittenMessages) - // no-ack the failed messages, so that they will be retried in the next iteration // if we don't do this, the failed messages will be retried after the ackWait time // which will cause correctness issues. We want these messages to be immediately retried. @@ -456,6 +480,9 @@ func (df *DataForward) process(ctx context.Context, messages []*isb.ReadMessage) return } + // ack successful messages + df.ackMessages(ctx, successfullyWrittenMessages) + // close any windows that need to be closed. // since the watermark will be same for all the messages in the batch // we can invoke remove windows only once per batch @@ -467,7 +494,7 @@ func (df *DataForward) process(ctx context.Context, messages []*isb.ReadMessage) // write the close window operations to PBQ for _, winOp := range closedWindowOps { - err = df.writeToPBQ(ctx, winOp, true) + err = df.writeToPBQ(ctx, winOp, false) if err != nil { df.log.Errorw("Failed to write close signal to PBQ", zap.Error(err)) } @@ -499,13 +526,13 @@ func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*i var writtenMessages = make([]*isb.ReadMessage, 0, len(messages)) var failedMessages = make([]*isb.ReadMessage, 0) -messagesLoop: - for i, message := range messages { + for _, message := range messages { if df.shouldDropMessage(message) { writtenMessages = append(writtenMessages, message) continue } + var failed bool // identify and add window for the message windowOperations := df.windower.AssignWindows(message) @@ -513,16 +540,18 @@ messagesLoop: // We need to write the messages to these PBQs for _, winOp := range windowOperations { - err := df.writeToPBQ(ctx, winOp, true) + err = df.writeToPBQ(ctx, winOp, true) // there is no point continuing because we are seeing an error. // this error will ONLY BE set if we are in an error loop and ctx.Done() has been invoked. if err != nil { df.log.Errorw("Failed to write message, asked to stop trying", zap.Any("msgOffSet", message.ReadOffset.String()), zap.String("partitionID", winOp.ID.String()), zap.Error(err)) - failedMessages = append(failedMessages, messages[i:]...) - break messagesLoop + failed = true } } - + if failed { + failedMessages = append(failedMessages, message) + continue + } writtenMessages = append(writtenMessages, message) } return writtenMessages, failedMessages, err @@ -558,7 +587,8 @@ func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool { // This could be due to a couple of problem, eg. ack was not registered, etc. // Please do not confuse this with late data! This is a platform related problem causing the watermark inequality // to be violated. - if !message.IsLate && message.EventTime.Before(message.Watermark.Add(-1*df.opts.allowedLateness)) { + // df.currentWatermark cannot be -1 except for the first time till it gets a valid watermark (wm > -1) + if !message.IsLate && message.EventTime.Before(df.currentWatermark.Add(-1*df.opts.allowedLateness)) { // TODO: track as a counter metric df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message)) // mark it as a successfully written message as the message will be acked to avoid subsequent retries @@ -593,9 +623,12 @@ func (df *DataForward) writeToPBQ(ctx context.Context, winOp *window.TimedWindow Jitter: 0.1, } - q := df.associatePBQAndPnF(ctx, winOp.ID) + q, err := df.associatePBQAndPnF(ctx, winOp.ID) + if err != nil { + return err + } - err := wait.ExponentialBackoff(pbqWriteBackoff, func() (done bool, err error) { + err = wait.ExponentialBackoff(pbqWriteBackoff, func() (done bool, err error) { rErr := q.Write(ctx, winOp, persist) if rErr != nil { df.log.Errorw("Failed to write message", zap.String("msgOffSet", winOp.ReadMessage.ReadOffset.String()), zap.String("partitionID", winOp.ID.String()), zap.Error(rErr)) @@ -692,7 +725,7 @@ func (df *DataForward) ackMessages(ctx context.Context, messages []*isb.ReadMess func (df *DataForward) noAckMessages(ctx context.Context, failedMessages []*isb.ReadMessage) { var readOffsets []isb.Offset for _, m := range failedMessages { - df.log.Debugw("No-ack message", zap.String("msgOffSet", m.ReadOffset.String())) + df.log.Infow("No-ack message", zap.String("msgOffSet", m.ReadOffset.String())) readOffsets = append(readOffsets, m.ReadOffset) } df.fromBufferPartition.NoAck(ctx, readOffsets) diff --git a/pkg/reduce/pbq/pbq.go b/pkg/reduce/pbq/pbq.go index 5dd124a206..f11964ad9f 100644 --- a/pkg/reduce/pbq/pbq.go +++ b/pkg/reduce/pbq/pbq.go @@ -52,11 +52,13 @@ var _ ReadWriteCloser = (*PBQ)(nil) // Write accepts a window request and writes it to the PBQ, only the isb message is written to the store. // The other metadata like operation etc are recomputed from WAL. // request can never be nil. -func (p *PBQ) Write(ctx context.Context, request *window.TimedWindowRequest, persist bool) error { +func (p *PBQ) Write(_ context.Context, request *window.TimedWindowRequest, persist bool) error { + var writeErr error + // if cob we should return if p.cob { p.log.Errorw("Failed to write request to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("request", request)) - return nil + return fmt.Errorf("pbq is closed") } // if the window operation is delete, we should close the output channel and return @@ -65,32 +67,28 @@ func (p *PBQ) Write(ctx context.Context, request *window.TimedWindowRequest, per return nil } - var writeErr error - // we need context to get out of blocking write - select { - case p.output <- request: - switch request.Operation { - case window.Open, window.Append, window.Expand: - // during replay we do not have to persist - if persist { - writeErr = p.store.Write(request.ReadMessage) - } - case window.Close, window.Merge: - // these do not have request.ReadMessage, only metadata fields are used - default: - return fmt.Errorf("unknown request.Operation, %v", request.Operation) + // write the request to the output channel + // NOTE: this is a blocking call! it should only block if UDF is blocking. + p.output <- request + + switch request.Operation { + case window.Open, window.Append, window.Expand: + // during replay we do not have to persist + if persist { + writeErr = p.store.Write(request.ReadMessage) } - case <-ctx.Done(): - // closing the output channel will not cause panic, since its inside select case - // ctx.Done implicitly means write hasn't succeeded. - close(p.output) - writeErr = ctx.Err() + case window.Close, window.Merge: + // these do not have request.ReadMessage, only metadata fields are used + default: + return fmt.Errorf("unknown request.Operation, %v", request.Operation) } + pbqChannelSize.With(map[string]string{ metrics.LabelVertex: p.vertexName, metrics.LabelPipeline: p.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(p.vertexReplica)), }).Set(float64(len(p.output))) + return writeErr } diff --git a/pkg/reduce/pbq/pbqmanager.go b/pkg/reduce/pbq/pbqmanager.go index 4efe6cfa1e..f97424e441 100644 --- a/pkg/reduce/pbq/pbqmanager.go +++ b/pkg/reduce/pbq/pbqmanager.go @@ -159,6 +159,7 @@ func (m *Manager) ShutDown(ctx context.Context) { // exponential backoff will return if err is not nil return false, nil } + m.log.Infow("Successfully closed pbq", zap.String("ID", q.PartitionID.String())) return true, nil }) if ctxClosedErr != nil { diff --git a/pkg/reduce/pbq/wal/aligned/fs/wal_segment.go b/pkg/reduce/pbq/wal/aligned/fs/wal_segment.go index 12d9d92a96..0618485408 100644 --- a/pkg/reduce/pbq/wal/aligned/fs/wal_segment.go +++ b/pkg/reduce/pbq/wal/aligned/fs/wal_segment.go @@ -431,10 +431,7 @@ func (w *alignedWAL) Close() (err error) { return err } - err = w.fp.Close() - if err != nil { - return err - } + _ = w.fp.Close() return nil } diff --git a/pkg/reduce/pnf/processandforward.go b/pkg/reduce/pnf/processandforward.go index df7aa765a8..051c07cfc6 100644 --- a/pkg/reduce/pnf/processandforward.go +++ b/pkg/reduce/pnf/processandforward.go @@ -187,7 +187,7 @@ outerLoop: for { select { case err := <-errCh: - if errors.Is(err, context.Canceled) { + if errors.Is(err, context.Canceled) || ctx.Err() != nil { return } if err != nil { diff --git a/pkg/sdkclient/reducer/client.go b/pkg/sdkclient/reducer/client.go index 92af33ea2c..da728ea5fb 100644 --- a/pkg/sdkclient/reducer/client.go +++ b/pkg/sdkclient/reducer/client.go @@ -112,9 +112,14 @@ func (c *client) ReduceFn(ctx context.Context, datumStreamCh <-chan *reducepb.Re } } - // close the stream after sending all the messages - if sendErr = stream.CloseSend(); sendErr != nil && !errors.Is(sendErr, io.EOF) { - errCh <- util.ToUDFErr("ReduceFn stream.Send()", sendErr) + select { + case <-ctx.Done(): + return + default: + sendErr = stream.CloseSend() + if sendErr != nil && !errors.Is(sendErr, io.EOF) { + errCh <- util.ToUDFErr("ReduceFn stream.CloseSend()", sendErr) + } } }() diff --git a/pkg/sdkclient/sessionreducer/client.go b/pkg/sdkclient/sessionreducer/client.go index 0d59bda8be..bd96bd60c6 100644 --- a/pkg/sdkclient/sessionreducer/client.go +++ b/pkg/sdkclient/sessionreducer/client.go @@ -110,9 +110,16 @@ func (c *client) SessionReduceFn(ctx context.Context, datumStreamCh <-chan *sess } } } + // close the stream after sending all the messages - if sendErr = stream.CloseSend(); sendErr != nil && !errors.Is(sendErr, io.EOF) { - errCh <- util.ToUDFErr("SessionReduceFn stream.Send()", sendErr) + select { + case <-ctx.Done(): + return + default: + sendErr = stream.CloseSend() + if sendErr != nil && !errors.Is(sendErr, io.EOF) { + errCh <- util.ToUDFErr("SessionReduceFn stream.CloseSend()", sendErr) + } } }() diff --git a/test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml b/test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml index 160f157b1b..ddd7a9ed19 100644 --- a/test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml +++ b/test/reduce-one-e2e/testdata/complex-sliding-window-pipeline.yaml @@ -4,7 +4,7 @@ metadata: name: complex-sliding-sum spec: watermark: - maxDelay: "20s" + maxDelay: 30s limits: readBatchSize: 50 vertices: @@ -36,7 +36,9 @@ spec: length: 5s keyed: true storage: - emptyDir: {} + persistentVolumeClaim: + volumeSize: 1Gi + accessMode: ReadWriteOnce - name: non-keyed-fixed-sum containerTemplate: env: @@ -52,7 +54,9 @@ spec: length: 10s keyed: false storage: - emptyDir: {} + persistentVolumeClaim: + volumeSize: 1Gi + accessMode: ReadWriteOnce - name: non-keyed-sliding-sum containerTemplate: env: @@ -69,7 +73,9 @@ spec: slide: 10s keyed: false storage: - emptyDir: {} + persistentVolumeClaim: + volumeSize: 1Gi + accessMode: ReadWriteOnce - name: sink scale: min: 1