diff --git a/pkg/sources/kafka/reader.go b/pkg/sources/kafka/reader.go index 2777f89d82..4a4731059d 100644 --- a/pkg/sources/kafka/reader.go +++ b/pkg/sources/kafka/reader.go @@ -168,14 +168,14 @@ func (r *kafkaSource) Ack(_ context.Context, offsets []isb.Offset) []error { for _, offset := range offsets { topic := offset.(*kafkaOffset).Topic() - // we need to mark the offset of the next message to read pOffset, err := offset.Sequence() if err != nil { kafkaSourceOffsetAckErrors.With(map[string]string{metrics.LabelVertex: r.vertexName, metrics.LabelPipeline: r.pipelineName}).Inc() r.logger.Errorw("Unable to extract partition offset of type int64 from the supplied offset. skipping and continuing", zap.String("supplied-offset", offset.String()), zap.Error(err)) continue } - r.handler.sess.MarkOffset(topic, offset.PartitionIdx(), pOffset, "") + // we need to mark the offset of the next message to read + r.handler.sess.MarkOffset(topic, offset.PartitionIdx(), pOffset+1, "") kafkaSourceAckCount.With(map[string]string{metrics.LabelVertex: r.vertexName, metrics.LabelPipeline: r.pipelineName}).Inc() }