Warning: This user-defined Kafka source is currently NOT production ready. It's pending fix by #3. Please consider using the built-in Kafka instead. If you have a particular use case, please open an issue to let us know.
Kafka Source is a user-defined source for Numaflow that facilitates reading messages from Kafka.
To integrate the Kafka source in your own Numaflow pipeline, follow these detailed steps:
Deploy your own Kafka server to your cluster. Create your own Kafka brokers, topic and consumer groups, etc.
Define the Kafka source configuration in a ConfigMap and mount it to the Kafka source pod as a volume. The following is an example of a Kafka source configuration:
apiVersion: v1
data:
# kafka-config.yaml is the config file that the Kafka source uses to connect to Kafka.
kafka-config.yaml: |
brokers:
- 'keran-kafka-0.keran-kafka-headless.default.svc.cluster.local:9092'
- 'keran-kafka-1.keran-kafka-headless.default.svc.cluster.local:9092'
topic: input-topic-partition-debug
consumergroupname: test-consumer-group-1
tls: null
config: ""
sasl: null
kind: ConfigMap
metadata:
name: kafka-config-map
The configuration contains the following fields:
brokers
: The list of Kafka brokers to connect to.topic
: The Kafka topic to read messages from.consumergroupname
: The Kafka consumer group name.
Please notice that the fields declared above isn't the exhaustive list of all the fields that can be specified in the Kafka source configuration. For more information, please refer to the Kafka Source Configuration Struct.
Name your Kafka Configuration ConfigMap kafka-config.yaml
and mount it to the Kafka source pod as a volume under path /etc/config
.
Create all the secrets that are referenced in the Kafka source configuration and mount them to the Kafka source pod as volumes under path /etc/secrets/{secret-name}
.
Include the Kafka Source in your pipeline using the template below:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: kafka-source-e2e
spec:
vertices:
- name: in
scale:
min: 2
volumes:
- name: my-config-mount
configMap:
name: kafka-config-map
source:
udsource:
container:
image: quay.io/numaio/numaflow-source/kafka-source-go:v0.1.8
volumeMounts:
- name: my-config-mount
# /etc/config is the path where the Kafka source looks for the config file.
mountPath: /etc/config
- name: out
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: out
Now, execute the pipeline to start reading messages from the Kafka server. You should see messages being printed in the logs of the sink pod.
TODO - Add example to cover the following configurations:
tls: null
config: ""
sasl: null