From 84b47eda452fdcadc64a7c1cbbfb21d0a86473bb Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 10 Apr 2024 23:42:12 +0530 Subject: [PATCH] chore: info level logging for dropped messages (#1656) Signed-off-by: Yashash H L --- pkg/reduce/data_forward.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/reduce/data_forward.go b/pkg/reduce/data_forward.go index 099d86ce58..38b3e0a80a 100644 --- a/pkg/reduce/data_forward.go +++ b/pkg/reduce/data_forward.go @@ -572,12 +572,12 @@ func (df *DataForward) handleLateMessage(message *isb.ReadMessage) []*window.Tim nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed() // if there is no window open, drop the message. if nextWinAsSeenByWriter == nil { - df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark)) + df.log.Infow("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark)) return lateMessageWindowRequests } // if the message doesn't fall in the next window that is about to be closed drop it. if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { - df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime())) + df.log.Infow("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime())) return lateMessageWindowRequests } @@ -643,7 +643,7 @@ func (df *DataForward) writeToPBQ(ctx context.Context, winOp *window.TimedWindow err = wait.ExponentialBackoff(pbqWriteBackoff, func() (done bool, err error) { rErr := q.Write(ctx, winOp, persist) if rErr != nil { - df.log.Errorw("Failed to write message", zap.String("msgOffSet", winOp.ReadMessage.ReadOffset.String()), zap.String("partitionID", winOp.ID.String()), zap.Error(rErr)) + df.log.Errorw("Failed to write message to pbq", zap.String("partitionID", winOp.ID.String()), zap.Error(rErr)) metrics.PBQWriteErrorCount.With(map[string]string{ metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName,