Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for DEBUG DIGEST module data type callback #21

Merged
merged 6 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::configs::{
use crate::metrics::BLOOM_NUM_OBJECTS;
use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES;
use crate::wrapper::bloom_callback;
use crate::wrapper::digest;
use crate::MODULE_NAME;
use std::mem;
use std::os::raw::c_int;
Expand All @@ -26,10 +27,10 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
rdb_load: Some(bloom_callback::bloom_rdb_load),
rdb_save: Some(bloom_callback::bloom_rdb_save),
aof_rewrite: Some(bloom_callback::bloom_aof_rewrite),
digest: Some(bloom_callback::bloom_digest),

mem_usage: Some(bloom_callback::bloom_mem_usage),
// TODO
digest: None,
free: Some(bloom_callback::bloom_free),

aux_load: Some(bloom_callback::bloom_aux_load),
Expand Down
16 changes: 16 additions & 0 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::bloom;
use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::wrapper::digest::Digest;
use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
Expand Down Expand Up @@ -118,6 +119,21 @@ pub unsafe extern "C" fn bloom_copy(
Box::into_raw(bb).cast::<libc::c_void>()
}

/// # Safety
/// Raw handler for the Bloom digest callback.
pub unsafe extern "C" fn bloom_digest(md: *mut raw::RedisModuleDigest, value: *mut c_void) {
let mut dig = Digest::new(md);
let val = &*(value.cast::<BloomFilterType>());
dig.add_long_long(val.expansion.into());
dig.add_string_buffer(&val.fp_rate.to_le_bytes());
for filter in &val.filters {
dig.add_string_buffer(&filter.bloom.bitmap());
Copy link
Member

@KarthikSubbarao KarthikSubbarao Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add the sip_keys of every filter into the digest. When we compare two bloom objects, the sip keys of hash functions of the bloom filters should be compared as well.

If they are different, they are not the same object

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Step1 - Implement sip_keys on the BloomFilter struct

    /// Return the keys used by the sip hasher of the raw bloom.
    pub fn sip_keys(&self) -> [(u64, u64); 2] {
        self.bloom.sip_keys()
    }

Step 2 - Here, from the callback, write the 4 numbers (which are u64) into the digest using add_long_long()

dig.add_long_long(filter.num_items.into());
dig.add_long_long(filter.capacity.into());
}
dig.end_sequence();
}

/// # Safety
/// Raw handler for the Bloom object's free_effort callback.
pub unsafe extern "C" fn bloom_free_effort(
Expand Down
80 changes: 80 additions & 0 deletions src/wrapper/digest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::os::raw::c_char;
use valkey_module::raw;
use valkey_module::ValkeyString;

/// `Digest` is a high-level rust interface to the Valkey module C API
/// abstracting away the raw C ffi calls.
pub struct Digest {
pub dig: *mut raw::RedisModuleDigest,
}

impl Digest {
pub const fn new(dig: *mut raw::RedisModuleDigest) -> Self {
Self { dig }
}

/// Returns the key name of this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_GetKeyNameFromDigest` is missing in redismodule.h
pub fn get_key_name(&self) -> ValkeyString {
ValkeyString::from_redis_module_string(std::ptr::null_mut(), unsafe {
raw::RedisModule_GetKeyNameFromDigest
.expect("RedisModule_GetKeyNameFromDigest is not available.")(self.dig)
.cast_mut()
})
}

/// Returns the database ID of this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_GetDbIdFromDigest` is missing in redismodule.h
pub fn get_db_id(&self) -> i32 {
unsafe {
raw::RedisModule_GetDbIdFromDigest
.expect("RedisModule_GetDbIdFromDigest is not available.")(self.dig)
}
}

/// Adds a new element to this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestAddStringBuffer` is missing in redismodule.h
pub fn add_string_buffer(&mut self, ele: &[u8]) {
unsafe {
raw::RedisModule_DigestAddStringBuffer
.expect("RedisModule_DigestAddStringBuffer is not available.")(
self.dig,
ele.as_ptr().cast::<c_char>(),
ele.len(),
)
}
}

/// Similar to [`Digest::add_string_buffer`], but takes [`i64`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestAddLongLong` is missing in redismodule.h
pub fn add_long_long(&mut self, ll: i64) {
unsafe {
raw::RedisModule_DigestAddLongLong
.expect("RedisModule_DigestAddLongLong is not available.")(self.dig, ll)
}
}

/// Ends the current sequence in this [`Digest`].
///
/// # Panics
///
/// Will panic if `RedisModule_DigestEndSequence` is missing in redismodule.h
pub fn end_sequence(&mut self) {
unsafe {
raw::RedisModule_DigestEndSequence
.expect("RedisModule_DigestEndSequence is not available.")(self.dig)
}
}
}
1 change: 1 addition & 0 deletions src/wrapper/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod bloom_callback;
pub mod digest;
7 changes: 6 additions & 1 deletion tests/test_aofrewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ def test_basic_aofrewrite_and_restore(self):
bf_info_result_1 = client.execute_command('BF.INFO testSave')
assert(len(bf_info_result_1)) != 0
curr_item_count_1 = client.info_obj().num_keys()

# cmd debug digest
client.debug_digest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check that this is not None? Also, when we restart the server later on, can we compare and check that they are the same?

debug_original = client.execute_command('DEBUG DIGEST-VALUE testSave')

# save aof, restart sever
client.bgrewriteaof()
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE)
Expand All @@ -35,7 +38,9 @@ def test_basic_aofrewrite_and_restore(self):
bf_exists_result_2 = client.execute_command('BF.EXISTS testSave item')
assert bf_exists_result_2 == 1
bf_info_result_2 = client.execute_command('BF.INFO testSave')
debug_restore = client.execute_command('DEBUG DIGEST-VALUE testSave')
assert bf_info_result_2 == bf_info_result_1
assert debug_restore == debug_original
client.execute_command('DEL testSave')

def test_aofrewrite_bloomfilter_metrics(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add a debug digest test here

Expand Down
5 changes: 5 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ def test_copy_and_exists_cmd(self):
assert client.execute_command('EXISTS filter') == 1
mexists_result = client.execute_command('BF.MEXISTS filter item1 item2 item3 item4')
assert len(madd_result) == 4 and len(mexists_result) == 4
# cmd debug digest
client.debug_digest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check that this is not None? Also, when we restart the server later on, can we compare and check that they are the same?

debug_filter = client.execute_command('DEBUG DIGEST-VALUE filter')
assert client.execute_command('COPY filter new_filter') == 1
debug_new_filter = client.execute_command('DEBUG DIGEST-VALUE filter')
assert client.execute_command('EXISTS new_filter') == 1
copy_mexists_result = client.execute_command('BF.MEXISTS new_filter item1 item2 item3 item4')
assert mexists_result == copy_mexists_result
assert debug_new_filter == debug_filter

def test_memory_usage_cmd(self):
client = self.server.get_new_client()
Expand Down
6 changes: 5 additions & 1 deletion tests/test_save_and_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def test_basic_save_and_restore(self):
bf_info_result_1 = client.execute_command('BF.INFO testSave')
assert(len(bf_info_result_1)) != 0
curr_item_count_1 = client.info_obj().num_keys()

client.debug_digest()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check that this is not None? Also, when we restart the server later on, can we compare and check that they are the same?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And not 0000000000000000000000000000000000000000

debug_save_1 = client.execute_command('DEBUG DIGEST-VALUE testSave')

# save rdb, restart sever
client.bgsave()
self.server.wait_for_save_done()
Expand All @@ -33,7 +35,9 @@ def test_basic_save_and_restore(self):
bf_exists_result_2 = client.execute_command('BF.EXISTS testSave item')
assert bf_exists_result_2 == 1
bf_info_result_2 = client.execute_command('BF.INFO testSave')
debug_save_2 = client.execute_command('DEBUG DIGEST-VALUE testSave')
assert bf_info_result_2 == bf_info_result_1
assert debug_save_2 == debug_save_1

def test_restore_failed_large_bloom_filter(self):
client = self.server.get_new_client()
Expand Down
2 changes: 1 addition & 1 deletion tests/valkeytests/valkey_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def port_tracker_fixture(self, resource_port_tracker):
self.port_tracker = resource_port_tracker

def _get_valkey_args(self):
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": ""})
self.args.update({"maxmemory":self.maxmemory, "maxmemory-policy":"allkeys-random", "activerehashing":"yes", "databases": self.num_dbs, "repl-diskless-sync": "yes", "save": "", "enable-debug-command":"yes"})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us also add testing in two other places:

  1. test_correctness.py - both scaling and non scaling filters should have ensured of correctness
  2. test_replication.py - replicated commands should have the same digest

self.args.update(self.get_custom_args())
return self.args

Expand Down