Skip to content

Commit

Permalink
Extract parquet statistics to its own module, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 21, 2023
1 parent 58483fb commit 62f91b6
Show file tree
Hide file tree
Showing 4 changed files with 832 additions and 142 deletions.
23 changes: 1 addition & 22 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ mod metrics;
pub mod page_filter;
mod row_filter;
mod row_groups;
mod statistics;

pub use metrics::ParquetFileMetrics;

Expand Down Expand Up @@ -718,28 +719,6 @@ pub async fn plan_to_parquet(
Ok(())
}

// Copy from the arrow-rs
// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
// Convert the byte slice to fixed length byte array with the length of 16
fn sign_extend_be(b: &[u8]) -> [u8; 16] {
assert!(b.len() <= 16, "Array too large, expected less than 16");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
*d = *s;
}
result
}

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(sign_extend_be(b))
}

// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ use parquet::{
};
use std::sync::Arc;

use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;
Expand Down
141 changes: 24 additions & 117 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,26 @@
// specific language governing permissions and limitations
// under the License.

use arrow::{
array::ArrayRef,
datatypes::{DataType, Schema},
};
use arrow::{array::ArrayRef, datatypes::Schema};
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
bloom_filter::Sbbf,
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
file::metadata::RowGroupMetaData,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use crate::datasource::{
listing::FileRange,
physical_plan::parquet::{from_bytes_to_i128, parquet_to_arrow_decimal_type},
};
use crate::datasource::listing::FileRange;
use crate::logical_expr::Operator;
use crate::physical_expr::expressions as phys_expr;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::PhysicalExpr;

use super::statistics::RowGoupStatisticsConverter;
use super::ParquetFileMetrics;

/// Prune row groups based on statistics
Expand Down Expand Up @@ -303,112 +298,6 @@ struct RowGroupPruningStatistics<'a> {
parquet_schema: &'a Schema,
}

/// Extract the min/max statistics from a `ParquetStatistics` object
macro_rules! get_statistic {
($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{
if !$column_statistics.has_min_max_set() {
return None;
}
match $column_statistics {
ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))),
ParquetStatistics::Int32(s) => {
match $target_arrow_type {
// int32 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(*s.$func() as i128),
precision,
scale,
))
}
_ => Some(ScalarValue::Int32(Some(*s.$func()))),
}
}
ParquetStatistics::Int64(s) => {
match $target_arrow_type {
// int64 to decimal with the precision and scale
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(*s.$func() as i128),
precision,
scale,
))
}
_ => Some(ScalarValue::Int64(Some(*s.$func()))),
}
}
// 96 bit ints not supported
ParquetStatistics::Int96(_) => None,
ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))),
ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))),
ParquetStatistics::ByteArray(s) => {
match $target_arrow_type {
// decimal data type
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
precision,
scale,
))
}
_ => {
let s = std::str::from_utf8(s.$bytes_func())
.map(|s| s.to_string())
.ok();
Some(ScalarValue::Utf8(s))
}
}
}
// type not supported yet
ParquetStatistics::FixedLenByteArray(s) => {
match $target_arrow_type {
// just support the decimal data type
Some(DataType::Decimal128(precision, scale)) => {
Some(ScalarValue::Decimal128(
Some(from_bytes_to_i128(s.$bytes_func())),
precision,
scale,
))
}
_ => None,
}
}
}
}};
}

// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
macro_rules! get_min_max_values {
($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
let (_column_index, field) =
if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
(v, f)
} else {
// Named column was not present
return None;
};

let data_type = field.data_type();
// The result may be None, because DataFusion doesn't have support for ScalarValues of the column type
let null_scalar: ScalarValue = data_type.try_into().ok()?;

$self.row_group_metadata
.columns()
.iter()
.find(|c| c.column_descr().name() == &$column.name)
.and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None})
.map(|(stats, column_descr)|
{
let target_data_type = parquet_to_arrow_decimal_type(column_descr);
get_statistic!(stats, $func, $bytes_func, target_data_type)
})
.flatten()
// column either didn't have statistics at all or didn't have min/max values
.or_else(|| Some(null_scalar.clone()))
.and_then(|s| s.to_array().ok())
}}
}

// Extract the null count value on the ParquetStatistics
macro_rules! get_null_count_values {
($self:expr, $column:expr) => {{
Expand All @@ -431,11 +320,29 @@ macro_rules! get_null_count_values {

impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, min, min_bytes)
let field = self
.parquet_schema
.fields()
.find(&column.name)
.map(|(_idx, field)| field)?;

RowGoupStatisticsConverter::new(&field)
.min([self.row_group_metadata])
// ignore errors during conversion, and just use no statistics
.ok()
}

fn max_values(&self, column: &Column) -> Option<ArrayRef> {
get_min_max_values!(self, column, max, max_bytes)
let field = self
.parquet_schema
.fields()
.find(&column.name)
.map(|(_idx, field)| field)?;

RowGoupStatisticsConverter::new(&field)
.max([self.row_group_metadata])
// ignore errors during conversion, and just use no statistics
.ok()
}

fn num_containers(&self) -> usize {
Expand Down
Loading

0 comments on commit 62f91b6

Please sign in to comment.