Skip to content

Commit

Permalink
API to go from ParquetExec to ParquetExecBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 7, 2024
1 parent 577e4bb commit 750a390
Showing 1 changed file with 79 additions and 7 deletions.
86 changes: 79 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,33 @@ pub use writer::plan_to_parquet;
/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
///
/// # Example: rewriting `ParquetExec`
///
/// You can modify a `ParquetExec` using [`ParquetExecBuilder`], for example
/// to change files or add a predicate.
///
/// ```no_run
/// # use std::sync::Arc;
/// # use arrow::datatypes::Schema;
/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
/// # use datafusion::datasource::listing::PartitionedFile;
/// # fn parquet_exec() -> ParquetExec { unimplemented!() }
/// // Split a single ParquetExec into multiple ParquetExecs, one for each file
/// let exec = parquet_exec();
/// let exiting_file_groups = &exec.base_config().file_groups;
/// let new_execs = exiting_file_groups
/// .iter()
/// .map(|file_group| {
/// // create a new exec by copying the existing exec into a builder
/// let new_exec = exec.clone()
/// .into_builder()
/// .with_file_groups(vec![file_group.clone()])
/// .build();
/// new_exec
/// })
/// .collect::<Vec<_>>();
/// ```
///
/// # Implementing External Indexes
///
/// It is possible to restrict the row groups and selections within those row
Expand Down Expand Up @@ -257,6 +284,12 @@ pub struct ParquetExec {
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl From<ParquetExec> for ParquetExecBuilder {
fn from(exec: ParquetExec) -> Self {
exec.into_builder()
}
}

/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
///
/// See example on [`ParquetExec`].
Expand Down Expand Up @@ -291,6 +324,12 @@ impl ParquetExecBuilder {
}
}

/// Update the list of files groups to read
pub fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.file_scan_config.file_groups = file_groups;
self
}

/// Set the filter predicate when reading.
///
/// See the "Predicate Pushdown" section of the [`ParquetExec`] documenation
Expand Down Expand Up @@ -459,6 +498,32 @@ impl ParquetExec {
ParquetExecBuilder::new(file_scan_config)
}

/// Convert this `ParquetExec` into a builder for modification
pub fn into_builder(self) -> ParquetExecBuilder {
// list out fileds so it is clear what is being dropped
let Self {
base_config,
projected_statistics: _,
metrics: _,
predicate,
pruning_predicate: _,
page_pruning_predicate: _,
metadata_size_hint,
parquet_file_reader_factory,
cache: _,
table_parquet_options,
schema_adapter_factory,
} = self;
ParquetExecBuilder {
file_scan_config: base_config,
predicate,
metadata_size_hint,
table_parquet_options,
parquet_file_reader_factory,
schema_adapter_factory,
}
}

/// [`FileScanConfig`] that controls this scan (such as which files to read)
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
Expand All @@ -479,9 +544,15 @@ impl ParquetExec {
self.pruning_predicate.as_ref()
}

/// return the optional file reader factory
pub fn parquet_file_reader_factory(
&self,
) -> Option<&Arc<dyn ParquetFileReaderFactory>> {
self.parquet_file_reader_factory.as_ref()
}

/// Optional user defined parquet file reader factory.
///
/// See documentation on [`ParquetExecBuilder::with_parquet_file_reader_factory`]
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
Expand All @@ -490,6 +561,11 @@ impl ParquetExec {
self
}

/// return the optional schema adapter factory
pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
self.schema_adapter_factory.as_ref()
}

/// Optional schema adapter factory.
///
/// See documentation on [`ParquetExecBuilder::with_schema_adapter_factory`]
Expand Down Expand Up @@ -586,12 +662,8 @@ impl ParquetExec {
)
}

fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it also
let output_partitioning = Self::output_partitioning_helper(&self.base_config);
self.cache = self.cache.with_partitioning(output_partitioning);
self
fn with_file_groups(self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
self.into_builder().with_file_groups(file_groups).build()
}
}

Expand Down

0 comments on commit 750a390

Please sign in to comment.