Skip to content

Commit

Permalink
rdkafka_performance: simulate consumer processing time with -r <rate>
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Jul 15, 2018
1 parent 04d59a2 commit 321bafc
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions examples/rdkafka_performance.c
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,23 @@ static rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
* @brief Sleep for \p sleep_us microseconds.
*/
static void do_sleep (int sleep_us) {
if (sleep_us > 100) {
#ifdef _MSC_VER
Sleep(sleep_us / 1000);
#else
usleep(sleep_us);
#endif
} else {
rd_ts_t next = rd_clock() + (rd_ts_t)sleep_us;
while (next > rd_clock())
;
}
}


int main (int argc, char **argv) {
char *brokers = NULL;
Expand Down Expand Up @@ -1383,19 +1400,8 @@ int main (int argc, char **argv) {
cnt.msgs++;
cnt.bytes += msgsize;

if (rate_sleep) {
if (rate_sleep > 100) {
#ifdef _MSC_VER
Sleep(rate_sleep / 1000);
#else
usleep(rate_sleep);
#endif
} else {
rd_ts_t next = rd_clock() + rate_sleep;
while (next > rd_clock())
;
}
}
if (rate_sleep)
do_sleep(rate_sleep);

/* Must poll to handle delivery reports */
rd_kafka_poll(rk, 0);
Expand Down Expand Up @@ -1523,6 +1529,12 @@ int main (int argc, char **argv) {
if (r == -1)
fprintf(stderr, "%% Error: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
else if (r > 0 && rate_sleep) {
/* Simulate processing time
* if `-r <rate>` was set. */
do_sleep(rate_sleep);
}


print_stats(rk, mode, otype, compression);

Expand Down Expand Up @@ -1604,6 +1616,11 @@ int main (int argc, char **argv) {
if (rkmessage) {
msg_consume(rkmessage, NULL);
rd_kafka_message_destroy(rkmessage);

/* Simulate processing time
* if `-r <rate>` was set. */
if (rate_sleep)
do_sleep(rate_sleep);
}

cnt.t_fetch_latency += rd_clock() - fetch_latency;
Expand Down

0 comments on commit 321bafc

Please sign in to comment.