From ac74cd3163e43563807a8c6e8e72bb058cb6f459 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 2 Sep 2024 09:02:42 -0400 Subject: [PATCH] Minor: Add `RuntimeEnvBuilder::build_arc() (#12213) --- datafusion/core/src/execution/context/mod.rs | 6 +-- datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 10 ++--- datafusion/core/tests/memory_limit/mod.rs | 11 +++-- .../core/tests/parquet/file_statistics.rs | 11 +++-- datafusion/execution/src/runtime_env.rs | 5 +++ datafusion/execution/src/task.rs | 4 +- .../physical-plan/src/aggregates/mod.rs | 18 ++++----- .../physical-plan/src/joins/cross_join.rs | 8 ++-- .../physical-plan/src/joins/hash_join.rs | 16 +++----- .../src/joins/nested_loop_join.rs | 8 ++-- .../src/joins/sort_merge_join.rs | 40 ++++++++----------- .../physical-plan/src/repartition/mod.rs | 8 ++-- datafusion/physical-plan/src/sorts/sort.rs | 22 +++++----- datafusion/wasmtest/src/lib.rs | 10 ++--- 14 files changed, 75 insertions(+), 102 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index c67424c0fa53..06dc797ae27a 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -219,13 +219,13 @@ where /// // configure a memory limit of 1GB with 20% slop /// let runtime_env = RuntimeEnvBuilder::new() /// .with_memory_limit(1024 * 1024 * 1024, 0.80) -/// .build() +/// .build_arc() /// .unwrap(); /// /// // Create a SessionState using the config and runtime_env /// let state = SessionStateBuilder::new() /// .with_config(config) -/// .with_runtime_env(Arc::new(runtime_env)) +/// .with_runtime_env(runtime_env) /// // include support for built in functions and configurations /// .with_default_features() /// .build(); @@ -1758,7 +1758,7 @@ mod tests { let path = path.join("tests/tpch-csv"); let url = format!("file://{}", path.display()); - let runtime = Arc::new(RuntimeEnvBuilder::new().build()?); + let runtime = RuntimeEnvBuilder::new().build_arc()?; let cfg = SessionConfig::new() .set_str("datafusion.catalog.location", url.as_str()) .set_str("datafusion.catalog.format", "CSV") diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index 1980589491a5..fae4731569b6 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -136,12 +136,10 @@ impl SortTest { .sort_spill_reservation_bytes, ); - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) - .build() - .unwrap(), - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) + .build_arc() + .unwrap(); SessionContext::new_with_config_rt(session_config, runtime) } else { SessionContext::new_with_config(session_config) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 592c25dedc50..69ef6058a2f6 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -509,21 +509,20 @@ impl TestCase { let table = scenario.table(); - let rt_config = RuntimeEnvBuilder::new() + let mut builder = RuntimeEnvBuilder::new() // disk manager setting controls the spilling .with_disk_manager(disk_manager_config) .with_memory_limit(memory_limit, MEMORY_FRACTION); - let runtime = if let Some(pool) = memory_pool { - rt_config.with_memory_pool(pool).build().unwrap() - } else { - rt_config.build().unwrap() + if let Some(pool) = memory_pool { + builder = builder.with_memory_pool(pool); }; + let runtime = builder.build_arc().unwrap(); // Configure execution let builder = SessionStateBuilder::new() .with_config(config) - .with_runtime_env(Arc::new(runtime)) + .with_runtime_env(runtime) .with_default_features(); let builder = match scenario.rules() { Some(rules) => builder.with_physical_optimizer_rules(rules), diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index bd251f1a6669..cd62c3bf426f 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -197,12 +197,11 @@ fn get_cache_runtime_state() -> ( .with_files_statistics_cache(Some(file_static_cache.clone())) .with_list_files_cache(Some(list_file_cache.clone())); - let rt = Arc::new( - RuntimeEnvBuilder::new() - .with_cache_manager(cache_config) - .build() - .expect("could not build runtime environment"), - ); + let rt = RuntimeEnvBuilder::new() + .with_cache_manager(cache_config) + .build_arc() + .expect("could not build runtime environment"); + let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state(); (file_static_cache, list_file_cache, state) diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index e7b48be95cff..574d387ae697 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -246,4 +246,9 @@ impl RuntimeEnvBuilder { object_store_registry: self.object_store_registry, }) } + + /// Convenience method to create a new `Arc` + pub fn build_arc(self) -> Result> { + self.build().map(Arc::new) + } } diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 35689b8e08df..57fcac0ee5ab 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -58,7 +58,7 @@ pub struct TaskContext { impl Default for TaskContext { fn default() -> Self { let runtime = RuntimeEnvBuilder::new() - .build() + .build_arc() .expect("default runtime created successfully"); // Create a default task context, mostly useful for testing @@ -69,7 +69,7 @@ impl Default for TaskContext { scalar_functions: HashMap::new(), aggregate_functions: HashMap::new(), window_functions: HashMap::new(), - runtime: Arc::new(runtime), + runtime, } } } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e66a57fd2ee5..764227e5e717 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1331,12 +1331,10 @@ mod tests { fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc { let session_config = SessionConfig::new().with_batch_size(batch_size); - let runtime = Arc::new( - RuntimeEnvBuilder::default() - .with_memory_pool(Arc::new(FairSpillPool::new(max_memory))) - .build() - .unwrap(), - ); + let runtime = RuntimeEnvBuilder::default() + .with_memory_pool(Arc::new(FairSpillPool::new(max_memory))) + .build_arc() + .unwrap(); let task_ctx = TaskContext::default() .with_session_config(session_config) .with_runtime(runtime); @@ -1815,11 +1813,9 @@ mod tests { let input: Arc = Arc::new(TestYieldingExec::new(true)); let input_schema = input.schema(); - let runtime = Arc::new( - RuntimeEnvBuilder::default() - .with_memory_limit(1, 1.0) - .build()?, - ); + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(1, 1.0) + .build_arc()?; let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index b99d4f17c42a..11153556f253 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -673,11 +673,9 @@ mod tests { #[tokio::test] async fn test_overallocation() -> Result<()> { - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build_arc()?; let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index f20d00e1a298..38827108e815 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -3798,11 +3798,9 @@ mod tests { ]; for join_type in join_types { - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build_arc()?; let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); @@ -3874,11 +3872,9 @@ mod tests { ]; for join_type in join_types { - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build_arc()?; let session_config = SessionConfig::default().with_batch_size(50); let task_ctx = TaskContext::default() .with_session_config(session_config) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 3cd373544157..dadd20714ead 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1019,11 +1019,9 @@ mod tests { ]; for join_type in join_types { - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build_arc()?; let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 09fe5d9ebc54..2118c1a5266f 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -2900,12 +2900,10 @@ mod tests { ]; // Disable DiskManager to prevent spilling - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::Disabled) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .with_disk_manager(DiskManagerConfig::Disabled) + .build_arc()?; let session_config = SessionConfig::default().with_batch_size(50); for join_type in join_types { @@ -2987,12 +2985,10 @@ mod tests { ]; // Disable DiskManager to prevent spilling - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::Disabled) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .with_disk_manager(DiskManagerConfig::Disabled) + .build_arc()?; let session_config = SessionConfig::default().with_batch_size(50); for join_type in join_types { @@ -3052,12 +3048,10 @@ mod tests { ]; // Enable DiskManager to allow spilling - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .with_disk_manager(DiskManagerConfig::NewOs) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .with_disk_manager(DiskManagerConfig::NewOs) + .build_arc()?; for batch_size in [1, 50] { let session_config = SessionConfig::default().with_batch_size(batch_size); @@ -3162,12 +3156,10 @@ mod tests { ]; // Enable DiskManager to allow spilling - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(500, 1.0) - .with_disk_manager(DiskManagerConfig::NewOs) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(500, 1.0) + .with_disk_manager(DiskManagerConfig::NewOs) + .build_arc()?; for batch_size in [1, 50] { let session_config = SessionConfig::default().with_batch_size(batch_size); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 650006a9d02d..47e5192c237e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1506,11 +1506,9 @@ mod tests { let partitioning = Partitioning::RoundRobinBatch(4); // setup up context - let runtime = Arc::new( - RuntimeEnvBuilder::default() - .with_memory_limit(1, 1.0) - .build()?, - ); + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(1, 1.0) + .build_arc()?; let task_ctx = TaskContext::default().with_runtime(runtime); let task_ctx = Arc::new(task_ctx); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e0041194016c..fa9628abdfbb 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1148,11 +1148,9 @@ mod tests { .options() .execution .sort_spill_reservation_bytes; - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0) + .build_arc()?; let task_ctx = Arc::new( TaskContext::default() .with_session_config(session_config) @@ -1226,14 +1224,12 @@ mod tests { .execution .sort_spill_reservation_bytes; - let runtime = Arc::new( - RuntimeEnvBuilder::new() - .with_memory_limit( - sort_spill_reservation_bytes + avg_batch_size * (partitions - 1), - 1.0, - ) - .build()?, - ); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit( + sort_spill_reservation_bytes + avg_batch_size * (partitions - 1), + 1.0, + ) + .build_arc()?; let task_ctx = Arc::new( TaskContext::default() .with_runtime(runtime) diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index 50325d262d1d..0f24449cbed3 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -98,12 +98,10 @@ mod test { let sql = "SELECT 2 + 2;"; // Execute SQL (using datafusion) - let rt = Arc::new( - RuntimeEnvBuilder::new() - .with_disk_manager(DiskManagerConfig::Disabled) - .build() - .unwrap(), - ); + let rt = RuntimeEnvBuilder::new() + .with_disk_manager(DiskManagerConfig::Disabled) + .build_arc() + .unwrap(); let session_config = SessionConfig::new().with_target_partitions(1); let session_context = Arc::new(SessionContext::new_with_config_rt(session_config, rt));