Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust,python): ensure projections containing only hive columns are projected #11803

Merged
merged 9 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -499,15 +499,13 @@ where
D: NestedDecoder<'a>,
{
// front[a1, a2, a3, ...]back
if items.len() > 1 {
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
if *remaining == 0 && items.is_empty() {
return MaybeNext::None;
}
if *remaining == 0 {
return match items.pop_front() {
Some(decoded) => MaybeNext::Some(Ok(decoded)),
None => MaybeNext::None,
};
if !items.is_empty() && items.front().unwrap().0.len() > chunk_size.unwrap_or(usize::MAX) {
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
}

match iter.next() {
Err(e) => MaybeNext::Some(Err(e.into())),
Ok(None) => {
Expand Down Expand Up @@ -541,7 +539,8 @@ where
Err(e) => return MaybeNext::Some(Err(e)),
};

if (items.len() == 1)
// if possible, return the value immediately.
if !items.is_empty()
&& items.front().unwrap().0.len() > chunk_size.unwrap_or(usize::MAX)
{
MaybeNext::Some(Ok(items.pop_front().unwrap()))
Expand Down
49 changes: 34 additions & 15 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ pub(super) fn array_iter_to_series(
}

/// Materializes hive partitions.
fn materialize_hive_partitions(df: &mut DataFrame, hive_partition_columns: Option<&[Series]>) {
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
/// Safety: num_rows equals the height of the df when the df height is non-zero.
fn materialize_hive_partitions(
df: &mut DataFrame,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
let num_rows = df.height();

for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
}
Expand Down Expand Up @@ -191,6 +196,7 @@ fn rg_to_dfs_optionally_par_over_columns(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

let projection_height = (*remaining_rows).min(md.num_rows());
let chunk_size = md.num_rows();
let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
Expand All @@ -200,7 +206,7 @@ fn rg_to_dfs_optionally_par_over_columns(
column_idx_to_series(
*column_i,
md,
*remaining_rows,
projection_height,
schema,
store,
chunk_size,
Expand All @@ -212,20 +218,26 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.iter()
.map(|column_i| {
column_idx_to_series(*column_i, md, *remaining_rows, schema, store, chunk_size)
column_idx_to_series(
*column_i,
md,
projection_height,
schema,
store,
chunk_size,
)
})
.collect::<PolarsResult<Vec<_>>>()?
};

*remaining_rows =
remaining_rows.saturating_sub(file_metadata.row_groups[rg_idx].num_rows());
*remaining_rows -= projection_height;

let mut df = DataFrame::new_no_checks(columns);
if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(*previous_row_count + rc.offset));
}
materialize_hive_partitions(&mut df, hive_partition_columns);

materialize_hive_partitions(&mut df, hive_partition_columns, projection_height);
apply_predicate(&mut df, predicate, true)?;

*previous_row_count += current_row_count;
Expand Down Expand Up @@ -265,17 +277,17 @@ fn rg_to_dfs_par_over_rg(
let row_count_start = *previous_row_count;
let num_rows = rg_md.num_rows();
*previous_row_count += num_rows as IdxSize;
let local_limit = *remaining_rows;
*remaining_rows = remaining_rows.saturating_sub(num_rows);
let projection_height = (*remaining_rows).min(num_rows);
*remaining_rows -= projection_height;

(rg_idx, rg_md, local_limit, row_count_start)
(rg_idx, rg_md, projection_height, row_count_start)
})
.collect::<Vec<_>>();

let dfs = row_groups
.into_par_iter()
.map(|(rg_idx, md, local_limit, row_count_start)| {
if local_limit == 0
.map(|(rg_idx, md, projection_height, row_count_start)| {
if projection_height == 0
|| use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
Expand All @@ -291,7 +303,14 @@ fn rg_to_dfs_par_over_rg(
let columns = projection
.iter()
.map(|column_i| {
column_idx_to_series(*column_i, md, local_limit, schema, store, chunk_size)
column_idx_to_series(
*column_i,
md,
projection_height,
schema,
store,
chunk_size,
)
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand All @@ -300,8 +319,8 @@ fn rg_to_dfs_par_over_rg(
if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset));
}
materialize_hive_partitions(&mut df, hive_partition_columns);

materialize_hive_partitions(&mut df, hive_partition_columns, projection_height);
apply_predicate(&mut df, predicate, false)?;

Ok(Some(df))
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,15 @@ def test_hive_partitioned_projection_pushdown(
columns = ["sugars_g", "category"]
for streaming in [True, False]:
assert q.select(columns).collect(streaming=streaming).columns == columns

# test that hive partition columns are projected with the correct height when
# the projection contains only hive partition columns (11796)
for parallel in ("row_groups", "columns"):
q = pl.scan_parquet(
root / "**/*.parquet", hive_partitioning=True, parallel=parallel # type: ignore[arg-type]
)

expect = q.collect().select("category")
actual = q.select("category").collect()

assert expect.frame_equal(actual)