Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for DEBUG DIGEST module data type callback #21

Merged
merged 6 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::configs::{
use crate::metrics::BLOOM_NUM_OBJECTS;
use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES;
use crate::wrapper::bloom_callback;
use crate::wrapper::digest::Digest;
use crate::MODULE_NAME;
use std::mem;
use std::os::raw::c_int;
Expand All @@ -26,10 +27,10 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
rdb_load: Some(bloom_callback::bloom_rdb_load),
rdb_save: Some(bloom_callback::bloom_rdb_save),
aof_rewrite: Some(bloom_callback::bloom_aof_rewrite),
digest: Some(bloom_callback::bloom_digest),

mem_usage: Some(bloom_callback::bloom_mem_usage),
// TODO
digest: None,
free: Some(bloom_callback::bloom_free),

aux_load: Some(bloom_callback::bloom_aux_load),
Expand All @@ -54,6 +55,7 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(

pub trait ValkeyDataType {
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType>;
fn debug_digest(&self, dig: Digest);
}

impl ValkeyDataType for BloomFilterType {
Expand Down Expand Up @@ -126,6 +128,22 @@ impl ValkeyDataType for BloomFilterType {
};
Some(item)
}

/// A callback function that is used for DEBUG DIGEST.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the callback.

You can add a comment like this: Function that is used to generate a digest on the Bloom Object.

fn debug_digest(&self, mut dig: Digest) {
dig.add_long_long(self.expansion.into());
dig.add_string_buffer(&self.fp_rate.to_le_bytes());
for filter in &self.filters {
dig.add_string_buffer(&filter.bloom.bitmap());
for &(key1, key2) in &filter.sip_keys() {
dig.add_long_long(key1 as i64);
dig.add_long_long(key2 as i64);
}
dig.add_long_long(filter.num_items.into());
dig.add_long_long(filter.capacity.into());
}
dig.end_sequence();
}
}

/// Load the auxiliary data outside of the regular keyspace from the RDB file
Expand Down
4 changes: 4 additions & 0 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ impl BloomFilter {
self.bloom.set(item)
}

pub fn sip_keys(&self) -> [(u64, u64); 2] {
self.bloom.sip_keys()
}

/// Create a new BloomFilter from an existing BloomFilter object (COPY command).
pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter {
BloomFilter::from_existing(
Expand Down
9 changes: 9 additions & 0 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::bloom;
use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::wrapper::digest::Digest;
use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
Expand Down Expand Up @@ -118,6 +119,14 @@ pub unsafe extern "C" fn bloom_copy(
Box::into_raw(bb).cast::<libc::c_void>()
}

/// # Safety
/// Raw handler for the Bloom digest callback.
pub unsafe extern "C" fn bloom_digest(md: *mut raw::RedisModuleDigest, value: *mut c_void) {
let mut dig = Digest::new(md);
let val = &*(value.cast::<BloomFilterType>());
val.debug_digest(dig);
}

/// # Safety
/// Raw handler for the Bloom object's free_effort callback.
pub unsafe extern "C" fn bloom_free_effort(
Expand Down
80 changes: 80 additions & 0 deletions src/wrapper/digest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::os::raw::c_char;
use valkey_module::raw;
use valkey_module::ValkeyString;

/// `Digest` is a high-level rust interface to the Valkey module C API
/// abstracting away the raw C ffi calls.
pub struct Digest {
pub dig: *mut raw::RedisModuleDigest,
}

impl Digest {
pub const fn new(dig: *mut raw::RedisModuleDigest) -> Self {
Self { dig }
}

/// Returns the key name of this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_GetKeyNameFromDigest` is missing in redismodule.h
pub fn get_key_name(&self) -> ValkeyString {
ValkeyString::from_redis_module_string(std::ptr::null_mut(), unsafe {
raw::RedisModule_GetKeyNameFromDigest
.expect("RedisModule_GetKeyNameFromDigest is not available.")(self.dig)
.cast_mut()
})
}

/// Returns the database ID of this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_GetDbIdFromDigest` is missing in redismodule.h
pub fn get_db_id(&self) -> i32 {
unsafe {
raw::RedisModule_GetDbIdFromDigest
.expect("RedisModule_GetDbIdFromDigest is not available.")(self.dig)
}
}

/// Adds a new element to this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestAddStringBuffer` is missing in redismodule.h
pub fn add_string_buffer(&mut self, ele: &[u8]) {
unsafe {
raw::RedisModule_DigestAddStringBuffer
.expect("RedisModule_DigestAddStringBuffer is not available.")(
self.dig,
ele.as_ptr().cast::<c_char>(),
ele.len(),
)
}
}

/// Similar to [`Digest::add_string_buffer`], but takes [`i64`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestAddLongLong` is missing in redismodule.h
pub fn add_long_long(&mut self, ll: i64) {
unsafe {
raw::RedisModule_DigestAddLongLong
.expect("RedisModule_DigestAddLongLong is not available.")(self.dig, ll)
}
}

/// Ends the current sequence in this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestEndSequence` is missing in redismodule.h
pub fn end_sequence(&mut self) {
unsafe {
raw::RedisModule_DigestEndSequence
.expect("RedisModule_DigestEndSequence is not available.")(self.dig)
}
}
}
1 change: 1 addition & 0 deletions src/wrapper/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod bloom_callback;
pub mod digest;
22 changes: 21 additions & 1 deletion tests/test_aofrewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@ def test_basic_aofrewrite_and_restore(self):
bf_info_result_1 = client.execute_command('BF.INFO testSave')
assert(len(bf_info_result_1)) != 0
curr_item_count_1 = client.info_obj().num_keys()

# cmd debug digest
cmd_debug = client.debug_digest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

server_digest

assert cmd_debug != None or 0000000000000000000000000000000000000000
debug_save = client.execute_command('DEBUG DIGEST-VALUE testSave')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object_digest


# save aof, restart sever
client.bgrewriteaof()
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE)
# Keep the server running for 1 second more to have a larger uptime.
time.sleep(1)
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True)
assert self.server.is_alive()
debug_restart = client.debug_digest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored_server_digest

assert debug_restart != None or 0000000000000000000000000000000000000000
debug_restore = client.execute_command('DEBUG DIGEST-VALUE testSave')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored_object_digest

assert debug_restart == cmd_debug
assert debug_restore == debug_save

# verify restore results
curr_item_count_2 = client.info_obj().num_keys()
Expand All @@ -49,12 +58,23 @@ def test_aofrewrite_bloomfilter_metrics(self):
for var in variables:
self.client.execute_command(f'BF.ADD key1 {var}')

# cmd debug digest
cmd_debug = self.client.debug_digest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

server_digest

assert cmd_debug != None or 0000000000000000000000000000000000000000
debug_save = self.client.execute_command('DEBUG DIGEST-VALUE key1')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object_digest


# save aof, restart sever
self.client.bgrewriteaof()
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE)
# restart server
time.sleep(1)
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True)
assert self.server.is_alive()
debug_restart = self.client.debug_digest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored_server_digest

