Skip to content

Commit

Permalink
Unified db rehash method for both standalone and cluster (redis#12848)
Browse files Browse the repository at this point in the history
After redis#11695, we added two functions `rehashingStarted` and
`rehashingCompleted` to the dict structure. We also registered two
handlers for the main database's dict and expire structures. This allows
the main database to record the dict in `rehashing` list when rehashing
starts. Later, in `serverCron`, the `incrementallyRehash` function is
continuously called to perform the rehashing operation. However,
currently, when rehashing is completed, `rehashingCompleted` does not
remove the dict from the `rehashing` list. This results in the
`rehashing` list containing many invalid dicts. Although subsequent cron
checks and removes dicts that don't require rehashing, it is still
inefficient.

This PR implements the functionality to remove the dict from the
`rehashing` list in `rehashingCompleted`. This is achieved by adding
`metadata` to the dict structure, which keeps track of its position in
the `rehashing` list, allowing for quick removal. This approach avoids
storing duplicate dicts in the `rehashing` list.

Additionally, there are other modifications:

1. Whether in standalone or cluster mode, the dict in database is
inserted into the rehashing linked list when rehashing starts. This
eliminates the need to distinguish between standalone and cluster mode
in `incrementallyRehash`. The function only needs to focus on the dicts
in the `rehashing` list that require rehashing.
2. `rehashing` list is moved from per-database to Redis server level.
This decouples `incrementallyRehash` from the database ID, and in
standalone mode, there is no need to iterate over all databases,
avoiding unnecessary access to databases that do not require rehashing.
In the future, even if unsharded-cluster mode supports multiple
databases, there will be no risk involved.
3. The insertion and removal operations of dict structures in the
`rehashing` list are decoupled from `activerehashing` config.
`activerehashing` only controls whether `incrementallyRehash` is
executed in serverCron. There is no need for additional steps when
modifying the `activerehashing` switch, as in redis#12705.
  • Loading branch information
soloestoy authored Dec 15, 2023
1 parent 967fb3c commit d8a21c5
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 104 deletions.
22 changes: 13 additions & 9 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,21 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
if (async) {
emptyDbAsync(&dbarray[j]);
} else {
dbDictMetadata *metadata;
for (int k = 0; k < dbarray[j].dict_count; k++) {
dictEmpty(dbarray[j].dict[k],callback);
metadata = (dbDictMetadata *)dictMetadata(dbarray[j].dict[k]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}

dictEmpty(dbarray[j].expires[k],callback);
metadata = (dbDictMetadata *)dictMetadata(dbarray[j].expires[k]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
}
}
/* Because all keys of database are removed, reset average ttl. */
Expand All @@ -682,8 +694,6 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
dbarray[j].sub_dict[subdict].key_count = 0;
dbarray[j].sub_dict[subdict].resize_cursor = -1;
if (server.cluster_enabled) {
if (dbarray[j].sub_dict[subdict].rehashing)
listEmpty(dbarray[j].sub_dict[subdict].rehashing);
dbarray[j].sub_dict[subdict].bucket_count = 0;
unsigned long long *slot_size_index = dbarray[j].sub_dict[subdict].slot_size_index;
memset(slot_size_index, 0, sizeof(unsigned long long) * (CLUSTER_SLOTS + 1));
Expand Down Expand Up @@ -757,7 +767,6 @@ redisDb *initTempDb(void) {
tempDb[i].dict = dictCreateMultiple(&dbDictType, tempDb[i].dict_count);
tempDb[i].expires = dictCreateMultiple(&dbExpiresDictType, tempDb[i].dict_count);
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
tempDb[i].sub_dict[subdict].rehashing = listCreate();
tempDb[i].sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL;
}
}
Expand All @@ -779,7 +788,6 @@ void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
zfree(tempDb[i].dict);
zfree(tempDb[i].expires);
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
listRelease(tempDb[i].sub_dict[subdict].rehashing);
if (server.cluster_enabled) {
zfree(tempDb[i].sub_dict[subdict].slot_size_index);
}
Expand Down Expand Up @@ -1445,7 +1453,7 @@ size_t dbMemUsage(redisDb *db, dbKeyType keyType) {
unsigned long long keys_count = dbSize(db, keyType);
mem += keys_count * dictEntryMemUsage() +
dbBuckets(db, keyType) * sizeof(dictEntry*) +
db->dict_count * sizeof(dict);
db->dict_count * (sizeof(dict) + dictMetadataSize(db->dict[0]));
if (keyType == DB_MAIN) {
mem+=keys_count * sizeof(robj);
}
Expand Down Expand Up @@ -1890,7 +1898,6 @@ int dbSwapDatabases(int id1, int id2) {
db1->expires_cursor = db2->expires_cursor;
db1->dict_count = db2->dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db1->sub_dict[subdict].rehashing = db2->sub_dict[subdict].rehashing;
db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count;
db1->sub_dict[subdict].bucket_count = db2->sub_dict[subdict].bucket_count;
db1->sub_dict[subdict].non_empty_slots = db2->sub_dict[subdict].non_empty_slots;
Expand All @@ -1904,7 +1911,6 @@ int dbSwapDatabases(int id1, int id2) {
db2->expires_cursor = aux.expires_cursor;
db2->dict_count = aux.dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db2->sub_dict[subdict].rehashing = aux.sub_dict[subdict].rehashing;
db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
db2->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count;
db2->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots;
Expand Down Expand Up @@ -1950,7 +1956,6 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
activedb->expires_cursor = newdb->expires_cursor;
activedb->dict_count = newdb->dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
activedb->sub_dict[subdict].rehashing = newdb->sub_dict[subdict].rehashing;
activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count;
activedb->sub_dict[subdict].bucket_count = newdb->sub_dict[subdict].bucket_count;
activedb->sub_dict[subdict].non_empty_slots = newdb->sub_dict[subdict].non_empty_slots;
Expand All @@ -1964,7 +1969,6 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
newdb->expires_cursor = aux.expires_cursor;
newdb->dict_count = aux.dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
newdb->sub_dict[subdict].rehashing = aux.sub_dict[subdict].rehashing;
newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
newdb->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count;
newdb->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots;
Expand Down
16 changes: 10 additions & 6 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ static void _dictReset(dict *d, int htidx)
/* Create a new hash table */
dict *dictCreate(dictType *type)
{
dict *d = zmalloc(sizeof(*d));
size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes(NULL) : 0;
dict *d = zmalloc(sizeof(*d)+metasize);
if (metasize > 0) {
memset(dictMetadata(d), 0, metasize);
}
_dictInit(d,type);
return d;
}
Expand Down Expand Up @@ -399,10 +403,10 @@ long long timeInMilliseconds(void) {
return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
}

/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
* than 0, and is smaller than 1 in most cases. The exact upper bound
/* Rehash in us+"delta" microseconds. The value of "delta" is larger
* than 0, and is smaller than 1000 in most cases. The exact upper bound
* depends on the running time of dictRehash(d,100).*/
int dictRehashMilliseconds(dict *d, unsigned int ms) {
int dictRehashMicroseconds(dict *d, uint64_t us) {
if (d->pauserehash > 0) return 0;

monotime timer;
Expand All @@ -411,7 +415,7 @@ int dictRehashMilliseconds(dict *d, unsigned int ms) {

while(dictRehash(d,100)) {
rehashes += 100;
if (elapsedMs(timer) >= ms) break;
if (elapsedUs(timer) >= us) break;
}
return rehashes;
}
Expand Down Expand Up @@ -1714,7 +1718,7 @@ int dictTest(int argc, char **argv, int flags) {

/* Wait for rehashing. */
while (dictIsRehashing(dict)) {
dictRehashMilliseconds(dict,100);
dictRehashMicroseconds(dict,100*1000);
}

start_benchmark();
Expand Down
11 changes: 9 additions & 2 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ typedef struct dictType {
/* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists
* and are cleaned up after this callback. */
void (*rehashingCompleted)(dict *d);
/* Allow a dict to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when a dict is allocated. */
size_t (*dictMetadataBytes)(dict *d);
/* Flags */
/* The 'no_value' flag, if set, indicates that values are not used, i.e. the
* dict is a set. When this flag is set, it's not possible to access the
Expand Down Expand Up @@ -88,6 +91,7 @@ struct dict {
/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
void *metadata[];
};

/* If safe is set to 1 this is a safe iterator, that means, you can call
Expand Down Expand Up @@ -140,6 +144,10 @@ typedef struct {
(d)->type->keyCompare((d), key1, key2) : \
(key1) == (key2))

#define dictMetadata(d) (&(d)->metadata)
#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \
? (d)->type->dictMetadataBytes(d) : 0)

#define dictHashKey(d, key) ((d)->type->hashFunction(key))
#define dictBuckets(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1]))
#define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1])
Expand All @@ -166,7 +174,6 @@ dict *dictCreate(dictType *type);
dict **dictCreateMultiple(dictType *type, int count);
int dictExpand(dict *d, unsigned long size);
int dictTryExpand(dict *d, unsigned long size);
void *dictMetadata(dict *d);
int dictAdd(dict *d, void *key, void *val);
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
void *dictFindPositionForInsert(dict *d, const void *key, dictEntry **existing);
Expand Down Expand Up @@ -215,7 +222,7 @@ uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len);
void dictEmpty(dict *d, void(callback)(dict*));
void dictSetResizeEnabled(dictResizeEnable enable);
int dictRehash(dict *d, int n);
int dictRehashMilliseconds(dict *d, unsigned int ms);
int dictRehashMicroseconds(dict *d, uint64_t us);
void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
Expand Down
14 changes: 14 additions & 0 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,20 @@ void freeObjAsync(robj *key, robj *obj, int dbid) {
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dbDictMetadata *metadata;
for (int i = 0; i < db->dict_count; i++) {
metadata = (dbDictMetadata *)dictMetadata(db->dict[i]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}

metadata = (dbDictMetadata *)dictMetadata(db->expires[i]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
}
dict **oldDict = db->dict;
dict **oldExpires = db->expires;
atomicIncr(lazyfree_objects,dbSize(db, DB_MAIN));
Expand Down
Loading

0 comments on commit d8a21c5

Please sign in to comment.