Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Query input from URL on report collector side + integration test #1510

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 120 additions & 33 deletions ipa-core/src/bin/report_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
fmt::Debug,
fs::{File, OpenOptions},
io,
io::{stdout, BufReader, Write},
io::{stdout, BufRead, BufReader, Write},
iter::zip,
ops::Deref,
path::{Path, PathBuf},
};

use clap::{Parser, Subcommand};
use hyper::http::uri::Scheme;
use hyper::{http::uri::Scheme, Uri};
use ipa_core::{
cli::{
playbook::{
Expand All @@ -24,11 +25,13 @@
ff::{boolean_array::BA32, FieldType},
helpers::{
query::{
DpMechanism, HybridQueryParams, IpaQueryConfig, QueryConfig, QuerySize, QueryType,
DpMechanism, HybridQueryParams, IpaQueryConfig, QueryConfig, QueryInput, QuerySize,
QueryType,
},
BodyStream,
},
net::{Helper, IpaHttpClient},
protocol::QueryId,
report::{EncryptedOprfReportStreams, DEFAULT_KEY_ID},
test_fixture::{
ipa::{ipa_in_the_clear, CappingOrder, IpaSecurityModel, TestRawDataRecord},
Expand Down Expand Up @@ -143,7 +146,14 @@
},
MaliciousHybrid {
#[clap(flatten)]
encrypted_inputs: EncryptedInputs,
encrypted_inputs: Option<EncryptedInputs>,

#[arg(
long,
help = "Read the list of URLs that contain the input from the provided file",
conflicts_with_all = ["enc_input_file1", "enc_input_file2", "enc_input_file3"]
)]
url_file_list: Option<PathBuf>,

#[clap(flatten)]
hybrid_query_config: HybridQueryParams,
Expand Down Expand Up @@ -267,6 +277,7 @@
}
ReportCollectorCommand::MaliciousHybrid {
ref encrypted_inputs,
ref url_file_list,

Check warning on line 280 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L280

Added line #L280 was not covered by tests
hybrid_query_config,
count,
set_fixed_polling_ms,
Expand All @@ -275,7 +286,19 @@
&args,
hybrid_query_config,
clients,
encrypted_inputs,
|query_id| {

Check warning on line 289 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L289

Added line #L289 was not covered by tests
if let Some(ref url_file_list) = url_file_list {
inputs_from_url_file(url_file_list, query_id, args.shard_count)
} else if let Some(ref encrypted_inputs) = encrypted_inputs {
Ok(inputs_from_encrypted_inputs(
encrypted_inputs,
query_id,
args.shard_count,
))

Check warning on line 297 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L294-L297

Added lines #L294 - L297 were not covered by tests
} else {
panic!("Either --url-file-list or --enc-input-file1, --enc-input-file2, and --enc-input-file3 must be provided");
}
},
count.try_into().expect("u32 should fit into usize"),
set_fixed_polling_ms,
)
Expand All @@ -286,6 +309,95 @@
Ok(())
}

fn inputs_from_url_file(
url_file_path: &Path,
query_id: QueryId,
shard_count: usize,
) -> Result<Vec<[QueryInput; 3]>, Box<dyn Error>> {
let mut file = BufReader::new(File::open(url_file_path)?);
let mut buf = String::new();
let mut inputs = [Vec::new(), Vec::new(), Vec::new()];
for helper_input in inputs.iter_mut() {
for _ in 0..shard_count {
buf.clear();
if file.read_line(&mut buf)? == 0 {
break;
}
helper_input
.push(Uri::try_from(buf.trim()).map_err(|e| format!("Invalid URL {buf:?}: {e}"))?);

Check warning on line 327 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L312-L327

Added lines #L312 - L327 were not covered by tests
}
}

// make sure all helpers have the expected number of inputs (one per shard)
let all_rows = inputs.iter().map(|v| v.len()).sum::<usize>();
if all_rows != 3 * shard_count {
return Err(format!(
"The number of URLs in {url_file_path:?} '{all_rows}' is less than 3*{shard_count}."
)
.into());
}

let [h1, h2, h3] = inputs;
Ok(zip(zip(h1, h2), h3)
.map(|((h1, h2), h3)| {
[
QueryInput::FromUrl {
url: h1.to_string(),
query_id,
},
QueryInput::FromUrl {
url: h2.to_string(),
query_id,
},
QueryInput::FromUrl {
url: h3.to_string(),
query_id,
},
]
})
.collect())
}

Check warning on line 359 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L332-L359

Added lines #L332 - L359 were not covered by tests

fn inputs_from_encrypted_inputs(
encrypted_inputs: &EncryptedInputs,
query_id: QueryId,
shard_count: usize,
) -> Vec<[QueryInput; 3]> {
let [h1_streams, h2_streams, h3_streams] = [
&encrypted_inputs.enc_input_file1,
&encrypted_inputs.enc_input_file2,
&encrypted_inputs.enc_input_file3,
]
.map(|path| {
let file = File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
RoundRobinSubmission::new(BufReader::new(file))
})
.map(|s| s.into_byte_streams(shard_count));

// create byte streams for each shard
h1_streams
.into_iter()
.zip(h2_streams)
.zip(h3_streams)
.map(|((s1, s2), s3)| {
[
QueryInput::Inline {
input_stream: BodyStream::from_bytes_stream(s1),
query_id,
},
QueryInput::Inline {
input_stream: BodyStream::from_bytes_stream(s2),
query_id,
},
QueryInput::Inline {
input_stream: BodyStream::from_bytes_stream(s3),
query_id,
},
]
})
.collect::<Vec<_>>()
}

