Skip to content

Commit

Permalink
Merge pull request #87 from timiblossom/replication_fix3
Browse files Browse the repository at this point in the history
Add metrics for latencies
  • Loading branch information
timiblossom committed Feb 9, 2015
2 parents 34c1cbf + f14af6e commit 0a2cd60
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 24 deletions.
2 changes: 1 addition & 1 deletion conf/redis_single.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ dyn_o_mite:
dyn_listen: 127.0.0.1:8101
redis: true
listen: 127.0.0.1:8102
dyn_seed_provider: florida_provider
dyn_seed_provider: simple_provider
servers:
- 127.0.0.1:22122:1
tokens: 437425602
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dynomite_SOURCES = \
dyn_dnode_request.c \
dyn_dnode_response.c \
dyn_dnode_server.c dyn_dnode_server.h \
dyn_histogram.c dyn_histogram.h \
dyn_server.c dyn_server.h \
dyn_proxy.c dyn_proxy.h \
dyn_message.c dyn_message.h \
Expand Down
27 changes: 22 additions & 5 deletions src/dyn_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,23 @@ core_ctx_create(struct instance *nci)
/* parse and create configuration */
ctx->cf = conf_create(nci->conf_filename);
if (ctx->cf == NULL) {
loga("Failed to create context!!!");
dn_free(ctx);
return NULL;
}

status = histo_init();
if (status != DN_OK) {
loga("Failed to initialize server pool!!!");
conf_destroy(ctx->cf);
dn_free(ctx);
return NULL;
}

