diff --git a/pkg/watermark/publish/src_publisher.go b/pkg/watermark/publish/src_publisher.go index 7d00c47d7f..62f3e5950e 100644 --- a/pkg/watermark/publish/src_publisher.go +++ b/pkg/watermark/publish/src_publisher.go @@ -105,7 +105,7 @@ func (df *sourcePublish) PublishSourceWatermarks(readMessages []*isb.ReadMessage func (df *sourcePublish) PublishIdleWatermarks(wm time.Time, partitions []int32) { for _, partitionId := range partitions { publisher := df.loadSourceWatermarkPublisher(partitionId) - publisher.PublishIdleWatermark(wmb.Watermark(wm), nil, partitionId) // while publishing idle watermark at source, we don't care about the offset + publisher.PublishIdleWatermark(wmb.Watermark(wm), nil, 0) // while publishing idle watermark at source, we don't care about the offset } }