diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index df1605af31c..395217cc378 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -182,8 +182,9 @@ static int in_kafka_collect(struct flb_input_instance *ins, if(!ctx->enable_auto_commit) { - /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + rd_kafka_commit(ctx->kafka.rk, NULL, 0); + } } /* Break from the loop when reaching the limit of polling if available */ @@ -225,6 +226,7 @@ static int in_kafka_init(struct flb_input_instance *ins, char errstr[512]; (void) data; char conf_val[16]; + size_t dsize; /* Allocate space for the configuration context */ ctx = flb_malloc(sizeof(struct flb_in_kafka_config)); @@ -252,13 +254,20 @@ static int in_kafka_init(struct flb_input_instance *ins, * -> minimize the delay we might create * b) run in our own thread: * -> optimize for throuput and relay on 'fetch.wait.max.ms' - * which is set to 500 by default default. lets set it to - * twice that so that increasing fetch.wait.max.ms still - * has an effect. + * which is set to 500 by default default. wa algin our + * timeout with what is set for 'fetch.wait.max.ms' */ ctx->poll_timeount_ms = 1; - if(ins->is_threaded) { - ctx->poll_timeount_ms = 1000; + if (ins->is_threaded) { + ctx->poll_timeount_ms = 550; // ensure kafa triggers timeout + + // align our timeout with what was configured for fetch.wait.max.ms + dsize = sizeof(conf_val); + res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize); + if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) { + // add 50ms so kafa triggers timout + ctx->poll_timeount_ms = atoi(conf_val) + 50; + } } if (ctx->buffer_max_size > 0) { @@ -451,7 +460,6 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit), "Rely on kafka auto-commit and commit messages in batches" }, - /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 7eca5f5341f..7a57e0ca68b 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -33,6 +33,7 @@ #define FLB_IN_KAFKA_UNLIMITED (size_t)-1 #define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M" #define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false" +#define FLB_IN_KAFKA_POLL_TIMEOUT_MS "550" // same as kafka fetch.wait.max.ms + 10% enum { FLB_IN_KAFKA_FORMAT_NONE,