assert debug_restart != None or 0000000000000000000000000000000000000000
debug_restore = self.client.execute_command('DEBUG DIGEST-VALUE key1')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored_object_digest

assert debug_restart == cmd_debug
assert debug_restore == debug_save

# Check info for scaled bloomfilter matches metrics data for bloomfilter
new_info_obj = self.client.execute_command(f'BF.INFO key1')
Expand Down
9 changes: 9 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,19 @@ def test_copy_and_exists_cmd(self):
assert client.execute_command('EXISTS filter') == 1
mexists_result = client.execute_command('BF.MEXISTS filter item1 item2 item3 item4')
assert len(madd_result) == 4 and len(mexists_result) == 4
# cmd debug digest
cmd_debug = client.debug_digest()
assert cmd_debug != None or 0000000000000000000000000000000000000000
debug_filter = client.execute_command('DEBUG DIGEST-VALUE filter')
assert client.execute_command('COPY filter new_filter') == 1
debug_copy = client.debug_digest()
assert debug_copy != None or 0000000000000000000000000000000000000000
debug_new_filter = client.execute_command('DEBUG DIGEST-VALUE filter')
assert client.execute_command('EXISTS new_filter') == 1
copy_mexists_result = client.execute_command('BF.MEXISTS new_filter item1 item2 item3 item4')
assert mexists_result == copy_mexists_result
assert cmd_debug != debug_copy
assert debug_new_filter == debug_filter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 . Can we move this into a new test called test_debug_cmd?
2. Let's create a different object (with different parameters, can check that debug fails). (a) new object + add different items, (b) new object + different fp rate (c) new object + different expansion (d) new object + with no items.
3. Also, let's use the variable naming suggested in earlier test comments


def test_memory_usage_cmd(self):
client = self.server.get_new_client()
Expand Down
11 changes: 10 additions & 1 deletion tests/test_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ def test_non_scaling_filter(self):
error_count, add_operation_idx = self.add_items_till_capacity(client, filter_name, capacity, 1, item_prefix)
with pytest.raises(Exception, match="non scaling filter is full"):
client.execute_command(f'BF.ADD {filter_name} new_item')
# cmd debug digest
cmd_debug = client.debug_digest()
assert cmd_debug != None or 0000000000000000000000000000000000000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this

