diff --git a/src/dyn_client.c b/src/dyn_client.c index eb6d88486..dda48dc0c 100644 --- a/src/dyn_client.c +++ b/src/dyn_client.c @@ -51,6 +51,9 @@ static rstatus_t msg_quorum_rsp_handler(struct msg *req, struct msg *rsp); static msg_response_handler_t msg_get_rsp_handler(struct msg *req); +static rstatus_t rewrite_query_if_necessary(struct msg** req, struct context* ctx); +static rstatus_t fragment_query_if_necessary(struct msg* req, struct conn* conn, + struct msg_tqh* frag_msgq); static void client_ref(struct conn *conn, void *owner) @@ -928,6 +931,37 @@ req_forward(struct context *ctx, struct conn *c_conn, struct msg *req) } } +/* + * Rewrites a query if necessary. + * + * If a rewrite occured, it will replace '*req' with the new 'msg' that contains the new query + * and free up the original msg. + * + */ +rstatus_t rewrite_query_if_necessary(struct msg** req, struct context* ctx) { + bool did_rewrite = false; + struct msg* new_req = NULL; + rstatus_t ret_status = g_rewrite_query(*req, ctx, &did_rewrite, &new_req); + THROW_STATUS(ret_status); + + if (did_rewrite) { + // If we successfully did a rewrite, we need to recycle the memory used by the original + // request and point it to the 'new_req'. + msg_put(*req); + *req = new_req; + } + return DN_OK; +} + +/* + * Fragments a query if applicable. + * 'frag_msgq' will be non-empty if the query is fragmented. + */ +rstatus_t fragment_query_if_necessary(struct msg* req, struct conn* conn, struct msg_tqh* frag_msgq) { + struct server_pool *pool = conn->owner; + struct rack *rack = server_get_rack_by_dc_rack(pool, &pool->rack, &pool->dc); + return g_fragment(req, pool, rack, frag_msgq); +} void req_recv_done(struct context *ctx, struct conn *conn, @@ -953,16 +987,11 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg_tqh frag_msgq; TAILQ_INIT(&frag_msgq); - struct server_pool *pool = conn->owner; - struct rack *rack = server_get_rack_by_dc_rack(pool, &pool->rack, &pool->dc); - rstatus_t status = g_fragment(req, pool, rack, &frag_msgq); - if (status != DN_OK) { - if (req->expect_datastore_reply) { - conn_enqueue_outq(ctx, conn, req); - } - req_forward_error(ctx, conn, req, DN_OK, status); //TODO: CHeck error code - return; - } + rstatus_t status = rewrite_query_if_necessary(&req, ctx); + if (status != DN_OK) goto error; + + status = fragment_query_if_necessary(req, conn, &frag_msgq); + if (status != DN_OK) goto error; /* if no fragment happened */ if (TAILQ_EMPTY(&frag_msgq)) { @@ -971,13 +1000,7 @@ req_recv_done(struct context *ctx, struct conn *conn, } status = req_make_reply(ctx, conn, req); - if (status != DN_OK) { - if (req->expect_datastore_reply) { - conn_enqueue_outq(ctx, conn, req); - } - req_forward_error(ctx, conn, req, DN_OK, status); - return; - } + if (status != DN_OK) goto error; struct msg *sub_msg, *tmsg; for (sub_msg = TAILQ_FIRST(&frag_msgq); sub_msg != NULL; sub_msg = tmsg) { @@ -989,8 +1012,16 @@ req_recv_done(struct context *ctx, struct conn *conn, } ASSERT(TAILQ_EMPTY(&frag_msgq)); return; + +error: + if (req->expect_datastore_reply) { + conn_enqueue_outq(ctx, conn, req); + } + req_forward_error(ctx, conn, req, DN_OK, status); //TODO: CHeck error code + return; } + static msg_response_handler_t msg_get_rsp_handler(struct msg *req) { diff --git a/src/dyn_message.c b/src/dyn_message.c index 8d596c353..1c247f7f4 100644 --- a/src/dyn_message.c +++ b/src/dyn_message.c @@ -156,6 +156,7 @@ func_msg_fragment_t g_fragment; /* message post-coalesce */ func_msg_verify_t g_verify_request; /* message post-coalesce */ func_is_multikey_request g_is_multikey_request; func_reconcile_responses g_reconcile_responses; +func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary */ #define DEFINE_ACTION(_name) string(#_name), static struct string msg_type_strings[] = { @@ -197,6 +198,7 @@ set_datastore_ops(void) g_verify_request = redis_verify_request; g_is_multikey_request = redis_is_multikey_request; g_reconcile_responses = redis_reconcile_responses; + g_rewrite_query = redis_rewrite_query; break; case DATA_MEMCACHE: g_pre_coalesce = memcache_pre_coalesce; @@ -205,6 +207,7 @@ set_datastore_ops(void) g_verify_request = memcache_verify_request; g_is_multikey_request = memcache_is_multikey_request; g_reconcile_responses = memcache_reconcile_responses; + g_rewrite_query = memcache_rewrite_query; break; default: return; @@ -753,6 +756,29 @@ msg_get_tagged_key(struct msg *req, uint32_t key_index, uint32_t *keylen) return msg_get_key(req, key_index, keylen, true); } +/* + * Returns the 'idx' key in 'msg'. + * + * Transfers ownership of returned buffer to the caller, so the caller must + * take the responsibility of freeing it. + * + * Returns NULL if key does not exist or if we're unable to allocate memory. + */ +uint8_t* msg_get_full_key_copy(struct msg* msg, int idx, uint32_t *keylen) { + // Get a pointer to the required key in 'msg'. + uint8_t* key_ptr = msg_get_full_key(msg, idx, keylen); + + // Allocate a new buffer for the key. + uint8_t* copied_key = dn_alloc((size_t) (*keylen + 1)); + if (copied_key == NULL) return NULL; + + // Copy contents of the key from 'msg' to our new buffer. + dn_memcpy(copied_key, key_ptr, *keylen); + copied_key[*keylen] = '\0'; + + return copied_key; +} + uint32_t msg_payload_crc32(struct msg *rsp) { diff --git a/src/dyn_message.h b/src/dyn_message.h index f32e53835..a7e75f688 100644 --- a/src/dyn_message.h +++ b/src/dyn_message.h @@ -44,6 +44,8 @@ typedef rstatus_t (*msg_response_handler_t)(struct msg *req, struct msg *rsp); typedef bool (*func_msg_failure_t)(struct msg *r); typedef bool (*func_is_multikey_request)(struct msg *r); typedef struct msg *(*func_reconcile_responses)(struct response_mgr *rspmgr); +typedef rstatus_t (*func_msg_rewrite_t)(struct msg *orig_msg, struct context* ctx, + bool* did_rewrite, struct msg** new_msg_ptr); extern func_msg_coalesce_t g_pre_coalesce; /* message pre-coalesce */ extern func_msg_coalesce_t g_post_coalesce; /* message post-coalesce */ @@ -51,6 +53,7 @@ extern func_msg_fragment_t g_fragment; /* message fragment */ extern func_msg_verify_t g_verify_request; /* message verify */ extern func_is_multikey_request g_is_multikey_request; extern func_reconcile_responses g_reconcile_responses; +extern func_msg_rewrite_t g_rewrite_query; /* rewrite query in a msg if necessary */ void set_datastore_ops(void); @@ -484,6 +487,7 @@ rstatus_t msg_prepend_format(struct msg *msg, const char *fmt, ...); uint8_t *msg_get_tagged_key(struct msg *req, uint32_t key_index, uint32_t *keylen); uint8_t *msg_get_full_key(struct msg *req, uint32_t key_index, uint32_t *keylen); +uint8_t* msg_get_full_key_copy(struct msg* msg, int idx, uint32_t *keylen); struct msg *req_get(struct conn *conn); void req_put(struct msg *msg); diff --git a/src/proto/dyn_memcache.c b/src/proto/dyn_memcache.c index 46516dee7..3518f2d7b 100644 --- a/src/proto/dyn_memcache.c +++ b/src/proto/dyn_memcache.c @@ -1623,3 +1623,12 @@ memcache_reconcile_responses(struct response_mgr *rspmgr) return rsp; } } + +/* + * Placeholder function for memcache query rewrites. + * No rewrites implemented toady. + */ +rstatus_t memcache_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite, + struct msg** new_msg_ptr) { + return DN_OK; +} diff --git a/src/proto/dyn_proto.h b/src/proto/dyn_proto.h index 74fda96db..7e983b662 100644 --- a/src/proto/dyn_proto.h +++ b/src/proto/dyn_proto.h @@ -39,6 +39,8 @@ rstatus_t memcache_fragment(struct msg *r, struct server_pool *pool, struct rack struct msg_tqh *frag_msgq); rstatus_t memcache_verify_request(struct msg *r, struct server_pool *pool, struct rack *rack); +rstatus_t memcache_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite, + struct msg** new_msg_ptr); void redis_parse_req(struct msg *r, const struct string *hash_tag); void redis_parse_rsp(struct msg *r, const struct string *UNUSED); @@ -50,5 +52,7 @@ rstatus_t redis_fragment(struct msg *r, struct server_pool *pool, struct rack *r struct msg_tqh *frag_msgq); rstatus_t redis_verify_request(struct msg *r, struct server_pool *pool, struct rack *rack); +rstatus_t redis_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite, + struct msg** new_msg_ptr); #endif diff --git a/src/proto/dyn_redis.c b/src/proto/dyn_redis.c index 0bed78654..c8d9f3b14 100644 --- a/src/proto/dyn_redis.c +++ b/src/proto/dyn_redis.c @@ -368,6 +368,99 @@ redis_error(struct msg *r) return false; } +/* + * Detects the query and does a rewrite if applicable. + * + * Currently the following queries are rewritten: + * 1) SMEMBERS -> SORT ALPHA (only when DC_SAFE_QUORUM=true) + * We rewrite this query this way since when DC_SAFE_QUORUM is enabled, + * we run this query on multiple nodes and take checksums and compare that + * they're the same. Since SMEMBERS offers no ordering guarantee, even though + * the elements are the same, the order of elements in the set might be + * different causing the checksums to be different and hence causing the + * query to fail. Rewriting it to a SORT query ensures ordering and thus + * ensures that the checksum comparison succeeds. + * + * * Sets *did_rewrite='true' if a rewrite occured and 'false' if not. + * * Does not modify 'orig_msg' and sets 'new_msg_ptr' to point to the new 'msg' struct with the + * rewritten query if 'did_rewrite' is true. + * * Caller must take ownership of the newly allocated msg '*new_msg_ptr'. + */ +rstatus_t redis_rewrite_query(struct msg* orig_msg, struct context* ctx, bool* did_rewrite, + struct msg** new_msg_ptr) { + const char* SMEMBERS_REWRITE_FMT_STRING = "*3\r\n$4\r\nsort\r\n$%d\r\n%s\r\n$5\r\nalpha\r\n"; + + ASSERT(orig_msg != NULL); + ASSERT(orig_msg->is_request); + ASSERT(did_rewrite != NULL); + + *did_rewrite = false; + + struct msg* new_msg = NULL; + uint8_t* key = NULL; + rstatus_t ret_status = DN_OK; + switch (orig_msg->type) { + case MSG_REQ_REDIS_SMEMBERS: + + if (orig_msg->owner->read_consistency == DC_SAFE_QUORUM) { + // SMEMBERS should have only one key. + ASSERT(orig_msg->nkeys == 1); + + // Get a new 'msg' structure. + new_msg = msg_get(orig_msg->owner, true, __FUNCTION__); + if (new_msg == NULL) { + ret_status = DN_ENOMEM; + goto error; + } + + uint32_t keylen; + // Get a copy of the key from 'orig_msg'. + key = msg_get_full_key_copy(orig_msg, 0, &keylen); + if (key == NULL) { + ret_status = DN_ENOMEM; + goto error; + } + + // Write the new command into 'new_msg' + rstatus_t prepend_status = msg_prepend_format(new_msg, SMEMBERS_REWRITE_FMT_STRING, keylen, key); + if (prepend_status != DN_OK) { + ret_status = prepend_status; + goto error; + } + + { + // Point the 'pos' pointer in 'new_msg' to the mbuf we've added. + struct mbuf* new_mbuf = STAILQ_LAST(&new_msg->mhdr, mbuf, next); + new_msg->pos = new_mbuf->pos; + } + // Parse the message 'new_msg' to populate all of its appropriate fields. + new_msg->parser(new_msg, &ctx->pool.hash_tag); + // Check if 'new_msg' was parsed successfully. + if (new_msg->result != MSG_PARSE_OK) { + ret_status = DN_ERROR; + goto error; + } + + *new_msg_ptr = new_msg; + *did_rewrite = true; + goto done; + } + break; + default: + return DN_OK; + } + +error: + if (key != NULL) dn_free(key); + // Return the newly allocated message back to the free message queue. + if (new_msg != NULL) msg_put(new_msg); + return ret_status; + +done: + if (key != NULL) dn_free(key); + return DN_OK; +} + /* * Reference: http://redis.io/topics/protocol *