Skip to content

Commit

Permalink
Merge pull request #597 from smukil/smembers_final
Browse files Browse the repository at this point in the history
Consistency for SMEMBERS when using DC_SAFE_QUORUM
  • Loading branch information
smukil authored Aug 29, 2018
2 parents 4b9e839 + 68aad15 commit bdb4574
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 17 deletions.
65 changes: 48 additions & 17 deletions src/dyn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
static rstatus_t msg_quorum_rsp_handler(struct msg *req, struct msg *rsp);
static msg_response_handler_t msg_get_rsp_handler(struct msg *req);

static rstatus_t rewrite_query_if_necessary(struct msg** req, struct context* ctx);
static rstatus_t fragment_query_if_necessary(struct msg* req, struct conn* conn,
struct msg_tqh* frag_msgq);

static void
client_ref(struct conn *conn, void *owner)
Expand Down Expand Up @@ -928,6 +931,37 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *req)
}
}

/*
* Rewrites a query if necessary.
*
* If a rewrite occured, it will replace '*req' with the new 'msg' that contains the new query
* and free up the original msg.
*
*/
rstatus_t rewrite_query_if_necessary(struct msg** req, struct context* ctx) {
bool did_rewrite = false;
struct msg* new_req = NULL;
rstatus_t ret_status = g_rewrite_query(*req, ctx, &did_rewrite, &new_req);
THROW_STATUS(ret_status);

if (did_rewrite) {
// If we successfully did a rewrite, we need to recycle the memory used by the original
// request and point it to the 'new_req'.
msg_put(*req);
*req = new_req;
}
return DN_OK;
}

/*
* Fragments a query if applicable.
* 'frag_msgq' will be non-empty if the query is fragmented.
*/
rstatus_t fragment_query_if_necessary(struct msg* req, struct conn* conn, struct msg_tqh* frag_msgq) {
struct server_pool *pool = conn->owner;
struct rack *rack = server_get_rack_by_dc_rack(pool, &pool->rack, &pool->dc);
return g_fragment(req, pool, rack, frag_msgq);
}

