Skip to content

Commit

Permalink
Consistency for SMEMBERS when using DC_SAFE_QUORUM
Browse files Browse the repository at this point in the history
Although the data written to the same set in different replics will be
the same, the order in which they're returned in each replica might
be different. This causes the checksums of each replica set to be
different, causing SMEMBERS to fail under DC_SAFE_QUORUM.

This patch addresses this by rewriting all SMEMBERS calls to the
following (only when DC_SAFE_QUORUM is set):
 * SMEMBERS <set> -> SORT <set> ALPHA

This ensures that the returned set is in the same order across replicas
and hence ensures that their checksums match.

A mini-framework for rewriting queries is introduced to make future
query rewrites easier. Note that this is different from query
fragmenting which already exists today for queries like MGET.
  • Loading branch information
smukil committed Aug 28, 2018
1 parent 4b9e839 commit 68aad15
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 17 deletions.
65 changes: 48 additions & 17 deletions src/dyn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
static rstatus_t msg_quorum_rsp_handler(struct msg *req, struct msg *rsp);
static msg_response_handler_t msg_get_rsp_handler(struct msg *req);

static rstatus_t rewrite_query_if_necessary(struct msg** req, struct context* ctx);
static rstatus_t fragment_query_if_necessary(struct msg* req, struct conn* conn,
struct msg_tqh* frag_msgq);

static void
client_ref(struct conn *conn, void *owner)
Expand Down Expand Up @@ -928,6 +931,37 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *req)
}
}

/*
* Rewrites a query if necessary.
*
* If a rewrite occured, it will replace '*req' with the new 'msg' that contains the new query
* and free up the original msg.
*
*/
rstatus_t rewrite_query_if_necessary(struct msg** req, struct context* ctx) {
bool did_rewrite = false;
struct msg* new_req = NULL;
rstatus_t ret_status = g_rewrite_query(*req, ctx, &did_rewrite, &new_req);
THROW_STATUS(ret_status);

if (did_rewrite) {
// If we successfully did a rewrite, we need to recycle the memory used by the original
// request and point it to the 'new_req'.
msg_put(*req);
*req = new_req;
}
return DN_OK;
}

/*
* Fragments a query if applicable.
* 'frag_msgq' will be non-empty if the query is fragmented.
*/
rstatus_t fragment_query_if_necessary(struct msg* req, struct conn* conn, struct msg_tqh* frag_msgq) {
struct server_pool *pool = conn->owner;
struct rack *rack = server_get_rack_by_dc_rack(pool, &pool->rack, &pool->dc);
return g_fragment(req, pool, rack, frag_msgq);
}

