Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add concurreny configuration #2569

Merged
183 changes: 172 additions & 11 deletions crates/pixi_config/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
use std::{
cmp::PartialEq,
collections::{BTreeSet as Set, HashMap},
fs,
path::{Path, PathBuf},
process::{Command, Stdio},
str::FromStr,
};

use clap::{ArgAction, Parser};
use itertools::Itertools;
use miette::{miette, Context, IntoDiagnostic};
Expand All @@ -18,6 +9,13 @@ use rattler_conda_types::{
use rattler_repodata_gateway::{Gateway, SourceConfig};
use reqwest_middleware::ClientWithMiddleware;
use serde::{de::IntoDeserializer, Deserialize, Serialize};
use std::{
collections::{BTreeSet as Set, HashMap},
fs,
path::{Path, PathBuf},
process::{Command, Stdio},
str::FromStr,
};
use url::Url;

const EXPERIMENTAL: &str = "experimental";
Expand Down Expand Up @@ -116,6 +114,14 @@ pub struct ConfigCli {
/// Specifies if we want to use uv keyring provider
#[arg(long)]
pypi_keyring_provider: Option<KeyringProvider>,

/// Max concurrent solves, default is the number of CPUs
#[arg(long)]
pub concurrent_solves: Option<usize>,

/// Max concurrent network requests, default is 50
#[arg(long)]
pub concurrent_downloads: Option<usize>,
}

#[derive(Parser, Debug, Clone, Default)]
Expand Down Expand Up @@ -333,6 +339,66 @@ impl ExperimentalConfig {
}
}

// Making the default values part of pixi_config to allow for printing the default settings in the future.
/// The default maximum number of concurrent solves that can be run at once.
/// Defaulting to the number of CPUs available.
fn default_max_concurrent_solves() -> usize {
std::thread::available_parallelism().map_or(1, |n| n.get())
}

/// The default maximum number of concurrent downloads that can be run at once.
/// 50 is a reasonable default for the number of concurrent downloads.
/// More verification is needed to determine the optimal number.
fn default_max_concurrent_downloads() -> usize {
50
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub struct ConcurrencyConfig {
/// The maximum number of concurrent solves that can be run at once.
// Needing to set this default next to the default of the full struct to avoid serde defaulting to 0 of partial struct was omitted.
#[serde(default = "default_max_concurrent_solves")]
pub solves: usize,

/// The maximum number of concurrent HTTP requests to make.
// Needing to set this default next to the default of the full struct to avoid serde defaulting to 0 of partial struct was omitted.
#[serde(default = "default_max_concurrent_downloads")]
pub downloads: usize,
}

impl Default for ConcurrencyConfig {
fn default() -> Self {
Self {
solves: default_max_concurrent_solves(),
downloads: default_max_concurrent_downloads(),
}
}
}

impl ConcurrencyConfig {
/// Merge the given ConcurrencyConfig into the current one.
pub fn merge(self, other: Self) -> Self {
// Merging means using the other value if they are none default.
Self {
solves: if other.solves != ConcurrencyConfig::default().solves {
other.solves
} else {
self.solves
},
downloads: if other.downloads != ConcurrencyConfig::default().downloads {
other.downloads
} else {
self.downloads
},
}
}

pub fn is_default(&self) -> bool {
ConcurrencyConfig::default() == *self
}
}

impl PyPIConfig {
/// Merge the given PyPIConfig into the current one.
pub fn merge(self, other: Self) -> Self {
Expand Down Expand Up @@ -549,6 +615,11 @@ pub struct Config {
#[serde(default)]
#[serde(skip_serializing_if = "ExperimentalConfig::is_default")]
pub experimental: ExperimentalConfig,

/// Concurrency configuration for pixi
#[serde(default)]
#[serde(skip_serializing_if = "ConcurrencyConfig::is_default")]
pub concurrency: ConcurrencyConfig,
}

impl Default for Config {
Expand All @@ -564,9 +635,10 @@ impl Default for Config {
repodata_config: RepodataConfig::default(),
pypi_config: PyPIConfig::default(),
detached_environments: Some(DetachedEnvironments::default()),
pinning_strategy: Default::default(),
pinning_strategy: None,
force_activate: None,
experimental: Default::default(),
experimental: ExperimentalConfig::default(),
concurrency: ConcurrencyConfig::default(),
}
}
}
Expand All @@ -581,6 +653,14 @@ impl From<ConfigCli> for Config {
.map(|val| PyPIConfig::default().with_keyring(val))
.unwrap_or_default(),
detached_environments: None,
concurrency: ConcurrencyConfig {
solves: cli
.concurrent_solves
.unwrap_or(ConcurrencyConfig::default().solves),
downloads: cli
.concurrent_downloads
.unwrap_or(ConcurrencyConfig::default().downloads),
},
..Default::default()
}
}
Expand Down Expand Up @@ -798,6 +878,7 @@ impl Config {
"mirrors",
"detached-environments",
"pinning-strategy",
"max-concurrent-solves",
"repodata-config",
"repodata-config.disable-jlap",
"repodata-config.disable-bzip2",
Expand Down Expand Up @@ -838,6 +919,8 @@ impl Config {
pinning_strategy: other.pinning_strategy.or(self.pinning_strategy),
force_activate: other.force_activate,
experimental: other.experimental.merge(self.experimental),
// Make other take precedence over self to allow for setting the value through the CLI
concurrency: self.concurrency.merge(other.concurrency),
}
}

Expand Down Expand Up @@ -903,6 +986,16 @@ impl Config {
self.experimental.use_environment_activation_cache()
}

/// Retrieve the value for the max_concurrent_solves field.
pub fn max_concurrent_solves(&self) -> usize {
self.concurrency.solves
}

/// Retrieve the value for the network_requests field.
pub fn max_concurrent_downloads(&self) -> usize {
self.concurrency.downloads
}

/// Modify this config with the given key and value
///
/// # Note
Expand Down Expand Up @@ -1049,6 +1142,36 @@ impl Config {
_ => return Err(err),
}
}
key if key.starts_with("concurrency") => {
if key == "concurrency" {
if let Some(value) = value {
self.pypi_config = serde_json::de::from_str(&value).into_diagnostic()?;
} else {
self.pypi_config = PyPIConfig::default();
}
return Ok(());
} else if !key.starts_with("concurrency.") {
return Err(err);
}
let subkey = key.strip_prefix("concurrency.").unwrap();
match subkey {
"solves" => {
if let Some(value) = value {
self.concurrency.solves = value.parse().into_diagnostic()?;
} else {
return Err(miette!("'solves' requires a number value"));
}
}
"downloads" => {
if let Some(value) = value {
self.concurrency.downloads = value.parse().into_diagnostic()?;
} else {
return Err(miette!("'downloads' requires a number value"));
}
}
_ => return Err(err),
}
}
_ => return Err(err),
}

