diff --git a/ipa-core/src/helpers/transport/query/mod.rs b/ipa-core/src/helpers/transport/query/mod.rs index 87942c1d9..c224b6900 100644 --- a/ipa-core/src/helpers/transport/query/mod.rs +++ b/ipa-core/src/helpers/transport/query/mod.rs @@ -1,6 +1,7 @@ use std::{ + cmp::{max, min}, fmt::{Debug, Display, Formatter}, - num::NonZeroU32, + num::{NonZeroU32, NonZeroUsize}, }; use serde::{Deserialize, Deserializer, Serialize}; @@ -140,9 +141,25 @@ impl RouteParams for &QueryConfig { } impl From<&QueryConfig> for GatewayConfig { - fn from(_value: &QueryConfig) -> Self { - // TODO: pick the correct value for active and test it - Self::default() + fn from(value: &QueryConfig) -> Self { + let mut config = Self::default(); + // Minimum size for active work is 2 because: + // * `UnorderedReceiver` wants capacity to be greater than 1 + // * 1 is better represented by not using seq_join and/or indeterminate total records + let active = max( + 2, + min( + config.active.get(), + // It makes sense to start with active work set to input size, but some protocols + // may want to change that, if their fanout factor per input row is greater than 1. + // we don't have capabilities (see #ipa/1171) to allow that currently. + usize::try_from(value.size.0).expect("u32 fits into usize"), + ), + ); + // we set active to be at least 2, so unwrap is fine. + config.active = NonZeroUsize::new(active).unwrap(); + + config } }