void
req_recv_done(struct context *ctx, struct conn *conn,
Expand All @@ -953,16 +987,11 @@ req_recv_done(struct context *ctx, struct conn *conn,
struct msg_tqh frag_msgq;
TAILQ_INIT(&frag_msgq);

struct server_pool *pool = conn->owner;
struct rack *rack = server_get_rack_by_dc_rack(pool, &pool->rack, &pool->dc);
rstatus_t status = g_fragment(req, pool, rack, &frag_msgq);
if (status != DN_OK) {
if (req->expect_datastore_reply) {
conn_enqueue_outq(ctx, conn, req);
}
req_forward_error(ctx, conn, req, DN_OK, status); //TODO: CHeck error code
return;
}
rstatus_t status = rewrite_query_if_necessary(&req, ctx);
if (status != DN_OK) goto error;

status = fragment_query_if_necessary(req, conn, &frag_msgq);
if (status != DN_OK) goto error;

/* if no fragment happened */
if (TAILQ_EMPTY(&frag_msgq)) {
Expand All @@ -971,13 +1000,7 @@ req_recv_done(struct context *ctx, struct conn *conn,
}

status = req_make_reply(ctx, conn, req);
if (status != DN_OK) {
if (req->expect_datastore_reply) {
conn_enqueue_outq(ctx, conn, req);
}
req_forward_error(ctx, conn, req, DN_OK, status);
return;
}
if (status != DN_OK) goto error;

struct msg *sub_msg, *tmsg;
for (sub_msg = TAILQ_FIRST(&frag_msgq); sub_msg != NULL; sub_msg = tmsg) {
Expand All @@ -989,8 +1012,16 @@ req_recv_done(struct context *ctx, struct conn *conn,
}
ASSERT(TAILQ_EMPTY(&frag_msgq));
return;

error:
if (req->expect_datastore_reply) {
conn_enqueue_outq(ctx, conn, req);
}
req_forward_error(ctx, conn, req, DN_OK, status); //TODO: CHeck error code
return;
}


static msg_response_handler_t
msg_get_rsp_handler(struct msg *req)
{
Expand Down
26 changes: 26 additions & 0 deletions src/dyn_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func_msg_fragment_t g_fragment; /* message post-coalesce */
func_msg_verify_t g_verify_request; /* message post-coalesce */
func_is_multikey_request g_is_multikey_request;
func_reconcile_responses g_reconcile_responses;
func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary */

#define DEFINE_ACTION(_name) string(#_name),
static struct string msg_type_strings[] = {
Expand Down Expand Up @@ -197,6 +198,7 @@ set_datastore_ops(void)
g_verify_request = redis_verify_request;
g_is_multikey_request = redis_is_multikey_request;
g_reconcile_responses = redis_reconcile_responses;
g_rewrite_query = redis_rewrite_query;
break;
case DATA_MEMCACHE:
g_pre_coalesce = memcache_pre_coalesce;
Expand All @@ -205,6 +207,7 @@ set_datastore_ops(void)
g_verify_request = memcache_verify_request;
g_is_multikey_request = memcache_is_multikey_request;
g_reconcile_responses = memcache_reconcile_responses;
g_rewrite_query = memcache_rewrite_query;
break;
default:
return;
Expand Down Expand Up @@ -753,6 +756,29 @@ msg_get_tagged_key(struct msg *req, uint32_t key_index, uint32_t *keylen)
return msg_get_key(req, key_index, keylen, true);
}

/*
* Returns the 'idx' key in 'msg'.
*
* Transfers ownership of returned buffer to the caller, so the caller must
* take the responsibility of freeing it.
*
* Returns NULL if key does not exist or if we're unable to allocate memory.
*/
uint8_t* msg_get_full_key_copy(struct msg* msg, int idx, uint32_t *keylen) {
// Get a pointer to the required key in 'msg'.
uint8_t* key_ptr = msg_get_full_key(msg, idx, keylen);

// Allocate a new buffer for the key.
uint8_t* copied_key = dn_alloc((size_t) (*keylen + 1));
if (copied_key == NULL) return NULL;

// Copy contents of the key from 'msg' to our new buffer.
dn_memcpy(copied_key, key_ptr, *keylen);
copied_key[*keylen] = '\0';

return copied_key;
}

uint32_t
msg_payload_crc32(struct msg *rsp)
{
Expand Down
4 changes: 4 additions & 0 deletions src/dyn_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ typedef rstatus_t (*msg_response_handler_t)(struct msg *req, struct msg *rsp);
typedef bool (*func_msg_failure_t)(struct msg *r);
typedef bool (*func_is_multikey_request)(struct msg *r);
typedef struct msg *(*func_reconcile_responses)(struct response_mgr *rspmgr);
typedef rstatus_t (*func_msg_rewrite_t)(struct msg *orig_msg, struct context* ctx,
bool* did_rewrite, struct msg** new_msg_ptr);

extern func_msg_coalesce_t g_pre_coalesce; /* message pre-coalesce */
extern func_msg_coalesce_t g_post_coalesce; /* message post-coalesce */
extern func_msg_fragment_t g_fragment; /* message fragment */
extern func_msg_verify_t g_verify_request; /* message verify */
extern func_is_multikey_request g_is_multikey_request;
extern func_reconcile_responses g_reconcile_responses;
extern func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary */

void set_datastore_ops(void);

Expand Down Expand Up @@ -484,6 +487,7 @@ rstatus_t msg_prepend_format(struct msg *msg, const char *fmt, ...);

uint8_t *msg_get_tagged_key(struct msg *req, uint32_t key_index, uint32_t *keylen);
uint8_t *msg_get_full_key(struct msg *req, uint32_t key_index, uint32_t *keylen);
uint8_t* msg_get_full_key_copy(struct msg* msg, int idx, uint32_t *keylen);

struct msg *req_get(struct conn *conn);
void req_put(struct msg *msg);
Expand Down
9 changes: 9 additions & 0 deletions src/proto/dyn_memcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1623,3 +1623,12 @@ memcache_reconcile_responses(struct response_mgr *rspmgr)
return rsp;
}
}

/*
* Placeholder function for memcache query rewrites.
* No rewrites implemented toady.
*/
rstatus_t memcache_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite,
struct msg** new_msg_ptr) {
return DN_OK;
}
4 changes: 4 additions & 0 deletions src/proto/dyn_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ rstatus_t memcache_fragment(struct msg *r, struct server_pool *pool, struct rack
struct msg_tqh *frag_msgq);
rstatus_t memcache_verify_request(struct msg *r, struct server_pool *pool,
struct rack *rack);
rstatus_t memcache_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite,
struct msg** new_msg_ptr);

void redis_parse_req(struct msg *r, const struct string *hash_tag);
void redis_parse_rsp(struct msg *r, const struct string *UNUSED);
Expand All @@ -50,5 +52,7 @@ rstatus_t redis_fragment(struct msg *r, struct server_pool *pool, struct rack *r
struct msg_tqh *frag_msgq);
rstatus_t redis_verify_request(struct msg *r, struct server_pool *pool,
struct rack *rack);
rstatus_t redis_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite,
struct msg** new_msg_ptr);

