Skip to content

Commit

Permalink
Merge branch 'dev' into v0.6
Browse files Browse the repository at this point in the history
* dev:
  Add DYNOMITE_DNS_NAME and DYNOMITE_DNS_TYPE env vars for dns_provider
  Support list of keys for exists command
  Support for info command with args
  Fix crash caused by parsing unexpected Florida responses
  Fix crash caused by parsing 'empty' Florida responses
  Fix crash caused by parsing 'bad' Florida responses
  • Loading branch information
shailesh33 committed Mar 1, 2018
2 parents 4afe908 + 0a0427e commit 92afb5a
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 32 deletions.
53 changes: 43 additions & 10 deletions src/dyn_gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,25 +401,41 @@ parse_seeds(struct string *seeds, struct string *dc_name, struct string *rack_na
for (k = 0; k < sizeof(delim)-1; k++) {
q = dn_strrchr(p, start, delim[k]);

if (q == NULL) {
break;
}

switch (k) {
case 0:
token = q + 1;
tokenlen = (uint32_t)(p - token + 1);
if (tokenlen == 0) {
return GOS_ERROR;
}
break;
case 1:
dc = q + 1;
dclen = (uint32_t)(p - dc + 1);
if (dclen == 0) {
return GOS_ERROR;
}
string_copy(dc_name, dc, dclen);
break;
case 2:
rack = q + 1;
racklen = (uint32_t)(p - rack + 1);
if (racklen == 0) {
return GOS_ERROR;
}
string_copy(rack_name, rack, racklen);
break;

case 3:
port = q + 1;
portlen = (uint32_t)(p - port + 1);
if (portlen == 0) {
return GOS_ERROR;
}
string_copy(port_str, port, portlen);
break;

Expand All @@ -438,13 +454,19 @@ parse_seeds(struct string *seeds, struct string *dc_name, struct string *rack_na
pname = seeds->data;
log_debug(LOG_VERB, "pname %s", pname);
pnamelen = seeds->len - (tokenlen + racklen + dclen + 3);
if (pnamelen == 0) {
return GOS_ERROR;
}
// address = hostname:port
status = string_copy(address, pname, pnamelen);


//addr = hostname or ip only
addr = start;
addrlen = (uint32_t)(p - start + 1);
if (addrlen == 0) {
return GOS_ERROR;
}
//if it is a dns name, convert to IP or otherwise keep that IP
if (!isdigit( (char) addr[0])) {
addr[addrlen] = '\0';
Expand Down Expand Up @@ -683,6 +705,8 @@ gossip_update_seeds(struct server_pool *sp, struct mbuf *seeds)

struct string temp;

rstatus_t parse_status;

string_init(&rack_name);
string_init(&dc_name);
string_init(&port_str);
Expand All @@ -698,21 +722,23 @@ gossip_update_seeds(struct server_pool *sp, struct mbuf *seeds)
uint8_t *seed_node;
uint32_t seed_node_len;

while (q > start) {
while (q != NULL && q > start) {
seed_node = q + 1;
seed_node_len = (uint32_t)(p - seed_node + 1);
string_copy(&temp, seed_node, seed_node_len);
//array_init(&tokens, 1, sizeof(struct dyn_token));
init_dyn_token(&token);
parse_seeds(&temp, &dc_name, &rack_name, &port_str, &address, &ip, &token);
log_debug(LOG_VERB, "address : '%.*s'", address.len, address.data);
log_debug(LOG_VERB, "rack_name : '%.*s'", rack_name.len, rack_name.data);
log_debug(LOG_VERB, "dc_name : '%.*s'", dc_name.len, dc_name.data);
log_debug(LOG_VERB, "ip : '%.*s'", ip.len, ip.data);
log_debug(LOG_VERB, "port : '%.*s'", port_str.len, port_str.data);
parse_status = parse_seeds(&temp, &dc_name, &rack_name, &port_str, &address, &ip, &token);
log_debug(LOG_VERB, "address : '%.*s'", address.len, address.data);
log_debug(LOG_VERB, "rack_name : '%.*s'", rack_name.len, rack_name.data);
log_debug(LOG_VERB, "dc_name : '%.*s'", dc_name.len, dc_name.data);
log_debug(LOG_VERB, "ip : '%.*s'", ip.len, ip.data);
log_debug(LOG_VERB, "port : '%.*s'", port_str.len, port_str.data);

//struct dyn_token *token = array_get(&tokens, 0);
gossip_add_node_if_absent(sp, &dc_name, &rack_name, &address, &ip, &port_str, &token, NORMAL, (uint64_t) time(NULL));
if (parse_status == GOS_OK) {
gossip_add_node_if_absent(sp, &dc_name, &rack_name, &address, &ip, &port_str, &token, NORMAL, (uint64_t) time(NULL));
}

p = q - 1;
q = dn_strrchr(p, start, '|');
Expand All @@ -733,10 +759,17 @@ gossip_update_seeds(struct server_pool *sp, struct mbuf *seeds)
string_copy(&temp, seed_node, seed_node_len);
//array_init(&tokens, 1, sizeof(struct dyn_token));
init_dyn_token(&token);
parse_seeds(&temp, &dc_name, &rack_name, &port_str, &address, &ip, &token);
parse_status = parse_seeds(&temp, &dc_name, &rack_name, &port_str, &address, &ip, &token);
log_debug(LOG_VERB, "address : '%.*s'", address.len, address.data);
log_debug(LOG_VERB, "rack_name : '%.*s'", rack_name.len, rack_name.data);
log_debug(LOG_VERB, "dc_name : '%.*s'", dc_name.len, dc_name.data);
log_debug(LOG_VERB, "ip : '%.*s'", ip.len, ip.data);
log_debug(LOG_VERB, "port : '%.*s'", port_str.len, port_str.data);

//struct dyn_token *token = array_get(&tokens, 0);
gossip_add_node_if_absent(sp, &dc_name, &rack_name, &address, &ip, &port_str, &token, NORMAL, (uint64_t) time(NULL));
if (parse_status == GOS_OK) {
gossip_add_node_if_absent(sp, &dc_name, &rack_name, &address, &ip, &port_str, &token, NORMAL, (uint64_t) time(NULL));
}
}

string_deinit(&temp);
Expand Down
39 changes: 29 additions & 10 deletions src/proto/dyn_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ static bool
redis_arg0(struct msg *r)
{
switch (r->type) {
case MSG_REQ_REDIS_EXISTS:
case MSG_REQ_REDIS_PERSIST:
case MSG_REQ_REDIS_PTTL:
case MSG_REQ_REDIS_TTL:
Expand Down Expand Up @@ -148,6 +147,17 @@ redis_arg1(struct msg *r)
return false;
}

static bool
redis_arg_upto1(struct msg *r)
{
switch (r->type) {
case MSG_REQ_REDIS_INFO:
return true;
default:
break;
}
return false;
}
/*
* Return true, if the redis command accepts exactly 2 arguments, otherwise
* return false
Expand Down Expand Up @@ -273,6 +283,7 @@ redis_argx(struct msg *r)
switch (r->type) {
case MSG_REQ_REDIS_MGET:
case MSG_REQ_REDIS_DEL:
case MSG_REQ_REDIS_EXISTS:
return true;

default:
Expand Down Expand Up @@ -638,9 +649,8 @@ redis_parse_req(struct msg *r, const struct string *hash_tag)
if (str4icmp(m, 'i', 'n', 'f', 'o')) {
r->type = MSG_REQ_REDIS_INFO;
r->msg_routing = ROUTING_LOCAL_NODE_ONLY;
p = p + 1;
r->is_read = 1;
goto done;
break;
}

if (str4icmp(m, 'l', 'l', 'e', 'n')) {
Expand Down Expand Up @@ -1273,10 +1283,12 @@ redis_parse_req(struct msg *r, const struct string *hash_tag)
case SW_REQ_TYPE_LF:
switch (ch) {
case LF:
if (redis_argz(r)) {
if (redis_argz(r) && (r->rnarg == 0)) {
goto done;
} else if (r->narg == 1) {
goto error;
} else if (redis_arg_upto1(r) && r->rnarg == 0) {
goto done;
} else if (redis_arg_upto1(r) && r->rnarg == 1) {
state = SW_ARG1_LEN;
} else if (redis_argeval(r)) {
state = SW_ARG1_LEN;
} else {
Expand Down Expand Up @@ -1508,7 +1520,7 @@ redis_parse_req(struct msg *r, const struct string *hash_tag)
case SW_ARG1_LF:
switch (ch) {
case LF:
if (redis_arg1(r)) {
if (redis_arg_upto1(r) || redis_arg1(r)) {
if (r->rnarg != 0) {
goto error;
}
Expand Down Expand Up @@ -2564,7 +2576,8 @@ redis_pre_coalesce(struct msg *rsp)
switch (rsp->type) {
case MSG_RSP_REDIS_INTEGER:
/* only redis 'del' fragmented request sends back integer reply */
ASSERT(req->type == MSG_REQ_REDIS_DEL);
ASSERT((req->type == MSG_REQ_REDIS_DEL) ||
(req->type == MSG_REQ_REDIS_EXISTS));

mbuf = STAILQ_FIRST(&rsp->mhdr);
/*
Expand Down Expand Up @@ -2652,7 +2665,7 @@ redis_post_coalesce_mset(struct msg *request)
}

void
redis_post_coalesce_del(struct msg *request)
redis_post_coalesce_num(struct msg *request)
{
struct msg *response = request->selected_rsp;
rstatus_t status;
Expand Down Expand Up @@ -2726,7 +2739,8 @@ redis_post_coalesce(struct msg *req)
return redis_post_coalesce_mget(req);

case MSG_REQ_REDIS_DEL:
return redis_post_coalesce_del(req);
case MSG_REQ_REDIS_EXISTS:
return redis_post_coalesce_num(req);

case MSG_REQ_REDIS_MSET:
return redis_post_coalesce_mset(req);
Expand Down Expand Up @@ -2938,6 +2952,9 @@ redis_fragment_argx(struct msg *r, struct server_pool *pool, struct rack *rack,
} else if (r->type == MSG_REQ_REDIS_DEL) {
status = msg_prepend_format(sub_msg, "*%d\r\n$3\r\ndel\r\n",
sub_msg->narg + 1);
} if (r->type == MSG_REQ_REDIS_EXISTS) {
status = msg_prepend_format(sub_msg, "*%d\r\n$6\r\nexists\r\n",
sub_msg->narg + 1);
} else if (r->type == MSG_REQ_REDIS_MSET) {
status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmset\r\n",
sub_msg->narg + 1);
Expand Down Expand Up @@ -2972,6 +2989,7 @@ redis_fragment(struct msg *r, struct server_pool *pool, struct rack *rack, struc
switch (r->type) {
case MSG_REQ_REDIS_MGET:
case MSG_REQ_REDIS_DEL:
case MSG_REQ_REDIS_EXISTS:
return redis_fragment_argx(r, pool, rack, frag_msgq, 1);

case MSG_REQ_REDIS_MSET:
Expand Down Expand Up @@ -3014,6 +3032,7 @@ redis_is_multikey_request(struct msg *req)
switch (req->type) {
case MSG_REQ_REDIS_MGET:
case MSG_REQ_REDIS_DEL:
case MSG_REQ_REDIS_EXISTS:
case MSG_REQ_REDIS_MSET:
return true;
default:
Expand Down
29 changes: 17 additions & 12 deletions src/seedsprovider/dyn_dns.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#define DNS_TXT_NAME "_dynomite.ec2-internal"
#endif

static char * txtName = NULL;
static char * dnsName = NULL;
static char * dnsType = NULL;
static int queryType = T_TXT;
static int64_t last = 0; //storing last time for seeds check
static uint32_t last_seeds_hash = 0;

Expand Down Expand Up @@ -74,52 +76,55 @@ dns_get_seeds(struct context * ctx, struct mbuf *seeds_buf)

if (!_env_checked) {
_env_checked = 1;
txtName = getenv("DYNOMITE_DNS_TXT_NAME");
if (txtName == NULL) txtName = DNS_TXT_NAME;
dnsName = getenv("DYNOMITE_DNS_NAME");
if (dnsName == NULL) dnsName = DNS_TXT_NAME;
dnsType = getenv("DYNOMITE_DNS_TYPE");
if (dnsType != NULL) { if (strcmp(dnsType, "A") == 0) queryType = T_A; }
}

log_debug(LOG_VVERB, "checking for %s", txtName);
log_debug(LOG_VVERB, "checking for %s", dnsName);

if (!seeds_check()) {
return DN_NOOPS;
}

unsigned char buf[BUFSIZ];
int r = res_query(txtName, C_IN, T_TXT, buf, sizeof(buf));

int r = res_query(dnsName, C_IN, queryType, buf, sizeof(buf));
if (r == -1) {
log_debug(LOG_DEBUG, "DNS response for %s: %s", txtName, hstrerror(h_errno));
log_debug(LOG_DEBUG, "DNS response for %s: %s", dnsName, hstrerror(h_errno));
return DN_NOOPS;
}
if (r >= sizeof(buf)) {
log_debug(LOG_DEBUG, "DNS reply is too large for %s: %d, bufsize: %d", txtName, r, sizeof(buf));
log_debug(LOG_DEBUG, "DNS reply is too large for %s: %d, bufsize: %d", dnsName, r, sizeof(buf));
return DN_NOOPS;
}
HEADER *hdr = (HEADER*)buf;
if (hdr->rcode != NOERROR) {
log_debug(LOG_DEBUG, "DNS reply code for %s: %d", txtName, hdr->rcode);
log_debug(LOG_DEBUG, "DNS reply code for %s: %d", dnsName, hdr->rcode);
return DN_NOOPS;
}
int na = ntohs(hdr->ancount);

ns_msg m;
if (ns_initparse(buf, r, &m) == -1) {
log_debug(LOG_DEBUG, "ns_initparse error for %s: %s", txtName, strerror(errno));
log_debug(LOG_DEBUG, "ns_initparse error for %s: %s", dnsName, strerror(errno));
return DN_NOOPS;
}
int i;
ns_rr rr;
for (i = 0; i < na; ++i) {
if (ns_parserr(&m, ns_s_an, i, &rr) == -1) {
log_debug(LOG_DEBUG, "ns_parserr for %s: %s", txtName, strerror (errno));
log_debug(LOG_DEBUG, "ns_parserr for %s: %s", dnsName, strerror (errno));
return DN_NOOPS;
}
mbuf_rewind(seeds_buf);
unsigned char *s = ns_rr_rdata(rr);
if (s[0] >= ns_rr_rdlen(rr)) {
log_debug(LOG_DEBUG, "invalid TXT length for %s: %d < %d", txtName, s[0], ns_rr_rdlen(rr));
log_debug(LOG_DEBUG, "invalid length for %s: %d < %d", dnsName, s[0], ns_rr_rdlen(rr));
return DN_NOOPS;
}
log_debug(LOG_VERB, "seeds for %s: %.*s", txtName, s[0], s +1);
log_debug(LOG_VERB, "seeds for %s: %.*s", dnsName, s[0], s +1);
mbuf_copy(seeds_buf, s + 1, s[0]);
}

Expand Down
5 changes: 5 additions & 0 deletions src/seedsprovider/dyn_florida.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ florida_get_seeds(struct context * ctx, struct mbuf *seeds_buf) {
close(sock);
dn_free(remote);

if (mbuf_length(seeds_buf) == 0) {
log_error("No seeds were found in Florida response (htmlstart %u)", htmlstart);
return DN_ERROR;
}

uint32_t seeds_hash = hash_seeds(seeds_buf->pos, mbuf_length(seeds_buf));

if (last_seeds_hash != seeds_hash) {
Expand Down

0 comments on commit 92afb5a

Please sign in to comment.