-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for DEBUG DIGEST module data type callback #21
Conversation
Please check the DCO guide and signoff your PR: https://github.com/valkey-io/valkey-bloom/pull/21/checks?check_run_id=33062120817. |
src/digest.rs
Outdated
|
||
/// `Digest` is a high-level rust interface to the Valkey module C API | ||
/// abstracting away the raw C ffi calls. | ||
pub struct Digest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is a solution until the DEBUG wrapper functionality is added to the valkeymodule-rs SDK.
We can remove this once the DEBUG wrapper functionality is released in a new version
src/wrapper/bloom_callback.rs
Outdated
let mut dig = Digest::new(md); | ||
let val = &*(value.cast::<BloomFilterType>()); | ||
dig.add_long_long(val.expansion.into()); | ||
dig.add_long_long(val.capacity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
capacity()
returns a per BloomFilterType
result which is summed across all filters in the object.
We will need to add data which is specific to the overall BloomFilterType
(including every sub filter).
This means, we need to add the struct member data from the top level - BloomFilterType structure, and then we will need to add the struct member values from the inner BloomFilter
structures in the vector
This is needed for data correctness.
For example: If we just add capacity, we can have an bloom with overall 100 capacity from one single filter. But we can also have another object where this is split across 5 filters adding up to a total of 100. These objects are not the same, hence updating the debug logic as mentioned above will handle this
188835b
to
a33e0e3
Compare
Signed-off-by: Nihal Mehta <[email protected]>
Signed-off-by: Nihal Mehta <[email protected]>
src/digest.rs
Outdated
pub dig: *mut raw::RedisModuleDigest, | ||
} | ||
|
||
impl Digest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this file into the wrapper directory
Signed-off-by: Nihal Mehta <[email protected]>
tests/test_aofrewrite.py
Outdated
@@ -20,7 +20,10 @@ def test_basic_aofrewrite_and_restore(self): | |||
bf_info_result_1 = client.execute_command('BF.INFO testSave') | |||
assert(len(bf_info_result_1)) != 0 | |||
curr_item_count_1 = client.info_obj().num_keys() | |||
|
|||
# cmd debug digest | |||
client.debug_digest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check that this is not None
? Also, when we restart the server later on, can we compare and check that they are the same?
tests/test_aofrewrite.py
Outdated
assert bf_info_result_2 == bf_info_result_1 | ||
assert debug_restore == debug_original | ||
client.execute_command('DEL testSave') | ||
|
||
def test_aofrewrite_bloomfilter_metrics(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add a debug digest test here
tests/test_basic.py
Outdated
@@ -39,10 +39,15 @@ def test_copy_and_exists_cmd(self): | |||
assert client.execute_command('EXISTS filter') == 1 | |||
mexists_result = client.execute_command('BF.MEXISTS filter item1 item2 item3 item4') | |||
assert len(madd_result) == 4 and len(mexists_result) == 4 | |||
# cmd debug digest | |||
client.debug_digest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check that this is not None
? Also, when we restart the server later on, can we compare and check that they are the same?
tests/test_save_and_restore.py
Outdated
@@ -14,7 +14,9 @@ def test_basic_save_and_restore(self): | |||
bf_info_result_1 = client.execute_command('BF.INFO testSave') | |||
assert(len(bf_info_result_1)) != 0 | |||
curr_item_count_1 = client.info_obj().num_keys() | |||
|
|||
client.debug_digest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check that this is not None
? Also, when we restart the server later on, can we compare and check that they are the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And not 0000000000000000000000000000000000000000
@@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker): | |||
self.port_tracker = resource_port_tracker | |||
|
|||
def _get_valkey_args(self): | |||
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": ""}) | |||
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us also add testing in two other places:
- test_correctness.py - both scaling and non scaling filters should have ensured of correctness
- test_replication.py - replicated commands should have the same digest
src/wrapper/bloom_callback.rs
Outdated
dig.add_long_long(val.expansion.into()); | ||
dig.add_string_buffer(&val.fp_rate.to_le_bytes()); | ||
for filter in &val.filters { | ||
dig.add_string_buffer(&filter.bloom.bitmap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add the sip_keys of every filter into the digest. When we compare two bloom objects, the sip keys of hash functions of the bloom filters should be compared as well.
If they are different, they are not the same object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Step1 - Implement sip_keys on the BloomFilter
struct
/// Return the keys used by the sip hasher of the raw bloom.
pub fn sip_keys(&self) -> [(u64, u64); 2] {
self.bloom.sip_keys()
}
Step 2 - Here, from the callback, write the 4 numbers (which are u64) into the digest using add_long_long()
Signed-off-by: Nihal Mehta <[email protected]>
tests/test_aofrewrite.py
Outdated
@@ -20,14 +20,23 @@ def test_basic_aofrewrite_and_restore(self): | |||
bf_info_result_1 = client.execute_command('BF.INFO testSave') | |||
assert(len(bf_info_result_1)) != 0 | |||
curr_item_count_1 = client.info_obj().num_keys() | |||
|
|||
# cmd debug digest | |||
cmd_debug = client.debug_digest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
server_digest
tests/test_aofrewrite.py
Outdated
# cmd debug digest | ||
cmd_debug = client.debug_digest() | ||
assert cmd_debug != None or 0000000000000000000000000000000000000000 | ||
debug_save = client.execute_command('DEBUG DIGEST-VALUE testSave') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
object_digest
tests/test_aofrewrite.py
Outdated
# save aof, restart sever | ||
client.bgrewriteaof() | ||
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) | ||
# Keep the server running for 1 second more to have a larger uptime. | ||
time.sleep(1) | ||
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) | ||
assert self.server.is_alive() | ||
debug_restart = client.debug_digest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restored_server_digest
tests/test_aofrewrite.py
Outdated
# save aof, restart sever | ||
client.bgrewriteaof() | ||
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) | ||
# Keep the server running for 1 second more to have a larger uptime. | ||
time.sleep(1) | ||
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) | ||
assert self.server.is_alive() | ||
debug_restart = client.debug_digest() | ||
assert debug_restart != None or 0000000000000000000000000000000000000000 | ||
debug_restore = client.execute_command('DEBUG DIGEST-VALUE testSave') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restored_object_digest
tests/test_aofrewrite.py
Outdated
@@ -49,12 +58,23 @@ def test_aofrewrite_bloomfilter_metrics(self): | |||
for var in variables: | |||
self.client.execute_command(f'BF.ADD key1 {var}') | |||
|
|||
# cmd debug digest | |||
cmd_debug = self.client.debug_digest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
server_digest
tests/test_aofrewrite.py
Outdated
assert self.server.is_alive() | ||
debug_restart = self.client.debug_digest() | ||
assert debug_restart != None or 0000000000000000000000000000000000000000 | ||
debug_restore = self.client.execute_command('DEBUG DIGEST-VALUE key1') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restored_object_digest
tests/test_aofrewrite.py
Outdated
# save aof, restart sever | ||
self.client.bgrewriteaof() | ||
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) | ||
# restart server | ||
time.sleep(1) | ||
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) | ||
assert self.server.is_alive() | ||
debug_restart = self.client.debug_digest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restored_server_digest
tests/test_basic.py
Outdated
# cmd debug digest | ||
cmd_debug = client.debug_digest() | ||
assert cmd_debug != None or 0000000000000000000000000000000000000000 | ||
debug_filter = client.execute_command('DEBUG DIGEST-VALUE filter') | ||
assert client.execute_command('COPY filter new_filter') == 1 | ||
debug_copy = client.debug_digest() | ||
assert debug_copy != None or 0000000000000000000000000000000000000000 | ||
debug_new_filter = client.execute_command('DEBUG DIGEST-VALUE filter') | ||
assert client.execute_command('EXISTS new_filter') == 1 | ||
copy_mexists_result = client.execute_command('BF.MEXISTS new_filter item1 item2 item3 item4') | ||
assert mexists_result == copy_mexists_result | ||
assert cmd_debug != debug_copy | ||
assert debug_new_filter == debug_filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 . Can we move this into a new test called test_debug_cmd
?
2. Let's create a different object (with different parameters, can check that debug fails). (a) new object + add different items, (b) new object + different fp rate (c) new object + different expansion (d) new object + with no items.
3. Also, let's use the variable naming suggested in earlier test comments
tests/test_correctness.py
Outdated
@@ -54,6 +57,8 @@ def test_non_scaling_filter(self): | |||
item_prefix, | |||
) | |||
self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin) | |||
debug_copy = client.debug_digest() | |||
assert debug_copy != None or 0000000000000000000000000000000000000000 or cmd_debug |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the copied object's digest match?
tests/test_correctness.py
Outdated
@@ -127,5 +134,7 @@ def test_scaling_filter(self): | |||
info = client.execute_command(f'BF.INFO {filter_name}') | |||
it = iter(info) | |||
info_dict = dict(zip(it, it)) | |||
debug_copy = client.debug_digest() | |||
assert debug_copy != None or 0000000000000000000000000000000000000000 or cmd_debug |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the copied object's digest match?
src/bloom/data_type.rs
Outdated
@@ -126,6 +128,22 @@ impl ValkeyDataType for BloomFilterType { | |||
}; | |||
Some(item) | |||
} | |||
|
|||
/// A callback function that is used for DEBUG DIGEST. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the callback.
You can add a comment like this: Function that is used to generate a digest on the Bloom Object.
tests/test_correctness.py
Outdated
debug_copy = client.debug_digest() | ||
assert debug_copy != None or 0000000000000000000000000000000000000000 or cmd_debug |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this
tests/test_correctness.py
Outdated
# cmd debug digest | ||
cmd_debug = client.debug_digest() | ||
assert cmd_debug != None or 0000000000000000000000000000000000000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this
tests/test_correctness.py
Outdated
# cmd debug digest | ||
cmd_debug = client.debug_digest() | ||
assert cmd_debug != None or 0000000000000000000000000000000000000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this
tests/test_correctness.py
Outdated
debug_copy = client.debug_digest() | ||
assert debug_copy != None or 0000000000000000000000000000000000000000 or cmd_debug |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this
Signed-off-by: Nihal Mehta <[email protected]>
tests/test_replication.py
Outdated
object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') | ||
server_digest_replica = self.client.debug_digest() | ||
assert server_digest_primary == server_digest_replica | ||
assert server_digest_replica != None or 0000000000000000000000000000000000000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This check is not needed since we already validated it on the server_digest_primary
tests/test_aofrewrite.py
Outdated
# save aof, restart sever | ||
client.bgrewriteaof() | ||
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) | ||
# Keep the server running for 1 second more to have a larger uptime. | ||
time.sleep(1) | ||
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) | ||
assert self.server.is_alive() | ||
restored_server_digest = client.debug_digest() | ||
assert restored_server_digest != None or 0000000000000000000000000000000000000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This check is not needed since we already validated it on the server_digest
tests/test_aofrewrite.py
Outdated
# save aof, restart sever | ||
self.client.bgrewriteaof() | ||
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) | ||
# restart server | ||
time.sleep(1) | ||
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) | ||
assert self.server.is_alive() | ||
restored_server_digest = self.client.debug_digest() | ||
assert restored_server_digest != None or 0000000000000000000000000000000000000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This check is not needed since we already validated it on the server_digest
default_obj = client.execute_command('BF.RESERVE default_obj 0.001 1000') | ||
default_object_digest = client.execute_command('DEBUG DIGEST-VALUE default_obj') | ||
|
||
scenario1_obj = client.execute_command('BF.INSERT scenario1 error 0.001 capacity 1000 items 1') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Let's add a Scenario 4 to validate with different capacity. So the current Scenario 4 becomes Scenario5 as explained below
- Nit: Let's add one line comment descriptions to explain each scenario.
Example:
# scenario1 validates that digest differs on bloom objects (with same properties) when different items are added.
# scenario2 validates that digest differs on bloom objects with different false positive rate.
# scenario3 validates that digest differs on bloom objects with different expansion.
# scenario4 validates that digest differs on bloom objects with different capacity.
# scenario5 validates that digest is equal on bloom objects with same properties and same items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will add these scenarios
tests/test_save_and_restore.py
Outdated
@@ -26,6 +30,11 @@ def test_basic_save_and_restore(self): | |||
assert self.server.is_alive() | |||
assert uptime_in_sec_1 > uptime_in_sec_2 | |||
assert self.server.is_rdb_done_loading() | |||
restored_server_digest = client.debug_digest() | |||
assert restored_server_digest != None or 0000000000000000000000000000000000000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This check is not needed since we already validated it on the server_digest
Signed-off-by: Nihal Mehta <[email protected]>
* Support for DEBUG DIGEST module data type callback Signed-off-by: Nihal Mehta <[email protected]> * Update test cases Signed-off-by: Nihal Mehta <[email protected]> * Move digest to wrapper Signed-off-by: Nihal Mehta <[email protected]> * Update tests Signed-off-by: Nihal Mehta <[email protected]> * Add more scenarios for debug test Signed-off-by: Nihal Mehta <[email protected]> * Clean code and add scenario for debug test Signed-off-by: Nihal Mehta <[email protected]> --------- Signed-off-by: Nihal Mehta <[email protected]>
Modules can support the DEBUG command on Module data type objects by implementing a Module data type callback. valkey-bloom implements a DIGEST Module data type callback and during a DEBUG operation, we would want to create and return a DIGEST.
In the callback, we add data for BloomFilter object followed by the member data for every BloomFilter structure and sip keys of every filter into the digest to validate that the sip keys of hash functions of the bloom filters are the same.
We have added testing for debug digest for different scenarios like AOF, RDB load, Save and Restore, Copy, Replication and Scaling/Non-scaling.
Closes #9