#endif
93 changes: 93 additions & 0 deletions src/proto/dyn_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,99 @@ redis_error(struct msg *r)
return false;
}

/*
* Detects the query and does a rewrite if applicable.
*
* Currently the following queries are rewritten:
* 1) SMEMBERS <set> -> SORT <myset> ALPHA (only when DC_SAFE_QUORUM=true)
* We rewrite this query this way since when DC_SAFE_QUORUM is enabled,
* we run this query on multiple nodes and take checksums and compare that
* they're the same. Since SMEMBERS offers no ordering guarantee, even though
* the elements are the same, the order of elements in the set might be
* different causing the checksums to be different and hence causing the
* query to fail. Rewriting it to a SORT query ensures ordering and thus
* ensures that the checksum comparison succeeds.
*
* * Sets *did_rewrite='true' if a rewrite occured and 'false' if not.
* * Does not modify 'orig_msg' and sets 'new_msg_ptr' to point to the new 'msg' struct with the
* rewritten query if 'did_rewrite' is true.
* * Caller must take ownership of the newly allocated msg '*new_msg_ptr'.
*/
rstatus_t redis_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite,
struct msg** new_msg_ptr) {
const char* SMEMBERS_REWRITE_FMT_STRING = "*3\r\n$4\r\nsort\r\n$%d\r\n%s\r\n$5\r\nalpha\r\n";

ASSERT(orig_msg != NULL);
ASSERT(orig_msg->is_request);
ASSERT(did_rewrite != NULL);

*did_rewrite = false;

struct msg* new_msg = NULL;
uint8_t* key = NULL;
rstatus_t ret_status = DN_OK;
switch (orig_msg->type) {
case MSG_REQ_REDIS_SMEMBERS:

if (orig_msg->owner->read_consistency == DC_SAFE_QUORUM) {
// SMEMBERS should have only one key.
ASSERT(orig_msg->nkeys == 1);

// Get a new 'msg' structure.
new_msg = msg_get(orig_msg->owner, true, __FUNCTION__);
if (new_msg == NULL) {
ret_status = DN_ENOMEM;
goto error;
}

uint32_t keylen;
// Get a copy of the key from 'orig_msg'.
key = msg_get_full_key_copy(orig_msg, 0, &keylen);
if (key == NULL) {
ret_status = DN_ENOMEM;
goto error;
}

// Write the new command into 'new_msg'
rstatus_t prepend_status = msg_prepend_format(new_msg, SMEMBERS_REWRITE_FMT_STRING, keylen, key);
if (prepend_status != DN_OK) {
ret_status = prepend_status;
goto error;
}

{
// Point the 'pos' pointer in 'new_msg' to the mbuf we've added.
struct mbuf* new_mbuf = STAILQ_LAST(&new_msg->mhdr, mbuf, next);
new_msg->pos = new_mbuf->pos;
}
// Parse the message 'new_msg' to populate all of its appropriate fields.
new_msg->parser(new_msg, &ctx->pool.hash_tag);
// Check if 'new_msg' was parsed successfully.
if (new_msg->result != MSG_PARSE_OK) {
ret_status = DN_ERROR;
goto error;
}

*new_msg_ptr = new_msg;
*did_rewrite = true;
goto done;
}
break;
default:
return DN_OK;
}

error:
if (key != NULL) dn_free(key);
// Return the newly allocated message back to the free message queue.
if (new_msg != NULL) msg_put(new_msg);
return ret_status;

done:
if (key != NULL) dn_free(key);
return DN_OK;
}

/*
* Reference: http://redis.io/topics/protocol
*
Expand Down

0 comments on commit 68aad15

Please sign in to comment.