Skip to content

Commit

Permalink
Implement concurrency.
Browse files Browse the repository at this point in the history
  • Loading branch information
octo committed Jul 11, 2013
1 parent 51aaa74 commit a6edbeb
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 14 deletions.
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
bin_PROGRAMS = statsd-tg

statsd_tg_SOURCES = statsd-tg.c
statsd_tg_CFLAGS = $(AM_CFLAGS) -pthread
statsd_tg_LDADD = -lrt
107 changes: 93 additions & 14 deletions src/statsd-tg.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include <signal.h>
#include <errno.h>
#include <assert.h>
#include <time.h>
#include <pthread.h>

#include <sys/types.h>
#include <sys/socket.h>
Expand All @@ -57,11 +59,13 @@ static int conf_set_size = DEF_SET_SIZE;
static const char *conf_node = DEF_NODE;
static const char *conf_service = DEF_SERVICE;

static int sock = -1;
static int conf_threads_num = 1;

static struct sigaction sigint_action;
static struct sigaction sigterm_action;

static unsigned long long events_sent = 0;
pthread_mutex_t events_sent_lock = PTHREAD_MUTEX_INITIALIZER;
static _Bool loop = 1;

__attribute__((noreturn))
Expand Down Expand Up @@ -101,6 +105,7 @@ static int sock_open (void) /* {{{ */
struct addrinfo ai_hints;
struct addrinfo *ai_list = NULL;
struct addrinfo *ai_ptr;
int sock;

int status;

Expand Down Expand Up @@ -147,17 +152,17 @@ static int sock_open (void) /* {{{ */
exit (EXIT_FAILURE);
}

return (0);
return (sock);
} /* }}} int sock_open */

static int send_random_event (void) /* {{{ */
static int send_random_event (int sock, unsigned short seed[static 3]) /* {{{ */
{
long conf_num_total = conf_num_counters + conf_num_timers
+ conf_num_gauges + conf_num_sets;
/* Not completely fair, but good enough for our use-case. */
long rnd = lrand48 () % conf_num_total;
long rnd = nrand48 (seed) % conf_num_total;

long value = lrand48 ();
long value = nrand48 (seed);
char *type;

char buffer[1024];
Expand Down Expand Up @@ -199,7 +204,7 @@ static int send_random_event (void) /* {{{ */
status = send (sock, buffer, (size_t) buffer_size, /* flags = */ 0);
if (status < 0)
{
fprintf (stderr, "send failed: %s", strerror (errno));
fprintf (stderr, "send failed: %s\n", strerror (errno));
return (-1);
}

Expand Down Expand Up @@ -239,6 +244,10 @@ static int read_options (int argc, char **argv) /* {{{ */
{
int opt;

#ifdef _SC_NPROCESSORS_ONLN
conf_threads_num = (int) sysconf (_SC_NPROCESSORS_ONLN);
#endif

while ((opt = getopt (argc, argv, "c:t:g:s:S:d:D:h")) != -1)
{
switch (opt)
Expand Down Expand Up @@ -271,6 +280,10 @@ static int read_options (int argc, char **argv) /* {{{ */
conf_service = optarg;
break;

case 'T':
get_integer_opt (optarg, &conf_threads_num);
break;

case 'h':
exit_usage (EXIT_SUCCESS);

Expand All @@ -282,8 +295,76 @@ static int read_options (int argc, char **argv) /* {{{ */
return (0);
} /* }}} int read_options */

static void *send_thread (void *args __attribute__((unused))) /* {{{ */
{
int sock;
unsigned short seed[3];
struct timespec ts;

unsigned long long local_events_sent = 0;

clock_gettime (CLOCK_REALTIME, &ts);
seed[2] = (unsigned short) (ts.tv_nsec);
seed[1] = (unsigned short) (ts.tv_nsec >> 16);
seed[0] = (unsigned short) (ts.tv_sec);

sock = sock_open ();

while (loop)
{
send_random_event (sock, seed);
local_events_sent++;
}

close (sock);

pthread_mutex_lock (&events_sent_lock);
events_sent += local_events_sent;
pthread_mutex_unlock (&events_sent_lock);

return (NULL);
} /* }}} void *send_thread */

static void run_threads (void) /* {{{ */
{
pthread_t threads[conf_threads_num];
int i;

for (i = 0; i < conf_threads_num; i++)
{
int status;

status = pthread_create (&threads[i], /* attr = */ NULL,
send_thread, /* args = */ NULL);
if (status != 0)
{
fprintf (stderr, "pthread_create failed.");
abort ();
}
}

for (i = 0; i < conf_threads_num; i++)
pthread_join (threads[i], /* retval = */ NULL);
} /* }}} void run_threads */

static double timespec_diff (struct timespec const *ts0, /* {{{ */
struct timespec const *ts1)
{
time_t diff_sec;
long diff_nsec;

diff_sec = ts1->tv_sec - ts0->tv_sec;
diff_nsec += ts1->tv_nsec - ts0->tv_nsec;

return ((double) diff_sec) + (((double) diff_nsec) / 1.0e9);
} /* }}} double timespec_diff */

int main (int argc, char **argv) /* {{{ */
{
struct timespec ts_begin;
struct timespec ts_end;
double runtime;

read_options (argc, argv);

sigint_action.sa_handler = signal_handler;
Expand All @@ -292,15 +373,13 @@ int main (int argc, char **argv) /* {{{ */
sigterm_action.sa_handler = signal_handler;
sigaction (SIGTERM, &sigterm_action, /* old = */ NULL);

sock_open ();
clock_gettime (CLOCK_MONOTONIC, &ts_begin);
run_threads ();
clock_gettime (CLOCK_MONOTONIC, &ts_end);

while (loop)
{
send_random_event ();
}

close (sock);
sock = -1;
runtime = timespec_diff (&ts_begin, &ts_end);
printf ("Sent %llu events in %.0fs (%.0f events/s).\n",
events_sent, runtime, ((double) events_sent) / runtime);

exit (EXIT_SUCCESS);
return (0);
Expand Down

0 comments on commit a6edbeb

Please sign in to comment.