Skip to content

Commit

Permalink
out_kafka: Add dynamic/static headers support
Browse files Browse the repository at this point in the history
  • Loading branch information
DIFRIN committed Mar 15, 2024
1 parent da8aea7 commit 627834f
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 8 deletions.
122 changes: 114 additions & 8 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "kafka_config.h"
#include "kafka_topic.h"


void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
void *opaque)
{
Expand Down Expand Up @@ -73,6 +74,8 @@ static int cb_kafka_init(struct flb_output_instance *ins,
{
struct flb_out_kafka *ctx;

flb_plg_info(ins, "Starting kafka output init");

/* Configuration */
ctx = flb_out_kafka_create(ins, config);
if (!ctx) {
Expand All @@ -85,6 +88,26 @@ static int cb_kafka_init(struct flb_output_instance *ins,
return 0;
}

int flb_msgpack_get_map_value(struct flb_out_kafka *ctx, msgpack_object *map, const char *key, msgpack_object **val)
{
if (map->type != MSGPACK_OBJECT_MAP) {
flb_error("[flb_msgpack_get_map_value] Map expected");
return -1;
}

size_t i;
for (i = 0; i < map->via.map.size; ++i) {
if (map->via.map.ptr[i].key.type == MSGPACK_OBJECT_STR &&
strncmp(map->via.map.ptr[i].key.via.str.ptr, key, map->via.map.ptr[i].key.via.str.size) == 0) {
*val = &map->via.map.ptr[i].val;
flb_debug("key matches a field in the message");
return 0;
}
}

return -1; // Key not found
}

int produce_message(struct flb_time *tm, msgpack_object *map,
struct flb_out_kafka *ctx, struct flb_config *config)
{
Expand All @@ -106,6 +129,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
msgpack_object key;
msgpack_object val;
flb_sds_t s;
rd_kafka_headers_t *kafka_headers = NULL;

#ifdef FLB_HAVE_AVRO_ENCODER
// used to flag when a buffer needs to be freed for avro
Expand Down Expand Up @@ -155,6 +179,70 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
msgpack_pack_str(&mp_pck, ctx->timestamp_key_len);
msgpack_pack_str_body(&mp_pck,
ctx->timestamp_key, ctx->timestamp_key_len);

/* Check if headers are provided in the configuration */
if (ctx->headers) {

flb_debug("setting message headers");
/* Setting headers list size */
int size_headers = 0;
struct mk_list *tmp;
struct mk_list *head2;
struct flb_config_map_val *mv;
struct flb_slist_entry *hkey = NULL;
struct flb_slist_entry *hval = NULL;

/* Calculate the number of headers */
mk_list_foreach_safe(head2, tmp, ctx->headers) {
size_headers++;
}

/* Create Kafka headers object */
kafka_headers = rd_kafka_headers_new(size_headers);

/* Add headers from configuration */
flb_config_map_foreach(head2, mv, ctx->headers) {
hkey = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
hval = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);

flb_debug("found header %s with value %s", hkey->str, hval->str);

/* Extract the message field value */
char *field_name = NULL;
size_t field_len = flb_sds_len(hval->str);
field_name = flb_malloc(field_len); // Allocate memory for field name
if (!field_name) {
flb_errno();
return -1;
}

memcpy(field_name, hval->str, field_len); // Copy field name
/* Check if the header value is a message field */
if (field_name[0] == '<' ) {
flb_debug("header %s is part of the msg, field name : %s", hkey->str, hval->str);
msgpack_object *field_value = NULL;
if (flb_msgpack_get_map_value(ctx, map, field_name + 1, &field_value) == 0 &&
field_value->type == MSGPACK_OBJECT_STR) {
rd_kafka_header_add(kafka_headers, hkey->str, flb_sds_len(hkey->str),
field_value->via.str.ptr, field_value->via.str.size);
}
else {
flb_warn("Field '%s' not found or not a string value", field_name);
}

flb_free(field_name); // Free allocated memory
}
else {
/* Static header value */
rd_kafka_header_add(kafka_headers, hkey->str, flb_sds_len(hkey->str),
hval->str, flb_sds_len(hval->str));
}
}
}
else {
flb_debug("no header set");
}

switch (ctx->timestamp_format) {
case FLB_JSON_DATE_DOUBLE:
msgpack_pack_double(&mp_pck, flb_time_to_double(tm));
Expand Down Expand Up @@ -221,7 +309,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
if (ctx->dynamic_topic) {
/* Only if default topic is set and this topicname is not set for this message */
if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 &&
(strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
(strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
if (memchr(val.via.str.ptr, ',', val.via.str.size)) {
/* Don't allow commas in kafkatopic name */
flb_warn("',' not allowed in dynamic_kafka topic names");
Expand Down Expand Up @@ -392,12 +480,22 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
return FLB_RETRY;
}

ret = rd_kafka_produce(topic->tp,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
out_buf, out_size,
message_key, message_key_len,
ctx);
rd_kafka_resp_err_t err = rd_kafka_producev(ctx->kafka.rk,
RD_KAFKA_V_TOPIC(rd_kafka_topic_name(topic->tp)),
RD_KAFKA_V_HEADERS(kafka_headers),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(out_buf, out_size),
RD_KAFKA_V_KEY(message_key, message_key_len),
RD_KAFKA_V_END);




if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
flb_plg_info(ctx->ins, "Sending message completed");
ret = 0;
}


if (ret == -1) {
flb_error(
Expand Down Expand Up @@ -455,7 +553,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
AVRO_FREE(avro_fast_buffer, out_buf)
}
#endif

msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_OK;
}
Expand Down Expand Up @@ -622,6 +720,12 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set the kafka topics, delimited by commas."
},
{
FLB_CONFIG_MAP_SLIST_1, "header", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, headers),
"Add a kafka message header key/value pair. Multiple headers can be set"
},

{
FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
0, FLB_FALSE, 0,
Expand All @@ -647,6 +751,8 @@ static struct flb_config_map config_map[] = {
{0}
};



struct flb_output_plugin out_kafka_plugin = {
.name = "kafka",
.description = "Kafka",
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ struct flb_out_kafka {
/* Head of defined topics by configuration */
struct mk_list topics;

/* Headers map defined by configuration*/
struct mk_list *headers;

/*
* Blocked Status: since rdkafka have it own buffering queue, there is a
* chance that the queue becomes full, when that happens our default
Expand Down

0 comments on commit 627834f

Please sign in to comment.