diff --git a/pkg/sources/kafka/handler.go b/pkg/sources/kafka/handler.go index 4d65f7c3d5..3055d91208 100644 --- a/pkg/sources/kafka/handler.go +++ b/pkg/sources/kafka/handler.go @@ -37,10 +37,17 @@ type consumerHandler struct { // new handler initializes the channel for passing messages func newConsumerHandler(readChanSize int) *consumerHandler { + // Initializing the inflightAcks channel to closed channel instead of nil will ensure that + // the Cleanup func below will not hang on the inflight acks to be completed in the case + // the Ack func was not called due to no messages being consumed. + var inflightAcks = make(chan bool) + close(inflightAcks) + return &consumerHandler{ - ready: make(chan bool), - messages: make(chan *sarama.ConsumerMessage, readChanSize), - logger: logging.NewLogger(), + inflightAcks: inflightAcks, + ready: make(chan bool), + messages: make(chan *sarama.ConsumerMessage, readChanSize), + logger: logging.NewLogger(), } } @@ -50,14 +57,17 @@ func (consumer *consumerHandler) Setup(sess sarama.ConsumerGroupSession) error { consumer.readyCloser.Do(func() { close(consumer.ready) }) + consumer.logger.Info("Kafka Consumer Setup complete") return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *consumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error { + consumer.logger.Info("Kafka Consumer Starting Cleanup routine, waiting for in-flight-acks to complete") // wait for inflight acks to be completed. <-consumer.inflightAcks sess.Commit() + consumer.logger.Info("Kafka Consumer Cleanup complete") return nil } @@ -66,6 +76,7 @@ func (consumer *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSessio // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29 for { + consumer.logger.Info("Kafka Consumer about to claim Messages from the Kafka broker") select { case msg, ok := <-claim.Messages(): if !ok { diff --git a/pkg/sources/kafka/metrics.go b/pkg/sources/kafka/metrics.go index eea79d7f5a..6c41468ba2 100644 --- a/pkg/sources/kafka/metrics.go +++ b/pkg/sources/kafka/metrics.go @@ -28,7 +28,7 @@ var kafkaSourceReadCount = promauto.NewCounterVec(prometheus.CounterOpts{ Subsystem: "kafka_source", Name: "read_total", Help: "Total number of messages Read", -}, []string{metrics.LabelVertex, metrics.LabelPipeline}) +}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName}) // kafkaSourceAckCount is used to indicate the number of messages Acknowledged var kafkaSourceAckCount = promauto.NewCounterVec(prometheus.CounterOpts{