From 86286482be89ef6d4746a4fe1c11992bf7affdd1 Mon Sep 17 00:00:00 2001 From: zackcam Date: Fri, 22 Nov 2024 06:13:02 +0000 Subject: [PATCH 1/6] Updating how we create BloomFilter from rdb loads. BloomFilter vec now has capacity of filter we are loading from Signed-off-by: zackcam --- src/bloom/data_type.rs | 3 ++- src/bloom/utils.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 215f182..3d6dd85 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -59,7 +59,6 @@ pub trait ValkeyDataType { impl ValkeyDataType for BloomFilterType { /// Callback to load and parse RDB data of a bloom item and create it. fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option { - let mut filters = Vec::new(); if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION { logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str()); return None; @@ -73,6 +72,8 @@ impl ValkeyDataType for BloomFilterType { let Ok(fp_rate) = raw::load_double(rdb) else { return None; }; + let mut filters: Vec = Vec::with_capacity(num_filters as usize); + for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 55e327f..e9d36c0 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -104,7 +104,7 @@ impl BloomFilterType { /// Create a new BloomFilterType object from an existing one. pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType { - let mut filters = Vec::new(); + let mut filters: Vec = Vec::with_capacity(from_bf.filters.len()); metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( mem::size_of::(), From 8cbec6da1b46002327bbee86616be96e010432cf Mon Sep 17 00:00:00 2001 From: zackcam Date: Mon, 2 Dec 2024 22:32:07 +0000 Subject: [PATCH 2/6] Updating bloomfilter dependency to version 3, fixing breaking changes as well Signed-off-by: zackcam --- Cargo.toml | 2 +- src/bloom/data_type.rs | 10 ++---- src/bloom/utils.rs | 57 ++++++++++------------------------- src/wrapper/bloom_callback.rs | 4 +-- tests/test_bloom_metrics.py | 2 +- 5 files changed, 22 insertions(+), 53 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 555174a..f29cf11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom" valkey-module = "0.1.2" valkey-module-macros = "0" linkme = "0" -bloomfilter = { version = "1.0.13", features = ["serde"] } +bloomfilter = { version = "3", features = ["serde"] } lazy_static = "1.4.0" libc = "0.2" serde = { version = "1.0", features = ["derive"] } diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 3d6dd85..921fcb7 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -105,14 +105,8 @@ impl ValkeyDataType for BloomFilterType { (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), ]; - let filter = BloomFilter::from_existing( - bitmap.as_ref(), - number_of_bits, - number_of_hash_functions as u32, - sip_keys, - num_items as u32, - capacity as u32, - ); + let filter = + BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32); filters.push(filter); } BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index e9d36c0..3e3a00a 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -5,10 +5,11 @@ use crate::{ metrics, }; use bloomfilter; +use bloomfilter::{deserialize, serialize}; use serde::{Deserialize, Serialize}; -use std::{mem, sync::atomic::Ordering}; use super::data_type::BLOOM_TYPE_VERSION; +use std::{mem, sync::atomic::Ordering}; /// KeySpace Notification Events pub const ADD_EVENT: &str = "bloom.add"; @@ -318,6 +319,7 @@ impl BloomFilterType { /// well within the u32::MAX limit. #[derive(Serialize, Deserialize)] pub struct BloomFilter { + #[serde(serialize_with = "serialize", deserialize_with = "deserialize")] pub bloom: bloomfilter::Bloom<[u8]>, pub num_items: u32, pub capacity: u32, @@ -330,7 +332,8 @@ impl BloomFilter { capacity as usize, fp_rate, &configs::FIXED_SEED, - ); + ) + .unwrap(); let fltr = BloomFilter { bloom, num_items: 0, @@ -346,20 +349,8 @@ impl BloomFilter { } /// Create a new BloomFilter from dumped information (RDB load). - pub fn from_existing( - bitmap: &[u8], - number_of_bits: u64, - number_of_hash_functions: u32, - sip_keys: [(u64, u64); 2], - num_items: u32, - capacity: u32, - ) -> BloomFilter { - let bloom = bloomfilter::Bloom::from_existing( - bitmap, - number_of_bits, - number_of_hash_functions, - sip_keys, - ); + pub fn from_existing(bitmap: &[u8], num_items: u32, capacity: u32) -> BloomFilter { + let bloom = bloomfilter::Bloom::from_slice(bitmap).unwrap(); let fltr = BloomFilter { bloom, num_items, @@ -377,7 +368,7 @@ impl BloomFilter { } pub fn number_of_bytes(&self) -> usize { - std::mem::size_of::() + (self.bloom.number_of_bits() / 8) as usize + std::mem::size_of::() + (self.bloom.len() / 8) as usize } /// Caculates the number of bytes that the bloom filter will require to be allocated. @@ -413,14 +404,7 @@ impl BloomFilter { /// Create a new BloomFilter from an existing BloomFilter object (COPY command). pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { - BloomFilter::from_existing( - &bf.bloom.bitmap(), - bf.bloom.number_of_bits(), - bf.bloom.number_of_hash_functions(), - bf.bloom.sip_keys(), - bf.num_items, - bf.capacity, - ) + BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity) } } @@ -449,9 +433,7 @@ impl Drop for BloomFilter { #[cfg(test)] mod tests { use super::*; - use crate::configs::{ - FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B, - }; + use configs::FIXED_SEED; use rand::{distributions::Alphanumeric, Rng}; /// Returns random string with specified number of characters. @@ -541,10 +523,6 @@ mod tests { fp_margin: f64, rand_prefix: &String, ) { - let expected_sip_keys = [ - (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), - (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), - ]; assert_eq!( restored_bloom_filter_type.capacity(), original_bloom_filter_type.capacity() @@ -568,8 +546,8 @@ mod tests { .filters .iter() .any( - |filter| (filter.bloom.sip_keys() == restore_filter.bloom.sip_keys()) - && (restore_filter.bloom.sip_keys() == expected_sip_keys) + |filter| (filter.bloom.seed() == restore_filter.bloom.seed()) + && (restore_filter.bloom.seed() == FIXED_SEED) ))); assert!(restored_bloom_filter_type .filters @@ -585,7 +563,7 @@ mod tests { .all(|restore_filter| original_bloom_filter_type .filters .iter() - .any(|filter| filter.bloom.bitmap() == restore_filter.bloom.bitmap()))); + .any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice()))); let (error_count, _) = check_items_exist( restored_bloom_filter_type, 1, @@ -731,14 +709,11 @@ mod tests { } #[test] - fn test_sip_keys() { + fn test_seed() { // The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32); - let test_sip_keys = test_bloom_filter.bloom.sip_keys(); - assert_eq!(test_sip_keys[0].0, FIXED_SIP_KEY_ONE_A); - assert_eq!(test_sip_keys[0].1, FIXED_SIP_KEY_ONE_B); - assert_eq!(test_sip_keys[1].0, FIXED_SIP_KEY_TWO_A); - assert_eq!(test_sip_keys[1].1, FIXED_SIP_KEY_TWO_B); + let seed = test_bloom_filter.bloom.seed(); + assert_eq!(seed, FIXED_SEED); } #[test] diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index caa882b..8b6e9d3 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -24,13 +24,13 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu let mut filter_list_iter = filter_list.iter().peekable(); while let Some(filter) = filter_list_iter.next() { let bloom = &filter.bloom; - let bitmap = bloom.bitmap(); + let bitmap = bloom.to_bytes(); raw::RedisModule_SaveStringBuffer.unwrap()( rdb, bitmap.as_ptr().cast::(), bitmap.len(), ); - raw::save_unsigned(rdb, bloom.number_of_bits()); + raw::save_unsigned(rdb, bloom.len()); raw::save_unsigned(rdb, bloom.number_of_hash_functions() as u64); raw::save_unsigned(rdb, filter.capacity as u64); if filter_list_iter.peek().is_none() { diff --git a/tests/test_bloom_metrics.py b/tests/test_bloom_metrics.py index 29eb4e6..3df634c 100644 --- a/tests/test_bloom_metrics.py +++ b/tests/test_bloom_metrics.py @@ -4,7 +4,7 @@ from valkeytests.conftest import resource_port_tracker from util.waiters import * -DEFAULT_BLOOM_FILTER_SIZE = 179960 +DEFAULT_BLOOM_FILTER_SIZE = 179952 DEFAULT_BLOOM_FILTER_CAPACITY = 100000 class TestBloomMetrics(ValkeyBloomTestCaseBase): From 7d09c3d3eca7f4678612d2b8ee4bb0bbe7e3a58a Mon Sep 17 00:00:00 2001 From: zackcam Date: Wed, 4 Dec 2024 00:02:57 +0000 Subject: [PATCH 3/6] Updating the digest changes to follow updated version of bloom. As well as removing unnecesary fields saved in rdb Signed-off-by: zackcam --- Cargo.toml | 2 +- src/bloom/data_type.rs | 31 ++++++++++-------------------- src/bloom/utils.rs | 36 ++++++++++++++--------------------- src/wrapper/bloom_callback.rs | 2 -- 4 files changed, 25 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f29cf11..d205dbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom" valkey-module = "0.1.2" valkey-module-macros = "0" linkme = "0" -bloomfilter = { version = "3", features = ["serde"] } +bloomfilter = { version = "3.0.1", features = ["serde"] } lazy_static = "1.4.0" libc = "0.2" serde = { version = "1.0", features = ["derive"] } diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 9fa86cd..cffc999 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -1,8 +1,5 @@ use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; -use crate::configs::{ - FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B, -}; use crate::metrics::BLOOM_NUM_OBJECTS; use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES; use crate::wrapper::bloom_callback; @@ -80,20 +77,20 @@ impl ValkeyDataType for BloomFilterType { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; }; - let Ok(number_of_bits) = raw::load_unsigned(rdb) else { + let Ok(capacity) = raw::load_unsigned(rdb) else { return None; }; - // Reject RDB Load if any bloom filter within a bloom object of a size greater than what is allowed. - if !BloomFilter::validate_size_with_bits(number_of_bits) { + let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) { + Ok(rate) => rate, + Err(_) => { + logging::log_warning("ERR bloom object reached max number of filters"); + return None; + } + }; + if !BloomFilter::validate_size(capacity as u32, new_fp_rate) { logging::log_warning("Failed to restore bloom object because it contains a filter larger than the max allowed size limit."); return None; } - let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else { - return None; - }; - let Ok(capacity) = raw::load_unsigned(rdb) else { - return None; - }; // Only load num_items when it's the last filter let num_items = if i == num_filters - 1 { match raw::load_unsigned(rdb) { @@ -103,10 +100,6 @@ impl ValkeyDataType for BloomFilterType { } else { capacity }; - let sip_keys = [ - (FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B), - (FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B), - ]; let filter = BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32); filters.push(filter); @@ -129,11 +122,7 @@ impl ValkeyDataType for BloomFilterType { dig.add_long_long(self.expansion.into()); dig.add_string_buffer(&self.fp_rate.to_le_bytes()); for filter in &self.filters { - dig.add_string_buffer(&filter.bloom.bitmap()); - for &(key1, key2) in &filter.sip_keys() { - dig.add_long_long(key1 as i64); - dig.add_long_long(key2 as i64); - } + dig.add_string_buffer(filter.bloom.as_slice()); dig.add_long_long(filter.num_items.into()); dig.add_long_long(filter.capacity.into()); } diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index a8a4401..f0e0bd6 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -189,11 +189,9 @@ impl BloomFilterType { } // Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also // bound within the range f64::MIN_POSITIVE <= x < 1.0. - let new_fp_rate = match self.fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) { - x if x > f64::MIN_POSITIVE => x, - _ => { - return Err(BloomError::MaxNumScalingFilters); - } + let new_fp_rate = match Self::calculate_fp_rate(self.fp_rate, num_filters) { + Ok(rate) => rate, + Err(e) => return Err(e), }; let new_capacity = match filter.capacity.checked_mul(self.expansion) { Some(new_capacity) => new_capacity, @@ -232,6 +230,13 @@ impl BloomFilterType { } } + pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result { + match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) { + x if x > f64::MIN_POSITIVE => Ok(x), + _ => Err(BloomError::MaxNumScalingFilters), + } + } + /// Deserialize a byte array to bloom filter. /// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_TYPE_VERSION`. pub fn decode_bloom_filter( @@ -333,7 +338,7 @@ impl BloomFilter { fp_rate, &configs::FIXED_SEED, ) - .unwrap(); + .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); let fltr = BloomFilter { bloom, num_items: 0, @@ -350,7 +355,9 @@ impl BloomFilter { /// Create a new BloomFilter from dumped information (RDB load). pub fn from_existing(bitmap: &[u8], num_items: u32, capacity: u32) -> BloomFilter { - let bloom = bloomfilter::Bloom::from_slice(bitmap).unwrap(); + let bloom = bloomfilter::Bloom::from_slice(bitmap) + .expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"); + let fltr = BloomFilter { bloom, num_items, @@ -383,17 +390,6 @@ impl BloomFilter { true } - /// Caculates the number of bytes that the bloom filter will require to be allocated using provided `number_of_bits`. - /// This is used before actually creating the bloom filter when checking if the filter is within the allowed size. - /// Returns whether the bloom filter is of a valid size or not. - pub fn validate_size_with_bits(number_of_bits: u64) -> bool { - let bytes = std::mem::size_of::() as u64 + number_of_bits; - if bytes > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as u64 { - return false; - } - true - } - pub fn check(&self, item: &[u8]) -> bool { self.bloom.check(item) } @@ -402,10 +398,6 @@ impl BloomFilter { self.bloom.set(item) } - pub fn sip_keys(&self) -> [(u64, u64); 2] { - self.bloom.sip_keys() - } - /// Create a new BloomFilter from an existing BloomFilter object (COPY command). pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter { BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity) diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index 73164e7..da9e21d 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -31,8 +31,6 @@ pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mu bitmap.as_ptr().cast::(), bitmap.len(), ); - raw::save_unsigned(rdb, bloom.len()); - raw::save_unsigned(rdb, bloom.number_of_hash_functions() as u64); raw::save_unsigned(rdb, filter.capacity as u64); if filter_list_iter.peek().is_none() { raw::save_unsigned(rdb, filter.num_items as u64); From 5f364cffe9889a470e32b405b84cd71ea9c8f257 Mon Sep 17 00:00:00 2001 From: KarthikSubbarao Date: Tue, 3 Dec 2024 21:22:37 -0800 Subject: [PATCH 4/6] Update log in src/bloom/data_type.rs Signed-off-by: KarthikSubbarao --- src/bloom/data_type.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index cffc999..1fef679 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -83,7 +83,7 @@ impl ValkeyDataType for BloomFilterType { let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) { Ok(rate) => rate, Err(_) => { - logging::log_warning("ERR bloom object reached max number of filters"); + logging::log_warning("Failed to restore bloom object: Reached max number of filters"); return None; } }; From 7cb709856c8b66744bac6faf5a7ea1ca85b4fc26 Mon Sep 17 00:00:00 2001 From: KarthikSubbarao Date: Tue, 3 Dec 2024 21:25:07 -0800 Subject: [PATCH 5/6] Update comment in src/bloom/utils.rs Signed-off-by: KarthikSubbarao --- src/bloom/utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index f0e0bd6..f066d05 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -230,6 +230,7 @@ impl BloomFilterType { } } + /// Calculate the false positive rate for the Nth filter using tightening ratio. pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result { match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) { x if x > f64::MIN_POSITIVE => Ok(x), From 976b0823c3bdcc3061385f9635f7bf0ac9fe84c3 Mon Sep 17 00:00:00 2001 From: KarthikSubbarao Date: Tue, 3 Dec 2024 21:28:41 -0800 Subject: [PATCH 6/6] Clippy error in src/bloom/data_type.rs Signed-off-by: KarthikSubbarao --- src/bloom/data_type.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 1fef679..c8af83a 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -83,7 +83,9 @@ impl ValkeyDataType for BloomFilterType { let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) { Ok(rate) => rate, Err(_) => { - logging::log_warning("Failed to restore bloom object: Reached max number of filters"); + logging::log_warning( + "Failed to restore bloom object: Reached max number of filters", + ); return None; } };