Skip to content

Commit

Permalink
Increase parallelism using channels
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Dec 14, 2024
1 parent c3a74d5 commit a4af360
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions ipa-core/src/query/runner/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::{
};

use futures::{stream::iter, StreamExt, TryStreamExt};
use futures_util::TryFutureExt;
use generic_array::ArrayLength;
use tokio_stream::wrappers::ReceiverStream;

use super::QueryResult;
use crate::{
Expand Down Expand Up @@ -47,7 +49,7 @@ use crate::{
replicated::semi_honest::AdditiveShare as Replicated, BitDecomposed, TransposeFrom,
Vectorizable,
},
seq_join::seq_join,
seq_join::{assert_send, seq_join},
sharding::{ShardConfiguration, Sharded},
};

Expand Down Expand Up @@ -111,7 +113,7 @@ where
phantom_data: _,
} = self;

tracing::info!("New hybrid query: {config:?} with parallel decryption");
tracing::info!("New hybrid query: {config:?} with parallel decryption using channels");
let ctx = ctx.narrow(&Hybrid);
let sz = usize::from(query_size);

Expand Down Expand Up @@ -149,11 +151,16 @@ where
})
.take(sz);

let (decrypted_reports, resharded_tags) = reshard_aad(
ctx.narrow(&HybridStep::ReshardByTag),
seq_join(ctx.active_work(), stream),
|ctx, _, tag| tag.shard_picker(ctx.shard_count()),
)
let (tx, rx) = tokio::sync::mpsc::channel(ctx.active_work().get());
let (_, (decrypted_reports, resharded_tags)) = assert_send(futures::future::try_join(
seq_join(ctx.active_work(), stream)
.try_for_each(|(report, tag)| tx.send((report, tag)).map_err(|_| Error::Internal)),
reshard_aad(
ctx.narrow(&HybridStep::ReshardByTag),
ReceiverStream::new(rx).map(Ok),
|ctx, _, tag| tag.shard_picker(ctx.shard_count()),
),
))
.await?;

let mut unique_encrypted_hybrid_reports = UniqueTagValidator::new(resharded_tags.len());
Expand Down

0 comments on commit a4af360

Please sign in to comment.