Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Kafka Buffer defaults for fetch.max.wait.ms, fetch.min.bytes, partition.assignment.strategy, close consumer on shutdown #5373

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
private static final Long DEFAULT_RETENTION_PERIOD = 604800000L;
static final boolean DEFAULT_AUTO_COMMIT = false;
static final ByteCount DEFAULT_FETCH_MAX_BYTES = ByteCount.parse("50mb");
static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(500);
static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("1b");
static final Duration DEFAULT_FETCH_MAX_WAIT = Duration.ofMillis(1000);
static final ByteCount DEFAULT_FETCH_MIN_BYTES = ByteCount.parse("2kb");
static final ByteCount DEFAULT_MAX_PARTITION_FETCH_BYTES = ByteCount.parse("1mb");
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
private final AbstractBuffer<Record<Event>> innerBuffer;
private final ExecutorService executorService;
private final Duration drainTimeout;

private final List<KafkaCustomConsumer> consumers;
private AtomicBoolean shutdownInProgress;
private ByteDecoder byteDecoder;

Expand All @@ -83,7 +85,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
this.shutdownInProgress = new AtomicBoolean(false);
final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName());
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
this.consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker);
this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
Expand Down Expand Up @@ -233,6 +235,9 @@ public void shutdown() {
executorService.shutdownNow();
}

LOG.info("Closing {} consumers", consumers.size());
consumers.forEach(KafkaCustomConsumer::closeConsumer);

innerBuffer.shutdown();
} finally {
resetMdc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -167,6 +168,7 @@ public static void setConsumerTopicProperties(final Properties properties, final
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int)topicConfig.getFetchMaxBytes());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes());
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
}

private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties, final TopicConfig topicConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Objects;
import java.util.Map;
import java.util.HashMap;

public class KafkaTopicConsumerMetrics {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConsumerMetrics.class);
static final String NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS = "numberOfPositiveAcknowledgements";
static final String NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS = "numberOfNegativeAcknowledgements";
static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse";
Expand Down Expand Up @@ -82,7 +85,10 @@ private void initializeMetricNamesMap(final boolean topicNameInMetrics) {
double max = 0.0;
for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
Map<String, Double> consumerMetrics = entry.getValue();
synchronized(consumerMetrics) {
synchronized (consumerMetrics) {
if (consumerMetrics.get(metricName) == null) {
LOG.debug("No consumer metric for recordsLagMax found");
}
max = Math.max(max, consumerMetrics.get(metricName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ public void testShutdown_Successful() throws InterruptedException {
kafkaBuffer.shutdown();
verify(executorService).shutdown();
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
verify(consumer).closeConsumer();
}

@Test
Expand Down
Loading