Check warning on line 399 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L361-L399

Added lines #L361 - L399 were not covered by tests

fn gen_hybrid_inputs(
count: u32,
seed: Option<u64>,
Expand Down Expand Up @@ -422,41 +534,16 @@
Ok(())
}

async fn hybrid(
async fn hybrid<F: FnOnce(QueryId) -> Result<Vec<[QueryInput; 3]>, Box<dyn Error>>>(

Check warning on line 537 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L537

Added line #L537 was not covered by tests
args: &Args,
hybrid_query_config: HybridQueryParams,
helper_clients: Vec<[IpaHttpClient<Helper>; 3]>,
encrypted_inputs: &EncryptedInputs,
make_inputs_fn: F,

Check warning on line 541 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L541

Added line #L541 was not covered by tests
count: usize,
set_fixed_polling_ms: Option<u64>,
) -> Result<(), Box<dyn Error>> {
let query_type = QueryType::MaliciousHybrid(hybrid_query_config);

let [h1_streams, h2_streams, h3_streams] = [
&encrypted_inputs.enc_input_file1,
&encrypted_inputs.enc_input_file2,
&encrypted_inputs.enc_input_file3,
]
.map(|path| {
let file = File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
RoundRobinSubmission::new(BufReader::new(file))
})
.map(|s| s.into_byte_streams(args.shard_count));

// create byte streams for each shard
let submissions = h1_streams
.into_iter()
.zip(h2_streams.into_iter())
.zip(h3_streams.into_iter())
.map(|((s1, s2), s3)| {
[
BodyStream::from_bytes_stream(s1),
BodyStream::from_bytes_stream(s2),
BodyStream::from_bytes_stream(s3),
]
})
.collect::<Vec<_>>();

let query_config = QueryConfig {
size: QuerySize::try_from(count).unwrap(),
field_type: FieldType::Fp32BitPrime,
Expand All @@ -469,6 +556,7 @@
.expect("Unable to create query!");

tracing::info!("Starting query for OPRF");
let submissions = make_inputs_fn(query_id)?;

Check warning on line 559 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L559

Added line #L559 was not covered by tests

// the value for histogram values (BA32) must be kept in sync with the server-side
// implementation, otherwise a runtime reconstruct error will be generated.
Expand All @@ -477,7 +565,6 @@
submissions,
count,
helper_clients,
query_id,
hybrid_query_config,
set_fixed_polling_ms,
)
Expand Down
27 changes: 12 additions & 15 deletions ipa-core/src/cli/playbook/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@

use crate::{
ff::{Serializable, U128Conversions},
helpers::{
query::{HybridQueryParams, QueryInput, QuerySize},
BodyStream,
},
helpers::query::{HybridQueryParams, QueryInput, QuerySize},
net::{Helper, IpaHttpClient},
protocol::QueryId,
query::QueryStatus,
secret_sharing::{replicated::semi_honest::AdditiveShare, SharedValue},
test_fixture::Reconstruct,
Expand All @@ -26,30 +22,31 @@
/// if results are invalid
#[allow(clippy::disallowed_methods)] // allow try_join_all
pub async fn run_hybrid_query_and_validate<HV>(
inputs: Vec<[BodyStream; 3]>,
inputs: Vec<[QueryInput; 3]>,

Check warning on line 25 in ipa-core/src/cli/playbook/hybrid.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/cli/playbook/hybrid.rs#L25

Added line #L25 was not covered by tests
query_size: usize,
clients: Vec<[IpaHttpClient<Helper>; 3]>,
query_id: QueryId,
query_config: HybridQueryParams,
set_fixed_polling_ms: Option<u64>,
) -> HybridQueryResult
where
HV: SharedValue + U128Conversions,
AdditiveShare<HV>: Serializable,
{
let query_id = inputs
.first()
.map(|v| v[0].query_id())
.expect("At least one shard must be used to run a Hybrid query");

Check warning on line 38 in ipa-core/src/cli/playbook/hybrid.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/cli/playbook/hybrid.rs#L35-L38

Added lines #L35 - L38 were not covered by tests
let mpc_time = Instant::now();
assert_eq!(clients.len(), inputs.len());
// submit inputs to each shard
let _ = try_join_all(zip(clients.iter(), inputs.into_iter()).map(
|(shard_clients, shard_inputs)| {
try_join_all(shard_clients.iter().zip(shard_inputs.into_iter()).map(
|(client, input)| {
client.query_input(QueryInput::Inline {
query_id,
input_stream: input,
})
},
))
try_join_all(
shard_clients
.iter()
.zip(shard_inputs.into_iter())
.map(|(client, input)| client.query_input(input)),
)

Check warning on line 49 in ipa-core/src/cli/playbook/hybrid.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/cli/playbook/hybrid.rs#L44-L49

Added lines #L44 - L49 were not covered by tests
},
))
.await
Expand Down
Loading
Loading