Expand Down Expand Up @@ -1085,6 +1208,7 @@ impl Config {
.with_client(client)
.with_cache_dir(cache_dir.join(consts::CONDA_REPODATA_CACHE_DIR))
.with_channel_config(self.into())
.with_max_concurrent_requests(self.max_concurrent_downloads())
.finish()
}
}
Expand Down Expand Up @@ -1125,6 +1249,7 @@ mod tests {
tls-no-verify = true
detached-environments = "{}"
pinning-strategy = "no-pin"
concurrency.solves = 5
UNUSED = "unused"
"#,
env!("CARGO_MANIFEST_DIR").replace('\\', "\\\\").as_str()
Expand All @@ -1139,6 +1264,7 @@ UNUSED = "unused"
config.detached_environments().path().unwrap(),
Some(PathBuf::from(env!("CARGO_MANIFEST_DIR")))
);
assert_eq!(config.max_concurrent_solves(), 5);
assert!(unused.contains("UNUSED"));

let toml = r"detached-environments = true";
Expand Down Expand Up @@ -1171,6 +1297,8 @@ UNUSED = "unused"
tls_no_verify: true,
auth_file: None,
pypi_keyring_provider: Some(KeyringProvider::Subprocess),
concurrent_solves: None,
concurrent_downloads: None,
};
let config = Config::from(cli);
assert_eq!(config.tls_no_verify, Some(true));
Expand All @@ -1183,6 +1311,8 @@ UNUSED = "unused"
tls_no_verify: false,
auth_file: Some(PathBuf::from("path.json")),
pypi_keyring_provider: None,
concurrent_solves: None,
concurrent_downloads: None,
};

let config = Config::from(cli);
Expand Down Expand Up @@ -1214,6 +1344,14 @@ UNUSED = "unused"
);
}

#[test]
fn test_default_config() {
let config = Config::default();
// This depends on the system so it's hard to test.
assert!(config.concurrency.solves > 0);
assert_eq!(config.concurrency.downloads, 50);
}

#[test]
fn test_config_merge() {
let mut config = Config::default();
Expand All @@ -1222,6 +1360,10 @@ UNUSED = "unused"
channel_config: ChannelConfig::default_with_root_dir(PathBuf::from("/root/dir")),
tls_no_verify: Some(true),
detached_environments: Some(DetachedEnvironments::Path(PathBuf::from("/path/to/envs"))),
concurrency: ConcurrencyConfig {
solves: 5,
..ConcurrencyConfig::default()
},
..Default::default()
};
config = config.merge_config(other);
Expand Down Expand Up @@ -1255,6 +1397,7 @@ UNUSED = "unused"
config.detached_environments().path().unwrap(),
Some(PathBuf::from("/path/to/envs2"))
);
assert_eq!(config.max_concurrent_solves(), 5);

let d = Path::new(&env!("CARGO_MANIFEST_DIR"))
.join("tests")
Expand Down Expand Up @@ -1430,6 +1573,24 @@ UNUSED = "unused"
config.set("change-ps1", None).unwrap();
assert_eq!(config.change_ps1, None);

config
.set("concurrency.solves", Some("10".to_string()))
.unwrap();
assert_eq!(config.max_concurrent_solves(), 10);
config
.set("concurrency.solves", Some("1".to_string()))
.unwrap();

config
.set("concurrency.downloads", Some("10".to_string()))
.unwrap();
assert_eq!(config.max_concurrent_downloads(), 10);
config
.set("concurrency.downloads", Some("1".to_string()))
.unwrap();

assert_eq!(config.max_concurrent_downloads(), 1);

config.set("unknown-key", None).unwrap_err();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
source: crates/pixi_config/src/lib.rs
expression: debug
snapshot_kind: text
---
Config {
default_channels: [
Expand Down Expand Up @@ -72,4 +73,8 @@ Config {
experimental: ExperimentalConfig {
use_environment_activation_cache: None,
},
concurrency: ConcurrencyConfig {
solves: 1,
downloads: 50,
},
}
3 changes: 3 additions & 0 deletions crates/pixi_config/tests/config/config_1.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
default_channels = ["conda-forge", "bioconda", "defaults"]
tls_no_verify = true

# Hardcode as we don't want this to depend on the system in the tests
concurrency.solves = 1
Loading
Loading