From c3a74d5f8783ceb7f201a855bd9fcdd2ed14852c Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Sat, 14 Dec 2024 00:16:43 -0800 Subject: [PATCH] Use actual runtime instead of a single thread --- ipa-core/src/query/executor.rs | 36 ++++++++++++++++------------- ipa-core/src/query/runner/hybrid.rs | 35 ++++++++++++++++------------ 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/ipa-core/src/query/executor.rs b/ipa-core/src/query/executor.rs index d626e12bf..3e20bb37e 100644 --- a/ipa-core/src/query/executor.rs +++ b/ipa-core/src/query/executor.rs @@ -165,22 +165,26 @@ pub fn execute( ) }, ), - (QueryType::MaliciousHybrid(ipa_config), _) => do_query( - runtime, - config, - gateway, - input, - move |prss, gateway, config, input| { - Box::pin(execute_hybrid_protocol( - prss, - gateway, - input, - ipa_config, - config, - key_registry, - )) - }, - ), + (QueryType::MaliciousHybrid(ipa_config), _) => { + let protocol_runtime = runtime.clone(); + do_query( + runtime, + config, + gateway, + input, + move |prss, gateway, config, input| { + Box::pin(execute_hybrid_protocol( + prss, + gateway, + input, + ipa_config, + config, + key_registry, + protocol_runtime, + )) + }, + ) + } } } diff --git a/ipa-core/src/query/runner/hybrid.rs b/ipa-core/src/query/runner/hybrid.rs index ef17a9638..b3d88e2ac 100644 --- a/ipa-core/src/query/runner/hybrid.rs +++ b/ipa-core/src/query/runner/hybrid.rs @@ -11,6 +11,7 @@ use generic_array::ArrayLength; use super::QueryResult; use crate::{ error::{Error, LengthError}, + executor::IpaRuntime, ff::{ boolean::Boolean, boolean_array::{BooleanArray, BA3, BA32, BA8}, @@ -54,15 +55,17 @@ use crate::{ pub struct Query { config: HybridQueryParams, key_registry: Arc, + ipa_runtime: IpaRuntime, phantom_data: PhantomData<(C, HV)>, } #[allow(dead_code)] impl Query { - pub fn new(query_params: HybridQueryParams, key_registry: Arc) -> Self { + pub fn new(query_params: HybridQueryParams, key_registry: Arc, runtime: IpaRuntime) -> Self { Self { config: query_params, key_registry, + ipa_runtime: runtime, phantom_data: PhantomData, } } @@ -104,10 +107,11 @@ where let Self { config, key_registry, + ipa_runtime, phantom_data: _, } = self; - tracing::info!("New hybrid query: {config:?}"); + tracing::info!("New hybrid query: {config:?} with parallel decryption"); let ctx = ctx.narrow(&Hybrid); let sz = usize::from(query_size); @@ -123,21 +127,21 @@ where .try_flatten() .map(|enc_report| { let key_registry = Arc::clone(&key_registry); + let ipa_runtime = ipa_runtime.clone(); async move { match enc_report { Ok(enc_report) => { - let r = tokio::spawn({ - async move { - let dec_report = enc_report - .decrypt(key_registry.as_ref()) - .map_err(Into::::into); - let unique_tag = UniqueTag::from_unique_bytes(&enc_report); - dec_report.map(|dec_report1| (dec_report1, unique_tag)) - } - }) - .await - .map_err(Into::::into)?; - r + ipa_runtime + .spawn({ + async move { + let dec_report = enc_report + .decrypt(key_registry.as_ref()) + .map_err(Into::::into); + let unique_tag = UniqueTag::from_unique_bytes(&enc_report); + dec_report.map(|dec_report1| (dec_report1, unique_tag)) + } + }) + .await } Err(e) => Err(e), } @@ -187,6 +191,7 @@ pub async fn execute_hybrid_protocol<'a, R: PrivateKeyRegistry>( ipa_config: HybridQueryParams, config: &QueryConfig, key_registry: Arc, + ipa_runtime: IpaRuntime, ) -> QueryResult { let gate = Gate::default(); let cross_shard_prss = @@ -200,7 +205,7 @@ pub async fn execute_hybrid_protocol<'a, R: PrivateKeyRegistry>( let ctx = ShardedMaliciousContext::new_with_gate(prss, gateway, gate, sharded); Ok(Box::new( - Query::<_, BA32, R>::new(ipa_config, key_registry) + Query::<_, BA32, R>::new(ipa_config, key_registry, ipa_runtime) .execute(ctx, config.size, input) .await?, ))