diff --git a/docker/consumer/Dockerfile b/docker/consumer/Dockerfile index 2a70885..633bfde 100644 --- a/docker/consumer/Dockerfile +++ b/docker/consumer/Dockerfile @@ -1,5 +1,5 @@ # Use an OpenJDK base image -FROM openjdk:22-slim-bullseye +FROM openjdk:22-jdk-slim # Create a directory for the application RUN mkdir -p /opt diff --git a/docker/consumer/kafka-consumer.jar b/docker/consumer/kafka-consumer.jar index 9054d08..7f96ecc 100644 Binary files a/docker/consumer/kafka-consumer.jar and b/docker/consumer/kafka-consumer.jar differ diff --git a/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseConsumer.java b/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseConsumer.java index b2c629d..64b087e 100644 --- a/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseConsumer.java +++ b/kafka-app-otel/common/src/main/java/io/shivanshuraj1333/kafka/otel/BaseConsumer.java @@ -11,7 +11,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; public class BaseConsumer { @@ -31,7 +30,7 @@ public class BaseConsumer { protected AtomicBoolean running = new AtomicBoolean(true); - public void run(CountDownLatch latch) { + public void run() { log.info("Subscribing to topic [{}]", this.topic); this.consumer.subscribe(List.of(this.topic)); @@ -39,11 +38,13 @@ public void run(CountDownLatch latch) { log.info("Polling for records..."); while (this.running.get()) { try { + // Poll for records with a timeout duration ConsumerRecords records = this.consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord record : records) { log.info("Received message key = [{}], value = [{}], offset = [{}]", record.key(), record.value(), record.offset()); + // Commit offsets after processing each record try { this.consumer.commitSync(); log.info("Successfully committed offset for record key = [{}]", record.key()); @@ -71,14 +72,10 @@ public void run(CountDownLatch latch) { log.info("Kafka consumer closed."); } catch (Exception e) { log.error("Error occurred while closing Kafka consumer.", e); - } finally { - latch.countDown(); - log.info("CountDownLatch decremented, consumer run method exiting."); } } } - public void loadConfiguration(Map map) { this.bootstrapServers = map.getOrDefault(BOOTSTRAP_SERVERS_ENV_VAR, DEFAULT_BOOTSTRAP_SERVERS); this.consumerGroup = map.getOrDefault(CONSUMER_GROUP_ENV_VAR, DEFAULT_CONSUMER_GROUP); diff --git a/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseConsumer.class b/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseConsumer.class index e60a399..123669a 100644 Binary files a/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseConsumer.class and b/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseConsumer.class differ diff --git a/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseProducer.class b/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseProducer.class index f6c673e..b30bba8 100644 Binary files a/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseProducer.class and b/kafka-app-otel/common/target/classes/io/shivanshuraj1333/kafka/otel/BaseProducer.class differ diff --git a/kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar b/kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar index 8150d5b..c45ab03 100644 Binary files a/kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar and b/kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar differ diff --git a/kafka-app-otel/kafka-consumer/src/main/java/io/shivanshuraj1333/kafka/otel/Consumer.java b/kafka-app-otel/kafka-consumer/src/main/java/io/shivanshuraj1333/kafka/otel/Consumer.java index b366c6b..24b1d37 100644 --- a/kafka-app-otel/kafka-consumer/src/main/java/io/shivanshuraj1333/kafka/otel/Consumer.java +++ b/kafka-app-otel/kafka-consumer/src/main/java/io/shivanshuraj1333/kafka/otel/Consumer.java @@ -1,61 +1,50 @@ package io.shivanshuraj1333.kafka.otel; -import java.io.IOException; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; public class Consumer extends BaseConsumer { public static void main(String[] args) { Consumer consumer = new Consumer(); - CountDownLatch latch = new CountDownLatch(1); try { + // Load configuration from environment variables consumer.loadConfiguration(System.getenv()); + // Load Kafka consumer properties Properties props = consumer.loadKafkaConsumerProperties(); consumer.createKafkaConsumer(props); + // Start the consumer thread Thread consumerThread = new Thread(() -> { try { - consumer.run(latch); + log.info("Starting Kafka consumer thread..."); + consumer.run(); // No latch needed, directly run the consumer } catch (Exception e) { log.error("Error occurred while running Kafka consumer: ", e); + } finally { + log.info("Kafka consumer thread has exited."); } }); + consumerThread.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - log.info("Shutdown hook triggered, stopping the consumer..."); - consumer.running.set(false); + // Keep the main thread alive indefinitely to prevent the application from exiting + log.info("Application is running. Press Ctrl+C to exit."); + // Use an infinite loop to keep the application alive + while (true) { try { - if (!latch.await(10000, TimeUnit.MILLISECONDS)) { - log.warn("Consumer did not shut down gracefully within the timeout."); - } else { - log.info("Consumer shut down gracefully."); - } + Thread.sleep(1000); // Sleep to reduce CPU usage } catch (InterruptedException e) { - log.error("Interrupted while waiting for consumer to shut down.", e); + log.error("Main thread interrupted. Exiting application.", e); Thread.currentThread().interrupt(); + break; } - })); + } - log.info("Application is running. Press Ctrl+C to exit."); - latch.await(); } catch (Exception e) { log.error("Unexpected error in main method: ", e); - } finally { - try { - latch.countDown(); - } catch (Exception e) { - log.error("Error during final latch countdown: ", e); - } - log.info("Application has exited."); } } } - - - diff --git a/kafka-app-otel/kafka-consumer/target/classes/io/shivanshuraj1333/kafka/otel/Consumer.class b/kafka-app-otel/kafka-consumer/target/classes/io/shivanshuraj1333/kafka/otel/Consumer.class index 6b570cd..680d942 100644 Binary files a/kafka-app-otel/kafka-consumer/target/classes/io/shivanshuraj1333/kafka/otel/Consumer.class and b/kafka-app-otel/kafka-consumer/target/classes/io/shivanshuraj1333/kafka/otel/Consumer.class differ diff --git a/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar b/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar index 9054d08..7f96ecc 100644 Binary files a/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar and b/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar differ diff --git a/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT.jar b/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT.jar index 0c5199c..1c9afd4 100644 Binary files a/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT.jar and b/kafka-app-otel/kafka-consumer/target/kafka-consumer-1.0-SNAPSHOT.jar differ diff --git a/kafka-app-otel/kafka-producer/target/classes/io/shivanshuraj1333/kafka/otel/Producer.class b/kafka-app-otel/kafka-producer/target/classes/io/shivanshuraj1333/kafka/otel/Producer.class index 7b8ee43..210f6b4 100644 Binary files a/kafka-app-otel/kafka-producer/target/classes/io/shivanshuraj1333/kafka/otel/Producer.class and b/kafka-app-otel/kafka-producer/target/classes/io/shivanshuraj1333/kafka/otel/Producer.class differ diff --git a/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT-jar-with-dependencies.jar b/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT-jar-with-dependencies.jar index e2ddbe0..0b55126 100644 Binary files a/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT-jar-with-dependencies.jar and b/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT-jar-with-dependencies.jar differ diff --git a/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT.jar b/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT.jar index f7c2dda..85588d7 100644 Binary files a/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT.jar and b/kafka-app-otel/kafka-producer/target/kafka-producer-1.0-SNAPSHOT.jar differ