Skip to content

Commit

Permalink
perf: Batch parquet integer decoding (#17734)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jul 20, 2024
1 parent 7015663 commit 4695c4c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 28 deletions.
26 changes: 14 additions & 12 deletions crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,19 @@ where
}
}

struct BatchDecoder<'a, 'b, P, T, D>
pub(crate) struct PlainDecoderFnCollector<'a, 'b, P, T, D>
where
T: NativeType,
P: ParquetNativeType,
D: DecoderFunction<P, T>,
{
chunks: &'b mut ArrayChunks<'a, P>,
decoder: D,
_pd: std::marker::PhantomData<T>,
pub(crate) chunks: &'b mut ArrayChunks<'a, P>,
pub(crate) decoder: D,
pub(crate) _pd: std::marker::PhantomData<T>,
}

impl<'a, 'b, P, T, D: DecoderFunction<P, T>> BatchableCollector<(), Vec<T>>
for BatchDecoder<'a, 'b, P, T, D>
for PlainDecoderFnCollector<'a, 'b, P, T, D>
where
T: NativeType,
P: ParquetNativeType,
Expand Down Expand Up @@ -214,13 +214,15 @@ where

match (self, page_validity) {
(Self::Unit(page), None) => {
values.extend(
page.map(|v| decoder.decoder.decode(P::from_le_bytes(*v)))
.take(additional),
);
PlainDecoderFnCollector {
chunks: page,
decoder: decoder.decoder,
_pd: std::marker::PhantomData,
}
.push_n(values, additional)?;
},
(Self::Unit(page), Some(page_validity)) => {
let batched = BatchDecoder {
let collector = PlainDecoderFnCollector {
chunks: page,
decoder: decoder.decoder,
_pd: std::marker::PhantomData,
Expand All @@ -231,8 +233,8 @@ where
page_validity,
Some(additional),
values,
batched,
)?
collector,
)?;
},
(Self::Dictionary(page), None) => {
let translator = DictionaryTranslator(page.dict);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ use polars_error::PolarsResult;

use super::super::utils::MaybeNext;
use super::super::{utils, PagesIter};
use super::basic::{finish, DecoderFunction, PrimitiveDecoder, ValuesDictionary};
use super::basic::{
finish, DecoderFunction, PlainDecoderFnCollector, PrimitiveDecoder, ValuesDictionary,
};
use crate::parquet::encoding::hybrid_rle::DictionaryTranslator;
use crate::parquet::encoding::{byte_stream_split, delta_bitpacked, Encoding};
use crate::parquet::error::ParquetResult;
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::parquet::types::{decode, NativeType as ParquetNativeType};
use crate::read::deserialize::utils::array_chunks::ArrayChunks;
use crate::read::deserialize::utils::filter::Filter;
use crate::read::deserialize::utils::{PageValidity, TranslatedHybridRle};
use crate::read::deserialize::utils::{BatchableCollector, PageValidity, TranslatedHybridRle};

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
Expand Down Expand Up @@ -100,21 +102,28 @@ where
) -> ParquetResult<()> {
let (values, validity) = decoded;
match (self, page_validity) {
(Self::Unit(page), Some(page_validity)) => utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page
.by_ref()
.map(|v| decoder.0.decoder.decode(P::from_le_bytes(*v))),
)?,
(Self::Unit(page), Some(page_validity)) => {
let collector = PlainDecoderFnCollector {
chunks: page,
decoder: decoder.0.decoder,
_pd: Default::default(),
};

utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
collector,
)?;
},
(Self::Unit(page), None) => {
values.extend(
page.by_ref()
.map(|v| decoder.0.decoder.decode(P::from_le_bytes(*v)))
.take(additional),
);
PlainDecoderFnCollector {
chunks: page,
decoder: decoder.0.decoder,
_pd: Default::default(),
}
.push_n(values, additional)?;
},
(Self::Dictionary(page), Some(page_validity)) => {
let translator = DictionaryTranslator(page.dict);
Expand Down

0 comments on commit 4695c4c

Please sign in to comment.