Skip to content

Commit

Permalink
fix: initialize inflightAcks channel to not nil channel (#1548)
Browse files Browse the repository at this point in the history
Signed-off-by: Antonino Fugazzotto <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
2 people authored and whynowy committed Mar 15, 2024
1 parent ae1ddaa commit e53f858
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
17 changes: 14 additions & 3 deletions pkg/sources/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@ type consumerHandler struct {

// new handler initializes the channel for passing messages
func newConsumerHandler(readChanSize int) *consumerHandler {
// Initializing the inflightAcks channel to closed channel instead of nil will ensure that
// the Cleanup func below will not hang on the inflight acks to be completed in the case
// the Ack func was not called due to no messages being consumed.
var inflightAcks = make(chan bool)
close(inflightAcks)

return &consumerHandler{
ready: make(chan bool),
messages: make(chan *sarama.ConsumerMessage, readChanSize),
logger: logging.NewLogger(),
inflightAcks: inflightAcks,
ready: make(chan bool),
messages: make(chan *sarama.ConsumerMessage, readChanSize),
logger: logging.NewLogger(),
}
}

Expand All @@ -50,14 +57,17 @@ func (consumer *consumerHandler) Setup(sess sarama.ConsumerGroupSession) error {
consumer.readyCloser.Do(func() {
close(consumer.ready)
})
consumer.logger.Info("Kafka Consumer Setup complete")
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *consumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
consumer.logger.Info("Kafka Consumer Starting Cleanup routine, waiting for in-flight-acks to complete")
// wait for inflight acks to be completed.
<-consumer.inflightAcks
sess.Commit()
consumer.logger.Info("Kafka Consumer Cleanup complete")
return nil
}

Expand All @@ -66,6 +76,7 @@ func (consumer *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSessio
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
for {
consumer.logger.Info("Kafka Consumer about to claim Messages from the Kafka broker")
select {
case msg, ok := <-claim.Messages():
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var kafkaSourceReadCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "kafka_source",
Name: "read_total",
Help: "Total number of messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline})
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// kafkaSourceAckCount is used to indicate the number of messages Acknowledged
var kafkaSourceAckCount = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down

0 comments on commit e53f858

Please sign in to comment.