Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for logging MDC in the Kafka buffer #4131

Merged
merged 2 commits into from
Feb 15, 2024

Conversation

dlvenable
Copy link
Member

Description

This adds logging MDC to the kafka buffer to disambiguate it against the kafka sink. Entry points into the Buffer interface set the MDC value. Also, the threads which the kafka buffer directly creates will have MDC and also have a useful thread name.

This MDC is now available for both Data Prepper loggers and Apache Kafka loggers.

One limitation is that the MDC is not available for the KafkaProducer. This is because the internal Apache Kafka libraries create that thread and appear to have no way to influence it.

Issues Resolved

First part of #4126

I plan to follow up with more changes for the kafka sink in a subsequent PR.

Results

I changed the Log4j configuration to include the kafkaPluginType MDC value.

appender.console.layout.pattern = %d{ISO8601} [%t] [%X{kafkaPluginType}] %-5p %40C - %m%n

Here is a sample of the logs produced:

2024-02-15T09:38:29,884 [kafka-buffer-1] [buffer] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Notifying assignor about the new Assignment(partitions=[test-encrypted-0])
2024-02-15T09:38:29,884 [kafka-buffer-2] [buffer] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Adding newly assigned partitions:
2024-02-15T09:38:29,886 [kafka-buffer-1] [buffer] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Adding newly assigned partitions: test-encrypted-0
2024-02-15T09:38:29,886 [kafka-buffer-1] [buffer] INFO  org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer - Assigned partition test-encrypted-0
2024-02-15T09:38:29,901 [kafka-buffer-1] [buffer] INFO  org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer - Partition test-encrypted-0 offsets: beginningOffset: 0, endOffset: 15, committedOffset: 15
2024-02-15T09:38:29,903 [kafka-buffer-1] [buffer] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Setting offset for partition test-encrypted-0 to the committed offset FetchPosition{offset=15, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}

And here are some sample logs from outside of Kafka

2024-02-15T09:38:26,926 [main] [] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [kafka-buffer-pipeline] - Initiating pipeline execution
2024-02-15T09:38:26,927 [kafka-buffer-pipeline-sink-worker-2-thread-1] [] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [kafka-buffer-pipeline] Sink is ready, starting source...

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…a Buffer consumer threads with MDC. Name the consumer threads to help when tracking down thread dumps. First part of opensearch-project#4126

Signed-off-by: David Venable <[email protected]>
kkondaka
kkondaka previously approved these changes Feb 15, 2024
final Thread thread = delegateThreadFactory.newThread(() -> {
MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, kafkaPluginType);
runnable.run();
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we invoke MDC.clear() after the run?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it really matters because the thread will end. So the ThreadLocalContext is lost. But, I could do it.

@dlvenable dlvenable merged commit faddf01 into opensearch-project:main Feb 15, 2024
47 of 50 checks passed
@dlvenable dlvenable deleted the 4126-kafka-mdc branch February 16, 2024 21:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants