Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Parquet several smaller issues #18325

Merged
merged 9 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/polars-arrow/src/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,14 @@ impl<O: Offset> OffsetsBuffer<O> {
pub fn into_inner(self) -> Buffer<O> {
self.0
}

/// Returns the offset difference between `start` and `end`.
#[inline]
pub fn delta(&self, start: usize, end: usize) -> usize {
assert!(start <= end);

(self.0[end + 1] - self.0[start]).to_usize()
}
}

impl From<&OffsetsBuffer<i32>> for OffsetsBuffer<i64> {
Expand Down
26 changes: 7 additions & 19 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ fn rg_to_dfs_prefiltered(
// column indexes of the schema.
let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns);
let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns);
for (i, col) in file_metadata.schema().columns().iter().enumerate() {
if live_variables.contains(col.path_in_schema[0].deref()) {
for (i, field) in schema.fields.iter().enumerate() {
if live_variables.contains(&field.name[..]) {
live_idx_to_col_idx.push(i);
} else {
dead_idx_to_col_idx.push(i);
Expand Down Expand Up @@ -406,22 +406,10 @@ fn rg_to_dfs_prefiltered(
})
.collect::<PolarsResult<Vec<_>>>()?;

let mut rearranged_schema: Schema = Schema::new();
if let Some(rc) = &row_index {
rearranged_schema.insert_at_index(
0,
SmartString::from(rc.name.deref()),
IdxType::get_dtype(),
)?;
}
for i in live_idx_to_col_idx.iter().copied() {
rearranged_schema.insert_at_index(
rearranged_schema.len(),
schema.fields[i].name.clone().into(),
schema.fields[i].data_type().into(),
)?;
}
rearranged_schema.merge(Schema::from(schema.as_ref()));
let Some(df) = dfs.first().map(|(_, df)| df) else {
return Ok(Vec::new());
};
let rearranged_schema = df.schema();

rg_columns
.par_chunks_exact_mut(num_dead_columns)
Expand Down Expand Up @@ -520,7 +508,7 @@ fn rg_to_dfs_optionally_par_over_columns(
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1);
apply_predicate(&mut df, predicate, true)?;

*previous_row_count = previous_row_count.checked_add(current_row_count).ok_or(
*previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
polars_err!(
ComputeError: "Parquet file produces more than pow(2, 32) rows; \
consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ impl<'a, 'b, O: Offset> BatchableCollector<(), Binary<O>> for DeltaCollector<'a,
target.extend_constant(n);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl<'a, 'b, O: Offset> BatchableCollector<(), Binary<O>> for DeltaBytesCollector<'a, 'b, O> {
Expand All @@ -159,6 +163,10 @@ impl<'a, 'b, O: Offset> BatchableCollector<(), Binary<O>> for DeltaBytesCollecto
target.extend_constant(n);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder<O>> for BinaryStateTranslation<'a> {
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ impl<'a, 'b> BatchableCollector<(), MutableBinaryViewArray<[u8]>> for &mut Delta
target.extend_constant(n, <Option<&[u8]>>::None);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl<'a, 'b> DeltaCollector<'a, 'b> {
Expand Down Expand Up @@ -426,6 +430,10 @@ impl<'a, 'b> BatchableCollector<(), MutableBinaryViewArray<[u8]>> for DeltaBytes
target.extend_constant(n, <Option<&[u8]>>::None);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl utils::Decoder for BinViewDecoder {
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl<'a, 'b> BatchableCollector<u32, MutableBitmap> for BitmapCollector<'a, 'b>
target.extend_constant(n, false);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.0.skip_in_place(n)
}
}

impl ExactSize for (MutableBitmap, MutableBitmap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl<'a, 'b, K: DictionaryKey> BatchableCollector<(), Vec<K>> for DictArrayColle
target.resize(target.len() + n, K::default());
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.values.skip_in_place(n)
}
}

impl<K: DictionaryKey> Translator<K> for DictArrayTranslator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) enum StateTranslation<'a> {
Dictionary(hybrid_rle::HybridRleDecoder<'a>, &'a Vec<u8>),
}

#[derive(Debug)]
pub struct FixedSizeBinary {
pub values: Vec<u8>,
pub size: usize,
Expand Down Expand Up @@ -164,6 +165,12 @@ impl Decoder for BinaryDecoder {
target.resize(target.len() + n * self.size, 0);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
let n = usize::min(n, self.slice.len() / self.size);
*self.slice = &self.slice[n * self.size..];
Ok(())
}
}

let mut collector = FixedSizeBinaryCollector {
Expand Down
11 changes: 11 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,17 @@ pub fn columns_to_iter_recursive(
)?
.collect_n(filter)?
},
Binary | Utf8 => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support those types. I think we directly use BinviewDecoder here if we can.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will leave this for this PR. But we should be able to remove the code that generates (Large)Utf8, (LargeBinary) in favor of BinaryView.

init.push(InitNested::Primitive(field.is_nullable));
types.pop();
PageNestedDecoder::new(
columns.pop().unwrap(),
field.data_type().clone(),
binary::BinaryDecoder::<i32>::default(),
init,
)?
.collect_n(filter)?
},
_ => match field.data_type().to_logical_type() {
ArrowDataType::Dictionary(key_type, _, _) => {
init.push(InitNested::Primitive(field.is_nullable));
Expand Down
89 changes: 77 additions & 12 deletions crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Nested {

fn invalid_num_values(&self) -> usize {
match &self.content {
NestedContent::Primitive => 0,
NestedContent::Primitive => 1,
NestedContent::List { .. } => 0,
NestedContent::FixedSizeList { width } => *width,
NestedContent::Struct => 1,
Expand Down Expand Up @@ -204,6 +204,10 @@ impl<'a, 'b, 'c, D: utils::NestedDecoder> BatchableCollector<(), D::DecodedState
self.decoder.push_n_nulls(self.state, target, n);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.state.skip_in_place(n)
}
}

/// The initial info of nested data types.
Expand Down Expand Up @@ -290,6 +294,67 @@ impl NestedState {
}
}

/// Calculate the number of leaf values that are covered by the first `limit` definition level
/// values.
fn limit_to_num_values(
def_iter: &HybridRleDecoder<'_>,
def_levels: &[u16],
limit: usize,
) -> ParquetResult<usize> {
struct NumValuesGatherer {
leaf_def_level: u16,
}
struct NumValuesState {
num_values: usize,
length: usize,
}

impl HybridRleGatherer<u32> for NumValuesGatherer {
type Target = NumValuesState;

fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {}

fn target_num_elements(&self, target: &Self::Target) -> usize {
target.length
}

fn hybridrle_to_target(&self, value: u32) -> ParquetResult<u32> {
Ok(value)
}

fn gather_one(&self, target: &mut Self::Target, value: u32) -> ParquetResult<()> {
target.num_values += usize::from(value == self.leaf_def_level as u32);
target.length += 1;
Ok(())
}

fn gather_repeated(
&self,
target: &mut Self::Target,
value: u32,
n: usize,
) -> ParquetResult<()> {
target.num_values += n * usize::from(value == self.leaf_def_level as u32);
target.length += n;
Ok(())
}
}

let mut state = NumValuesState {
num_values: 0,
length: 0,
};
def_iter.clone().gather_n_into(
&mut state,
limit,
&NumValuesGatherer {
leaf_def_level: *def_levels.last().unwrap(),
},
)?;

Ok(state.num_values)
}

fn idx_to_limit(rep_iter: &HybridRleDecoder<'_>, idx: usize) -> ParquetResult<usize> {
struct RowIdxOffsetGatherer;
struct RowIdxOffsetState {
Expand Down Expand Up @@ -384,7 +449,7 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>(
>,
nested: &mut [Nested],
filter: Option<Filter>,
// Amortized allocations

def_levels: &[u16],
rep_levels: &[u16],
) -> PolarsResult<()> {
Expand Down Expand Up @@ -416,6 +481,9 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>(
if start > 0 {
let start_cell = idx_to_limit(&rep_iter, start)?;

let num_skipped_values = limit_to_num_values(&def_iter, def_levels, start_cell)?;
batched_collector.skip_in_place(num_skipped_values)?;

rep_iter.skip_in_place(start_cell)?;
def_iter.skip_in_place(start_cell)?;
}
Expand All @@ -436,6 +504,8 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>(

// @NOTE: This is kind of unused
let last_skip = def_iter.len();
let num_skipped_values = limit_to_num_values(&def_iter, def_levels, last_skip)?;
batched_collector.skip_in_place(num_skipped_values)?;
rep_iter.skip_in_place(last_skip)?;
def_iter.skip_in_place(last_skip)?;

Expand All @@ -447,6 +517,8 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>(
let num_zeros = iter.take_leading_zeros();
if num_zeros > 0 {
let offset = idx_to_limit(&rep_iter, num_zeros)?;
let num_skipped_values = limit_to_num_values(&def_iter, def_levels, offset)?;
batched_collector.skip_in_place(num_skipped_values)?;
rep_iter.skip_in_place(offset)?;
def_iter.skip_in_place(offset)?;
}
Expand Down Expand Up @@ -601,23 +673,16 @@ fn extend_offsets_limited<'a, D: utils::NestedDecoder>(
}
}

if embed_depth == max_depth - 1 {
for _ in 0..num_elements {
batched_collector.push_invalid();
}

break;
}

let embed_num_values = embed_nest.invalid_num_values();
num_elements *= embed_num_values;

if embed_num_values == 0 {
break;
}

num_elements *= embed_num_values;
}

batched_collector.push_n_invalids(num_elements);

break;
}

Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::parquet::error::ParquetResult;
use crate::parquet::page::{DataPage, DictPage};

pub(crate) struct NullDecoder;
#[derive(Debug)]
pub(crate) struct NullArrayLength {
length: usize,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ where
target.resize(target.len() + n, T::default());
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.chunks.skip_in_place(n);
Ok(())
}
}

#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -206,7 +211,7 @@ where
}

match self {
Self::Plain(t) => _ = t.nth(n - 1),
Self::Plain(t) => t.skip_in_place(n),
Self::Dictionary(t) => t.values.skip_in_place(n)?,
Self::ByteStreamSplit(t) => _ = t.iter_converted(|_| ()).nth(n - 1),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ where
}

match self {
Self::Plain(v) => _ = v.nth(n - 1),
Self::Plain(v) => v.skip_in_place(n),
Self::Dictionary(v) => v.values.skip_in_place(n)?,
Self::ByteStreamSplit(v) => _ = v.iter_converted(|_| ()).nth(n - 1),
Self::DeltaBinaryPacked(v) => v.skip_in_place(n)?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,8 @@ where
target.resize(target.len() + n, T::default());
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ impl<'a, P: ParquetNativeType> ArrayChunks<'a, P> {

Some(Self { bytes })
}

pub(crate) fn skip_in_place(&mut self, n: usize) {
let n = usize::min(self.bytes.len(), n);
self.bytes = &self.bytes[n..];
}
}

impl<'a, P: ParquetNativeType> Iterator for ArrayChunks<'a, P> {
Expand All @@ -36,13 +41,6 @@ impl<'a, P: ParquetNativeType> Iterator for ArrayChunks<'a, P> {
Some(item)
}

#[inline(always)]
fn nth(&mut self, n: usize) -> Option<Self::Item> {
let item = self.bytes.get(n)?;
self.bytes = &self.bytes[n + 1..];
Some(item)
}

#[inline(always)]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.bytes.len(), Some(self.bytes.len()))
Expand Down
Loading