-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for DEBUG DIGEST module data type callback #21
Changes from 5 commits
4edc19c
743516d
988f056
cba8f17
0232f2d
dc303c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
use std::os::raw::c_char; | ||
use valkey_module::raw; | ||
use valkey_module::ValkeyString; | ||
|
||
/// `Digest` is a high-level rust interface to the Valkey module C API | ||
/// abstracting away the raw C ffi calls. | ||
pub struct Digest { | ||
pub dig: *mut raw::RedisModuleDigest, | ||
} | ||
|
||
impl Digest { | ||
pub const fn new(dig: *mut raw::RedisModuleDigest) -> Self { | ||
Self { dig } | ||
} | ||
|
||
/// Returns the key name of this [`Digest`]. | ||
/// | ||
/// # Panics | ||
/// | ||
/// Will panic if `RedisModule_GetKeyNameFromDigest` is missing in redismodule.h | ||
pub fn get_key_name(&self) -> ValkeyString { | ||
ValkeyString::from_redis_module_string(std::ptr::null_mut(), unsafe { | ||
raw::RedisModule_GetKeyNameFromDigest | ||
.expect("RedisModule_GetKeyNameFromDigest is not available.")(self.dig) | ||
.cast_mut() | ||
}) | ||
} | ||
|
||
/// Returns the database ID of this [`Digest`]. | ||
/// | ||
/// # Panics | ||
/// | ||
/// Will panic if `RedisModule_GetDbIdFromDigest` is missing in redismodule.h | ||
pub fn get_db_id(&self) -> i32 { | ||
unsafe { | ||
raw::RedisModule_GetDbIdFromDigest | ||
.expect("RedisModule_GetDbIdFromDigest is not available.")(self.dig) | ||
} | ||
} | ||
|
||
/// Adds a new element to this [`Digest`]. | ||
/// | ||
/// # Panics | ||
/// | ||
/// Will panic if `RedisModule_DigestAddStringBuffer` is missing in redismodule.h | ||
pub fn add_string_buffer(&mut self, ele: &[u8]) { | ||
unsafe { | ||
raw::RedisModule_DigestAddStringBuffer | ||
.expect("RedisModule_DigestAddStringBuffer is not available.")( | ||
self.dig, | ||
ele.as_ptr().cast::<c_char>(), | ||
ele.len(), | ||
) | ||
} | ||
} | ||
|
||
/// Similar to [`Digest::add_string_buffer`], but takes [`i64`]. | ||
/// | ||
/// # Panics | ||
/// | ||
/// Will panic if `RedisModule_DigestAddLongLong` is missing in redismodule.h | ||
pub fn add_long_long(&mut self, ll: i64) { | ||
unsafe { | ||
raw::RedisModule_DigestAddLongLong | ||
.expect("RedisModule_DigestAddLongLong is not available.")(self.dig, ll) | ||
} | ||
} | ||
|
||
/// Ends the current sequence in this [`Digest`]. | ||
/// | ||
/// # Panics | ||
/// | ||
/// Will panic if `RedisModule_DigestEndSequence` is missing in redismodule.h | ||
pub fn end_sequence(&mut self) { | ||
unsafe { | ||
raw::RedisModule_DigestEndSequence | ||
.expect("RedisModule_DigestEndSequence is not available.")(self.dig) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
pub mod bloom_callback; | ||
pub mod digest; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,23 @@ def test_basic_aofrewrite_and_restore(self): | |
bf_info_result_1 = client.execute_command('BF.INFO testSave') | ||
assert(len(bf_info_result_1)) != 0 | ||
curr_item_count_1 = client.info_obj().num_keys() | ||
|
||
# cmd debug digest | ||
server_digest = client.debug_digest() | ||
assert server_digest != None or 0000000000000000000000000000000000000000 | ||
object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') | ||
|
||
# save aof, restart sever | ||
client.bgrewriteaof() | ||
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) | ||
# Keep the server running for 1 second more to have a larger uptime. | ||
time.sleep(1) | ||
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) | ||
assert self.server.is_alive() | ||
restored_server_digest = client.debug_digest() | ||
assert restored_server_digest != None or 0000000000000000000000000000000000000000 | ||
restored_object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') | ||
assert restored_server_digest == server_digest | ||
assert restored_object_digest == object_digest | ||
|
||
# verify restore results | ||
curr_item_count_2 = client.info_obj().num_keys() | ||
|
@@ -49,12 +58,23 @@ def test_aofrewrite_bloomfilter_metrics(self): | |
for var in variables: | ||
self.client.execute_command(f'BF.ADD key1 {var}') | ||
|
||
# cmd debug digest | ||
server_digest = self.client.debug_digest() | ||
assert server_digest != None or 0000000000000000000000000000000000000000 | ||
object_digest = self.client.execute_command('DEBUG DIGEST-VALUE key1') | ||
|
||
# save aof, restart sever | ||
self.client.bgrewriteaof() | ||
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) | ||
# restart server | ||
time.sleep(1) | ||
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) | ||
assert self.server.is_alive() | ||
restored_server_digest = self.client.debug_digest() | ||
assert restored_server_digest != None or 0000000000000000000000000000000000000000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This check is not needed since we already validated it on the server_digest |
||
restored_object_digest = self.client.execute_command('DEBUG DIGEST-VALUE key1') | ||
assert restored_server_digest == server_digest | ||
assert restored_object_digest == object_digest | ||
|
||
# Check info for scaled bloomfilter matches metrics data for bloomfilter | ||
new_info_obj = self.client.execute_command(f'BF.INFO key1') | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,10 +39,19 @@ def test_copy_and_exists_cmd(self): | |
assert client.execute_command('EXISTS filter') == 1 | ||
mexists_result = client.execute_command('BF.MEXISTS filter item1 item2 item3 item4') | ||
assert len(madd_result) == 4 and len(mexists_result) == 4 | ||
# cmd debug digest | ||
server_digest = client.debug_digest() | ||
assert server_digest != None or 0000000000000000000000000000000000000000 | ||
object_digest = client.execute_command('DEBUG DIGEST-VALUE filter') | ||
assert client.execute_command('COPY filter new_filter') == 1 | ||
copied_server_digest = client.debug_digest() | ||
assert copied_server_digest != None or 0000000000000000000000000000000000000000 | ||
copied_object_digest = client.execute_command('DEBUG DIGEST-VALUE filter') | ||
assert client.execute_command('EXISTS new_filter') == 1 | ||
copy_mexists_result = client.execute_command('BF.MEXISTS new_filter item1 item2 item3 item4') | ||
assert mexists_result == copy_mexists_result | ||
assert server_digest != copied_server_digest | ||
assert copied_object_digest == object_digest | ||
|
||
def test_memory_usage_cmd(self): | ||
client = self.server.get_new_client() | ||
|
@@ -240,3 +249,34 @@ def test_bloom_expiration(self): | |
assert client.execute_command('TTL TEST_PERSIST') > 0 | ||
assert client.execute_command('PERSIST TEST_PERSIST') == 1 | ||
assert client.execute_command('TTL TEST_PERSIST') == -1 | ||
|
||
def test_debug_cmd(self): | ||
client = self.server.get_new_client() | ||
default_obj = client.execute_command('BF.RESERVE default_obj 0.001 1000') | ||
default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj') | ||
|
||
scenario1_obj = client.execute_command('BF.INSERT scenario1 error 0.001 capacity 1000 items 1') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will add these scenarios |
||
scenario1_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario1') | ||
assert scenario1_obj != default_obj | ||
assert scenario1_object_digest != default_object_digest | ||
|
||
scenario2_obj = client.execute_command('BF.INSERT scenario2 error 0.002 capacity 1000 items 1') | ||
scenario2_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario2') | ||
assert scenario2_obj != default_obj | ||
assert scenario2_object_digest != default_object_digest | ||
|
||
scenario3_obj = client.execute_command('BF.INSERT scenario3 error 0.002 capacity 1000 expansion 3 items 1') | ||
scenario3_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario3') | ||
assert scenario3_obj != default_obj | ||
assert scenario3_object_digest != default_object_digest | ||
|
||
scenario4_obj = client.execute_command('BF.INSERT scenario4 error 0.001 capacity 1000 items 1') | ||
scenario4_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario4') | ||
assert scenario4_obj != default_obj | ||
assert scenario4_object_digest != default_object_digest | ||
|
||
client.execute_command('BF.MADD default_obj 1 2 3') | ||
client.execute_command('BF.MADD scenario4 2 3') | ||
madd_default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj') | ||
madd_scenario_object_digest = client.execute_command('DEBUG DIGEST-VALUE scenario4') | ||
assert madd_scenario_object_digest == madd_default_object_digest |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,17 @@ def test_replication_behavior(self): | |
assert primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 2 and replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1 | ||
else: | ||
assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == (expected_calls + 1) and replica_cmd_stats['cmdstat_' + prefix]["calls"] == expected_calls | ||
|
||
# cmd debug digest | ||
server_digest_primary = self.client.debug_digest() | ||
assert server_digest_primary != None or 0000000000000000000000000000000000000000 | ||
object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') | ||
server_digest_replica = self.client.debug_digest() | ||
assert server_digest_primary == server_digest_replica | ||
assert server_digest_replica != None or 0000000000000000000000000000000000000000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This check is not needed since we already validated it on the |
||
debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key') | ||
assert object_digest_primary == debug_digest_replica | ||
|
||
self.client.execute_command('FLUSHALL') | ||
self.waitForReplicaToSyncUp(self.replicas[0]) | ||
self.client.execute_command('CONFIG RESETSTAT') | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,11 @@ def test_basic_save_and_restore(self): | |
bf_info_result_1 = client.execute_command('BF.INFO testSave') | ||
assert(len(bf_info_result_1)) != 0 | ||
curr_item_count_1 = client.info_obj().num_keys() | ||
|
||
# cmd debug digest | ||
server_digest = client.debug_digest() | ||
assert server_digest != None or 0000000000000000000000000000000000000000 | ||
object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') | ||
|
||
# save rdb, restart sever | ||
client.bgsave() | ||
self.server.wait_for_save_done() | ||
|
@@ -26,6 +30,11 @@ def test_basic_save_and_restore(self): | |
assert self.server.is_alive() | ||
assert uptime_in_sec_1 > uptime_in_sec_2 | ||
assert self.server.is_rdb_done_loading() | ||
restored_server_digest = client.debug_digest() | ||
assert restored_server_digest != None or 0000000000000000000000000000000000000000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This check is not needed since we already validated it on the |
||
restored_object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave') | ||
assert restored_server_digest == server_digest | ||
assert restored_object_digest == object_digest | ||
|
||
# verify restore results | ||
curr_item_count_2 = client.info_obj().num_keys() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker): | |
self.port_tracker = resource_port_tracker | ||
|
||
def _get_valkey_args(self): | ||
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": ""}) | ||
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us also add testing in two other places:
|
||
self.args.update(self.get_custom_args()) | ||
return self.args | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This check is not needed since we already validated it on the
server_digest