From d043f63e0843eccf086497aee3628b117739ae5d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 3 Dec 2024 15:20:18 -0500 Subject: [PATCH 1/2] Deprecate `RuntimeConfig`, update code to use new builder style --- benchmarks/src/bin/external_aggr.rs | 12 +++- benchmarks/src/sort_tpch.rs | 10 +-- datafusion-cli/src/main.rs | 37 ++++------ .../core/src/datasource/file_format/csv.rs | 3 - datafusion/core/src/execution/context/mod.rs | 3 - datafusion/execution/src/disk_manager.rs | 3 +- datafusion/execution/src/memory_pool/pool.rs | 2 +- datafusion/execution/src/runtime_env.rs | 72 +++++++++++-------- datafusion/execution/src/task.rs | 10 +-- .../physical-plan/src/aggregates/mod.rs | 4 +- datafusion/sql/src/planner.rs | 2 +- 11 files changed, 81 insertions(+), 77 deletions(-) diff --git a/benchmarks/src/bin/external_aggr.rs b/benchmarks/src/bin/external_aggr.rs index 950c3048c1cc..f1d83375850d 100644 --- a/benchmarks/src/bin/external_aggr.rs +++ b/benchmarks/src/bin/external_aggr.rs @@ -33,7 +33,8 @@ use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::Result; use datafusion::execution::memory_pool::FairSpillPool; use datafusion::execution::memory_pool::{human_readable_size, units}; -use datafusion::execution::runtime_env::RuntimeConfig; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::execution::SessionStateBuilder; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; @@ -195,10 +196,15 @@ impl ExternalAggrConfig { let query_name = format!("Q{query_id}({})", human_readable_size(mem_limit as usize)); let config = self.common.config(); - let runtime_config = RuntimeConfig::new() + let runtime_env = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize))) .build_arc()?; - let ctx = SessionContext::new_with_config_rt(config, runtime_config); + let state = SessionStateBuilder::new() + .with_config(config) + .with_runtime_env(runtime_env) + .with_default_features() + .build(); + let ctx = SessionContext::from(state); // register tables self.register_tables(&ctx).await?; diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 137f4e20b787..566a5ea62c2d 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -32,7 +32,7 @@ use datafusion::datasource::listing::{ }; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::Result; -use datafusion::execution::runtime_env::RuntimeConfig; +use datafusion::execution::SessionStateBuilder; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{displayable, execute_stream}; use datafusion::prelude::*; @@ -188,9 +188,11 @@ impl RunOpt { /// Benchmark query `query_id` in `SORT_QUERIES` async fn benchmark_query(&self, query_id: usize) -> Result> { let config = self.common.config(); - - let runtime_config = RuntimeConfig::new().build_arc()?; - let ctx = SessionContext::new_with_config_rt(config, runtime_config); + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + let ctx = SessionContext::from(state); // register tables self.register_tables(&ctx).await?; diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 4c6c352ff339..d5dd2c4b33db 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -23,8 +23,8 @@ use std::sync::{Arc, OnceLock}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; -use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool}; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::ParquetMetadataFunc; @@ -156,27 +156,22 @@ async fn main_inner() -> Result<()> { session_config = session_config.with_batch_size(batch_size); }; - let rt_config = RuntimeConfig::new(); - let rt_config = - // set memory pool size - if let Some(memory_limit) = args.memory_limit { - // set memory pool type - match args.mem_pool_type { - PoolType::Fair => rt_config - .with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))), - PoolType::Greedy => rt_config - .with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))) - } - } else { - rt_config + let mut rt_builder = RuntimeEnvBuilder::new(); + // set memory pool size + if let Some(memory_limit) = args.memory_limit { + // set memory pool type + let pool: Arc = match args.mem_pool_type { + PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)), + PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)), }; + rt_builder = rt_builder.with_memory_pool(pool) + } - let runtime_env = create_runtime_env(rt_config.clone())?; + let runtime_env = rt_builder.build_arc()?; // enable dynamic file query - let ctx = - SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env)) - .enable_url_table(); + let ctx = SessionContext::new_with_config_rt(session_config, runtime_env) + .enable_url_table(); ctx.refresh_catalogs().await?; // install dynamic catalog provider that can register required object stores ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new( @@ -231,10 +226,6 @@ async fn main_inner() -> Result<()> { Ok(()) } -fn create_runtime_env(rn_config: RuntimeConfig) -> Result { - RuntimeEnv::try_new(rn_config) -} - fn parse_valid_file(dir: &str) -> Result { if Path::new(dir).is_file() { Ok(dir.to_string()) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 9f979ddf01e7..9c96c682865f 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -753,7 +753,6 @@ mod tests { use datafusion_common::cast::as_string_array; use datafusion_common::internal_err; use datafusion_common::stats::Precision; - use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::{col, lit}; use chrono::DateTime; @@ -984,12 +983,10 @@ mod tests { async fn query_compress_data( file_compression_type: FileCompressionType, ) -> Result<()> { - let runtime = Arc::new(RuntimeEnvBuilder::new().build()?); let mut cfg = SessionConfig::new(); cfg.options_mut().catalog.has_header = true; let session_state = SessionStateBuilder::new() .with_config(cfg) - .with_runtime_env(runtime) .with_default_features() .build(); let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap(); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 4cc3200df17d..67236c9a6bd2 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1792,7 +1792,6 @@ mod tests { use super::{super::options::CsvReadOptions, *}; use crate::assert_batches_eq; use crate::execution::memory_pool::MemoryConsumer; - use crate::execution::runtime_env::RuntimeEnvBuilder; use crate::test; use crate::test_util::{plan_and_collect, populate_csv_partitions}; use arrow_schema::{DataType, TimeUnit}; @@ -1932,14 +1931,12 @@ mod tests { let path = path.join("tests/tpch-csv"); let url = format!("file://{}", path.display()); - let runtime = RuntimeEnvBuilder::new().build_arc()?; let cfg = SessionConfig::new() .set_str("datafusion.catalog.location", url.as_str()) .set_str("datafusion.catalog.format", "CSV") .set_str("datafusion.catalog.has_header", "true"); let session_state = SessionStateBuilder::new() .with_config(cfg) - .with_runtime_env(runtime) .with_default_features() .build(); let ctx = SessionContext::new_with_state(session_state); diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index c71071b8093c..756da7ed5b46 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Manages files generated during query execution, files are -//! hashed among the directories listed in RuntimeConfig::local_dirs. +//! [`DiskManager`]: Manages files generated during query execution use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; use log::debug; diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index c2ec42d0df1e..261332180e57 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -62,7 +62,7 @@ pub struct GreedyMemoryPool { } impl GreedyMemoryPool { - /// Allocate up to `limit` bytes + /// Create a new pool that can allocate up to `pool_size` bytes pub fn new(pool_size: usize) -> Self { debug!("Created new GreedyMemoryPool(pool_size={pool_size})"); Self { diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index d302452f7573..bada37c79497 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -41,13 +41,32 @@ use url::Url; /// Execution runtime environment that manages system resources such /// as memory, disk, cache and storage. /// -/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the +/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the /// following resource management functionality: /// /// * [`MemoryPool`]: Manage memory /// * [`DiskManager`]: Manage temporary files on local disk /// * [`CacheManager`]: Manage temporary cache data during the session lifetime /// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances +/// +/// # Example: Create default `RuntimeEnv` +/// ``` +/// # use datafusion_execution::runtime_env::RuntimeEnv; +/// let runtime_env = RuntimeEnv::default(); +/// ``` +/// +/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool +/// ``` +/// # use std::sync::Arc; +/// # use datafusion_execution::memory_pool::GreedyMemoryPool; +/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; +/// // restrict to using at most 100MB of memory +/// let pool_size = 100 * 1024 * 1024; +/// let runtime_env = RuntimeEnvBuilder::new() +/// .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) +/// .build() +/// .unwrap(); +/// ``` pub struct RuntimeEnv { /// Runtime memory management pub memory_pool: Arc, @@ -66,28 +85,16 @@ impl Debug for RuntimeEnv { } impl RuntimeEnv { - #[deprecated(since = "43.0.0", note = "please use `try_new` instead")] + #[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")] + #[allow(deprecated)] pub fn new(config: RuntimeConfig) -> Result { Self::try_new(config) } /// Create env based on configuration + #[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` instead")] + #[allow(deprecated)] pub fn try_new(config: RuntimeConfig) -> Result { - let RuntimeConfig { - memory_pool, - disk_manager, - cache_manager, - object_store_registry, - } = config; - - let memory_pool = - memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default())); - - Ok(Self { - memory_pool, - disk_manager: DiskManager::try_new(disk_manager)?, - cache_manager: CacheManager::try_new(&cache_manager)?, - object_store_registry, - }) + config.build() } /// Registers a custom `ObjectStore` to be used with a specific url. @@ -104,7 +111,7 @@ impl RuntimeEnv { /// # use std::sync::Arc; /// # use url::Url; /// # use datafusion_execution::runtime_env::RuntimeEnv; - /// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap(); + /// # let runtime_env = RuntimeEnv::default(); /// let url = Url::try_from("file://").unwrap(); /// let object_store = object_store::local::LocalFileSystem::new(); /// // register the object store with the runtime environment @@ -119,11 +126,12 @@ impl RuntimeEnv { /// # use std::sync::Arc; /// # use url::Url; /// # use datafusion_execution::runtime_env::RuntimeEnv; - /// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap(); + /// # let runtime_env = RuntimeEnv::default(); /// # // use local store for example as http feature is not enabled /// # let http_store = object_store::local::LocalFileSystem::new(); /// // create a new object store via object_store::http::HttpBuilder; /// let base_url = Url::parse("https://github.com").unwrap(); + /// // (note this example can't depend on the http feature) /// // let http_store = HttpBuilder::new() /// // .with_url(base_url.clone()) /// // .build() @@ -155,12 +163,15 @@ impl Default for RuntimeEnv { } } -/// Please see: +/// Please see: /// This a type alias for backwards compatibility. +#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")] pub type RuntimeConfig = RuntimeEnvBuilder; #[derive(Clone)] -/// Execution runtime configuration +/// Execution runtime configuration builder. +/// +/// See example on [`RuntimeEnv`] pub struct RuntimeEnvBuilder { /// DiskManager to manage temporary disk file usage pub disk_manager: DiskManagerConfig, @@ -239,15 +250,20 @@ impl RuntimeEnvBuilder { /// Build a RuntimeEnv pub fn build(self) -> Result { - let memory_pool = self - .memory_pool - .unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default())); + let Self { + disk_manager, + memory_pool, + cache_manager, + object_store_registry, + } = self; + let memory_pool = + memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default())); Ok(RuntimeEnv { memory_pool, - disk_manager: DiskManager::try_new(self.disk_manager)?, - cache_manager: CacheManager::try_new(&self.cache_manager)?, - object_store_registry: self.object_store_registry, + disk_manager: DiskManager::try_new(disk_manager)?, + cache_manager: CacheManager::try_new(&cache_manager)?, + object_store_registry, }) } diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 35494443b476..7cdb53c90d0e 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -16,10 +16,8 @@ // under the License. use crate::{ - config::SessionConfig, - memory_pool::MemoryPool, - registry::FunctionRegistry, - runtime_env::{RuntimeEnv, RuntimeEnvBuilder}, + config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry, + runtime_env::RuntimeEnv, }; use datafusion_common::{plan_datafusion_err, DataFusionError, Result}; use datafusion_expr::planner::ExprPlanner; @@ -54,9 +52,7 @@ pub struct TaskContext { impl Default for TaskContext { fn default() -> Self { - let runtime = RuntimeEnvBuilder::new() - .build_arc() - .expect("default runtime created successfully"); + let runtime = Arc::new(RuntimeEnv::default()); // Create a default task context, mostly useful for testing Self { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 260c3a1c48de..feca5eb1db38 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1429,7 +1429,7 @@ mod tests { fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc { let session_config = SessionConfig::new().with_batch_size(batch_size); - let runtime = RuntimeEnvBuilder::default() + let runtime = RuntimeEnvBuilder::new() .with_memory_pool(Arc::new(FairSpillPool::new(max_memory))) .build_arc() .unwrap(); @@ -1914,7 +1914,7 @@ mod tests { let input: Arc = Arc::new(TestYieldingExec::new(true)); let input_schema = input.schema(); - let runtime = RuntimeEnvBuilder::default() + let runtime = RuntimeEnvBuilder::new() .with_memory_limit(1, 1.0) .build_arc()?; let task_ctx = TaskContext::default().with_runtime(runtime); diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 496088b1e171..59fa4ca5f1f6 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -468,7 +468,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { || matches!(tz_info, TimezoneInfo::WithTimeZone) { // Timestamp With Time Zone - // INPUT : [SQLDataType] TimestampTz + [RuntimeConfig] Time Zone + // INPUT : [SQLDataType] TimestampTz + [Config] Time Zone // OUTPUT: [ArrowDataType] Timestamp self.context_provider.options().execution.time_zone.clone() } else { From b1edd22023a32c3cc27e47bc511712ebaa214795 Mon Sep 17 00:00:00 2001 From: Oleks V Date: Wed, 4 Dec 2024 10:44:35 -0800 Subject: [PATCH 2/2] Update datafusion/execution/src/runtime_env.rs --- datafusion/execution/src/runtime_env.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index bada37c79497..5420080efd3e 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -163,7 +163,7 @@ impl Default for RuntimeEnv { } } -/// Please see: +/// Please see: /// This a type alias for backwards compatibility. #[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")] pub type RuntimeConfig = RuntimeEnvBuilder;