Skip to content

Commit

Permalink
Minor: Add `RuntimeEnvBuilder::build_arc() (apache#12213)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Sep 2, 2024
1 parent 77e0e3b commit ac74cd3
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 102 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,13 @@ where
/// // configure a memory limit of 1GB with 20% slop
/// let runtime_env = RuntimeEnvBuilder::new()
/// .with_memory_limit(1024 * 1024 * 1024, 0.80)
/// .build()
/// .build_arc()
/// .unwrap();
///
/// // Create a SessionState using the config and runtime_env
/// let state = SessionStateBuilder::new()
/// .with_config(config)
/// .with_runtime_env(Arc::new(runtime_env))
/// .with_runtime_env(runtime_env)
/// // include support for built in functions and configurations
/// .with_default_features()
/// .build();
Expand Down Expand Up @@ -1758,7 +1758,7 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());

let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
let runtime = RuntimeEnvBuilder::new().build_arc()?;
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,10 @@ impl SortTest {
.sort_spill_reservation_bytes,
);

let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.build()
.unwrap(),
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.build_arc()
.unwrap();
SessionContext::new_with_config_rt(session_config, runtime)
} else {
SessionContext::new_with_config(session_config)
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,21 +509,20 @@ impl TestCase {

let table = scenario.table();

let rt_config = RuntimeEnvBuilder::new()
let mut builder = RuntimeEnvBuilder::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

let runtime = if let Some(pool) = memory_pool {
rt_config.with_memory_pool(pool).build().unwrap()
} else {
rt_config.build().unwrap()
if let Some(pool) = memory_pool {
builder = builder.with_memory_pool(pool);
};
let runtime = builder.build_arc().unwrap();

// Configure execution
let builder = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(Arc::new(runtime))
.with_runtime_env(runtime)
.with_default_features();
let builder = match scenario.rules() {
Some(rules) => builder.with_physical_optimizer_rules(rules),
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,11 @@ fn get_cache_runtime_state() -> (
.with_files_statistics_cache(Some(file_static_cache.clone()))
.with_list_files_cache(Some(list_file_cache.clone()));

let rt = Arc::new(
RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()
.expect("could not build runtime environment"),
);
let rt = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build_arc()
.expect("could not build runtime environment");

let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state();

(file_static_cache, list_file_cache, state)
Expand Down
5 changes: 5 additions & 0 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,9 @@ impl RuntimeEnvBuilder {
object_store_registry: self.object_store_registry,
})
}

/// Convenience method to create a new `Arc<RuntimeEnv>`
pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
self.build().map(Arc::new)
}
}
4 changes: 2 additions & 2 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct TaskContext {
impl Default for TaskContext {
fn default() -> Self {
let runtime = RuntimeEnvBuilder::new()
.build()
.build_arc()
.expect("default runtime created successfully");

// Create a default task context, mostly useful for testing
Expand All @@ -69,7 +69,7 @@ impl Default for TaskContext {
scalar_functions: HashMap::new(),
aggregate_functions: HashMap::new(),
window_functions: HashMap::new(),
runtime: Arc::new(runtime),
runtime,
}
}
}
Expand Down
18 changes: 7 additions & 11 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1331,12 +1331,10 @@ mod tests {

fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime = Arc::new(
RuntimeEnvBuilder::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build()
.unwrap(),
);
let runtime = RuntimeEnvBuilder::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build_arc()
.unwrap();
let task_ctx = TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime);
Expand Down Expand Up @@ -1815,11 +1813,9 @@ mod tests {
let input: Arc<dyn ExecutionPlan> = Arc::new(TestYieldingExec::new(true));
let input_schema = input.schema();

let runtime = Arc::new(
RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
8 changes: 3 additions & 5 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,11 +673,9 @@ mod tests {

#[tokio::test]
async fn test_overallocation() -> Result<()> {
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
16 changes: 6 additions & 10 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3798,11 +3798,9 @@ mod tests {
];

for join_type in join_types {
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down Expand Up @@ -3874,11 +3872,9 @@ mod tests {
];

for join_type in join_types {
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
Expand Down
8 changes: 3 additions & 5 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,11 +1019,9 @@ mod tests {
];

for join_type in join_types {
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
40 changes: 16 additions & 24 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2900,12 +2900,10 @@ mod tests {
];

// Disable DiskManager to prevent spilling
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::Disabled)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::Disabled)
.build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);

for join_type in join_types {
Expand Down Expand Up @@ -2987,12 +2985,10 @@ mod tests {
];

// Disable DiskManager to prevent spilling
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::Disabled)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::Disabled)
.build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);

for join_type in join_types {
Expand Down Expand Up @@ -3052,12 +3048,10 @@ mod tests {
];

// Enable DiskManager to allow spilling
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::NewOs)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::NewOs)
.build_arc()?;

for batch_size in [1, 50] {
let session_config = SessionConfig::default().with_batch_size(batch_size);
Expand Down Expand Up @@ -3162,12 +3156,10 @@ mod tests {
];

// Enable DiskManager to allow spilling
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(500, 1.0)
.with_disk_manager(DiskManagerConfig::NewOs)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(500, 1.0)
.with_disk_manager(DiskManagerConfig::NewOs)
.build_arc()?;

for batch_size in [1, 50] {
let session_config = SessionConfig::default().with_batch_size(batch_size);
Expand Down
8 changes: 3 additions & 5 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1506,11 +1506,9 @@ mod tests {
let partitioning = Partitioning::RoundRobinBatch(4);

// setup up context
let runtime = Arc::new(
RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build_arc()?;

let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
Expand Down
22 changes: 9 additions & 13 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,11 +1148,9 @@ mod tests {
.options()
.execution
.sort_spill_reservation_bytes;
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
.build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_session_config(session_config)
Expand Down Expand Up @@ -1226,14 +1224,12 @@ mod tests {
.execution
.sort_spill_reservation_bytes;

let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(
sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1.0,
)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(
sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1.0,
)
.build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_runtime(runtime)
Expand Down
10 changes: 4 additions & 6 deletions datafusion/wasmtest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,10 @@ mod test {
let sql = "SELECT 2 + 2;";

// Execute SQL (using datafusion)
let rt = Arc::new(
RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::Disabled)
.build()
.unwrap(),
);
let rt = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::Disabled)
.build_arc()
.unwrap();
let session_config = SessionConfig::new().with_target_partitions(1);
let session_context =
Arc::new(SessionContext::new_with_config_rt(session_config, rt));
Expand Down

0 comments on commit ac74cd3

Please sign in to comment.