Skip to content

Commit

Permalink
bulker: batch_consumer: do intermediate flushes while producing retry…
Browse files Browse the repository at this point in the history
… messages to avoid overfilling queue.buffering.max.messages
  • Loading branch information
absorbb committed Feb 6, 2025
1 parent 3b6b506 commit 2860ccf
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
9 changes: 5 additions & 4 deletions bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ func NewAbstractBatchConsumer(repository *Repository, destinationId string, batc
return nil, abstract.NewError("Error creating consumer: %v", err)
}
producerConfig := kafka.ConfigMap(utils.MapPutAll(kafka.ConfigMap{
"transactional.id": fmt.Sprintf("%s_failed_%s", topicId, config.InstanceId),
"batch.size": config.ProducerBatchSize,
"linger.ms": config.ProducerLingerMs,
"compression.type": config.KafkaTopicCompression,
"transactional.id": fmt.Sprintf("%s_failed_%s", topicId, config.InstanceId),
"queue.buffering.max.messages": config.ProducerQueueSize,
"batch.size": config.ProducerBatchSize,
"linger.ms": config.ProducerLingerMs,
"compression.type": config.KafkaTopicCompression,
}, *kafkaConfig))

bc := &AbstractBatchConsumer{
Expand Down
5 changes: 5 additions & 0 deletions bulkerapp/app/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition,
Headers: headers,
Value: message.Value,
}, nil)

if counters.consumed%bc.config.ProducerQueueSize == 0 {
producer.Flush(bc.config.ProducerLingerMs)
}

if err != nil {
return counters, fmt.Errorf("failed to put message to producer: %v", err)
}
Expand Down

0 comments on commit 2860ccf

Please sign in to comment.