Skip to content

Commit

Permalink
Merge pull request #3 from SigNoz/docker-setup
Browse files Browse the repository at this point in the history
some more fixes
  • Loading branch information
shivanshuraj1333 authored Oct 1, 2024
2 parents 68d3f1e + f09a75a commit 0e61b59
Show file tree
Hide file tree
Showing 13 changed files with 20 additions and 34 deletions.
2 changes: 1 addition & 1 deletion docker/consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Use an OpenJDK base image
FROM openjdk:22-slim-bullseye
FROM openjdk:22-jdk-slim

# Create a directory for the application
RUN mkdir -p /opt
Expand Down
Binary file modified docker/consumer/kafka-consumer.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

public class BaseConsumer {
Expand All @@ -31,19 +30,21 @@ public class BaseConsumer {

protected AtomicBoolean running = new AtomicBoolean(true);

public void run(CountDownLatch latch) {
public void run() {
log.info("Subscribing to topic [{}]", this.topic);
this.consumer.subscribe(List.of(this.topic));

try {
log.info("Polling for records...");
while (this.running.get()) {
try {
// Poll for records with a timeout duration
ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(10000));

for (ConsumerRecord<String, String> record : records) {
log.info("Received message key = [{}], value = [{}], offset = [{}]", record.key(), record.value(), record.offset());

// Commit offsets after processing each record
try {
this.consumer.commitSync();
log.info("Successfully committed offset for record key = [{}]", record.key());
Expand Down Expand Up @@ -71,14 +72,10 @@ public void run(CountDownLatch latch) {
log.info("Kafka consumer closed.");
} catch (Exception e) {
log.error("Error occurred while closing Kafka consumer.", e);
} finally {
latch.countDown();
log.info("CountDownLatch decremented, consumer run method exiting.");
}
}
}


public void loadConfiguration(Map<String, String> map) {
this.bootstrapServers = map.getOrDefault(BOOTSTRAP_SERVERS_ENV_VAR, DEFAULT_BOOTSTRAP_SERVERS);
this.consumerGroup = map.getOrDefault(CONSUMER_GROUP_ENV_VAR, DEFAULT_CONSUMER_GROUP);
Expand Down
Binary file not shown.
Binary file not shown.
Binary file modified kafka-app-otel/common/target/common-1.0-SNAPSHOT.jar
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,61 +1,50 @@
package io.shivanshuraj1333.kafka.otel;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Consumer extends BaseConsumer {

public static void main(String[] args) {
Consumer consumer = new Consumer();
CountDownLatch latch = new CountDownLatch(1);

try {
// Load configuration from environment variables
consumer.loadConfiguration(System.getenv());

// Load Kafka consumer properties
Properties props = consumer.loadKafkaConsumerProperties();
consumer.createKafkaConsumer(props);

// Start the consumer thread
Thread consumerThread = new Thread(() -> {
try {
consumer.run(latch);
log.info("Starting Kafka consumer thread...");
consumer.run(); // No latch needed, directly run the consumer
} catch (Exception e) {
log.error("Error occurred while running Kafka consumer: ", e);
} finally {
log.info("Kafka consumer thread has exited.");
}
});

consumerThread.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Shutdown hook triggered, stopping the consumer...");
consumer.running.set(false);
// Keep the main thread alive indefinitely to prevent the application from exiting
log.info("Application is running. Press Ctrl+C to exit.");

// Use an infinite loop to keep the application alive
while (true) {
try {
if (!latch.await(10000, TimeUnit.MILLISECONDS)) {
log.warn("Consumer did not shut down gracefully within the timeout.");
} else {
log.info("Consumer shut down gracefully.");
}
Thread.sleep(1000); // Sleep to reduce CPU usage
} catch (InterruptedException e) {
log.error("Interrupted while waiting for consumer to shut down.", e);
log.error("Main thread interrupted. Exiting application.", e);
Thread.currentThread().interrupt();
break;
}
}));
}

log.info("Application is running. Press Ctrl+C to exit.");
latch.await();
} catch (Exception e) {
log.error("Unexpected error in main method: ", e);
} finally {
try {
latch.countDown();
} catch (Exception e) {
log.error("Error during final latch countdown: ", e);
}
log.info("Application has exited.");
}
}
}



Loading

0 comments on commit 0e61b59

Please sign in to comment.