Skip to content

Commit

Permalink
provided simple example
Browse files Browse the repository at this point in the history
issue #73
  • Loading branch information
rsoika committed Apr 16, 2019
1 parent 7af2f2b commit 72b1603
Show file tree
Hide file tree
Showing 4 changed files with 571 additions and 28 deletions.
51 changes: 49 additions & 2 deletions imixs-adapters-kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,54 @@ See:

# <img src="https://github.com/imixs/imixs-microservice/raw/master/small_h-trans.png">

To test the behavir of the Imxis Kafka Adapter, you can run Imixs-Workflow as a microservice.
To test the behavior of the Imxis Kafka Adapter, you can run [Imixs-Microservices](https://github.com/imixs/imixs-microservice) as a custom image. The project provides a setup to include the Imixs Kafka Adapter and create a custom build.

The project provides a Docker Container to be used to run the service on a Docker host togehter with a kafka node. The docker image is based on the docker image [imixs/wildfly](https://hub.docker.com/r/imixs/wildfly/).
## How to create a custom Docker Image from Imixs-Microservcie

To create a custom Docker image of the Imixs-Microservice just jecout the project from [Github](https://github.com/imixs/imixs-microservice) and add the Imixs-Adapters-Kafka dependency:

<dependency>
<groupId>org.imixs.workflow</groupId>
<artifactId>imixs-adapters-kafka</artifactId>
<version>${org.imixs.adapters.version}</version>
<scope>compile</scope>
</dependency>
The Imixs-Microservice project already includes this dependency in the pom.xml. You need to uncomment the dependency and than build the new Image


$ cd ~/git/imixs-microservice
$ mvn clean install -Pdocker-build

After that you can switch back into the imixs-adapter-kafka project and start the Imixs-Microservice Container with the docker-compose.yml file:

$ cd ~/git/imixs-adapters/imixs-adapters-kafka/
$ docker-compose up

This will start an instance of your new build Docker image of Imixs-Microservice including the Kafka Adater and also a local Kafak Server.


## Test the Kafka Adapter

First upload the demo model located under /src/model.ticket.bpmn


curl --user admin:adminadmin --request POST -Tsrc/model/ticket.bpmn http://localhost:8080/api/model/bpmn

You can verify the availiblity of the model under the Web URI:

http://localhost:8080/api/model

Now you can create a process instance which will trigger the Kafka Adapter:


curl --user admin:adminadmin -H "Content-Type: application/json" -H 'Accept: application/json' -d \
'{"item":[ \
{"name":"type","value":{"@type":"xs:string","$":"workitem"}}, \
{"name":"$modelversion","value":{"@type":"xs:string","$":"1.0.1"}}, \
{"name":"$taskid","value":{"@type":"xs:int","$":"1000"}}, \
{"name":"$eventid","value":{"@type":"xs:int","$":"10"}}, \
{"name":"txtname","value":{"@type":"xs:string","$":"test-json"}}\
]}' \
http://localhost:8080/api/workflow/workitem.json
55 changes: 55 additions & 0 deletions imixs-adapters-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: '3.3'

services:

# Imixs Workflow DB
db:
image: postgres:9.6.1
environment:
POSTGRES_PASSWORD: adminadmin
POSTGRES_DB: workflow

# Imixs Microservice Project
app:
image: imixs/imixs-microservice:latest
environment:
WILDFLY_PASS: adminadmin
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "adminadmin"
POSTGRES_CONNECTION: "jdbc:postgresql://db/workflow"
KAFKA_BROKERS: "kafka:9092"
KAFKA_CLIENTID: "imixs-workflow1"
ports:
- "8080:8080"

# Imixs Admin Client
imixsadmin:
image: imixs/imixs-admin
ports:
- "8888:8080"


#
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"

kafka:
image: wurstmeister/kafka:latest
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
environment:
HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
# volumes:
# - /var/run/docker.sock:/var/run/docker.sock


Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,17 @@
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class ProducerService implements Serializable {

public static String KAFKA_BROKERS = "localhost:9092";
public static Integer MESSAGE_COUNT = 1000;
public static String CLIENT_ID = "client1";
public static String TOPIC_NAME = "demo";
public static String GROUP_ID_CONFIG = "consumerGroup1";
public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 100;
public static String OFFSET_RESET_LATEST = "latest";
public static String OFFSET_RESET_EARLIER = "earliest";
public static Integer MAX_POLL_RECORDS = 1;
public static final String ENV_KAFKA_BROKERS = "KAFKA_BROKERS";
public static final String ENV_KAFKA_CLIENTID = "KAFKA_CLIENTID";

private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")

private static Logger logger = Logger.getLogger(ProducerService.class.getName());

Producer<Long, String> producer;

/**
* The above snippet creates a Kafka producer with some properties.
* This method creates a Kafka producer with some properties during initalization.
* <p>
* BOOTSTRAP_SERVERS_CONFIG: The Kafka broker's address. If Kafka is running in
* a cluster then you can provide comma (,) seperated addresses. For
Expand All @@ -84,19 +77,16 @@ public class ProducerService implements Serializable {
*/
@PostConstruct
void init() {

logger.info("...init KafkaProducer...");
Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getEnv(ENV_KAFKA_BROKERS, "kafka:9092"));
props.put(ProducerConfig.CLIENT_ID_CONFIG, getEnv(ENV_KAFKA_CLIENTID, "Imixs-Workflow-1"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
// CustomPartitioner.class.getName());

//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
producer = new KafkaProducer<>(props);

}

/**
Expand All @@ -106,29 +96,49 @@ public void onProcess(@Observes ProcessingEvent documentEvent) {

if (ProcessingEvent.AFTER_PROCESS == documentEvent.getEventType()) {

logger.info("...consuming ProcssingEvent... send new kafka event...");

String uid = documentEvent.getDocument().getUniqueID();
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC_NAME,
"processed workitem: " + uid);
// we use the model version as the topic name

String topic = documentEvent.getDocument().getModelVersion();
String value = documentEvent.getDocument().getWorkflowGroup() + ":" + uid;

ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(topic, value);

try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent with key " + uid + " to partition " + metadata.partition()

logger.info("...Imixs-Workflow Event sent with key " + uid + " to partition " + metadata.partition()
+ " with offset " + metadata.offset());
}

catch (ExecutionException e) {
System.out.println("Error in sending record");
System.out.println(e);
logger.info("Error in sending record: " + e.getMessage());
}

catch (InterruptedException e) {
System.out.println("Error in sending record");
System.out.println(e);
System.out.println("Error in sending record: " + e.getMessage());
}

}

}

/**
* Returns a environment variable. An environment variable can be provided as a
* System property.
*
* @param env
* - environment variable name
* @param defaultValue
* - optional default value
* @return value
*/
public static String getEnv(String env, String defaultValue) {
String result = System.getenv(env);
if (result == null || result.isEmpty()) {
result = defaultValue;
}
return result;
}
}
Loading

0 comments on commit 72b1603

Please sign in to comment.