Skip to content

Commit

Permalink
Updating how we create BloomFilter from rdb loads. BloomFilter vec no…
Browse files Browse the repository at this point in the history
…w has capacity of filter we are loading from

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Nov 22, 2024
1 parent 9d61cad commit cbbefa2
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub trait ValkeyDataType {
impl ValkeyDataType for BloomFilterType {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
let mut filters = Vec::new();
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
Expand All @@ -73,6 +72,7 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand Down
23 changes: 22 additions & 1 deletion src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl BloomFilterType {

/// Create a new BloomFilterType object from an existing one.
pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType {
let mut filters = Vec::new();
let mut filters: Vec<BloomFilter> = Vec::with_capacity(from_bf.filters.len());
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
Expand Down Expand Up @@ -175,6 +175,8 @@ impl BloomFilterType {
// Add item.
filter.set(item);
filter.num_items += 1;
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(1);
}
// Non Scaling Filters that are filled to capacity cannot handle more inserts.
Expand Down Expand Up @@ -208,6 +210,9 @@ impl BloomFilterType {
new_filter.set(item);
new_filter.num_items += 1;
self.filters.push(new_filter);

metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(1);
}
Ok(0)
Expand Down Expand Up @@ -289,6 +294,12 @@ impl BloomFilterType {
filter.number_of_bytes(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS.fetch_add(
filter.num_items.into(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(filter.capacity.into(), std::sync::atomic::Ordering::Relaxed);
}

Ok(item)
Expand Down Expand Up @@ -329,6 +340,8 @@ impl BloomFilter {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
.fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed);
fltr
}

Expand Down Expand Up @@ -356,6 +369,10 @@ impl BloomFilter {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
.fetch_add(fltr.number_of_bytes(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(capacity.into(), std::sync::atomic::Ordering::Relaxed);
fltr
}

Expand Down Expand Up @@ -422,6 +439,10 @@ impl Drop for BloomFilter {
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
.fetch_sub(self.number_of_bytes(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_sub(self.num_items.into(), std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_sub(self.capacity.into(), std::sync::atomic::Ordering::Relaxed);
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ lazy_static! {
pub static ref BLOOM_NUM_OBJECTS: AtomicU64 = AtomicU64::new(0);
pub static ref BLOOM_OBJECT_TOTAL_MEMORY_BYTES: AtomicUsize = AtomicUsize::new(0);
pub static ref BLOOM_NUM_FILTERS_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0);
pub static ref BLOOM_NUM_ITEMS_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0);
pub static ref BLOOM_CAPACITY_ACROSS_OBJECTS: AtomicU64 = AtomicU64::new(0);
}

pub fn bloom_info_handler(ctx: &InfoContext) -> ValkeyResult<()> {
Expand All @@ -27,6 +29,18 @@ pub fn bloom_info_handler(ctx: &InfoContext) -> ValkeyResult<()> {
.load(Ordering::Relaxed)
.to_string(),
)?
.field(
"bloom_num_items_across_objects",
BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.load(Ordering::Relaxed)
.to_string(),
)?
.field(
"bloom_capacity_across_objects",
BLOOM_CAPACITY_ACROSS_OBJECTS
.load(Ordering::Relaxed)
.to_string(),
)?
.build_section()?
.build_info()?;

Expand Down
5 changes: 3 additions & 2 deletions tests/test_aofrewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ def test_aofrewrite_bloomfilter_metrics(self):

# Check info for scaled bloomfilter matches metrics data for bloomfilter
new_info_obj = self.client.execute_command(f'BF.INFO key1')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2)

self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2, 7500, 21000)

# Check bloomfilter size has increased
assert new_info_obj[3] > info_obj[3]

# Delete the scaled bloomfilter to check both filters are deleted and metrics stats are set accordingly
self.client.execute_command('DEL key1')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)
55 changes: 28 additions & 27 deletions tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
from util.waiters import *

DEFAULT_BLOOM_FILTER_SIZE = 179960
DEFAULT_BLOOM_FILTER_CAPACITY = 100000
class TestBloomMetrics(ValkeyBloomTestCaseBase):

def test_basic_command_metrics(self):
# Check that bloom metrics stats start at 0
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0, 0, 0)

# Create a default bloom filter and check its metrics values are correct
assert(self.client.execute_command('BF.ADD key item') == 1)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 1, DEFAULT_BLOOM_FILTER_CAPACITY)

# Check that other commands don't influence metrics
assert(self.client.execute_command('BF.EXISTS key item') == 1)
Expand All @@ -26,55 +27,55 @@ def test_basic_command_metrics(self):
self.client.execute_command("BF.INFO key")
assert(self.client.execute_command('BF.INSERT key ITEMS item5 item6')== [1, 1])

self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 6, DEFAULT_BLOOM_FILTER_CAPACITY)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE, 1, 1, 6, DEFAULT_BLOOM_FILTER_CAPACITY)

# Create a new default bloom filter and check metrics again
assert(self.client.execute_command('BF.ADD key2 item') == 1)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE*2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Create a non default filter with BF.RESERVE and check its metrics are correct
assert(self.client.execute_command('BF.RESERVE key3 0.001 2917251') == b'OK')
info_obj = self.client.execute_command('BF.INFO key3')

# We want to check the size of the newly created bloom filter but metrics contains the size of all bloomfilters so we must minus the
# two default bloomfilters we already created
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2 + 2917251)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE * 2, 3, 3, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2 + 2917251)

# Delete a non default key and make sure the metrics stats are still correct
self.client.execute_command('DEL key3')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 7, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Create a default filter with BF.INSERT and check its metrics are correct
assert(self.client.execute_command('BF.INSERT key4 ITEMS item1 item2') == [1, 1])
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 9, DEFAULT_BLOOM_FILTER_CAPACITY * 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 9, DEFAULT_BLOOM_FILTER_CAPACITY * 3)

# Delete a default key and make sure the metrics are still correct
self.client.execute_command('UNLINK key')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Create a key then cause it to expire and check if metrics are updated correctly
assert self.client.execute_command('BF.ADD TEST_EXP ITEM') == 1
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 4, DEFAULT_BLOOM_FILTER_CAPACITY * 3)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 3, 3, 3, 4, DEFAULT_BLOOM_FILTER_CAPACITY * 3)
assert self.client.execute_command('TTL TEST_EXP') == -1
self.verify_bloom_filter_item_existence(self.client, 'TEST_EXP', 'ITEM')
curr_time = int(time.time())
assert self.client.execute_command(f'EXPIREAT TEST_EXP {curr_time + 5}') == 1
wait_for_equal(lambda: self.client.execute_command('BF.EXISTS TEST_EXP ITEM'), 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 3, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Flush database so all keys should now be gone and metrics should all be at 0
self.client.execute_command('FLUSHDB')
wait_for_equal(lambda: self.client.execute_command('BF.EXISTS key2 item'), 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO Modules"), 0, 0, 0, 0, 0)

def test_scaled_bloomfilter_metrics(self):
self.client.execute_command('BF.RESERVE key1 0.001 7000')
Expand All @@ -89,14 +90,14 @@ def test_scaled_bloomfilter_metrics(self):

# Check info for scaled bloomfilter matches metrics data for bloomfilter
new_info_obj = self.client.execute_command(f'BF.INFO key1')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), new_info_obj[3], 1, 2, 7500, 21000)

