Skip to content

Commit

Permalink
Merge branch 'unstable' into unstable
Browse files Browse the repository at this point in the history
Signed-off-by: KarthikSubbarao <[email protected]>
  • Loading branch information
KarthikSubbarao authored Oct 12, 2024
2 parents c7a3aae + fde4d37 commit b9cd93f
Show file tree
Hide file tree
Showing 10 changed files with 519 additions and 79 deletions.
27 changes: 20 additions & 7 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let mut nocreate = false;
while idx < argc {
match input_args[idx].to_string_lossy().to_uppercase().as_str() {
"ERROR" if idx < (argc - 1) => {
"ERROR" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
fp_rate = match input_args[idx].to_string_lossy().parse::<f32>() {
Ok(num) if num > BLOOM_FP_RATE_MIN && num < BLOOM_FP_RATE_MAX => num,
Expand All @@ -300,7 +303,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"CAPACITY" if idx < (argc - 1) => {
"CAPACITY" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
capacity = match input_args[idx].to_string_lossy().parse::<u32>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => num,
Expand All @@ -318,7 +324,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
"NONSCALING" => {
expansion = 0;
}
"EXPANSION" if idx < (argc - 1) => {
"EXPANSION" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
expansion = match input_args[idx].to_string_lossy().parse::<u32>() {
Ok(num) if (BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&num) => num,
Expand All @@ -327,16 +336,20 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"ITEMS" if idx < (argc - 1) => {
"ITEMS" => {
idx += 1;
break;
}
_ => {
return Err(ValkeyError::WrongArity);
return Err(ValkeyError::Str(utils::UNKNOWN_ARGUMENT));
}
}
idx += 1;
}
if idx == argc {
// No ITEMS argument from the insert command
return Err(ValkeyError::WrongArity);
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Expand Down Expand Up @@ -399,7 +412,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
"EXPANSION" => {
if val.expansion == 0 {
return Ok(ValkeyValue::Integer(-1));
return Ok(ValkeyValue::Null);
}
Ok(ValkeyValue::Integer(val.expansion as i64))
}
Expand All @@ -419,7 +432,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
ValkeyValue::SimpleStringStatic("Expansion rate"),
];
if val.expansion == 0 {
result.push(ValkeyValue::Integer(-1));
result.push(ValkeyValue::Null);
} else {
result.push(ValkeyValue::Integer(val.expansion as i64));
}
Expand Down
107 changes: 55 additions & 52 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,67 +46,70 @@ pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
},
);

/// Callback to load and parse RDB data of a bloom item and create it.
pub fn bloom_rdb_load_data_object(
rdb: *mut raw::RedisModuleIO,
encver: i32,
) -> Option<BloomFilterType> {
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
}
let Ok(num_filters) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(expansion) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(fp_rate) = raw::load_float(rdb) else {
return None;
};
let mut filters = Vec::new();
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
pub trait ValkeyDataType {
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType>;
}

impl ValkeyDataType for BloomFilterType {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
let mut filters = Vec::new();
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
};
let Ok(number_of_bits) = raw::load_unsigned(rdb) else {
}
let Ok(num_filters) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else {
let Ok(expansion) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(capacity) = raw::load_unsigned(rdb) else {
let Ok(fp_rate) = raw::load_float(rdb) else {
return None;
};
// Only load num_items when it's the last filter
let num_items = if i == num_filters - 1 {
match raw::load_unsigned(rdb) {
Ok(num_items) => num_items,
Err(_) => return None,
}
} else {
capacity
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
};
let Ok(number_of_bits) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(number_of_hash_functions) = raw::load_unsigned(rdb) else {
return None;
};
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
// Only load num_items when it's the last filter
let num_items = if i == num_filters - 1 {
match raw::load_unsigned(rdb) {
Ok(num_items) => num_items,
Err(_) => return None,
}
} else {
capacity
};
let sip_keys = [
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
let filter = BloomFilter::from_existing(
bitmap.as_ref(),
number_of_bits,
number_of_hash_functions as u32,
sip_keys,
num_items as u32,
capacity as u32,
);
filters.push(filter);
}
let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
filters,
};
let sip_keys = [
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
let filter = BloomFilter::from_existing(
bitmap.as_ref(),
number_of_bits,
number_of_hash_functions as u32,
sip_keys,
num_items as u32,
capacity as u32,
);
filters.push(filter);
Some(item)
}
let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
filters,
};
Some(item)
}

/// Load the auxiliary data outside of the regular keyspace from the RDB file
Expand Down
3 changes: 2 additions & 1 deletion src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ pub const ERROR: &str = "ERROR";
pub const NON_SCALING_FILTER_FULL: &str = "ERR non scaling filter is full";
pub const NOT_FOUND: &str = "ERR not found";
pub const ITEM_EXISTS: &str = "ERR item exists";
pub const INVALID_INFO_VALUE: &str = "ERR Invalid information value";
pub const INVALID_INFO_VALUE: &str = "ERR invalid information value";
pub const BAD_EXPANSION: &str = "ERR bad expansion";
pub const BAD_CAPACITY: &str = "ERR bad capacity";
pub const BAD_ERROR_RATE: &str = "ERR bad error rate";
pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR max number of scaling filters reached";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";

#[derive(Debug, PartialEq)]
pub enum BloomError {
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ valkey_module! {
],
configurations: [
i64: [
["bloom_max_item_size", &*configs::BLOOM_CAPACITY, configs::BLOOM_CAPACITY_DEFAULT, configs::BLOOM_CAPACITY_MIN as i64, configs::BLOOM_CAPACITY_MAX as i64, ConfigurationFlags::DEFAULT, None],
["bloom_expansion_rate", &*configs::BLOOM_EXPANSION, configs::BLOOM_EXPANSION_DEFAULT, configs::BLOOM_EXPANSION_MIN as i64, configs::BLOOM_EXPANSION_MAX as i64, ConfigurationFlags::DEFAULT, None],
["bloom-max-item-size", &*configs::BLOOM_CAPACITY, configs::BLOOM_CAPACITY_DEFAULT, configs::BLOOM_CAPACITY_MIN as i64, configs::BLOOM_CAPACITY_MAX as i64, ConfigurationFlags::DEFAULT, None],
["bloom-expansion-rate", &*configs::BLOOM_EXPANSION, configs::BLOOM_EXPANSION_DEFAULT, configs::BLOOM_EXPANSION_MIN as i64, configs::BLOOM_EXPANSION_MAX as i64, ConfigurationFlags::DEFAULT, None],
],
string: [
],
Expand Down
3 changes: 2 additions & 1 deletion src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::bloom;
use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilterType;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
Expand Down Expand Up @@ -39,7 +40,7 @@ pub unsafe extern "C" fn bloom_rdb_load(
rdb: *mut raw::RedisModuleIO,
encver: c_int,
) -> *mut c_void {
if let Some(item) = bloom::data_type::bloom_rdb_load_data_object(rdb, encver) {
if let Some(item) = <BloomFilterType as ValkeyDataType>::load_from_rdb(rdb, encver) {
let bb = Box::new(item);
Box::into_raw(bb).cast::<libc::c_void>()
} else {
Expand Down
106 changes: 98 additions & 8 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import time
import pytest
from util.waiters import *
from valkey import ResponseError
from valkeytests.valkey_test_case import ValkeyTestCase
from valkey_bloom_test_case import ValkeyBloomTestCaseBase
from valkeytests.conftest import resource_port_tracker
import logging
import os

class TestBloomBasic(ValkeyTestCase):

def get_custom_args(self):
self.set_server_version(os.environ['SERVER_VERSION'])
return {
'loadmodule': os.getenv('MODULE_PATH'),
}
class TestBloomBasic(ValkeyBloomTestCaseBase):

def test_basic(self):
client = self.server.get_new_client()
Expand Down Expand Up @@ -65,3 +60,98 @@ def test_module_data_type(self):
# Validate the name of the Module data type.
encoding_result = client.execute_command('OBJECT ENCODING filter')
assert encoding_result == b"raw"

def test_bloom_modification(self):
client = self.server.get_new_client()
# check bloom filter with basic valkey command
# cmd touch
assert client.execute_command('BF.ADD key1 val1') == 1
assert client.execute_command('BF.ADD key2 val2') == 1
assert client.execute_command('TOUCH key1 key2') == 2
assert client.execute_command('TOUCH key3') == 0
self.verify_key_number(client, 2)

def test_bloom_transaction(self):
client = self.server.get_new_client()
# cmd multi, exec
assert client.execute_command('MULTI') == b'OK'
assert client.execute_command('BF.ADD M1 V1') == b'QUEUED'
assert client.execute_command('BF.ADD M2 V2') == b'QUEUED'
assert client.execute_command('BF.EXISTS M1 V1') == b'QUEUED'
assert client.execute_command('DEL M1') == b'QUEUED'
assert client.execute_command('BF.EXISTS M1 V1') == b'QUEUED'
assert client.execute_command('EXEC') == [1, 1, 1, 1, 0]
self.verify_bloom_filter_existence(client, 'M2', 'V2')
self.verify_bloom_filter_existence(client, 'M1', 'V1', should_exist=False)
self.verify_key_number(client, 1)

def test_bloom_lua(self):
client = self.server.get_new_client()
# lua
load_filter = """
redis.call('BF.ADD', 'LUA1', 'ITEM1');
redis.call('BF.ADD', 'LUA2', 'ITEM2');
redis.call('BF.MADD', 'LUA2', 'ITEM3', 'ITEM4', 'ITEM5');
"""
client.eval(load_filter, 0)
assert client.execute_command('BF.MEXISTS LUA2 ITEM1 ITEM3 ITEM4') == [0, 1, 1]
self.verify_key_number(client, 2)

def test_bloom_deletes(self):
client = self.server.get_new_client()
# delete
assert client.execute_command('BF.ADD filter1 item1') == 1
self.verify_bloom_filter_existence(client, 'filter1', 'item1')
self.verify_key_number(client, 1)
assert client.execute_command('DEL filter1') == 1
self.verify_bloom_filter_existence(client, 'filter1', 'item1', should_exist=False)
self.verify_key_number(client, 0)

# flush
self.insert_bloom_filter(client, number_of_bf=10)
self.verify_key_number(client, 10)
assert client.execute_command('FLUSHALL')
self.verify_key_number(client, 0)

# unlink
assert client.execute_command('BF.ADD A ITEMA') == 1
assert client.execute_command('BF.ADD B ITEMB') == 1
self.verify_bloom_filter_existence(client, 'A', 'ITEMA')
self.verify_bloom_filter_existence(client, 'B', 'ITEMB')
self.verify_bloom_filter_existence(client, 'C', 'ITEMC', should_exist=False)
self.verify_key_number(client, 2)
assert client.execute_command('UNLINK A B C') == 2
assert client.execute_command('BF.MEXISTS A ITEMA ITEMB') == [0, 0]
self.verify_bloom_filter_existence(client, 'A', 'ITEMA', should_exist=False)
self.verify_bloom_filter_existence(client, 'B', 'ITEMB', should_exist=False)
self.verify_key_number(client, 0)

def test_bloom_expiration(self):
client = self.server.get_new_client()
# expiration
# cmd object idletime
self.verify_key_number(client, 0)
assert client.execute_command('BF.ADD TEST_IDLE val3') == 1
self.verify_bloom_filter_existence(client, 'TEST_IDLE', 'val3')
self.verify_key_number(client, 1)
time.sleep(1)
assert client.execute_command('OBJECT IDLETIME test_idle') == None
assert client.execute_command('OBJECT IDLETIME TEST_IDLE') > 0
# cmd ttl, expireat
assert client.execute_command('BF.ADD TEST_EXP ITEM') == 1
assert client.execute_command('TTL TEST_EXP') == -1
self.verify_bloom_filter_existence(client, 'TEST_EXP', 'ITEM')
self.verify_key_number(client, 2)
curr_time = int(time.time())
assert client.execute_command(f'EXPIREAT TEST_EXP {curr_time + 5}') == 1
wait_for_equal(lambda: client.execute_command('BF.EXISTS TEST_EXP ITEM'), 0)
self.verify_key_number(client, 1)
# cmd persist
assert client.execute_command('BF.ADD TEST_PERSIST ITEM') == 1
assert client.execute_command('TTL TEST_PERSIST') == -1
self.verify_bloom_filter_existence(client, 'TEST_PERSIST', 'ITEM')
self.verify_key_number(client, 2)
assert client.execute_command(f'EXPIREAT TEST_PERSIST {curr_time + 100000}') == 1
assert client.execute_command('TTL TEST_PERSIST') > 0
assert client.execute_command('PERSIST TEST_PERSIST') == 1
assert client.execute_command('TTL TEST_PERSIST') == -1
Loading

0 comments on commit b9cd93f

Please sign in to comment.