This section gives an overview on how to use the CafDI library!
In a Maven managed project simply the following to your pom.xml
:
...
<dependency>
<groupId>net.wessendorf.kafka</groupId>
<artifactId>kafka-cdi-extension</artifactId>
<version>0.0.9</version>
</dependency>
...
The @Producer
annotation is used to configure and inject an instance of the SimpleKafkaProducer
class, which is a simple extension of the original KafkaProducer
class:
...
public class MyPublisherService {
private Logger logger = LoggerFactory.getLogger(MyPublisherService.class);
@Producer
SimpleKafkaProducer<Integer, String> producer;
/**
* A simple service method, that sends payload over the wire
*/
public void hello() {
producer.send("myTopic", "My Message");
}
}
The @Consumer
annotation is used to configure and declare an annotated method as a callback for the internal DelegationKafkaConsumer
, which internally uses the vanilla KafkaConsumer
:
public class MyListenerService {
private Logger logger = LoggerFactory.getLogger(MyListenerService.class);
/**
* Simple listener that receives messages from the Kafka broker
*/
@Consumer(topic = "myTopic", groupId = "myGroupID", keyType = Integer.class)
public void receiver(String message) {
logger.info("That's what I got: " + message);
}
]
Receiving the key and the value is also possible:
public class MyListenerService {
private Logger logger = LoggerFactory.getLogger(MyListenerService.class);
/**
* Simple listener that receives messages from the Kafka broker
*/
@Consumer(topic = "myTopic", groupId = "myGroupID")
public void receiver(final String key, final String value) {
logger.info("That's what I got: (key: " + key + " , value:" + value + ")");
}
]
A minimal of configuration is currently needed. For that there is a @KafkaConfig
annotation. The first occurrence is used:
@KafkaConfig(bootstrapServers = "#{SOME_HOST}:#{SOME_PORT}")
public class MyService {
...
}
Apache Kafka uses a binary message format, and comes with a handful of handy Serializers and Deserializers, available through the Serdes
class. The CafDI extension adds a Serde for the JsonObject
:
To send serialize a JsonObject, simply specify the type, like:
...
@Producer
SimpleKafkaProducer<Integer, JsonObject> producer;
...
producer.send("myTopic", myJsonObj);
For deserialization the argument on the annotation @Consumer
method is used to setup the actual Deserializer
@Consumer(topic = "myTopic", groupId = "myGroupID", keyType = Integer.class)
public void receiveJsonObject(JsonObject message) {
logger.info("That's what I got: " + message);
}
To setup Apache Kafka there are different ways to get started. This section quickly discusses pure Docker and Openshift.
Starting a Zookeeper cluster:
docker run -d --name zookeeper jplock/zookeeper:3.4.6
Next, we need to start Kafka and link the Zookeeper Linux container to it:
docker run -d --name kafka --link zookeeper:zookeeper ches/kafka
Now, that the broker is running, we need to figure out the IP address of it:
docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka
We use this IP address when inside our @KafkaConfig
annotation that our Producers and Consumers can speak to Apache Kafka.
For Apache Kafka on Openshift please check this repository