Skip to content

Commit

Permalink
feat: add concurreny configuration (prefix-dev#2569)
Browse files Browse the repository at this point in the history
### Why
Pixi can possibly use a big amount of memory during the solve or network
requests during the repodata fetching. While we search for a
better/automated solution we want to let the user escape the issue by
forcing the amount of concurrent jobs.
The related issue is: prefix-dev#2458

### What this PR adds
As a user you can now define the max concurrent solves and max network
requests in two ways
**CLI**
```
pixi install --concurrent-solves 3
pixi install --concurrent-downloads 12
```
**configuration**
```
pixi config set concurrency.solves 1
pixi config set concurrency.downloads 12
```
`config.toml`
```toml
[concurrency]
solves = 2
downloads = 12
```
### TODO:
After initial approval of design I'll add the following:
- [x] : Add documentation
- [x] : Add basic cli and configuration test to the integration tests
  • Loading branch information
ruben-arts authored and jjjermiah committed Nov 30, 2024
1 parent da969b9 commit db67027
Show file tree
Hide file tree
Showing 37 changed files with 529 additions and 175 deletions.
183 changes: 172 additions & 11 deletions crates/pixi_config/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
use std::{
cmp::PartialEq,
collections::{BTreeSet as Set, HashMap},
fs,
path::{Path, PathBuf},
process::{Command, Stdio},
str::FromStr,
};

use clap::{ArgAction, Parser};
use itertools::Itertools;
use miette::{miette, Context, IntoDiagnostic};
Expand All @@ -18,6 +9,13 @@ use rattler_conda_types::{
use rattler_repodata_gateway::{Gateway, SourceConfig};
use reqwest_middleware::ClientWithMiddleware;
use serde::{de::IntoDeserializer, Deserialize, Serialize};
use std::{
collections::{BTreeSet as Set, HashMap},
fs,
path::{Path, PathBuf},
process::{Command, Stdio},
str::FromStr,
};
use url::Url;

const EXPERIMENTAL: &str = "experimental";
Expand Down Expand Up @@ -116,6 +114,14 @@ pub struct ConfigCli {
/// Specifies if we want to use uv keyring provider
#[arg(long)]
pypi_keyring_provider: Option<KeyringProvider>,

/// Max concurrent solves, default is the number of CPUs
#[arg(long)]
pub concurrent_solves: Option<usize>,

/// Max concurrent network requests, default is 50
#[arg(long)]
pub concurrent_downloads: Option<usize>,
}

#[derive(Parser, Debug, Clone, Default)]
Expand Down Expand Up @@ -333,6 +339,66 @@ impl ExperimentalConfig {
}
}

// Making the default values part of pixi_config to allow for printing the default settings in the future.
/// The default maximum number of concurrent solves that can be run at once.
/// Defaulting to the number of CPUs available.
fn default_max_concurrent_solves() -> usize {
std::thread::available_parallelism().map_or(1, |n| n.get())
}

/// The default maximum number of concurrent downloads that can be run at once.
/// 50 is a reasonable default for the number of concurrent downloads.
/// More verification is needed to determine the optimal number.
fn default_max_concurrent_downloads() -> usize {
50
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub struct ConcurrencyConfig {
/// The maximum number of concurrent solves that can be run at once.
// Needing to set this default next to the default of the full struct to avoid serde defaulting to 0 of partial struct was omitted.
#[serde(default = "default_max_concurrent_solves")]
pub solves: usize,

/// The maximum number of concurrent HTTP requests to make.
// Needing to set this default next to the default of the full struct to avoid serde defaulting to 0 of partial struct was omitted.
#[serde(default = "default_max_concurrent_downloads")]
pub downloads: usize,
}

impl Default for ConcurrencyConfig {
fn default() -> Self {
Self {
solves: default_max_concurrent_solves(),
downloads: default_max_concurrent_downloads(),
}
}
}

impl ConcurrencyConfig {
/// Merge the given ConcurrencyConfig into the current one.
pub fn merge(self, other: Self) -> Self {
// Merging means using the other value if they are none default.
Self {
solves: if other.solves != ConcurrencyConfig::default().solves {
other.solves
} else {
self.solves
},
downloads: if other.downloads != ConcurrencyConfig::default().downloads {
other.downloads
} else {
self.downloads
},
}
}

pub fn is_default(&self) -> bool {
ConcurrencyConfig::default() == *self
}
}

impl PyPIConfig {
/// Merge the given PyPIConfig into the current one.
pub fn merge(self, other: Self) -> Self {
Expand Down Expand Up @@ -549,6 +615,11 @@ pub struct Config {
#[serde(default)]
#[serde(skip_serializing_if = "ExperimentalConfig::is_default")]
pub experimental: ExperimentalConfig,

/// Concurrency configuration for pixi
#[serde(default)]
#[serde(skip_serializing_if = "ConcurrencyConfig::is_default")]
pub concurrency: ConcurrencyConfig,
}

impl Default for Config {
Expand All @@ -564,9 +635,10 @@ impl Default for Config {
repodata_config: RepodataConfig::default(),
pypi_config: PyPIConfig::default(),
detached_environments: Some(DetachedEnvironments::default()),
pinning_strategy: Default::default(),
pinning_strategy: None,
force_activate: None,
experimental: Default::default(),
experimental: ExperimentalConfig::default(),
concurrency: ConcurrencyConfig::default(),
}
}
}
Expand All @@ -581,6 +653,14 @@ impl From<ConfigCli> for Config {
.map(|val| PyPIConfig::default().with_keyring(val))
.unwrap_or_default(),
detached_environments: None,
concurrency: ConcurrencyConfig {
solves: cli
.concurrent_solves
.unwrap_or(ConcurrencyConfig::default().solves),
downloads: cli
.concurrent_downloads
.unwrap_or(ConcurrencyConfig::default().downloads),
},
..Default::default()
}
}
Expand Down Expand Up @@ -798,6 +878,7 @@ impl Config {
"mirrors",
"detached-environments",
"pinning-strategy",
"max-concurrent-solves",
"repodata-config",
"repodata-config.disable-jlap",
"repodata-config.disable-bzip2",
Expand Down Expand Up @@ -838,6 +919,8 @@ impl Config {
pinning_strategy: other.pinning_strategy.or(self.pinning_strategy),
force_activate: other.force_activate,
experimental: other.experimental.merge(self.experimental),
// Make other take precedence over self to allow for setting the value through the CLI
concurrency: self.concurrency.merge(other.concurrency),
}
}

Expand Down Expand Up @@ -903,6 +986,16 @@ impl Config {
self.experimental.use_environment_activation_cache()
}

/// Retrieve the value for the max_concurrent_solves field.
pub fn max_concurrent_solves(&self) -> usize {
self.concurrency.solves
}

/// Retrieve the value for the network_requests field.
pub fn max_concurrent_downloads(&self) -> usize {
self.concurrency.downloads
}

/// Modify this config with the given key and value
///
/// # Note
Expand Down Expand Up @@ -1049,6 +1142,36 @@ impl Config {
_ => return Err(err),
}
}
key if key.starts_with("concurrency") => {
if key == "concurrency" {
if let Some(value) = value {
self.pypi_config = serde_json::de::from_str(&value).into_diagnostic()?;
} else {
self.pypi_config = PyPIConfig::default();
}
return Ok(());
} else if !key.starts_with("concurrency.") {
return Err(err);
}
let subkey = key.strip_prefix("concurrency.").unwrap();
match subkey {
"solves" => {
if let Some(value) = value {
self.concurrency.solves = value.parse().into_diagnostic()?;
} else {
return Err(miette!("'solves' requires a number value"));
}
}
"downloads" => {
if let Some(value) = value {
self.concurrency.downloads = value.parse().into_diagnostic()?;
} else {
return Err(miette!("'downloads' requires a number value"));
}
}
_ => return Err(err),
}
}
_ => return Err(err),
}

