Skip to content

Commit

Permalink
chore: info level logging for dropped messages (#1656)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Apr 10, 2024
1 parent e9c3731 commit 84b47ed
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,12 +572,12 @@ func (df *DataForward) handleLateMessage(message *isb.ReadMessage) []*window.Tim
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message.
if nextWinAsSeenByWriter == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
df.log.Infow("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
return lateMessageWindowRequests
}
// if the message doesn't fall in the next window that is about to be closed drop it.
if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
df.log.Infow("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
return lateMessageWindowRequests
}

Expand Down Expand Up @@ -643,7 +643,7 @@ func (df *DataForward) writeToPBQ(ctx context.Context, winOp *window.TimedWindow
err = wait.ExponentialBackoff(pbqWriteBackoff, func() (done bool, err error) {
rErr := q.Write(ctx, winOp, persist)
if rErr != nil {
df.log.Errorw("Failed to write message", zap.String("msgOffSet", winOp.ReadMessage.ReadOffset.String()), zap.String("partitionID", winOp.ID.String()), zap.Error(rErr))
df.log.Errorw("Failed to write message to pbq", zap.String("partitionID", winOp.ID.String()), zap.Error(rErr))
metrics.PBQWriteErrorCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
Expand Down

0 comments on commit 84b47ed

Please sign in to comment.