Skip to content

Commit

Permalink
Merge from main
Browse files Browse the repository at this point in the history
  • Loading branch information
akoshelev committed Jan 8, 2025
2 parents a8b344b + c45ef9b commit 8b953ab
Show file tree
Hide file tree
Showing 56 changed files with 1,466 additions and 254 deletions.
24 changes: 12 additions & 12 deletions ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ default = [
"tracing/max_level_trace",
"tracing/release_max_level_info",
"stall-detection",
"ipa-prf",
"descriptive-gate",
]
cli = ["comfy-table", "clap", "num_cpus"]
Expand All @@ -42,6 +41,7 @@ web-app = [
"rustls",
"rustls-pemfile",
"time",
"tiny_http",
"tokio-rustls",
"toml",
"tower",
Expand All @@ -63,7 +63,9 @@ enable-benches = ["cli", "in-memory-infra", "test-fixture", "criterion", "iai"]
# of unit tests use it. Real world infra uses HTTP implementation and is suitable for integration/e2e tests
in-memory-infra = []
real-world-infra = []
dhat-heap = ["cli", "test-fixture"]
# Force use of jemalloc on non-Linux platforms. jemalloc is used by default on Linux.
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
dhat-heap = ["cli", "dhat", "test-fixture"]
# Enable this feature to enable our colossally weak Fp31.
weak-field = []
# Enable using more than one thread for protocol execution. Most of the parallelism occurs at parallel/seq_join operations
Expand All @@ -72,14 +74,6 @@ multi-threading = ["async-scoped"]
# RUSTFLAGS="--cfg tokio_unstable" cargo run ... --features="tokio-console ...".
# Note that if there are other flags enabled on your platform in .cargo/config.toml, you need to include them as well.
tokio-console = ["console-subscriber", "tokio/tracing"]

# If this flag is used, then the new breakdown reveal based aggregation is used
reveal-aggregation = []
# Standalone aggregation protocol. We use IPA infra for communication
# but it has nothing to do with IPA.
aggregate-circuit = []
# IPA protocol based on OPRF
ipa-prf = []
# relaxed DP, off by default
relaxed-dp = []

Expand All @@ -88,6 +82,7 @@ ipa-metrics = { path = "../ipa-metrics" }
ipa-metrics-tracing = { optional = true, path = "../ipa-metrics-tracing" }
ipa-step = { version = "*", path = "../ipa-step" }
ipa-step-derive = { version = "*", path = "../ipa-step-derive" }
ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" }

aes = "0.8.3"
async-trait = "0.1.79"
Expand All @@ -111,7 +106,7 @@ criterion = { version = "0.5.1", optional = true, default-features = false, feat
curve25519-dalek = "4.1.1"
dashmap = "5.4"
delegate = "0.10.0"
dhat = "0.3.2"
dhat = { version = "0.3.2", optional = true }
embed-doc-image = "0.1.4"
futures = "0.3.28"
futures-util = "0.3.28"
Expand Down Expand Up @@ -144,7 +139,10 @@ sha2 = "0.10"
shuttle-crate = { package = "shuttle", version = "0.6.1", optional = true }
subtle = "2.6"
thiserror = "1.0"
tikv-jemallocator = { version = "0.6", optional = true, features = ["profiling"] }
tikv-jemalloc-ctl = { version = "0.6", optional = true, features = ["stats"] }
time = { version = "0.3", optional = true }
tiny_http = { version = "0.12", optional = true }
tokio = { version = "1.42", features = ["fs", "rt", "rt-multi-thread", "macros"] }
tokio-rustls = { version = "0.26", optional = true }
tokio-stream = "0.1.14"
Expand All @@ -158,7 +156,8 @@ typenum = { version = "1.17", features = ["i128"] }
x25519-dalek = "2.0.0-rc.3"

[target.'cfg(all(not(target_env = "msvc"), not(target_os = "macos")))'.dependencies]
tikv-jemallocator = "0.5.0"
tikv-jemallocator = { version = "0.6", features = ["profiling"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }

[build-dependencies]
cfg_aliases = "0.1.1"
Expand All @@ -176,6 +175,7 @@ rustls = { version = "0.23" }
tempfile = "3"
ipa-metrics-tracing = { path = "../ipa-metrics-tracing" }
ipa-metrics = { path = "../ipa-metrics", features = ["partitions"] }
ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" }

[lib]
path = "src/lib.rs"
Expand Down
19 changes: 7 additions & 12 deletions ipa-core/benches/oneshot/ipa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,6 @@ use ipa_step::StepNarrow;
use rand::{random, rngs::StdRng, SeedableRng};
use tokio::runtime::Builder;

#[cfg(all(
not(target_env = "msvc"),
not(feature = "dhat-heap"),
not(target_os = "macos")
))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

/// A benchmark for the full IPA protocol.
#[derive(Parser)]
#[command(about, long_about = None)]
Expand Down Expand Up @@ -169,6 +157,13 @@ async fn run(args: Args) -> Result<(), Error> {
}

fn main() -> Result<(), Error> {
#[cfg(jemalloc)]
ipa_core::use_jemalloc!();

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

Expand Down
11 changes: 11 additions & 0 deletions ipa-core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,21 @@ fn main() {
descriptive_gate: { all(not(feature = "compact-gate"), feature = "descriptive-gate") },
unit_test: { all(not(feature = "shuttle"), feature = "in-memory-infra", descriptive_gate) },
web_test: { all(not(feature = "shuttle"), feature = "real-world-infra") },
jemalloc: { all(
not(feature = "dhat-heap"),
any(
feature = "jemalloc",
all(
not(target_env = "msvc"),
not(target_os = "macos")
)
)
) },
}
println!("cargo::rustc-check-cfg=cfg(descriptive_gate)");
println!("cargo::rustc-check-cfg=cfg(compact_gate)");
println!("cargo::rustc-check-cfg=cfg(unit_test)");
println!("cargo::rustc-check-cfg=cfg(web_test)");
println!("cargo::rustc-check-cfg=cfg(jemalloc)");
println!("cargo::rustc-check-cfg=cfg(coverage)");
}
33 changes: 26 additions & 7 deletions ipa-core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Weak;
use async_trait::async_trait;

use crate::{
cli::LoggingHandle,
executor::IpaRuntime,
helpers::{
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput},
Expand Down Expand Up @@ -65,6 +66,7 @@ struct Inner {
/// the flamegraph
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
logging_handle: LoggingHandle,
}

impl Setup {
Expand Down Expand Up @@ -96,11 +98,13 @@ impl Setup {
self,
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
logging_handle: LoggingHandle,
) -> HelperApp {
let app = Arc::new(Inner {
query_processor: self.query_processor,
mpc_transport,
shard_transport,
logging_handle,
});
self.mpc_handler
.set_handler(Arc::downgrade(&app) as Weak<dyn RequestHandler<HelperIdentity>>);
Expand Down Expand Up @@ -136,12 +140,24 @@ impl HelperApp {
///
/// ## Errors
/// Propagates errors from the helper.
/// ## Panics
/// If `input` asks to obtain query input from a remote URL.
pub fn execute_query(&self, input: QueryInput) -> Result<(), ApiError> {
let mpc_transport = self.inner.mpc_transport.clone_ref();
let shard_transport = self.inner.shard_transport.clone_ref();
self.inner
.query_processor
.receive_inputs(mpc_transport, shard_transport, input)?;
let QueryInput::Inline {
query_id,
input_stream,
} = input
else {
panic!("this client does not support pulling query input from a URL");
};
self.inner.query_processor.receive_inputs(
mpc_transport,
shard_transport,
query_id,
input_stream,
)?;
Ok(())
}

Expand Down Expand Up @@ -254,10 +270,8 @@ impl RequestHandler<HelperIdentity> for Inner {
HelperResponse::from(qp.receive_inputs(
Transport::clone_ref(&self.mpc_transport),
Transport::clone_ref(&self.shard_transport),
QueryInput {
query_id,
input_stream: data,
},
query_id,
data,
)?)
}
RouteId::QueryStatus => {
Expand All @@ -277,6 +291,11 @@ impl RequestHandler<HelperIdentity> for Inner {
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.kill(query_id)?)
}
RouteId::Metrics => {
let logging_handler = &self.logging_handle;
let metrics_handle = &logging_handler.metrics_handle;
HelperResponse::from(metrics_handle.scrape_metrics())
}
})
}
}
13 changes: 8 additions & 5 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ use ipa_core::{
use tokio::runtime::Runtime;
use tracing::{error, info};

#[cfg(all(not(target_env = "msvc"), not(target_os = "macos")))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[derive(Debug, Parser)]
#[clap(
name = "helper",
Expand Down Expand Up @@ -271,7 +267,7 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B
Some(shard_handler),
);

let _app = setup.connect(transport.clone(), shard_transport.clone());
let _app = setup.connect(transport.clone(), shard_transport.clone(), logging_handle);

let listener = create_listener(args.server_socket_fd)?;
let shard_listener = create_listener(args.shard_server_socket_fd)?;
Expand Down Expand Up @@ -365,6 +361,13 @@ fn new_query_runtime(logging_handle: &LoggingHandle) -> Runtime {
/// runtimes to use in MPC queries and HTTP.
#[tokio::main(flavor = "current_thread")]
pub async fn main() {
#[cfg(jemalloc)]
ipa_core::use_jemalloc!();

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

let args = Args::parse();
let handle = args.logging.setup_logging();

Expand Down
Loading

0 comments on commit 8b953ab

Please sign in to comment.