Skip to content

Commit

Permalink
fix: Kafka source reads duplicated messages (#1438)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 authored Jan 5, 2024
1 parent d0df8d6 commit 795bef6
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/sources/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ func (r *kafkaSource) Ack(_ context.Context, offsets []isb.Offset) []error {
for _, offset := range offsets {
topic := offset.(*kafkaOffset).Topic()

// we need to mark the offset of the next message to read
pOffset, err := offset.Sequence()
if err != nil {
kafkaSourceOffsetAckErrors.With(map[string]string{metrics.LabelVertex: r.vertexName, metrics.LabelPipeline: r.pipelineName}).Inc()
r.logger.Errorw("Unable to extract partition offset of type int64 from the supplied offset. skipping and continuing", zap.String("supplied-offset", offset.String()), zap.Error(err))
continue
}
r.handler.sess.MarkOffset(topic, offset.PartitionIdx(), pOffset, "")
// we need to mark the offset of the next message to read
r.handler.sess.MarkOffset(topic, offset.PartitionIdx(), pOffset+1, "")
kafkaSourceAckCount.With(map[string]string{metrics.LabelVertex: r.vertexName, metrics.LabelPipeline: r.pipelineName}).Inc()

}
Expand Down

0 comments on commit 795bef6

Please sign in to comment.