From 66e1fb772707b19aea2c7d67774d091fc9848a13 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 19 Jul 2024 16:16:03 +0200 Subject: [PATCH] perf: Batch parquet integer decoding --- .../arrow/read/deserialize/primitive/basic.rs | 26 ++++++------ .../read/deserialize/primitive/integer.rs | 41 +++++++++++-------- 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs index d0ceef2812eb..f5ed5d492e20 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs @@ -99,19 +99,19 @@ where } } -struct BatchDecoder<'a, 'b, P, T, D> +pub(crate) struct PlainDecoderFnCollector<'a, 'b, P, T, D> where T: NativeType, P: ParquetNativeType, D: DecoderFunction, { - chunks: &'b mut ArrayChunks<'a, P>, - decoder: D, - _pd: std::marker::PhantomData, + pub(crate) chunks: &'b mut ArrayChunks<'a, P>, + pub(crate) decoder: D, + pub(crate) _pd: std::marker::PhantomData, } impl<'a, 'b, P, T, D: DecoderFunction> BatchableCollector<(), Vec> - for BatchDecoder<'a, 'b, P, T, D> + for PlainDecoderFnCollector<'a, 'b, P, T, D> where T: NativeType, P: ParquetNativeType, @@ -214,13 +214,15 @@ where match (self, page_validity) { (Self::Unit(page), None) => { - values.extend( - page.map(|v| decoder.decoder.decode(P::from_le_bytes(*v))) - .take(additional), - ); + PlainDecoderFnCollector { + chunks: page, + decoder: decoder.decoder, + _pd: std::marker::PhantomData, + } + .push_n(values, additional)?; }, (Self::Unit(page), Some(page_validity)) => { - let batched = BatchDecoder { + let collector = PlainDecoderFnCollector { chunks: page, decoder: decoder.decoder, _pd: std::marker::PhantomData, @@ -231,8 +233,8 @@ where page_validity, Some(additional), values, - batched, - )? + collector, + )?; }, (Self::Dictionary(page), None) => { let translator = DictionaryTranslator(page.dict); diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs index 95abd6005d1c..4baa0d36be0b 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs @@ -9,7 +9,9 @@ use polars_error::PolarsResult; use super::super::utils::MaybeNext; use super::super::{utils, PagesIter}; -use super::basic::{finish, DecoderFunction, PrimitiveDecoder, ValuesDictionary}; +use super::basic::{ + finish, DecoderFunction, PlainDecoderFnCollector, PrimitiveDecoder, ValuesDictionary, +}; use crate::parquet::encoding::hybrid_rle::DictionaryTranslator; use crate::parquet::encoding::{byte_stream_split, delta_bitpacked, Encoding}; use crate::parquet::error::ParquetResult; @@ -17,7 +19,7 @@ use crate::parquet::page::{split_buffer, DataPage, DictPage}; use crate::parquet::types::{decode, NativeType as ParquetNativeType}; use crate::read::deserialize::utils::array_chunks::ArrayChunks; use crate::read::deserialize::utils::filter::Filter; -use crate::read::deserialize::utils::{PageValidity, TranslatedHybridRle}; +use crate::read::deserialize::utils::{BatchableCollector, PageValidity, TranslatedHybridRle}; #[allow(clippy::large_enum_variant)] #[derive(Debug)] @@ -100,21 +102,28 @@ where ) -> ParquetResult<()> { let (values, validity) = decoded; match (self, page_validity) { - (Self::Unit(page), Some(page_validity)) => utils::extend_from_decoder( - validity, - page_validity, - Some(additional), - values, - &mut page - .by_ref() - .map(|v| decoder.0.decoder.decode(P::from_le_bytes(*v))), - )?, + (Self::Unit(page), Some(page_validity)) => { + let collector = PlainDecoderFnCollector { + chunks: page, + decoder: decoder.0.decoder, + _pd: Default::default(), + }; + + utils::extend_from_decoder( + validity, + page_validity, + Some(additional), + values, + collector, + )?; + }, (Self::Unit(page), None) => { - values.extend( - page.by_ref() - .map(|v| decoder.0.decoder.decode(P::from_le_bytes(*v))) - .take(additional), - ); + PlainDecoderFnCollector { + chunks: page, + decoder: decoder.0.decoder, + _pd: Default::default(), + } + .push_n(values, additional)?; }, (Self::Dictionary(page), Some(page_validity)) => { let translator = DictionaryTranslator(page.dict);