From 97e5babbc18f103835dcb02783ec7268621976ea Mon Sep 17 00:00:00 2001 From: zackcam Date: Wed, 12 Feb 2025 20:38:13 +0000 Subject: [PATCH] Adding usage of must_obey_client in wrapper to improve performace Signed-off-by: zackcam --- .github/workflows/ci.yml | 14 ++++++++++++-- Cargo.toml | 3 ++- README.md | 2 +- build.sh | 10 ++++++++-- src/bloom/command_handler.rs | 11 +++++------ src/wrapper/mod.rs | 33 +++++++++++++++++++++++++++++++++ tests/test_bloom_correctness.py | 6 +++--- 7 files changed, 64 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bb5cea8..a5e1c53 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,7 +24,12 @@ jobs: cargo fmt --check cargo clippy --profile release --all-targets -- -D clippy::all - name: Release Build - run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release + run: | + if [ "${{ matrix.server_version }}" = "8.0.0" ]; then + RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0 + else + RUSTFLAGS="-D warnings" cargo build --all --all-targets --release + fi - name: Run unit tests run: cargo test --features enable-system-alloc - name: Make valkey-server binary @@ -77,7 +82,12 @@ jobs: cargo fmt --check cargo clippy --profile release --all-targets -- -D clippy::all - name: Release Build - run: RUSTFLAGS="-D warnings" cargo build --all --all-targets --release + run: | + if [ "${{ matrix.server_version }}" = "8.0.0" ]; then + RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0 + else + RUSTFLAGS="-D warnings" cargo build --all --all-targets --release + fi - name: Run unit tests run: cargo test --features enable-system-alloc - name: Make Valkey-server binary with asan diff --git a/Cargo.toml b/Cargo.toml index 1133455..e4ed86c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -valkey-module = { version = "0.1.3", features = ["min-valkey-compatibility-version-8-0", "min-redis-compatibility-version-7-2"]} +valkey-module = { version = "0.1.4", features = ["min-valkey-compatibility-version-8-0", "min-redis-compatibility-version-7-2"]} valkey-module-macros = "0" linkme = "0" bloomfilter = { version = "3.0.1", features = ["serde"] } @@ -38,3 +38,4 @@ debug-assertions = true default = ["min-valkey-compatibility-version-8-0"] enable-system-alloc = ["valkey-module/enable-system-alloc"] min-valkey-compatibility-version-8-0 = [] +valkey_8_0 = [] # Empty feature flag for Valkey 8.0 diff --git a/README.md b/README.md index e1c73c7..3c130f2 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Valkey-Bloom (BSD-3-Clause) is a Rust Valkey-Module which brings a native and space efficient probabilistic Module data type to Valkey. With this, users can create filters (space-efficient probabilistic Module data type) to add elements, perform “check” operation to test whether an element exists, auto scale their filters, perform RDB Save and load operations, etc. -Valkey-Bloom is built using bloomfilter::Bloom (https://crates.io/crates/bloomfilter which has a BSD-2-Clause license). +Valkey-Bloom is built using `bloomfilter::Bloom` (https://crates.io/crates/bloomfilter which has a BSD-2-Clause license). It is compatible with the BloomFilter (BF.*) command APIs in Redis offerings. diff --git a/build.sh b/build.sh index f250c9d..4c0a53f 100755 --- a/build.sh +++ b/build.sh @@ -12,8 +12,6 @@ echo "Running cargo and clippy format checks..." cargo fmt --check cargo clippy --profile release --all-targets -- -D clippy::all -echo "Running cargo build release..." -RUSTFLAGS="-D warnings" cargo build --all --all-targets --release echo "Running unit tests..." cargo test --features enable-system-alloc @@ -29,6 +27,14 @@ if [ "$SERVER_VERSION" != "unstable" ] && [ "$SERVER_VERSION" != "8.0.0" ] ; the exit 1 fi +echo "Running cargo build release..." +if [ "$SERVER_VERSION" == "8.0.0" ] ; then + RUSTFLAGS="-D warnings" cargo build --all --all-targets --release --features valkey_8_0 +else + RUSTFLAGS="-D warnings" cargo build --all --all-targets --release +fi + + REPO_URL="https://github.com/valkey-io/valkey.git" BINARY_PATH="tests/.build/binaries/$SERVER_VERSION/valkey-server" diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 05a693d..a4a6e49 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -6,11 +6,10 @@ use crate::configs::{ BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN, }; +use crate::wrapper::must_obey_client; use std::sync::atomic::Ordering; -use valkey_module::ContextFlags; use valkey_module::NotifyEvent; use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK}; - /// Helper function used to add items to a bloom object. It handles both multi item and single item add operations. /// It is used by any command that allows adding of items: BF.ADD, BF.MADD, and BF.INSERT. /// Returns the result of the item add operation on success as a ValkeyValue and a ValkeyError on failure. @@ -177,7 +176,7 @@ pub fn bloom_filter_add_value( } }; // Skip bloom filter size validation on replicated cmds. - let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); + let validate_size_limit = !must_obey_client(ctx); let mut add_succeeded = false; match value { Some(bloom) => { @@ -404,7 +403,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke false => (Some(configs::FIXED_SEED), false), }; // Skip bloom filter size validation on replicated cmds. - let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); + let validate_size_limit = !must_obey_client(ctx); let tightening_ratio = *configs::BLOOM_TIGHTENING_F64 .lock() .expect("Unable to get a lock on tightening ratio static"); @@ -615,7 +614,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; // Skip bloom filter size validation on replicated cmds. - let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); + let validate_size_limit = !must_obey_client(ctx); let mut add_succeeded = false; match value { Some(bloom) => { @@ -811,7 +810,7 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe None => { // if filter not exists, create it. let hex = value.to_vec(); - let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); + let validate_size_limit = !must_obey_client(ctx); let bloom = match BloomObject::decode_object(&hex, validate_size_limit) { Ok(v) => v, Err(err) => { diff --git a/src/wrapper/mod.rs b/src/wrapper/mod.rs index d198382..5028318 100644 --- a/src/wrapper/mod.rs +++ b/src/wrapper/mod.rs @@ -1,2 +1,35 @@ +use valkey_module::{Context, ContextFlags}; + pub mod bloom_callback; pub mod defrag; + +/// Wrapper for the ValkeyModule_MustObeyClient function. +/// Takes in an Context and returns true if the if commands are arriving +/// from the primary client or AOF client and should never be rejected. +/// False otherwise. +pub fn must_obey_client(ctx: &Context) -> bool { + // If we are using valkey 8.0 then we cannot use ValkeyModule_MustObeyClient so must go back to the default + // of checking for the replicated flag + #[cfg(not(feature = "valkey_8_0"))] + { + let ctx_raw = ctx.get_raw() as *mut valkey_module::ValkeyModuleCtx; + + match unsafe { valkey_module::raw::ValkeyModule_MustObeyClient } { + Some(func) => { + let status = unsafe { func(ctx_raw) as isize }; + match status { + 1 => true, + 0 => false, + _ => panic!("We do not expect ValkeyModule_MustObeyClient to return anything other than 1 or 0."), + } + } + // Fallback to checking for replicated flag in the GetContextFlags API as a best effort. + None => ctx.get_flags().contains(ContextFlags::REPLICATED), + } + } + + #[cfg(feature = "valkey_8_0")] + { + ctx.get_flags().contains(ContextFlags::REPLICATED) + } +} diff --git a/tests/test_bloom_correctness.py b/tests/test_bloom_correctness.py index 798c43a..f858ef5 100644 --- a/tests/test_bloom_correctness.py +++ b/tests/test_bloom_correctness.py @@ -135,7 +135,7 @@ def test_scaling_filter(self): def test_max_and_validate_scale_to_correctness(self): validate_scale_to_commands = [ - ('BF.INSERT key ERROR 0.00000001 VALIDATESCALETO 13107101', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ), + ('BF.INSERT MemLimitKey EXPANSION 25 ERROR 0.00000000000000001 VALIDATESCALETO 1627601', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ), ('BF.INSERT key EXPANSION 1 VALIDATESCALETO 101601', "provided VALIDATESCALETO causes false positive to degrade to 0" ) ] for cmd in validate_scale_to_commands: @@ -144,12 +144,12 @@ def test_max_and_validate_scale_to_correctness(self): assert False, "Expect BF.INSERT to fail if the wanted capacity would cause an error" except Exception as e: assert cmd[1] == str(e), f"Unexpected error message: {e}" - self.client.execute_command('BF.INSERT MemLimitKey ERROR 0.00000001 VALIDATESCALETO 13107100') + self.client.execute_command('BF.INSERT MemLimitKey EXPANSION 25 ERROR 0.00000000000000001 VALIDATESCALETO 1627600') self.client.execute_command('BF.INSERT FPKey VALIDATESCALETO 101600 EXPANSION 1') FPKey_max_capacity = self.client.execute_command(f'BF.INFO FPKey MAXSCALEDCAPACITY') MemLimitKeyMaxCapacity = self.client.execute_command(f'BF.INFO MemLimitKey MAXSCALEDCAPACITY') self.add_items_till_capacity(self.client, "FPKey", 101600, 1, "item") - self.add_items_till_capacity(self.client, "MemLimitKey", 13107100, 1, "item") + self.add_items_till_capacity(self.client, "MemLimitKey", 1627600, 1, "item") key_names = [("MemLimitKey", MemLimitKeyMaxCapacity, "operation exceeds bloom object memory limit"), ("FPKey", FPKey_max_capacity, "false positive degrades to 0 on scale out")] for key in key_names: try: