From c7a3aae777a5712eb30f36cccb873f40a5dd4da0 Mon Sep 17 00:00:00 2001 From: Karthik Subbarao Date: Wed, 2 Oct 2024 00:15:00 +0000 Subject: [PATCH] Add Integration Testing for correctness of scaling and non scaling filters, and maxmemory, memory usage, type, encoding, etc --- src/bloom/utils.rs | 67 +++---- tests/test_basic.py | 34 ++++ tests/test_correctness.py | 253 ++++++++++++++++++++++++++ tests/test_replication.py | 20 ++ tests/valkeytests/valkey_test_case.py | 2 +- 5 files changed, 334 insertions(+), 42 deletions(-) create mode 100644 tests/test_correctness.py diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 4e35bdc..dd43576 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -254,14 +254,17 @@ mod tests { ) -> (i64, i64) { let mut new_item_idx = starting_item_idx; let mut fp_count = 0; - while bf.cardinality() < capacity_needed { + let mut cardinality = bf.cardinality(); + while cardinality < capacity_needed { let item = format!("{}{}", rand_prefix, new_item_idx); let result = bf.add_item(item.as_bytes()); match result { Ok(0) => { fp_count += 1; } - Ok(1) => {} + Ok(1) => { + cardinality += 1; + } Ok(i64::MIN..=-1_i64) | Ok(2_i64..=i64::MAX) => { panic!("We do not expect add_item to return any Integer other than 0 or 1.") } @@ -271,22 +274,21 @@ mod tests { }; new_item_idx += 1; } - (fp_count, new_item_idx) + (fp_count, new_item_idx - 1) } /// Loops from the start index till the end index and uses the exists operation on the provided bloom filter. /// The item name used in exists operations is rand_prefix + the index (based on the iteration). /// The results are matched against the `expected_result` and an error_count tracks the wrong results. /// Asserts that the error_count is within the expected false positive (+ margin) rate. - fn item_exists_test( + /// Returns the error count and number of operations performed. + fn check_items_exist( bf: &BloomFilterType, start_idx: i64, end_idx: i64, - expected_fp_rate: f32, - fp_margin: f32, expected_result: bool, rand_prefix: &String, - ) { + ) -> (i64, i64) { let mut error_count = 0; for i in start_idx..=end_idx { let item = format!("{}{}", rand_prefix, i); @@ -296,8 +298,7 @@ mod tests { } } let num_operations = (end_idx - start_idx) + 1; - // Validate that the real fp_rate is not much more than the configured fp_rate. - fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); + (error_count, num_operations) } fn fp_assert(error_count: i64, num_operations: i64, expected_fp_rate: f32, fp_margin: f32) { @@ -364,24 +365,22 @@ mod tests { .filters .iter() .any(|filter| filter.bloom.bitmap() == restore_filter.bloom.bitmap()))); - item_exists_test( + let (error_count, _) = check_items_exist( restored_bloom_filter_type, 1, add_operation_idx, - expected_fp_rate, - fp_margin, true, rand_prefix, ); - item_exists_test( + assert!(error_count == 0); + let (error_count, num_operations) = check_items_exist( restored_bloom_filter_type, add_operation_idx + 1, add_operation_idx * 2, - expected_fp_rate, - fp_margin, false, rand_prefix, ); + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); } #[test] @@ -410,25 +409,18 @@ mod tests { fp_assert(error_count, add_operation_idx, expected_fp_rate, fp_margin); // Validate item "exists" operations on bloom filters are ensuring correctness. // This tests for items already added to the filter and expects them to exist. - item_exists_test( - &bf, - 1, - add_operation_idx, - expected_fp_rate, - fp_margin, - true, - &rand_prefix, - ); + let (error_count, _) = check_items_exist(&bf, 1, add_operation_idx, true, &rand_prefix); + assert!(error_count == 0); // This tests for items which are not added to the filter and expects them to not exist. - item_exists_test( + let (error_count, num_operations) = check_items_exist( &bf, add_operation_idx + 1, add_operation_idx * 2, - expected_fp_rate, - fp_margin, false, &rand_prefix, ); + // Validate that the real fp_rate is not much more than the configured fp_rate. + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); // Verify restore let mut restore_bf = BloomFilterType::create_copy_from(&bf); @@ -458,14 +450,14 @@ mod tests { assert_eq!(bf.capacity(), initial_capacity as i64); assert_eq!(bf.cardinality(), 0); let mut total_error_count = 0; - let mut add_operation_idx = 1; + let mut add_operation_idx = 0; // Validate the scaling behavior of the bloom filter. for filter_idx in 1..=num_filters_to_scale { let expected_total_capacity = initial_capacity * (expansion.pow(filter_idx) - 1); let (error_count, new_add_operation_idx) = add_items_till_capacity( &mut bf, expected_total_capacity as i64, - add_operation_idx, + add_operation_idx + 1, &rand_prefix, ); add_operation_idx = new_add_operation_idx; @@ -486,25 +478,18 @@ mod tests { ); // Validate item "exists" operations on bloom filters are ensuring correctness. // This tests for items already added to the filter and expects them to exist. - item_exists_test( - &bf, - 1, - add_operation_idx, - expected_fp_rate, - fp_margin, - true, - &rand_prefix, - ); + let (error_count, _) = check_items_exist(&bf, 1, add_operation_idx, true, &rand_prefix); + assert!(error_count == 0); // This tests for items which are not added to the filter and expects them to not exist. - item_exists_test( + let (error_count, num_operations) = check_items_exist( &bf, add_operation_idx + 1, add_operation_idx * 2, - expected_fp_rate, - fp_margin, false, &rand_prefix, ); + // Validate that the real fp_rate is not much more than the configured fp_rate. + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); // Verify restore let restore_bloom_filter_type = BloomFilterType::create_copy_from(&bf); diff --git a/tests/test_basic.py b/tests/test_basic.py index 882ef90..e0b4608 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -16,6 +16,7 @@ def get_custom_args(self): def test_basic(self): client = self.server.get_new_client() + # Validate that the valkey-bloom module is loaded. module_list_data = client.execute_command('MODULE LIST') module_list_count = len(module_list_data) assert module_list_count == 1 @@ -25,9 +26,42 @@ def test_basic(self): module_loaded = True break assert(module_loaded) + # Validate that all the BF.* commands are supported on the server. + command_cmd_result = client.execute_command('COMMAND') + bf_cmds = ["BF.ADD", "BF.EXISTS", "BF.MADD", "BF.MEXISTS", "BF.INFO", "BF.CARD", "BF.RESERVE", "BF.INSERT"] + assert all(item in command_cmd_result for item in bf_cmds) + # Basic bloom filter create, item add and item exists validation. bf_add_result = client.execute_command('BF.ADD filter1 item1') assert bf_add_result == 1 bf_exists_result = client.execute_command('BF.EXISTS filter1 item1') assert bf_exists_result == 1 bf_exists_result = client.execute_command('BF.EXISTS filter1 item2') assert bf_exists_result == 0 + + def test_copy_and_exists_cmd(self): + client = self.server.get_new_client() + madd_result = client.execute_command('BF.MADD filter item1 item2 item3 item4') + assert client.execute_command('EXISTS filter') == 1 + mexists_result = client.execute_command('BF.MEXISTS filter item1 item2 item3 item4') + assert len(madd_result) == 4 and len(mexists_result) == 4 + assert client.execute_command('COPY filter new_filter') == 1 + assert client.execute_command('EXISTS new_filter') == 1 + copy_mexists_result = client.execute_command('BF.MEXISTS new_filter item1 item2 item3 item4') + assert mexists_result == copy_mexists_result + + def test_memory_usage_cmd(self): + client = self.server.get_new_client() + assert client.execute_command('BF.ADD filter item1') == 1 + memory_usage = client.execute_command('MEMORY USAGE filter') + info_size = client.execute_command('BF.INFO filter SIZE') + assert memory_usage > info_size and info_size > 0 # DIFF is 32 bytes + + def test_module_data_type(self): + # Validate the name of the Module data type. + client = self.server.get_new_client() + assert client.execute_command('BF.ADD filter item1') == 1 + type_result = client.execute_command('TYPE filter') + assert type_result == b"bloomfltr" + # Validate the name of the Module data type. + encoding_result = client.execute_command('OBJECT ENCODING filter') + assert encoding_result == b"raw" diff --git a/tests/test_correctness.py b/tests/test_correctness.py new file mode 100644 index 0000000..29b749a --- /dev/null +++ b/tests/test_correctness.py @@ -0,0 +1,253 @@ +import time +import pytest +from valkey import ResponseError +from valkeytests.valkey_test_case import ValkeyTestCase +from valkeytests.conftest import resource_port_tracker +import logging +import os +import random +import string + +def generate_random_string(length=7): + characters = string.ascii_letters + string.digits + random_string = ''.join(random.choice(characters) for _ in range(length)) + return random_string + +def add_items_till_capacity(client, filter_name, capacity_needed, starting_item_idx, rand_prefix, batch_size=1000): + new_item_idx = starting_item_idx + fp_count = 0 + cardinality = client.execute_command(f'BF.CARD {filter_name}') + while cardinality < capacity_needed: + # Calculate how many more items we need to add. + remaining_capacity = capacity_needed - cardinality + batch_to_add = min(batch_size, remaining_capacity) + # Prepare a batch of items + items = [f"{rand_prefix}{new_item_idx + i}" for i in range(batch_to_add)] + new_item_idx += batch_to_add + result = client.execute_command(f'BF.MADD {filter_name} ' + ' '.join(items)) + # Process results + for res in result: + if res == 0: + fp_count += 1 + elif res == 1: + cardinality += 1 + else: + raise RuntimeError(f"Unexpected return value from add_item: {res}") + return fp_count, new_item_idx - 1 + +def check_items_exist(client, filter_name, start_idx, end_idx, expected_result, rand_prefix, batch_size=1000): + error_count = 0 + num_operations = (end_idx - start_idx) + 1 + # Check that items exist in batches. + for batch_start in range(start_idx, end_idx + 1, batch_size): + batch_end = min(batch_start + batch_size - 1, end_idx) + # Execute BF.MEXISTS with the batch of items + items = [f"{rand_prefix}{i}" for i in range(batch_start, batch_end + 1)] + result = client.execute_command(f'BF.MEXISTS {filter_name} ' + ' '.join(items)) + # Check the results + for item_result in result: + if item_result != expected_result: + error_count += 1 + return error_count, num_operations + +def fp_assert(error_count, num_operations, expected_fp_rate, fp_margin): + real_fp_rate = error_count / num_operations + fp_rate_with_margin = expected_fp_rate + fp_margin + + assert real_fp_rate < fp_rate_with_margin, f"The actual fp_rate, {real_fp_rate}, is greater than the configured fp_rate with margin. {fp_rate_with_margin}." + +class TestBloomCorrectness(ValkeyTestCase): + + def get_custom_args(self): + self.set_server_version(os.environ['SERVER_VERSION']) + return { + 'loadmodule': os.getenv('MODULE_PATH'), + } + + def test_non_scaling_filter(self): + client = self.server.get_new_client() + item_prefix = generate_random_string() + # 1 in every 1000 operations is expected to be a false positive. + expected_fp_rate = 0.001 + capacity = 10000 + # Create a non scaling bloom filter and validate its behavior. + filter_name = "filter1" + assert client.execute_command(f'BF.RESERVE {filter_name} {expected_fp_rate} {capacity} NONSCALING') == b"OK" + # Add items and fill the filter to capacity. + error_count, add_operation_idx = add_items_till_capacity(client, filter_name, capacity, 1, item_prefix) + with pytest.raises(Exception, match="non scaling filter is full"): + client.execute_command(f'BF.ADD {filter_name} new_item') + # Validate that is is filled. + info = client.execute_command(f'BF.INFO {filter_name}') + it = iter(info) + info_dict = dict(zip(it, it)) + assert info_dict[b'Capacity'] == capacity + assert info_dict[b'Number of items inserted'] == capacity + assert info_dict[b'Number of filters'] == 1 + assert info_dict[b'Size'] > 0 + assert info_dict[b'Expansion rate'] == -1 + # Use a margin on the expected_fp_rate when asserting for correctness. + fp_margin = 0.002 + # Validate that item "add" operations on bloom filters are ensuring correctness. + # False positives should be close to the configured fp_rate. + fp_assert(error_count, add_operation_idx, expected_fp_rate, fp_margin) + # Validate item "exists" operations on bloom filters are ensuring correctness. + # This tests for items already added to the filter and expects them to exist. + # False negatives should not be possible. + error_count, num_operations = check_items_exist( + client, + filter_name, + 1, + add_operation_idx, + True, + item_prefix, + ) + assert error_count == 0 + # This tests for items which are not added to the filter and expects them to not exist. + # False positives should be close to the configured fp_rate. + error_count, num_operations = check_items_exist( + client, + filter_name, + add_operation_idx + 1, + add_operation_idx * 2, + False, + item_prefix, + ) + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin) + # Create a copy of the bloom filter. + copy_filter_name = "filter_copy" + assert client.execute_command(f'COPY {filter_name} {copy_filter_name}') == 1 + assert client.execute_command('DBSIZE') == 2 + copy_info = client.execute_command(f'BF.INFO {copy_filter_name}') + copy_it = iter(copy_info) + copy_info_dict = dict(zip(copy_it, copy_it)) + assert copy_info_dict[b'Capacity'] == info_dict[b'Capacity'] + assert copy_info_dict[b'Number of items inserted'] == info_dict[b'Number of items inserted'] + assert copy_info_dict[b'Number of filters'] == info_dict[b'Number of filters'] + assert copy_info_dict[b'Size'] == info_dict[b'Size'] + assert copy_info_dict[b'Expansion rate'] == info_dict[b'Expansion rate'] + # Items added to the original filter should still exist on the copy. False Negatives are not possible. + error_count, num_operations = check_items_exist( + client, + copy_filter_name, + 1, + add_operation_idx, + True, + item_prefix, + ) + assert error_count == 0 + # Items not added to the original filter should not exist on the copy. False Positives should be close to configured fp_rate. + error_count, num_operations = check_items_exist( + client, + copy_filter_name, + add_operation_idx + 1, + add_operation_idx * 2, + False, + item_prefix, + ) + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin) + + def test_scaling_filter(self): + client = self.server.get_new_client() + item_prefix = generate_random_string() + expected_fp_rate = 0.001 + initial_capacity = 10000 + expansion = 2 + num_filters_to_scale = 5 + filter_name = "filter1" + # Create a scaling bloom filter and validate its behavior. + assert client.execute_command(f'BF.RESERVE {filter_name} {expected_fp_rate} {initial_capacity} EXPANSION {expansion}') == b"OK" + + info = client.execute_command(f'BF.INFO {filter_name}') + it = iter(info) + info_dict = dict(zip(it, it)) + assert info_dict[b'Capacity'] == initial_capacity + assert info_dict[b'Number of items inserted'] == 0 + assert info_dict[b'Number of filters'] == 1 + assert info_dict[b'Size'] > 0 + assert info_dict[b'Expansion rate'] == expansion + + # Scale out by adding items. + total_error_count = 0 + add_operation_idx = 0 + for filter_idx in range(1, num_filters_to_scale + 1): + expected_total_capacity = initial_capacity * ((expansion ** filter_idx) - 1) + error_count, new_add_operation_idx = add_items_till_capacity(client, filter_name, expected_total_capacity, add_operation_idx + 1, item_prefix) + add_operation_idx = new_add_operation_idx + total_error_count += error_count + # Validate from BF.INFO that is filter is scaling correctly. + info = client.execute_command(f'BF.INFO {filter_name}') + it = iter(info) + info_dict = dict(zip(it, it)) + assert info_dict[b'Capacity'] == expected_total_capacity + assert info_dict[b'Number of items inserted'] == expected_total_capacity + assert info_dict[b'Number of filters'] == filter_idx + assert info_dict[b'Size'] > 0 + assert info_dict[b'Expansion rate'] == expansion + + # Use a margin on the expected_fp_rate when asserting for correctness. + fp_margin = 0.002 + # Validate that item "add" operations on bloom filters are ensuring correctness. + # False positives should be close to the configured fp_rate. + fp_assert(total_error_count, add_operation_idx, expected_fp_rate, fp_margin) + # Validate item "exists" operations on bloom filters are ensuring correctness. + # This tests for items already added to the filter and expects them to exist. + # False negatives should not be possible. + error_count, num_operations = check_items_exist( + client, + filter_name, + 1, + add_operation_idx, + True, + item_prefix, + ) + assert error_count == 0 + # This tests for items which are not added to the filter and expects them to not exist. + # False positives should be close to the configured fp_rate. + error_count, num_operations = check_items_exist( + client, + filter_name, + add_operation_idx + 1, + add_operation_idx * 2, + False, + item_prefix, + ) + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin) + + # Track INFO on the scaled out bloom filter. + info = client.execute_command(f'BF.INFO {filter_name}') + it = iter(info) + info_dict = dict(zip(it, it)) + + # Create a copy of the scaled out bloom filter. + copy_filter_name = "filter_copy" + assert client.execute_command(f'COPY {filter_name} {copy_filter_name}') == 1 + assert client.execute_command('DBSIZE') == 2 + copy_info = client.execute_command(f'BF.INFO {copy_filter_name}') + copy_it = iter(copy_info) + copy_info_dict = dict(zip(copy_it, copy_it)) + assert copy_info_dict[b'Capacity'] == info_dict[b'Capacity'] + assert copy_info_dict[b'Number of items inserted'] == info_dict[b'Number of items inserted'] + assert copy_info_dict[b'Number of filters'] == info_dict[b'Number of filters'] + assert copy_info_dict[b'Size'] == info_dict[b'Size'] + assert copy_info_dict[b'Expansion rate'] == info_dict[b'Expansion rate'] + # Items added to the original filter should still exist on the copy. False Negatives are not possible. + error_count, num_operations = check_items_exist( + client, + copy_filter_name, + 1, + add_operation_idx, + True, + item_prefix, + ) + assert error_count == 0 + # Items not added to the original filter should not exist on the copy. False Positives should be close to configured fp_rate. + error_count, num_operations = check_items_exist( + client, + copy_filter_name, + add_operation_idx + 1, + add_operation_idx * 2, + False, + item_prefix, + ) + fp_assert(error_count, num_operations, expected_fp_rate, fp_margin) diff --git a/tests/test_replication.py b/tests/test_replication.py index 5f11a5c..b91d6df 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -12,6 +12,7 @@ def get_custom_args(self): } def test_replication_success(self): + self.setup_replication(num_replicas=1) bf_add_result = self.client.execute_command('BF.ADD key item1') assert bf_add_result == 1 bf_exists_result = self.client.execute_command('BF.EXISTS key item1') @@ -25,3 +26,22 @@ def test_replication_success(self): assert bf_non_added_exists_result == bf_replica_non_added_exists_result bf_replica_info_result = self.replicas[0].client.execute_command('BF.INFO key') assert bf_info_result == bf_replica_info_result + + def test_replication_behavior(self): + self.setup_replication(num_replicas=1) + bf_add_result = self.client.execute_command('BF.ADD key item1') + bf_exists_result = self.client.execute_command('BF.EXISTS key item1') + assert bf_add_result == 1 + self.waitForReplicaToSyncUp(self.replicas[0]) + bf_replica_exists_result = self.replicas[0].client.execute_command('BF.EXISTS key item1') + assert bf_exists_result == bf_replica_exists_result + + add_cmd_stats = self.client.info("Commandstats")['cmdstat_BF.ADD'] + replica_add_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_BF.ADD'] + assert add_cmd_stats["calls"] == 1 and add_cmd_stats["calls"] == replica_add_cmd_stats["calls"] + + assert self.client.execute_command('BF.ADD key item1') == 0 + add_cmd_stats = self.client.info("Commandstats")['cmdstat_BF.ADD'] + replica_add_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_BF.ADD'] + assert add_cmd_stats["calls"] == 2 and replica_add_cmd_stats["calls"] == 1 + diff --git a/tests/valkeytests/valkey_test_case.py b/tests/valkeytests/valkey_test_case.py index 883106c..65126c8 100644 --- a/tests/valkeytests/valkey_test_case.py +++ b/tests/valkeytests/valkey_test_case.py @@ -591,7 +591,7 @@ def create_client_for_dbs(self, num_dbs): class ReplicationTestCase(ValkeyTestCase): num_replicas = 1 - def setup(self, num_replicas = num_replicas): + def setup_replication(self, num_replicas = num_replicas): super(ReplicationTestCase, self).setup() self.create_replicas(num_replicas) self.start_replicas()