/* initialize server pool from configuration */
status = server_pool_init(&ctx->pool, &ctx->cf->pool, ctx);
if (status != DN_OK) {
loga("Failed to initialize server pool!!!");
conf_destroy(ctx->cf);
dn_free(ctx);
return NULL;
Expand All @@ -75,6 +84,7 @@ core_ctx_create(struct instance *nci)
/* crypto init */
status = crypto_init(ctx);
if (status != DN_OK) {
loga("Failed to initialize crypto!!!");
dn_free(ctx);
return NULL;
}
Expand All @@ -84,6 +94,7 @@ core_ctx_create(struct instance *nci)
ctx->stats = stats_create(nci->stats_port, nci->stats_addr, nci->stats_interval,
nci->hostname, &ctx->pool, ctx);
if (ctx->stats == NULL) {
loga("Failed to create stats!!!");
crypto_deinit();
server_pool_deinit(&ctx->pool);
conf_destroy(ctx->cf);
Expand All @@ -94,6 +105,7 @@ core_ctx_create(struct instance *nci)
/* initialize event handling for client, proxy and server */
ctx->evb = event_base_create(EVENT_SIZE, &core_core);
if (ctx->evb == NULL) {
loga("Failed to create socket event handling!!!");
crypto_deinit();
stats_destroy(ctx->stats);
server_pool_deinit(&ctx->pool);
Expand All @@ -105,6 +117,7 @@ core_ctx_create(struct instance *nci)
/* preconnect? servers in server pool */
status = server_pool_preconnect(ctx);
if (status != DN_OK) {
loga("Failed to preconnect for server pool!!!");
crypto_deinit();
server_pool_disconnect(ctx);
event_base_destroy(ctx->evb);
Expand All @@ -118,6 +131,7 @@ core_ctx_create(struct instance *nci)
/* initialize proxy per server pool */
status = proxy_init(ctx);
if (status != DN_OK) {
loga("Failed to initialize proxy!!!");
crypto_deinit();
server_pool_disconnect(ctx);
event_base_destroy(ctx->evb);
Expand All @@ -131,6 +145,7 @@ core_ctx_create(struct instance *nci)
/* initialize dnode listener per server pool */
status = dnode_init(ctx);
if (status != DN_OK) {
loga("Failed to initialize dnode!!!");
crypto_deinit();
server_pool_disconnect(ctx);
event_base_destroy(ctx->evb);
Expand All @@ -146,6 +161,7 @@ core_ctx_create(struct instance *nci)
/* initialize peers */
status = dnode_peer_init(&ctx->pool, ctx);
if (status != DN_OK) {
loga("Failed to initialize dnode peers!!!");
crypto_deinit();
dnode_deinit(ctx);
server_pool_disconnect(ctx);
Expand All @@ -162,6 +178,7 @@ core_ctx_create(struct instance *nci)
/* preconntect peers - probably start gossip here */
status = dnode_peer_pool_preconnect(ctx);
if (status != DN_OK) {
loga("Failed to preconnect dnode peers!!!");
crypto_deinit();
dnode_peer_deinit(&ctx->pool);
dnode_deinit(ctx);
Expand Down Expand Up @@ -216,9 +233,9 @@ core_start(struct instance *nci)
if (ctx != NULL) {
nci->ctx = ctx;

if (get_tracking_level() >= LOG_VVERB) {
crypto_check();
}
if (get_tracking_level() >= LOG_VVERB) {
crypto_check();
}
return ctx;
}

Expand Down Expand Up @@ -555,9 +572,9 @@ core_loop(struct context *ctx)
{
int nsd;

//log_debug(LOG_VERB, "timeout = %d", ctx->timeout);
log_debug(LOG_VERB, "timeout = %d", ctx->timeout);

core_process_messages();
core_process_messages();
nsd = event_wait(ctx->evb, ctx->timeout);
if (nsd < 0) {
return nsd;
Expand Down
1 change: 1 addition & 0 deletions src/dyn_gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ gossip_update_seeds(struct server_pool *sp, struct mbuf *seeds)
static void *
gossip_loop(void *arg)
{

struct server_pool *sp = arg;
uint64_t gossip_interval = gn_pool.g_interval * 1000;

Expand Down
259 changes: 259 additions & 0 deletions src/dyn_histogram.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* dyn_histogram.c
*
* Created on: Feb 6, 2015
* Author: mdo
*/

#include <math.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

#include "dyn_core.h"
#include "dyn_conf.h"
#include "dyn_histogram.h"



/* a port from this java code:
* https://github.com/apache/cassandra/blob/cassandra-1.2/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
*
* Will try to use https://github.com/HdrHistogram/HdrHistogram_c later
*/



static uint64_t bucket_offsets[BUCKET_SIZE];
static uint64_t buckets[BUCKET_SIZE];
static uint64_t max;


rstatus_t histo_init()
{
uint64_t last = 1;
bucket_offsets[0] = last;
int i;
for(i = 1; i<BUCKET_SIZE; i++)
{
uint64_t next = floor(last * 1.2);
if (next == last)
next++;

bucket_offsets[i] = next;
last = next;
}

for(i = 0; i<BUCKET_SIZE; i++) {
buckets[i] = 0;
}

max = 0;

return DN_OK;
}


static uint64_t count()
{
uint64_t sum = 0L;
int i;
for (i = 0; i < BUCKET_SIZE; i++)
sum += buckets[i];
return sum;
}


int histo_bucket_size()
{
return BUCKET_SIZE;
}


uint64_t* histo_bucket_offsets()
{
return bucket_offsets;
}


void histo_add(uint64_t val)
{
int begin_index, end_index, left_index, right_index, middle_index, index;

begin_index = left_index = 0;
end_index = right_index = BUCKET_SIZE-1;


while (left_index < right_index ) {
middle_index = left_index + (right_index - left_index) / 2;

if (val == bucket_offsets[middle_index]) {
index = middle_index;
break;
} else if (val < bucket_offsets[middle_index]) {
right_index = middle_index;
} else {
left_index = middle_index;
}

if (left_index == right_index - 1) {
index = left_index;
break;
}
}

if (left_index == right_index - 1)
index = left_index;

buckets[index]++;

//store max value
max = (max > val)? max : val;
}


uint64_t histo_get_bucket(int bucket)
{
if (bucket < BUCKET_SIZE)
return buckets[bucket];

return 0;
}


void histo_get_buckets(uint64_t* my_buckets)
{
int i;
for(i=0; i<BUCKET_SIZE; i++) {
my_buckets[i] = buckets[i];
}

}


uint64_t histo_percentile(double percentile)
{
if (percentile < 0 && percentile > 1.0) {
return 0;
}

int last_bucket = BUCKET_SIZE - 1;
if (buckets[last_bucket] > 0) {
log_error("histogram overflowed!");
return -1;
}

uint64_t pcount = floor(count() * percentile);
if (pcount == 0)
return 0;

uint64_t elements = 0;
int i;
for (i = 0; i < last_bucket; i++)
{
elements += buckets[i];
if (elements >= pcount)
return bucket_offsets[i];
}

return 0;
}


uint64_t histo_mean()
{
int last_bucket = BUCKET_SIZE - 1;
if (buckets[last_bucket] > 0) {
log_error("histogram overflowed!");
return -1;
}

uint64_t elements = 0;
uint64_t sum = 0;
int i;
for (i = 0; i < last_bucket; i++)
{
elements += buckets[i];
sum += buckets[i] * bucket_offsets[i];
}

return ceil((double) sum / elements);
}


uint64_t histo_max()
{
return max;
}


void histo_compute_latencies(uint64_t* mean, uint64_t* latency_95th,
uint64_t* latency_99th, uint64_t* latency_999th, uint64_t* latency_max)
{
if (mean == NULL || latency_95th == NULL || latency_99th == NULL || latency_999th == NULL) {
return;
}


int last_bucket = BUCKET_SIZE - 1;
if (buckets[last_bucket] > 0) {
log_error("histogram overflowed!");
return;
}

uint64_t p95_count = floor(count() * 0.95);
uint64_t p99_count = floor(count() * 0.99);
uint64_t p999_count = floor(count() * 0.999);

uint64_t val_95th = 0;
uint64_t val_99th = 0;
uint64_t val_999th = 0;


uint64_t elements = 0;
uint64_t sum = 0;
int i;
for (i = 0; i < last_bucket; i++)
{
elements += buckets[i];
if (elements >= p95_count && val_95th == 0)
val_95th = bucket_offsets[i];

if (elements >= p99_count && val_99th == 0)
val_99th = bucket_offsets[i];

if (elements >= p999_count && val_999th == 0)
val_999th = bucket_offsets[i];

sum += buckets[i] * bucket_offsets[i];

}

if (elements != 0)
*mean = ceil((double) sum / elements);

*latency_95th = val_95th;
*latency_99th = val_99th;
*latency_999th = val_999th;
*latency_max = max;
}


void print_buckets() {
int i;
for(i = 0; i<BUCKET_SIZE; i++) {
loga(" val: %lu - offset: %lu\n", buckets[i], bucket_offsets[i]);
}

printf("\n");
}

void print_bucketoffsets() {
int i;
for(i = 0; i<BUCKET_SIZE; i++) {
loga(" val %lu\n", bucket_offsets[i]);
}

loga("\n");

}
Loading

0 comments on commit 0a2cd60

Please sign in to comment.