Expand Down Expand Up @@ -1085,6 +1208,7 @@ impl Config {
.with_client(client)
.with_cache_dir(cache_dir.join(consts::CONDA_REPODATA_CACHE_DIR))
.with_channel_config(self.into())
.with_max_concurrent_requests(self.max_concurrent_downloads())
.finish()
}
}
Expand Down Expand Up @@ -1125,6 +1249,7 @@ mod tests {
tls-no-verify = true
detached-environments = "{}"
pinning-strategy = "no-pin"
concurrency.solves = 5
UNUSED = "unused"
"#,
env!("CARGO_MANIFEST_DIR").replace('\\', "\\\\").as_str()
Expand All @@ -1139,6 +1264,7 @@ UNUSED = "unused"
config.detached_environments().path().unwrap(),
Some(PathBuf::from(env!("CARGO_MANIFEST_DIR")))
);
assert_eq!(config.max_concurrent_solves(), 5);
assert!(unused.contains("UNUSED"));

let toml = r"detached-environments = true";
Expand Down Expand Up @@ -1171,6 +1297,8 @@ UNUSED = "unused"
tls_no_verify: true,
auth_file: None,
pypi_keyring_provider: Some(KeyringProvider::Subprocess),
concurrent_solves: None,
concurrent_downloads: None,
};
let config = Config::from(cli);
assert_eq!(config.tls_no_verify, Some(true));
Expand All @@ -1183,6 +1311,8 @@ UNUSED = "unused"
tls_no_verify: false,
auth_file: Some(PathBuf::from("path.json")),
pypi_keyring_provider: None,
concurrent_solves: None,
concurrent_downloads: None,
};

