From 68aad159d8042ae2ed4bfcdb395c0e4007064102 Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Tue, 28 Aug 2018 15:19:37 -0700 Subject: [PATCH] Consistency for SMEMBERS when using DC_SAFE_QUORUM Although the data written to the same set in different replics will be the same, the order in which they're returned in each replica might be different. This causes the checksums of each replica set to be different, causing SMEMBERS to fail under DC_SAFE_QUORUM. This patch addresses this by rewriting all SMEMBERS calls to the following (only when DC_SAFE_QUORUM is set): * SMEMBERS -> SORT ALPHA This ensures that the returned set is in the same order across replicas and hence ensures that their checksums match. A mini-framework for rewriting queries is introduced to make future query rewrites easier. Note that this is different from query fragmenting which already exists today for queries like MGET. --- src/dyn_client.c | 65 ++++++++++++++++++++-------- src/dyn_message.c | 26 +++++++++++ src/dyn_message.h | 4 ++ src/proto/dyn_memcache.c | 9 ++++ src/proto/dyn_proto.h | 4 ++ src/proto/dyn_redis.c | 93 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 184 insertions(+), 17 deletions(-) diff --git a/src/dyn_client.c b/src/dyn_client.c index eb6d88486..dda48dc0c 100644 --- a/src/dyn_client.c +++ b/src/dyn_client.c @@ -51,6 +51,9 @@ static rstatus_t msg_quorum_rsp_handler(struct msg *req, struct msg *rsp); static msg_response_handler_t msg_get_rsp_handler(struct msg *req); +static rstatus_t rewrite_query_if_necessary(struct msg** req, struct context* ctx); +static rstatus_t fragment_query_if_necessary(struct msg* req, struct conn* conn, + struct msg_tqh* frag_msgq); static void client_ref(struct conn *conn, void *owner) @@ -928,6 +931,37 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *req) } } +/* + * Rewrites a query if necessary. + * + * If a rewrite occured, it will replace '*req' with the new 'msg' that contains the new query + * and free up the original msg. + * + */ +rstatus_t rewrite_query_if_necessary(struct msg** req, struct context* ctx) { + bool did_rewrite = false; + struct msg* new_req = NULL; + rstatus_t ret_status = g_rewrite_query(*req, ctx, &did_rewrite, &new_req); + THROW_STATUS(ret_status); + + if (did_rewrite) { + // If we successfully did a rewrite, we need to recycle the memory used by the original + // request and point it to the 'new_req'. + msg_put(*req); + *req = new_req; + } + return DN_OK; +} + +/* + * Fragments a query if applicable. + * 'frag_msgq' will be non-empty if the query is fragmented. + */ +rstatus_t fragment_query_if_necessary(struct msg* req, struct conn* conn, struct msg_tqh* frag_msgq) { + struct server_pool *pool = conn->owner; + struct rack *rack = server_get_rack_by_dc_rack(pool, &pool->rack, &pool->dc); + return g_fragment(req, pool, rack, frag_msgq); +} void req_recv_done(struct context *ctx, struct conn *conn, @@ -953,16 +987,11 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg_tqh frag_msgq; TAILQ_INIT(&frag_msgq); - struct server_pool *pool = conn->owner; - struct rack *rack = server_get_rack_by_dc_rack(pool, &pool->rack, &pool->dc); - rstatus_t status = g_fragment(req, pool, rack, &frag_msgq); - if (status != DN_OK) { - if (req->expect_datastore_reply) { - conn_enqueue_outq(ctx, conn, req); - } - req_forward_error(ctx, conn, req, DN_OK, status); //TODO: CHeck error code - return; - } + rstatus_t status = rewrite_query_if_necessary(&req, ctx); + if (status != DN_OK) goto error; + + status = fragment_query_if_necessary(req, conn, &frag_msgq); + if (status != DN_OK) goto error; /* if no fragment happened */ if (TAILQ_EMPTY(&frag_msgq)) { @@ -971,13 +1000,7 @@ req_recv_done(struct context *ctx, struct conn *conn, } status = req_make_reply(ctx, conn, req); - if (status != DN_OK) { - if (req->expect_datastore_reply) { - conn_enqueue_outq(ctx, conn, req); - } - req_forward_error(ctx, conn, req, DN_OK, status); - return; - } + if (status != DN_OK) goto error; struct msg *sub_msg, *tmsg; for (sub_msg = TAILQ_FIRST(&frag_msgq); sub_msg != NULL; sub_msg = tmsg) { @@ -989,8 +1012,16 @@ req_recv_done(struct context *ctx, struct conn *conn, } ASSERT(TAILQ_EMPTY(&frag_msgq)); return; + +error: + if (req->expect_datastore_reply) { + conn_enqueue_outq(ctx, conn, req); + } + req_forward_error(ctx, conn, req, DN_OK, status); //TODO: CHeck error code + return; } + static msg_response_handler_t msg_get_rsp_handler(struct msg *req) { diff --git a/src/dyn_message.c b/src/dyn_message.c index 8d596c353..1c247f7f4 100644 --- a/src/dyn_message.c +++ b/src/dyn_message.c @@ -156,6 +156,7 @@ func_msg_fragment_t g_fragment; /* message post-coalesce */ func_msg_verify_t g_verify_request; /* message post-coalesce */ func_is_multikey_request g_is_multikey_request; func_reconcile_responses g_reconcile_responses; +func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary */ #define DEFINE_ACTION(_name) string(#_name), static struct string msg_type_strings[] = { @@ -197,6 +198,7 @@ set_datastore_ops(void) g_verify_request = redis_verify_request; g_is_multikey_request = redis_is_multikey_request; g_reconcile_responses = redis_reconcile_responses; + g_rewrite_query = redis_rewrite_query; break; case DATA_MEMCACHE: g_pre_coalesce = memcache_pre_coalesce; @@ -205,6 +207,7 @@ set_datastore_ops(void) g_verify_request = memcache_verify_request; g_is_multikey_request = memcache_is_multikey_request; g_reconcile_responses = memcache_reconcile_responses; + g_rewrite_query = memcache_rewrite_query; break; default: return; @@ -753,6 +756,29 @@ msg_get_tagged_key(struct msg *req, uint32_t key_index, uint32_t *keylen) return msg_get_key(req, key_index, keylen, true); } +/* + * Returns the 'idx' key in 'msg'. + * + * Transfers ownership of returned buffer to the caller, so the caller must + * take the responsibility of freeing it. + * + * Returns NULL if key does not exist or if we're unable to allocate memory. + */ +uint8_t* msg_get_full_key_copy(struct msg* msg, int idx, uint32_t *keylen) { + // Get a pointer to the required key in 'msg'. + uint8_t* key_ptr = msg_get_full_key(msg, idx, keylen); + + // Allocate a new buffer for the key. + uint8_t* copied_key = dn_alloc((size_t) (*keylen + 1)); + if (copied_key == NULL) return NULL; + + // Copy contents of the key from 'msg' to our new buffer. + dn_memcpy(copied_key, key_ptr, *keylen); + copied_key[*keylen] = '\0'; + + return copied_key; +} + uint32_t msg_payload_crc32(struct msg *rsp) { diff --git a/src/dyn_message.h b/src/dyn_message.h index f32e53835..a7e75f688 100644 --- a/src/dyn_message.h +++ b/src/dyn_message.h @@ -44,6 +44,8 @@ typedef rstatus_t (*msg_response_handler_t)(struct msg *req, struct msg *rsp); typedef bool (*func_msg_failure_t)(struct msg *r); typedef bool (*func_is_multikey_request)(struct msg *r); typedef struct msg *(*func_reconcile_responses)(struct response_mgr *rspmgr); +typedef rstatus_t (*func_msg_rewrite_t)(struct msg *orig_msg, struct context* ctx, + bool* did_rewrite, struct msg** new_msg_ptr); extern func_msg_coalesce_t g_pre_coalesce; /* message pre-coalesce */ extern func_msg_coalesce_t g_post_coalesce; /* message post-coalesce */ @@ -51,6 +53,7 @@ extern func_msg_fragment_t g_fragment; /* message fragment */ extern func_msg_verify_t g_verify_request; /* message verify */ extern func_is_multikey_request g_is_multikey_request; extern func_reconcile_responses g_reconcile_responses; +extern func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary */ void set_datastore_ops(void); @@ -484,6 +487,7 @@ rstatus_t msg_prepend_format(struct msg *msg, const char *fmt, ...); uint8_t *msg_get_tagged_key(struct msg *req, uint32_t key_index, uint32_t *keylen); uint8_t *msg_get_full_key(struct msg *req, uint32_t key_index, uint32_t *keylen); +uint8_t* msg_get_full_key_copy(struct msg* msg, int idx, uint32_t *keylen); struct msg *req_get(struct conn *conn); void req_put(struct msg *msg); diff --git a/src/proto/dyn_memcache.c b/src/proto/dyn_memcache.c index 46516dee7..3518f2d7b 100644 --- a/src/proto/dyn_memcache.c +++ b/src/proto/dyn_memcache.c @@ -1623,3 +1623,12 @@ memcache_reconcile_responses(struct response_mgr *rspmgr) return rsp; } } + +/* + * Placeholder function for memcache query rewrites. + * No rewrites implemented toady. + */ +rstatus_t memcache_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite, + struct msg** new_msg_ptr) { + return DN_OK; +} diff --git a/src/proto/dyn_proto.h b/src/proto/dyn_proto.h index 74fda96db..7e983b662 100644 --- a/src/proto/dyn_proto.h +++ b/src/proto/dyn_proto.h @@ -39,6 +39,8 @@ rstatus_t memcache_fragment(struct msg *r, struct server_pool *pool, struct rack struct msg_tqh *frag_msgq); rstatus_t memcache_verify_request(struct msg *r, struct server_pool *pool, struct rack *rack); +rstatus_t memcache_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite, + struct msg** new_msg_ptr); void redis_parse_req(struct msg *r, const struct string *hash_tag); void redis_parse_rsp(struct msg *r, const struct string *UNUSED); @@ -50,5 +52,7 @@ rstatus_t redis_fragment(struct msg *r, struct server_pool *pool, struct rack *r struct msg_tqh *frag_msgq); rstatus_t redis_verify_request(struct msg *r, struct server_pool *pool, struct rack *rack); +rstatus_t redis_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite, + struct msg** new_msg_ptr); #endif diff --git a/src/proto/dyn_redis.c b/src/proto/dyn_redis.c index 0bed78654..c8d9f3b14 100644 --- a/src/proto/dyn_redis.c +++ b/src/proto/dyn_redis.c @@ -368,6 +368,99 @@ redis_error(struct msg *r) return false; } +/* + * Detects the query and does a rewrite if applicable. + * + * Currently the following queries are rewritten: + * 1) SMEMBERS -> SORT ALPHA (only when DC_SAFE_QUORUM=true) + * We rewrite this query this way since when DC_SAFE_QUORUM is enabled, + * we run this query on multiple nodes and take checksums and compare that + * they're the same. Since SMEMBERS offers no ordering guarantee, even though + * the elements are the same, the order of elements in the set might be + * different causing the checksums to be different and hence causing the + * query to fail. Rewriting it to a SORT query ensures ordering and thus + * ensures that the checksum comparison succeeds. + * + * * Sets *did_rewrite='true' if a rewrite occured and 'false' if not. + * * Does not modify 'orig_msg' and sets 'new_msg_ptr' to point to the new 'msg' struct with the + * rewritten query if 'did_rewrite' is true. + * * Caller must take ownership of the newly allocated msg '*new_msg_ptr'. + */ +rstatus_t redis_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite, + struct msg** new_msg_ptr) { + const char* SMEMBERS_REWRITE_FMT_STRING = "*3\r\n$4\r\nsort\r\n$%d\r\n%s\r\n$5\r\nalpha\r\n"; + + ASSERT(orig_msg != NULL); + ASSERT(orig_msg->is_request); + ASSERT(did_rewrite != NULL); + + *did_rewrite = false; + + struct msg* new_msg = NULL; + uint8_t* key = NULL; + rstatus_t ret_status = DN_OK; + switch (orig_msg->type) { + case MSG_REQ_REDIS_SMEMBERS: + + if (orig_msg->owner->read_consistency == DC_SAFE_QUORUM) { + // SMEMBERS should have only one key. + ASSERT(orig_msg->nkeys == 1); + + // Get a new 'msg' structure. + new_msg = msg_get(orig_msg->owner, true, __FUNCTION__); + if (new_msg == NULL) { + ret_status = DN_ENOMEM; + goto error; + } + + uint32_t keylen; + // Get a copy of the key from 'orig_msg'. + key = msg_get_full_key_copy(orig_msg, 0, &keylen); + if (key == NULL) { + ret_status = DN_ENOMEM; + goto error; + } + + // Write the new command into 'new_msg' + rstatus_t prepend_status = msg_prepend_format(new_msg, SMEMBERS_REWRITE_FMT_STRING, keylen, key); + if (prepend_status != DN_OK) { + ret_status = prepend_status; + goto error; + } + + { + // Point the 'pos' pointer in 'new_msg' to the mbuf we've added. + struct mbuf* new_mbuf = STAILQ_LAST(&new_msg->mhdr, mbuf, next); + new_msg->pos = new_mbuf->pos; + } + // Parse the message 'new_msg' to populate all of its appropriate fields. + new_msg->parser(new_msg, &ctx->pool.hash_tag); + // Check if 'new_msg' was parsed successfully. + if (new_msg->result != MSG_PARSE_OK) { + ret_status = DN_ERROR; + goto error; + } + + *new_msg_ptr = new_msg; + *did_rewrite = true; + goto done; + } + break; + default: + return DN_OK; + } + +error: + if (key != NULL) dn_free(key); + // Return the newly allocated message back to the free message queue. + if (new_msg != NULL) msg_put(new_msg); + return ret_status; + +done: + if (key != NULL) dn_free(key); + return DN_OK; +} + /* * Reference: http://redis.io/topics/protocol *