From c8e184a057670873928202c07a62cb58dcdf8f55 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 08:00:48 -0400 Subject: [PATCH 1/6] Move source repartitioning into ExecutionPlan::repartition --- .../core/src/datasource/physical_plan/csv.rs | 58 ++++++++++--------- .../src/datasource/physical_plan/parquet.rs | 41 ++++++------- .../enforce_distribution.rs | 22 ++----- datafusion/physical-plan/src/lib.rs | 24 +++++++- 4 files changed, 78 insertions(+), 67 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 8117e101ea99..82163da64af8 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -46,6 +46,7 @@ use datafusion_physical_expr::{ }; use bytes::{Buf, Bytes}; +use datafusion_common::config::ConfigOptions; use futures::{ready, StreamExt, TryStreamExt}; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; @@ -117,34 +118,6 @@ impl CsvExec { pub fn escape(&self) -> Option { self.escape } - - /// Redistribute files across partitions according to their size - /// See comments on `repartition_file_groups()` for more detail. - /// - /// Return `None` if can't get repartitioned(empty/compressed file). - pub fn get_repartitioned( - &self, - target_partitions: usize, - repartition_file_min_size: usize, - ) -> Option { - // Parallel execution on compressed CSV file is not supported yet. - if self.file_compression_type.is_compressed() { - return None; - } - - let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( - self.base_config.file_groups.clone(), - target_partitions, - repartition_file_min_size, - ); - - if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { - let mut new_plan = self.clone(); - new_plan.base_config.file_groups = repartitioned_file_groups; - return Some(new_plan); - } - None - } } impl DisplayAs for CsvExec { @@ -205,6 +178,35 @@ impl ExecutionPlan for CsvExec { Ok(self) } + /// Redistribute files across partitions according to their size + /// See comments on `repartition_file_groups()` for more detail. + /// + /// Return `None` if can't get repartitioned(empty/compressed file). + fn repartitioned( + &self, + target_partitions: usize, + config: &ConfigOptions, + ) -> Result>> { + let repartition_file_min_size = config.optimizer.repartition_file_min_size; + // Parallel execution on compressed CSV file is not supported yet. + if self.file_compression_type.is_compressed() { + return Ok(None); + } + + let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( + self.base_config.file_groups.clone(), + target_partitions, + repartition_file_min_size, + ); + + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + let mut new_plan = self.clone(); + new_plan.base_config.file_groups = repartitioned_file_groups; + return Ok(Some(Arc::new(new_plan))); + } + Ok(None) + } + fn execute( &self, partition: usize, diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 3a2459bec817..f6e999f60249 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -259,26 +259,6 @@ impl ParquetExec { self.enable_bloom_filter .unwrap_or(config_options.execution.parquet.bloom_filter_enabled) } - - /// Redistribute files across partitions according to their size - /// See comments on `get_file_groups_repartitioned()` for more detail. - pub fn get_repartitioned( - &self, - target_partitions: usize, - repartition_file_min_size: usize, - ) -> Self { - let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( - self.base_config.file_groups.clone(), - target_partitions, - repartition_file_min_size, - ); - - let mut new_plan = self.clone(); - if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { - new_plan.base_config.file_groups = repartitioned_file_groups; - } - new_plan - } } impl DisplayAs for ParquetExec { @@ -349,6 +329,27 @@ impl ExecutionPlan for ParquetExec { Ok(self) } + /// Redistribute files across partitions according to their size + /// See comments on `get_file_groups_repartitioned()` for more detail. + fn repartitioned( + &self, + target_partitions: usize, + config: &ConfigOptions, + ) -> Result>> { + let repartition_file_min_size = config.optimizer.repartition_file_min_size; + let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( + self.base_config.file_groups.clone(), + target_partitions, + repartition_file_min_size, + ); + + let mut new_plan = self.clone(); + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + new_plan.base_config.file_groups = repartitioned_file_groups; + } + Ok(Some(Arc::new(new_plan))) + } + fn execute( &self, partition_index: usize, diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 072c3cb6d7a6..c84237ee97b1 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1265,25 +1265,13 @@ fn ensure_distribution( // Unless partitioning doesn't increase the partition count, it is not beneficial: && child.output_partitioning().partition_count() < target_partitions { - // When `repartition_file_scans` is set, leverage source operators - // (`ParquetExec`, `CsvExec` etc.) to increase parallelism at the source. + // When `repartition_file_scans` is set, attempt to increase + // parallelism at the source. if repartition_file_scans { - #[cfg(feature = "parquet")] - if let Some(parquet_exec) = - child.as_any().downcast_ref::() + if let Some(new_child) = + child.repartitioned(target_partitions, config)? { - child = Arc::new(parquet_exec.get_repartitioned( - target_partitions, - repartition_file_min_size, - )); - } - if let Some(csv_exec) = child.as_any().downcast_ref::() { - if let Some(csv_exec) = csv_exec.get_repartitioned( - target_partitions, - repartition_file_min_size, - ) { - child = Arc::new(csv_exec); - } + child = new_child; } } // Increase parallelism by adding round-robin repartitioning diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index b2f81579f8e8..c115a5026a0c 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -76,6 +76,7 @@ pub use crate::metrics::Metric; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; +use datafusion_common::config::ConfigOptions; pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; @@ -209,7 +210,26 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec>, ) -> Result>; - /// creates an iterator + /// If supported, changes the partitioning of this `ExecutionPlan` to + /// produce `target_partitions` partitions. + /// + /// If the `ExecutionPlan` does not support changing its partitioning, + /// returns `Ok(None)` (the default). + /// + /// The DataFusion optimizer attempts to use as many threads as possible by + /// repartitioning its inputs to match the target number of threads + /// available (`target_partitions`). Some data sources, such as the built in + /// CSV and Parquet readers, are able to read from their input files in + /// parallel, regardless of how the source data is split amongst files. + fn repartitioned( + &self, + _target_partitions: usize, + _config: &ConfigOptions, + ) -> Result>> { + Ok(None) + } + + /// Begin execution of `partition`, returning a stream of [`RecordBatch`]es. fn execute( &self, partition: usize, @@ -217,7 +237,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result; /// Return a snapshot of the set of [`Metric`]s for this - /// [`ExecutionPlan`]. + /// [`ExecutionPlan`]. If no `Metric`s are available, return None. /// /// While the values of the metrics in the returned /// [`MetricsSet`]s may change as execution progresses, the From 86f3015f93dd0844457cc699bae2af1a82bf4f47 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 08:08:13 -0400 Subject: [PATCH 2/6] cleanup --- .../core/src/datasource/physical_plan/mod.rs | 24 ++++++++++++------- .../enforce_distribution.rs | 6 +---- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3f84f87eb5d5..4bbe476c1086 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -529,6 +529,7 @@ mod tests { }; use arrow_schema::Field; use chrono::Utc; + use datafusion_common::config::ConfigOptions; use crate::physical_plan::{DefaultDisplay, VerboseDisplay}; @@ -831,7 +832,7 @@ mod tests { ); let partitioned_file = parquet_exec - .get_repartitioned(4, 0) + .repartitioned(4, &config_with_size(0)) .base_config() .file_groups .clone(); @@ -897,7 +898,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(n_partition, 10) + .repartitioned(n_partition, &config_with_size(10)) .base_config() .file_groups .clone(), @@ -931,7 +932,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(4, 10) + .repartitioned(4, &config_with_size(10)) .base_config() .file_groups .clone(), @@ -968,7 +969,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(96, 5) + .repartitioned(96, &config_with_size(5)) .base_config() .file_groups .clone(), @@ -1011,7 +1012,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(3, 10) + .repartitioned(3, &config_with_size(10)) .base_config() .file_groups .clone(), @@ -1050,7 +1051,7 @@ mod tests { let actual = file_groups_to_vec( parquet_exec - .get_repartitioned(2, 10) + .repartitioned(2, &config_with_size(10)) .base_config() .file_groups .clone(), @@ -1089,7 +1090,7 @@ mod tests { ); let actual = parquet_exec - .get_repartitioned(65, 10) + .repartitioned(65, &config_with_size(10)) .base_config() .file_groups .clone(); @@ -1118,7 +1119,7 @@ mod tests { ); let actual = parquet_exec - .get_repartitioned(65, 500) + .repartitioned(65, &config_with_size(500)) .base_config() .file_groups .clone(); @@ -1147,4 +1148,11 @@ mod tests { .collect_vec() } } + + /// Returns a new ConfigOptions with the specified `repartition_file_min_size` + fn config_with_size(repartition_file_min_size: usize) -> ConfigOptions { + let mut config = ConfigOptions::new(); + config.optimizer.repartition_file_min_size = repartition_file_min_size; + config + } } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c84237ee97b1..ffd2180729b8 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -26,9 +26,6 @@ use std::fmt::Formatter; use std::sync::Arc; use crate::config::ConfigOptions; -use crate::datasource::physical_plan::CsvExec; -#[cfg(feature = "parquet")] -use crate::datasource::physical_plan::ParquetExec; use crate::error::Result; use crate::physical_optimizer::utils::{ add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions, @@ -1188,7 +1185,6 @@ fn ensure_distribution( // When `false`, round robin repartition will not be added to increase parallelism let enable_round_robin = config.optimizer.enable_round_robin_repartition; let repartition_file_scans = config.optimizer.repartition_file_scans; - let repartition_file_min_size = config.optimizer.repartition_file_min_size; let batch_size = config.execution.batch_size; let is_unbounded = unbounded_output(&dist_context.plan); // Use order preserving variants either of the conditions true @@ -1630,9 +1626,9 @@ mod tests { use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::datasource::physical_plan::FileScanConfig; #[cfg(feature = "parquet")] use crate::datasource::physical_plan::ParquetExec; + use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_plan::aggregates::{ From bd9d2616c63dc2d3d27ba38be52df20aaeb1407c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 09:13:36 -0400 Subject: [PATCH 3/6] update test --- .../core/src/datasource/physical_plan/mod.rs | 86 +++++++------------ 1 file changed, 33 insertions(+), 53 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 4bbe476c1086..6bf9570b45e5 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -831,11 +831,7 @@ mod tests { None, ); - let partitioned_file = parquet_exec - .repartitioned(4, &config_with_size(0)) - .base_config() - .file_groups - .clone(); + let partitioned_file = repartition_with_size(&parquet_exec, 4, 0); assert!(partitioned_file[0][0].range.is_none()); } @@ -896,13 +892,11 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .repartitioned(n_partition, &config_with_size(10)) - .base_config() - .file_groups - .clone(), - ); + let actual = file_groups_to_vec(repartition_with_size( + &parquet_exec, + n_partition, + 10, + )); assert_eq!(expected, &actual); } @@ -930,13 +924,7 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .repartitioned(4, &config_with_size(10)) - .base_config() - .file_groups - .clone(), - ); + let actual = file_groups_to_vec(repartition_with_size(&parquet_exec, 4, 10)); let expected = vec![ (0, "a".to_string(), 0, 31), (1, "a".to_string(), 31, 62), @@ -967,13 +955,7 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .repartitioned(96, &config_with_size(5)) - .base_config() - .file_groups - .clone(), - ); + let actual = file_groups_to_vec(repartition_with_size(&parquet_exec, 96, 5)); let expected = vec![ (0, "a".to_string(), 0, 1), (1, "a".to_string(), 1, 2), @@ -1010,13 +992,7 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .repartitioned(3, &config_with_size(10)) - .base_config() - .file_groups - .clone(), - ); + let actual = file_groups_to_vec(repartition_with_size(&parquet_exec, 3, 10)); let expected = vec![ (0, "a".to_string(), 0, 34), (1, "a".to_string(), 34, 40), @@ -1049,13 +1025,7 @@ mod tests { None, ); - let actual = file_groups_to_vec( - parquet_exec - .repartitioned(2, &config_with_size(10)) - .base_config() - .file_groups - .clone(), - ); + let actual = file_groups_to_vec(repartition_with_size(&parquet_exec, 2, 10)); let expected = vec![ (0, "a".to_string(), 0, 40), (0, "b".to_string(), 0, 10), @@ -1089,11 +1059,7 @@ mod tests { None, ); - let actual = parquet_exec - .repartitioned(65, &config_with_size(10)) - .base_config() - .file_groups - .clone(); + let actual = repartition_with_size(&parquet_exec, 65, 10); assert_eq!(2, actual.len()); } @@ -1118,14 +1084,12 @@ mod tests { None, ); - let actual = parquet_exec - .repartitioned(65, &config_with_size(500)) - .base_config() - .file_groups - .clone(); + let actual = repartition_with_size(&parquet_exec, 65, 500); assert_eq!(1, actual.len()); } + /// Convert a set of file groups into a vector of tuples: + /// `(partition index, file path, start, end)` fn file_groups_to_vec( file_groups: Vec>, ) -> Vec<(usize, String, i64, i64)> { @@ -1149,10 +1113,26 @@ mod tests { } } - /// Returns a new ConfigOptions with the specified `repartition_file_min_size` - fn config_with_size(repartition_file_min_size: usize) -> ConfigOptions { + /// Calls `ParquetExec.repartitioned` with the specified + /// `target_partitions` and `repartition_file_min_size`, returning the + /// resulting `PartitionedFile`s + fn repartition_with_size( + parquet_exec: &ParquetExec, + target_partitions: usize, + repartition_file_min_size: usize, + ) -> Vec> { let mut config = ConfigOptions::new(); config.optimizer.repartition_file_min_size = repartition_file_min_size; - config + + parquet_exec + .repartitioned(target_partitions, &config) + .unwrap() // unwrap Result + .unwrap() // unwrap Option + .as_any() + .downcast_ref::() + .unwrap() + .base_config() + .file_groups + .clone() } } From bdc883460ed5df90da11008c8c05a2a13879c159 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 09:18:11 -0400 Subject: [PATCH 4/6] update test --- .../core/src/datasource/physical_plan/mod.rs | 76 ++++++++++--------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 6bf9570b45e5..342b72acd071 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -892,11 +892,8 @@ mod tests { None, ); - let actual = file_groups_to_vec(repartition_with_size( - &parquet_exec, - n_partition, - 10, - )); + let actual = + repartition_with_size_to_vec(&parquet_exec, n_partition, 10); assert_eq!(expected, &actual); } @@ -924,7 +921,7 @@ mod tests { None, ); - let actual = file_groups_to_vec(repartition_with_size(&parquet_exec, 4, 10)); + let actual = repartition_with_size_to_vec(&parquet_exec, 4, 10); let expected = vec![ (0, "a".to_string(), 0, 31), (1, "a".to_string(), 31, 62), @@ -955,7 +952,7 @@ mod tests { None, ); - let actual = file_groups_to_vec(repartition_with_size(&parquet_exec, 96, 5)); + let actual = repartition_with_size_to_vec(&parquet_exec, 96, 5); let expected = vec![ (0, "a".to_string(), 0, 1), (1, "a".to_string(), 1, 2), @@ -992,7 +989,7 @@ mod tests { None, ); - let actual = file_groups_to_vec(repartition_with_size(&parquet_exec, 3, 10)); + let actual = repartition_with_size_to_vec(&parquet_exec, 3, 10); let expected = vec![ (0, "a".to_string(), 0, 34), (1, "a".to_string(), 34, 40), @@ -1025,7 +1022,7 @@ mod tests { None, ); - let actual = file_groups_to_vec(repartition_with_size(&parquet_exec, 2, 10)); + let actual = repartition_with_size_to_vec(&parquet_exec, 2, 10); let expected = vec![ (0, "a".to_string(), 0, 40), (0, "b".to_string(), 0, 10), @@ -1088,11 +1085,43 @@ mod tests { assert_eq!(1, actual.len()); } - /// Convert a set of file groups into a vector of tuples: + /// Calls `ParquetExec.repartitioned` with the specified + /// `target_partitions` and `repartition_file_min_size`, returning the + /// resulting `PartitionedFile`s + fn repartition_with_size( + parquet_exec: &ParquetExec, + target_partitions: usize, + repartition_file_min_size: usize, + ) -> Vec> { + let mut config = ConfigOptions::new(); + config.optimizer.repartition_file_min_size = repartition_file_min_size; + + parquet_exec + .repartitioned(target_partitions, &config) + .unwrap() // unwrap Result + .unwrap() // unwrap Option + .as_any() + .downcast_ref::() + .unwrap() + .base_config() + .file_groups + .clone() + } + + /// Calls `repartition_with_size` and returns a tuple for each output `PartitionedFile`: + /// /// `(partition index, file path, start, end)` - fn file_groups_to_vec( - file_groups: Vec>, + fn repartition_with_size_to_vec( + parquet_exec: &ParquetExec, + target_partitions: usize, + repartition_file_min_size: usize, ) -> Vec<(usize, String, i64, i64)> { + let file_groups = repartition_with_size( + parquet_exec, + target_partitions, + repartition_file_min_size, + ); + file_groups .iter() .enumerate() @@ -1112,27 +1141,4 @@ mod tests { .collect_vec() } } - - /// Calls `ParquetExec.repartitioned` with the specified - /// `target_partitions` and `repartition_file_min_size`, returning the - /// resulting `PartitionedFile`s - fn repartition_with_size( - parquet_exec: &ParquetExec, - target_partitions: usize, - repartition_file_min_size: usize, - ) -> Vec> { - let mut config = ConfigOptions::new(); - config.optimizer.repartition_file_min_size = repartition_file_min_size; - - parquet_exec - .repartitioned(target_partitions, &config) - .unwrap() // unwrap Result - .unwrap() // unwrap Option - .as_any() - .downcast_ref::() - .unwrap() - .base_config() - .file_groups - .clone() - } } From bee0ba52f8f541d040cfb5b8f361d513f54fe1cb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 09:35:59 -0400 Subject: [PATCH 5/6] refine docs --- datafusion/physical-plan/src/lib.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index c115a5026a0c..3ada2fa163fd 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -210,17 +210,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec>, ) -> Result>; - /// If supported, changes the partitioning of this `ExecutionPlan` to + /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to /// produce `target_partitions` partitions. /// /// If the `ExecutionPlan` does not support changing its partitioning, /// returns `Ok(None)` (the default). /// + /// It is the `ExecutionPlan` can increase its partitioning, but not to the + /// `target_partitions`, it may return an ExecutionPlan with fewer + /// partitions. This might happen, for example, if each new partition would + /// be too small to be efficiently processed individually. + /// /// The DataFusion optimizer attempts to use as many threads as possible by /// repartitioning its inputs to match the target number of threads /// available (`target_partitions`). Some data sources, such as the built in - /// CSV and Parquet readers, are able to read from their input files in - /// parallel, regardless of how the source data is split amongst files. + /// CSV and Parquet readers, implement this method as they are able to read + /// from their input files in parallel, regardless of how the source data is + /// split amongst files. fn repartitioned( &self, _target_partitions: usize, From 9dab5966fce4b0d264240e0d0d9e89415d41e686 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Oct 2023 12:23:47 -0400 Subject: [PATCH 6/6] fix merge --- datafusion/core/src/physical_optimizer/enforce_distribution.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 01f35dcd1c7a..2889c268bafa 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1628,7 +1628,6 @@ mod tests { use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::datasource::physical_plan::FileScanConfig; use crate::datasource::physical_plan::ParquetExec; use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; use crate::physical_optimizer::enforce_sorting::EnforceSorting;