let config = Config::from(cli);
Expand Down Expand Up @@ -1214,6 +1344,14 @@ UNUSED = "unused"
);
}

#[test]
fn test_default_config() {
let config = Config::default();
// This depends on the system so it's hard to test.
assert!(config.concurrency.solves > 0);
assert_eq!(config.concurrency.downloads, 50);
}

#[test]
fn test_config_merge() {
let mut config = Config::default();
Expand All @@ -1222,6 +1360,10 @@ UNUSED = "unused"
channel_config: ChannelConfig::default_with_root_dir(PathBuf::from("/root/dir")),
tls_no_verify: Some(true),
detached_environments: Some(DetachedEnvironments::Path(PathBuf::from("/path/to/envs"))),
concurrency: ConcurrencyConfig {
solves: 5,
..ConcurrencyConfig::default()
},
..Default::default()
};
config = config.merge_config(other);
Expand Down Expand Up @@ -1255,6 +1397,7 @@ UNUSED = "unused"
config.detached_environments().path().unwrap(),
Some(PathBuf::from("/path/to/envs2"))
);
assert_eq!(config.max_concurrent_solves(), 5);

let d = Path::new(&env!("CARGO_MANIFEST_DIR"))
.join("tests")
Expand Down Expand Up @@ -1430,6 +1573,24 @@ UNUSED = "unused"
config.set("change-ps1", None).unwrap();
assert_eq!(config.change_ps1, None);

config
.set("concurrency.solves", Some("10".to_string()))
.unwrap();
assert_eq!(config.max_concurrent_solves(), 10);
config
.set("concurrency.solves", Some("1".to_string()))
.unwrap();

config
.set("concurrency.downloads", Some("10".to_string()))
.unwrap();
assert_eq!(config.max_concurrent_downloads(), 10);
config
.set("concurrency.downloads", Some("1".to_string()))
.unwrap();

assert_eq!(config.max_concurrent_downloads(), 1);

config.set("unknown-key", None).unwrap_err();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
source: crates/pixi_config/src/lib.rs
expression: debug
snapshot_kind: text
---
Config {
default_channels: [
Expand Down Expand Up @@ -72,4 +73,8 @@ Config {
experimental: ExperimentalConfig {
use_environment_activation_cache: None,
},
concurrency: ConcurrencyConfig {
solves: 1,
downloads: 50,
},
}
3 changes: 3 additions & 0 deletions crates/pixi_config/tests/config/config_1.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
default_channels = ["conda-forge", "bioconda", "defaults"]
tls_no_verify = true

# Hardcode as we don't want this to depend on the system in the tests
concurrency.solves = 1
Loading

0 comments on commit db67027

Please sign in to comment.