void
req_recv_done(struct context *ctx, struct conn *conn,
Expand All @@ -953,16 +987,11 @@ req_recv_done(struct context *ctx, struct conn *conn,
struct msg_tqh frag_msgq;
TAILQ_INIT(&frag_msgq);

struct server_pool *pool = conn->owner;
struct rack *rack = server_get_rack_by_dc_rack(pool, &pool->rack, &pool->dc);
rstatus_t status = g_fragment(req, pool, rack, &frag_msgq);
if (status != DN_OK) {
if (req->expect_datastore_reply) {
conn_enqueue_outq(ctx, conn, req);
}
req_forward_error(ctx, conn, req, DN_OK, status); //TODO: CHeck error code
return;
}
rstatus_t status = rewrite_query_if_necessary(&req, ctx);
if (status != DN_OK) goto error;

status = fragment_query_if_necessary(req, conn, &frag_msgq);
if (status != DN_OK) goto error;

/* if no fragment happened */
if (TAILQ_EMPTY(&frag_msgq)) {
Expand All @@ -971,13 +1000,7 @@ req_recv_done(struct context *ctx, struct conn *conn,
}

status = req_make_reply(ctx, conn, req);
if (status != DN_OK) {
if (req->expect_datastore_reply) {
conn_enqueue_outq(ctx, conn, req);
}
req_forward_error(ctx, conn, req, DN_OK, status);
return;
}
if (status != DN_OK) goto error;

struct msg *sub_msg, *tmsg;
for (sub_msg = TAILQ_FIRST(&frag_msgq); sub_msg != NULL; sub_msg = tmsg) {
Expand All @@ -989,8 +1012,16 @@ req_recv_done(struct context *ctx, struct conn *conn,
}
ASSERT(TAILQ_EMPTY(&frag_msgq));
return;

error:
if (req->expect_datastore_reply) {
conn_enqueue_outq(ctx, conn, req);
}
req_forward_error(ctx, conn, req, DN_OK, status); //TODO: CHeck error code
return;
}


static msg_response_handler_t
msg_get_rsp_handler(struct msg *req)
{
Expand Down
26 changes: 26 additions & 0 deletions src/dyn_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func_msg_fragment_t g_fragment; /* message post-coalesce */
func_msg_verify_t g_verify_request; /* message post-coalesce */
func_is_multikey_request g_is_multikey_request;
func_reconcile_responses g_reconcile_responses;
func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary */

#define DEFINE_ACTION(_name) string(#_name),
static struct string msg_type_strings[] = {
Expand Down Expand Up @@ -197,6 +198,7 @@ set_datastore_ops(void)
g_verify_request = redis_verify_request;
g_is_multikey_request = redis_is_multikey_request;
g_reconcile_responses = redis_reconcile_responses;
g_rewrite_query = redis_rewrite_query;
break;
case DATA_MEMCACHE:
g_pre_coalesce = memcache_pre_coalesce;
Expand All @@ -205,6 +207,7 @@ set_datastore_ops(void)
g_verify_request = memcache_verify_request;
g_is_multikey_request = memcache_is_multikey_request;
g_reconcile_responses = memcache_reconcile_responses;
g_rewrite_query = memcache_rewrite_query;
break;
default:
return;
Expand Down Expand Up @@ -753,6 +756,29 @@ msg_get_tagged_key(struct msg *req, uint32_t key_index, uint32_t *keylen)
return msg_get_key(req, key_index, keylen, true);
}

/*
* Returns the 'idx' key in 'msg'.
*
* Transfers ownership of returned buffer to the caller, so the caller must
* take the responsibility of freeing it.
*
* Returns NULL if key does not exist or if we're unable to allocate memory.
*/
uint8_t* msg_get_full_key_copy(struct msg* msg, int idx, uint32_t *keylen) {
// Get a pointer to the required key in 'msg'.
uint8_t* key_ptr = msg_get_full_key(msg, idx, keylen);

// Allocate a new buffer for the key.
uint8_t* copied_key = dn_alloc((size_t) (*keylen + 1));
if (copied_key == NULL) return NULL;

// Copy contents of the key from 'msg' to our new buffer.
dn_memcpy(copied_key, key_ptr, *keylen);
copied_key[*keylen] = '\0';

return copied_key;
}

uint32_t
msg_payload_crc32(struct msg *rsp)
{
Expand Down
4 changes: 4 additions & 0 deletions src/dyn_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ typedef rstatus_t (*msg_response_handler_t)(struct msg *req, struct msg *rsp);
typedef bool (*func_msg_failure_t)(struct msg *r);
typedef bool (*func_is_multikey_request)(struct msg *r);
typedef struct msg *(*func_reconcile_responses)(struct response_mgr *rspmgr);
typedef rstatus_t (*func_msg_rewrite_t)(struct msg *orig_msg, struct context* ctx,
bool* did_rewrite, struct msg** new_msg_ptr);

extern func_msg_coalesce_t g_pre_coalesce; /* message pre-coalesce */
extern func_msg_coalesce_t g_post_coalesce; /* message post-coalesce */
extern func_msg_fragment_t g_fragment; /* message fragment */
extern func_msg_verify_t g_verify_request; /* message verify */
extern func_is_multikey_request g_is_multikey_request;
extern func_reconcile_responses g_reconcile_responses;
extern func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary */

void set_datastore_ops(void);

Expand Down Expand Up @@ -484,6 +487,7 @@ rstatus_t msg_prepend_format(struct msg *msg, const char *fmt, ...);

uint8_t *msg_get_tagged_key(struct msg *req, uint32_t key_index, uint32_t *keylen);
uint8_t *msg_get_full_key(struct msg *req, uint32_t key_index, uint32_t *keylen);
uint8_t* msg_get_full_key_copy(struct msg* msg, int idx, uint32_t *keylen);

struct msg *req_get(struct conn *conn);
void req_put(struct msg *msg);
Expand Down
9 changes: 9 additions & 0 deletions src/proto/dyn_memcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1623,3 +1623,12 @@ memcache_reconcile_responses(struct response_mgr *rspmgr)
return rsp;
}
}

/*
* Placeholder function for memcache query rewrites.
* No rewrites implemented toady.
*/
rstatus_t memcache_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite,
struct msg** new_msg_ptr) {
return DN_OK;
}
4 changes: 4 additions & 0 deletions src/proto/dyn_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ rstatus_t memcache_fragment(struct msg *r, struct server_pool *pool, struct rack
struct msg_tqh *frag_msgq);
rstatus_t memcache_verify_request(struct msg *r, struct server_pool *pool,
struct rack *rack);
rstatus_t memcache_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite,
struct msg** new_msg_ptr);

void redis_parse_req(struct msg *r, const struct string *hash_tag);
void redis_parse_rsp(struct msg *r, const struct string *UNUSED);
Expand All @@ -50,5 +52,7 @@ rstatus_t redis_fragment(struct msg *r, struct server_pool *pool, struct rack *r
struct msg_tqh *frag_msgq);
rstatus_t redis_verify_request(struct msg *r, struct server_pool *pool,
struct rack *rack);
rstatus_t redis_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite,
struct msg** new_msg_ptr);