# Validate that is is filled.
info = client.execute_command(f'BF.INFO {filter_name}')
it = iter(info)
Expand Down Expand Up @@ -54,6 +57,8 @@ def test_non_scaling_filter(self):
item_prefix,
)
self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin)
debug_copy = client.debug_digest()
assert debug_copy != None or 0000000000000000000000000000000000000000 or cmd_debug
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the copied object's digest match?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this

# Validate correctness on a copy of a non scaling bloom filter.
self.validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict)

Expand All @@ -67,7 +72,9 @@ def test_scaling_filter(self):
filter_name = "filter1"
# Create a scaling bloom filter and validate its behavior.
assert client.execute_command(f'BF.RESERVE {filter_name} {expected_fp_rate} {initial_capacity} EXPANSION {expansion}') == b"OK"

# cmd debug digest
cmd_debug = client.debug_digest()
assert cmd_debug != None or 0000000000000000000000000000000000000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this

info = client.execute_command(f'BF.INFO {filter_name}')
it = iter(info)
info_dict = dict(zip(it, it))
Expand Down Expand Up @@ -127,5 +134,7 @@ def test_scaling_filter(self):
info = client.execute_command(f'BF.INFO {filter_name}')
it = iter(info)
info_dict = dict(zip(it, it))
debug_copy = client.debug_digest()
assert debug_copy != None or 0000000000000000000000000000000000000000 or cmd_debug
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the copied object's digest match?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this

# Validate correctness on a copy of a scaling bloom filter.
self.validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict)
11 changes: 11 additions & 0 deletions tests/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ def test_replication_behavior(self):
assert primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 2 and replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1
else:
assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == (expected_calls + 1) and replica_cmd_stats['cmdstat_' + prefix]["calls"] == expected_calls

# cmd debug digest
debug_primary = self.client.debug_digest()
assert debug_primary != None or 0000000000000000000000000000000000000000
debug_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key')
debug_replica = self.client.debug_digest()
assert debug_primary == debug_replica
assert debug_replica != None or 0000000000000000000000000000000000000000
debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key')
assert debug_digest_primary == debug_digest_replica

self.client.execute_command('FLUSHALL')
self.waitForReplicaToSyncUp(self.replicas[0])
self.client.execute_command('CONFIG RESETSTAT')
Expand Down
11 changes: 10 additions & 1 deletion tests/test_save_and_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ def test_basic_save_and_restore(self):
bf_info_result_1 = client.execute_command('BF.INFO testSave')
assert(len(bf_info_result_1)) != 0
curr_item_count_1 = client.info_obj().num_keys()

# cmd debug digest
cmd_debug = client.debug_digest()
assert cmd_debug != None or 0000000000000000000000000000000000000000
debug_save = client.execute_command('DEBUG DIGEST-VALUE testSave')

# save rdb, restart sever
client.bgsave()
self.server.wait_for_save_done()
Expand All @@ -26,6 +30,11 @@ def test_basic_save_and_restore(self):
assert self.server.is_alive()
assert uptime_in_sec_1 > uptime_in_sec_2
assert self.server.is_rdb_done_loading()
debug_restart = client.debug_digest()
assert debug_restart != None or 0000000000000000000000000000000000000000
debug_restore = client.execute_command('DEBUG DIGEST-VALUE testSave')
assert debug_restart == cmd_debug
assert debug_restore == debug_save

# verify restore results
curr_item_count_2 = client.info_obj().num_keys()
Expand Down
3 changes: 3 additions & 0 deletions tests/valkey_bloom_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def validate_copied_bloom_correctness(self, client, original_filter_name, item_p
"""
copy_filter_name = "filter_copy"
assert client.execute_command(f'COPY {original_filter_name} {copy_filter_name}') == 1
debug_filter = client.execute_command(f'DEBUG DIGEST-VALUE {original_filter_name}')
debug_copy = client.execute_command(f'DEBUG DIGEST-VALUE {copy_filter_name}')
assert debug_copy == debug_filter
assert client.execute_command('DBSIZE') == 2
copy_info = client.execute_command(f'BF.INFO {copy_filter_name}')
copy_it = iter(copy_info)
Expand Down
2 changes: 1 addition & 1 deletion tests/valkeytests/valkey_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker):
self.port_tracker = resource_port_tracker

def _get_valkey_args(self):
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": ""})
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us also add testing in two other places:

  1. test_correctness.py - both scaling and non scaling filters should have ensured of correctness
  2. test_replication.py - replicated commands should have the same digest

self.args.update(self.get_custom_args())
return self.args

Expand Down