Skip to content

Commit

Permalink
Remove async from per-user inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Dec 7, 2023
1 parent 5e7e859 commit c145189
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{num::NonZeroU32, ops::Not, pin::pin};
use std::{num::NonZeroU32, ops::Not};

use futures::{stream::iter as stream_iter, TryStreamExt};
use futures_util::{
Expand Down Expand Up @@ -450,7 +450,7 @@ where
collected.sort_by(|a, b| std::cmp::Ord::cmp(&b.len(), &a.len()));

// Convert to a stream of async futures that represent the result of executing the per-user circuit
let stream_of_per_user_circuits = pin!(stream_iter(collected).then(|rows_for_user| {
let stream_of_per_user_circuits = collected.into_iter().map(|rows_for_user| {
let num_user_rows = rows_for_user.len();
let contexts = ctx_for_row_number[..num_user_rows - 1].to_owned();
let record_ids = record_id_for_row_depth[..num_user_rows].to_owned();
Expand All @@ -465,10 +465,10 @@ where
rows_for_user,
attribution_window_seconds,
)
}));
});

// Execute all of the async futures (sequentially), and flatten the result
let flattenned_stream = stream_of_per_user_circuits
let flattenned_stream = seq_join(binary_m_ctx.active_work(), stream_iter(stream_of_per_user_circuits))
.flat_map(|x| stream_iter(x.unwrap()));

// modulus convert breakdown keys and trigger values
Expand Down

0 comments on commit c145189

Please sign in to comment.