Skip to content

Commit

Permalink
Make parquet an option by adding multiple cfg attributes without
Browse files Browse the repository at this point in the history
significant code changes.
  • Loading branch information
ongchi committed Oct 8, 2023
1 parent 3d1b23a commit 829d4b0
Show file tree
Hide file tree
Showing 19 changed files with 322 additions and 141 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async-trait = "0.1.41"
aws-config = "0.55"
aws-credential-types = "0.55"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"] }
datafusion = { path = "../datafusion/core", version = "32.0.0", features = ["avro", "crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ path = "src/lib.rs"
[features]
avro = ["apache-avro"]
backtrace = []
default = ["parquet"]
# default = ["parquet"]
pyarrow = ["pyo3", "arrow/pyarrow"]

[dependencies]
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
backtrace = ["datafusion-common/backtrace"]
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"]
crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"]
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression"]
default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"]
encoding_expressions = ["datafusion-physical-expr/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
pyarrow = ["datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"]
simd = ["arrow/simd"]
unicode_expressions = ["datafusion-physical-expr/unicode_expressions", "datafusion-optimizer/unicode_expressions", "datafusion-sql/unicode_expressions"]
parquet = ["datafusion-common/parquet", "dep:parquet"]

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
Expand All @@ -60,7 +61,7 @@ bytes = "1.4"
bzip2 = { version = "0.4.3", optional = true }
chrono = { workspace = true }
dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "32.0.0", features = ["parquet", "object_store"] }
datafusion-common = { path = "../common", version = "32.0.0", features = ["object_store"], default-features = false }
datafusion-execution = { path = "../execution", version = "32.0.0" }
datafusion-expr = { path = "../expr", version = "32.0.0" }
datafusion-optimizer = { path = "../optimizer", version = "32.0.0", default-features = false }
Expand All @@ -79,7 +80,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.7.0"
parking_lot = "0.12"
parquet = { workspace = true }
parquet = { workspace = true, optional = true }
percent-encoding = "2.2.0"
pin-project-lite = "^0.2.7"
rand = "0.8"
Expand All @@ -92,7 +93,6 @@ uuid = { version = "1.0", features = ["v4"] }
xz2 = { version = "0.1", optional = true }
zstd = { version = "0.12", optional = true, default-features = false }


[dev-dependencies]
async-trait = "0.1.53"
bigdecimal = "0.4.1"
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
#[cfg(feature = "parquet")]
use datafusion_common::file_options::parquet_writer::{
default_builder, ParquetWriterOptions,
};
Expand All @@ -35,6 +36,7 @@ use datafusion_common::{
DataFusionError, FileType, FileTypeWriterOptions, SchemaError, UnnestOptions,
};
use datafusion_expr::dml::CopyOptions;
#[cfg(feature = "parquet")]
use parquet::file::properties::WriterProperties;

use datafusion_common::{Column, DFSchema, ScalarValue};
Expand Down Expand Up @@ -1054,6 +1056,7 @@ impl DataFrame {
}

/// Write a `DataFrame` to a Parquet file.
#[cfg(feature = "parquet")]
pub async fn write_parquet(
self,
path: &str,
Expand Down Expand Up @@ -1320,7 +1323,9 @@ mod tests {
};
use datafusion_physical_expr::expressions::Column;
use object_store::local::LocalFileSystem;
#[cfg(feature = "parquet")]
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
#[cfg(feature = "parquet")]
use parquet::file::reader::FileReader;
use tempfile::TempDir;
use url::Url;
Expand Down Expand Up @@ -2368,6 +2373,7 @@ mod tests {
Ok(())
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn write_parquet_with_compression() -> Result<()> {
let test_df = test_table().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,14 @@ impl FileTypeExt for FileType {

match self {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant {
FileType::AVRO | FileType::ARROW => match c.variant {
UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
)),
},
#[cfg(feature = "parquet")]
FileType::PARQUET => match c.variant {
UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
Expand Down Expand Up @@ -276,10 +283,13 @@ mod tests {
);
}

let mut ty_ext_tuple = vec![];
ty_ext_tuple.push((FileType::AVRO, ".avro"));
#[cfg(feature = "parquet")]
ty_ext_tuple.push((FileType::PARQUET, ".parquet"));

// Cannot specify compression for these file types
for (file_type, extension) in
[(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")]
{
for (file_type, extension) in ty_ext_tuple {
assert_eq!(
file_type
.get_ext_with_compression(FileCompressionType::UNCOMPRESSED)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod csv;
pub mod file_compression_type;
pub mod json;
pub mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
pub mod write;

Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use datafusion_common::{plan_err, DataFusionError};

use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
use crate::datasource::{
file_format::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
},
file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
listing::ListingOptions,
};
use crate::error::Result;
Expand Down Expand Up @@ -542,6 +542,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
}
}

#[cfg(feature = "parquet")]
#[async_trait]
impl ReadOptions<'_> for ParquetReadOptions<'_> {
fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions {
Expand Down
14 changes: 10 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ use super::PartitionedFile;
use crate::datasource::file_format::file_compression_type::{
FileCompressionType, FileTypeExt,
};
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::physical_plan::{
is_plan_streaming, FileScanConfig, FileSinkConfig,
};
use crate::datasource::{
file_format::{
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
parquet::ParquetFormat, FileFormat,
FileFormat,
},
get_statistics_with_limit,
listing::ListingTableUrl,
Expand Down Expand Up @@ -147,6 +149,7 @@ impl ListingTableConfig {
FileType::JSON => Arc::new(
JsonFormat::default().with_file_compression_type(file_compression_type),
),
#[cfg(feature = "parquet")]
FileType::PARQUET => Arc::new(ParquetFormat::default()),
};

Expand Down Expand Up @@ -1004,15 +1007,15 @@ mod tests {
use std::fs::File;

use super::*;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{provider_as_source, MemTable};
use crate::execution::options::ArrowReadOptions;
use crate::physical_plan::collect;
use crate::prelude::*;
use crate::{
assert_batches_eq,
datasource::file_format::{
avro::AvroFormat, file_compression_type::FileTypeExt, parquet::ParquetFormat,
},
datasource::file_format::{avro::AvroFormat, file_compression_type::FileTypeExt},
execution::options::ReadOptions,
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
Expand Down Expand Up @@ -1075,6 +1078,7 @@ mod tests {
Ok(())
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn load_table_stats_by_default() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
Expand All @@ -1098,6 +1102,7 @@ mod tests {
Ok(())
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn load_table_stats_when_no_stats() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
Expand All @@ -1122,6 +1127,7 @@ mod tests {
Ok(())
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn test_try_create_output_ordering() {
let testdata = crate::test_util::parquet_test_data();
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::datasource::file_format::avro::AvroFormat;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::json::JsonFormat;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::FileFormat;
use crate::datasource::listing::{
Expand Down Expand Up @@ -81,6 +82,7 @@ impl TableProviderFactory for ListingTableFactory {
.with_delimiter(cmd.delimiter as u8)
.with_file_compression_type(file_compression_type),
),
#[cfg(feature = "parquet")]
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::AVRO => Arc::new(AvroFormat),
FileType::JSON => Arc::new(
Expand Down Expand Up @@ -159,6 +161,7 @@ impl TableProviderFactory for ListingTableFactory {
Some(mode) => ListingTableInsertMode::from_str(mode.as_str()),
None => match file_type {
FileType::CSV => Ok(ListingTableInsertMode::AppendToFile),
#[cfg(feature = "parquet")]
FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles),
FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles),
FileType::JSON => Ok(ListingTableInsertMode::AppendToFile),
Expand Down Expand Up @@ -199,6 +202,7 @@ impl TableProviderFactory for ListingTableFactory {
json_writer_options.compression = cmd.file_compression_type;
FileTypeWriterOptions::JSON(json_writer_options)
}
#[cfg(feature = "parquet")]
FileType::PARQUET => file_type_writer_options,
FileType::ARROW => file_type_writer_options,
FileType::AVRO => file_type_writer_options,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ pub use self::provider::TableProvider;
pub use self::view::ViewTable;
pub use crate::logical_expr::TableType;
pub use statistics::get_statistics_with_limit;
#[cfg(feature = "parquet")]
pub(crate) use statistics::{create_max_min_accs, get_col_stats};
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ mod avro;
mod csv;
mod file_stream;
mod json;
#[cfg(feature = "parquet")]
pub mod parquet;

pub(crate) use self::csv::plan_to_csv;
pub use self::csv::{CsvConfig, CsvExec, CsvOpener};
#[cfg(feature = "parquet")]
pub(crate) use self::parquet::plan_to_parquet;
#[cfg(feature = "parquet")]
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
use arrow::{
array::new_null_array,
Expand Down Expand Up @@ -805,6 +808,7 @@ mod tests {
use super::*;

/// Empty file won't get partitioned
#[cfg(feature = "parquet")]
#[tokio::test]
async fn repartition_empty_file_only() {
let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0);
Expand Down Expand Up @@ -836,6 +840,7 @@ mod tests {
}

// Repartition when there is a empty file in file groups
#[cfg(feature = "parquet")]
#[tokio::test]
async fn repartition_empty_files() {
let partitioned_file_a = PartitionedFile::new("a".to_string(), 10);
Expand Down Expand Up @@ -902,6 +907,7 @@ mod tests {
}
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn repartition_single_file() {
// Single file, single partition into multiple partitions
Expand Down Expand Up @@ -939,6 +945,7 @@ mod tests {
assert_eq!(expected, actual);
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn repartition_too_much_partitions() {
// Single file, single parittion into 96 partitions
Expand Down Expand Up @@ -980,6 +987,7 @@ mod tests {
assert_eq!(expected, actual);
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn repartition_multiple_partitions() {
// Multiple files in single partition after redistribution
Expand Down Expand Up @@ -1019,6 +1027,7 @@ mod tests {
assert_eq!(expected, actual);
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn repartition_same_num_partitions() {
// "Rebalance" files across partitions
Expand Down Expand Up @@ -1057,6 +1066,7 @@ mod tests {
assert_eq!(expected, actual);
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn repartition_no_action_ranges() {
// No action due to Some(range) in second file
Expand Down Expand Up @@ -1090,6 +1100,7 @@ mod tests {
assert_eq!(2, actual.len());
}

#[cfg(feature = "parquet")]
#[tokio::test]
async fn repartition_no_action_min_size() {
// No action due to target_partition_size
Expand Down
Loading

0 comments on commit 829d4b0

Please sign in to comment.