#endif
93 changes: 93 additions & 0 deletions src/proto/dyn_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,99 @@ redis_error(struct msg *r)
return false;
}

/*
* Detects the query and does a rewrite if applicable.
*
* Currently the following queries are rewritten:
* 1) SMEMBERS <set> -> SORT <myset> ALPHA (only when DC_SAFE_QUORUM=true)
* We rewrite this query this way since when DC_SAFE_QUORUM is enabled,
* we run this query on multiple nodes and take checksums and compare that
* they're the same. Since SMEMBERS offers no ordering guarantee, even though
* the elements are the same, the order of elements in the set might be
* different causing the checksums to be different and hence causing the
* query to fail. Rewriting it to a SORT query ensures ordering and thus
* ensures that the checksum comparison succeeds.
*
* * Sets *did_rewrite='true' if a rewrite occured and 'false' if not.
* * Does not modify 'orig_msg' and sets 'new_msg_ptr' to point to the new 'msg' struct with the
* rewritten query if 'did_rewrite' is true.
* * Caller must take ownership of the newly allocated msg '*new_msg_ptr'.
*/
rstatus_t redis_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite,
struct msg** new_msg_ptr) {
const char* SMEMBERS_REWRITE_FMT_STRING = "*3\r\n$4\r\nsort\r\n$%d\r\n%s\r\n$5\r\nalpha\r\n";

ASSERT(orig_msg != NULL);
ASSERT(orig_msg->is_request);
ASSERT(did_rewrite != NULL);

*did_rewrite = false;

struct msg* new_msg = NULL;
uint8_t* key = NULL;
rstatus_t ret_status = DN_OK;
switch (orig_msg->type) {
case MSG_REQ_REDIS_SMEMBERS:

if (orig_msg->owner->read_consistency == DC_SAFE_QUORUM) {
// SMEMBERS should have only one key.
ASSERT(orig_msg->nkeys == 1);

// Get a new 'msg' structure.
new_msg = msg_get(orig_msg->owner, true, __FUNCTION__);
if (new_msg == NULL) {
ret_status = DN_ENOMEM;
goto error;
}

uint32_t keylen;
// Get a copy of the key from 'orig_msg'.
key = msg_get_full_key_copy(orig_msg, 0, &keylen);
if (key == NULL) {
ret_status = DN_ENOMEM;
goto error;
}

// Write the new command into 'new_msg'
rstatus_t prepend_status = msg_prepend_format(new_msg, SMEMBERS_REWRITE_FMT_STRING, keylen, key);
if (prepend_status != DN_OK) {
ret_status = prepend_status;
goto error;
}

{
// Point the 'pos' pointer in 'new_msg' to the mbuf we've added.
struct mbuf* new_mbuf = STAILQ_LAST(&new_msg->mhdr, mbuf, next);
new_msg->pos = new_mbuf->pos;
}
// Parse the message 'new_msg' to populate all of its appropriate fields.
new_msg->parser(new_msg, &ctx->pool.hash_tag);
// Check if 'new_msg' was parsed successfully.
if (new_msg->result != MSG_PARSE_OK) {
ret_status = DN_ERROR;
goto error;
}

*new_msg_ptr = new_msg;
*did_rewrite = true;
goto done;
}
break;
default:
return DN_OK;
}

error:
if (key != NULL) dn_free(key);
// Return the newly allocated message back to the free message queue.
if (new_msg != NULL) msg_put(new_msg);
return ret_status;

done:
if (key != NULL) dn_free(key);
return DN_OK;
}

/*
* Reference: http://redis.io/topics/protocol
*
Expand Down

0 comments on commit bdb4574

Please sign in to comment.