# Check bloomfilter size has increased
assert new_info_obj[3] > info_obj[3]

# Delete the scaled bloomfilter to check both filters are deleted and metrics stats are set accordingly
self.client.execute_command('DEL key1')
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)


def test_copy_metrics(self):
Expand All @@ -105,12 +106,12 @@ def test_copy_metrics(self):
assert(self.client.execute_command('COPY key{123} copiedkey{123}') == 1)

# Verify that the metrics were updated correctly after copying
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), DEFAULT_BLOOM_FILTER_SIZE * 2, 2, 2, 2, DEFAULT_BLOOM_FILTER_CAPACITY * 2)

# Perform a FLUSHALL which should set all metrics data to 0
self.client.execute_command('FLUSHALL')
wait_for_equal(lambda: self.client.execute_command('BF.EXISTS key{123} item'), 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0)
self.verify_bloom_metrics(self.client.execute_command("INFO bf"), 0, 0, 0, 0, 0)


def test_save_and_restore_metrics(self):
Expand Down Expand Up @@ -138,4 +139,4 @@ def test_save_and_restore_metrics(self):
for i in range(1, len(original_info_obj), 2):
assert original_info_obj[i] == restored_info_obj[i]

self.verify_bloom_metrics(self.client.execute_command("INFO bf"), original_info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE, 2, 3)
self.verify_bloom_metrics(new_client.execute_command("INFO bf"), original_info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE, 2, 3, 7501, 21000 + DEFAULT_BLOOM_FILTER_CAPACITY)
12 changes: 10 additions & 2 deletions tests/valkey_bloom_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def validate_copied_bloom_correctness(self, client, original_filter_name, item_p
)
self.fp_assert(error_count, num_operations, expected_fp_rate, fp_margin)

def verify_bloom_metrics(self, info_response, expected_memory, expected_num_objects, expected_num_filters):
def verify_bloom_metrics(self, info_response, expected_memory, expected_num_objects, expected_num_filters, expected_num_items, expected_sum_capacity):
"""
Verify the metric values are recorded properly, the expected values are as below
expected_memory: the size of the memory used by the objects
Expand All @@ -152,14 +152,22 @@ def verify_bloom_metrics(self, info_response, expected_memory, expected_num_obje
total_memory_bites = -1
num_objects = -1
num_filters = -1
num_items = -1
sum_capacity = -1
for line in lines:
if line.startswith('bf_bloom_total_memory_bytes:'):
total_memory_bites = int(line.split(':')[1])
elif line.startswith('bf_bloom_num_objects:'):
num_objects = int(line.split(':')[1])
elif line.startswith('bf_bloom_num_filters_across_objects'):
num_filters = int(line.split(':')[1])
elif line.startswith('bf_bloom_num_items_across_objects'):
num_items = int(line.split(':')[1])
elif line.startswith('bf_bloom_capacity_across_objects'):
sum_capacity = int(line.split(':')[1])

assert total_memory_bites == expected_memory
assert num_objects == expected_num_objects
assert num_filters == expected_num_filters
assert num_filters == expected_num_filters
assert num_items == expected_num_items
assert sum_capacity == expected_sum_capacity

0 comments on commit cbbefa2

Please sign in to comment.