diff --git a/Cargo.toml b/Cargo.toml index 1182c3da92bf..819561603f0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,6 @@ hashbrown = { version = "0.14.5", features = ["raw"] } indexmap = "2.0.0" itertools = "0.13" log = "^0.4" -num_cpus = "1.13.0" object_store = { version = "0.11.0", default-features = false } parking_lot = "0.12" parquet = { version = "53.3.0", default-features = false, features = [ diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 7f29f7471b6f..ad8debaf2fa3 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -42,7 +42,6 @@ env_logger = { workspace = true } futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", optional = true, default-features = false } -num_cpus = { workspace = true } parquet = { workspace = true, default-features = true } serde = { version = "1.0.136", features = ["derive"] } serde_json = { workspace = true } diff --git a/benchmarks/src/bin/external_aggr.rs b/benchmarks/src/bin/external_aggr.rs index 6438593a20a0..f88847ba8a0a 100644 --- a/benchmarks/src/bin/external_aggr.rs +++ b/benchmarks/src/bin/external_aggr.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use std::sync::OnceLock; +use std::thread::available_parallelism; use structopt::StructOpt; use arrow::record_batch::RecordBatch; @@ -325,7 +326,11 @@ impl ExternalAggrConfig { } fn partitions(&self) -> usize { - self.common.partitions.unwrap_or(num_cpus::get()) + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) } /// Parse memory limit from string to number of bytes diff --git a/benchmarks/src/bin/h2o.rs b/benchmarks/src/bin/h2o.rs index 1ddeb786a591..d12592d3d7c7 100644 --- a/benchmarks/src/bin/h2o.rs +++ b/benchmarks/src/bin/h2o.rs @@ -29,6 +29,7 @@ use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext}; use datafusion_benchmarks::util::BenchmarkRun; use std::path::PathBuf; use std::sync::Arc; +use std::thread::{self, available_parallelism}; use structopt::StructOpt; use tokio::time::Instant; @@ -91,7 +92,9 @@ async fn group_by(opt: &GroupBy) -> Result<()> { .with_listing_options(ListingOptions::new(Arc::new(CsvFormat::default()))) .with_schema(Arc::new(schema)); let csv = ListingTable::try_new(listing_config)?; - let partition_size = num_cpus::get(); + let partition_size = available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(); let memtable = MemTable::load(Arc::new(csv), Some(partition_size), &ctx.state()).await?; ctx.register_table("x", Arc::new(memtable))?; diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 47c356990881..3d85658bb6ff 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use super::{get_imdb_table_schema, get_query_sql, IMDB_TABLES}; use crate::util::{BenchmarkRun, CommonOpt}; @@ -468,7 +470,11 @@ impl RunOpt { } fn partitions(&self) -> usize { - self.common.partitions.unwrap_or(num_cpus::get()) + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) } } diff --git a/benchmarks/src/sort.rs b/benchmarks/src/sort.rs index f4b707611cfb..9bc3d82d34af 100644 --- a/benchmarks/src/sort.rs +++ b/benchmarks/src/sort.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt}; @@ -147,7 +149,11 @@ impl RunOpt { rundata.start_new_case(title); for i in 0..self.common.iterations { let config = SessionConfig::new().with_target_partitions( - self.common.partitions.unwrap_or(num_cpus::get()), + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ), ); let ctx = SessionContext::new_with_config(config); let (rows, elapsed) = diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 4b83b3b8889a..341474a39e11 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -22,8 +22,10 @@ //! runs end-to-end sort queries and test the performance on multiple CPU cores. use futures::StreamExt; +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use structopt::StructOpt; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -315,6 +317,10 @@ impl RunOpt { } fn partitions(&self) -> usize { - self.common.partitions.unwrap_or(num_cpus::get()) + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) } } diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 9ff1f72d8606..f6f754caa14a 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::num::NonZero; use std::path::PathBuf; use std::sync::Arc; +use std::thread::available_parallelism; use super::{ get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES, @@ -296,7 +298,11 @@ impl RunOpt { } fn partitions(&self) -> usize { - self.common.partitions.unwrap_or(num_cpus::get()) + self.common.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ) } } diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index b9398e5b522f..50b8ac42fa83 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::{num::NonZero, thread::available_parallelism}; + use datafusion::prelude::SessionConfig; use structopt::StructOpt; @@ -48,7 +50,13 @@ impl CommonOpt { /// Modify the existing config appropriately pub fn update_config(&self, config: SessionConfig) -> SessionConfig { config - .with_target_partitions(self.partitions.unwrap_or(num_cpus::get())) + .with_target_partitions( + self.partitions.unwrap_or( + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), + ), + ) .with_batch_size(self.batch_size) } } diff --git a/benchmarks/src/util/run.rs b/benchmarks/src/util/run.rs index 5ee6691576b4..fd081826f385 100644 --- a/benchmarks/src/util/run.rs +++ b/benchmarks/src/util/run.rs @@ -20,7 +20,9 @@ use serde::{Serialize, Serializer}; use serde_json::Value; use std::{ collections::HashMap, + num::NonZero, path::Path, + thread::available_parallelism, time::{Duration, SystemTime}, }; @@ -68,7 +70,9 @@ impl RunContext { Self { benchmark_version: env!("CARGO_PKG_VERSION").to_owned(), datafusion_version: DATAFUSION_VERSION.to_owned(), - num_cpus: num_cpus::get(), + num_cpus: available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get(), start_time: SystemTime::now(), arguments: std::env::args().skip(1).collect::>(), } diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 9f2db95721f5..d76848dfe95e 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -58,7 +58,6 @@ half = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } libc = "0.2.140" -num_cpus = { workspace = true } object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } paste = "1.0.15" diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1ad10d164868..e91568075f43 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -20,7 +20,9 @@ use std::any::Any; use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Display}; +use std::num::NonZero; use std::str::FromStr; +use std::thread::available_parallelism; use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; @@ -250,7 +252,7 @@ config_namespace! { /// concurrency. /// /// Defaults to the number of CPU cores on the system - pub target_partitions: usize, default = num_cpus::get() + pub target_partitions: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get() /// The default time zone /// @@ -266,7 +268,7 @@ config_namespace! { /// This is mostly use to plan `UNION` children in parallel. /// /// Defaults to the number of CPU cores on the system - pub planning_concurrency: usize, default = num_cpus::get() + pub planning_concurrency: usize, default = available_parallelism().unwrap_or(NonZero::new(1).unwrap()).get() /// When set to true, skips verifying that the schema produced by /// planning the input of `LogicalPlan::Aggregate` exactly matches the diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index fc7b96cf9e13..268e0fb17f7b 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -116,7 +116,6 @@ glob = "0.3.0" itertools = { workspace = true } log = { workspace = true } num-traits = { version = "0.2", optional = true } -num_cpus = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true, optional = true, default-features = true } diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs index af454bee7ce8..0ca97c43b101 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{cmp, sync::Arc}; +use std::{cmp, sync::Arc, thread::available_parallelism}; use datafusion::{ datasource::MemTable, @@ -73,7 +73,7 @@ impl SessionContextGenerator { ]; let max_batch_size = cmp::max(1, dataset_ref.total_rows_num); - let max_target_partitions = num_cpus::get(); + let max_target_partitions = available_parallelism(); Self { dataset: dataset_ref, diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 177427b47d21..c6902a11d8db 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::num::NonZero; use std::sync::Arc; +use std::thread::available_parallelism; use arrow::{ array::*, datatypes::*, record_batch::RecordBatch, @@ -259,7 +261,12 @@ impl ExplainNormalizer { // convert things like partitioning=RoundRobinBatch(16) // to partitioning=RoundRobinBatch(NUM_CORES) - let needle = format!("RoundRobinBatch({})", num_cpus::get()); + let needle = format!( + "RoundRobinBatch({})", + available_parallelism() + .unwrap_or(NonZero::new(1).unwrap()) + .get() + ); replacements.push((needle, "RoundRobinBatch(NUM_CORES)".to_string())); Self { replacements } diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index ed2b9c49715e..849003f8eeac 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -70,7 +70,6 @@ postgres = [ [dev-dependencies] env_logger = { workspace = true } -num_cpus = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } [[test]]