Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Introduce MemReader to file buffer in Parquet reader #17712

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::parquet::read::MemReader;
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
Expand Down Expand Up @@ -73,7 +74,7 @@ pub(super) fn to_deserializer<'a>(
.into_iter()
.map(|(column_meta, chunk)| {
let pages = PageReader::new(
std::io::Cursor::new(chunk),
MemReader::from_slice(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl DecodedState for (FixedSizeBinary, MutableBitmap) {

impl<'a> Decoder<'a> for BinaryDecoder {
type Translation = StateTranslation<'a>;
type Dict = Vec<u8>;
type Dict = &'a [u8];
type DecodedState = (FixedSizeBinary, MutableBitmap);

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
Expand All @@ -156,8 +156,8 @@ impl<'a> Decoder<'a> for BinaryDecoder {
)
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dict {
page.buffer.clone()
fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dict {
page.buffer.as_ref()
}
}

Expand Down Expand Up @@ -207,7 +207,7 @@ impl<I: PagesIter> Iterator for Iter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.dict.as_deref(),
&mut self.remaining,
self.chunk_size,
&BinaryDecoder { size: self.size },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn read_dict(data_type: ArrowDataType, dict: &DictPage) -> Box<dyn Array> {

let values = dict.buffer.clone();

FixedSizeBinaryArray::try_new(data_type, values.into(), None)
FixedSizeBinaryArray::try_new(data_type, values.to_vec().into(), None)
.unwrap()
.boxed()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct BinaryDecoder {

impl<'a> NestedDecoder<'a> for BinaryDecoder {
type State = State<'a>;
type Dictionary = Vec<u8>;
type Dictionary = &'a [u8];
type DecodedState = (FixedSizeBinary, MutableBitmap);

fn build_state(
Expand All @@ -71,7 +71,7 @@ impl<'a> NestedDecoder<'a> for BinaryDecoder {
let values = values.chunks_exact(self.size);
StateTranslation::Unit(values)
},
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(&dict), false) => {
let values = dict_indices_decoder(page)?;
StateTranslation::Dictionary { values, dict }
},
Expand Down Expand Up @@ -133,8 +133,8 @@ impl<'a> NestedDecoder<'a> for BinaryDecoder {
validity.extend_constant(n, false);
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary {
page.buffer.clone()
fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dictionary {
page.buffer.as_ref()
}
}

Expand Down Expand Up @@ -179,7 +179,7 @@ impl<I: PagesIter> Iterator for NestedIter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.dict.as_deref(),
&mut self.remaining,
&self.init,
self.chunk_size,
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ use simple::page_iter_to_arrays;
pub use self::nested_utils::{init_nested, InitNested, NestedArrayIter, NestedState};
pub use self::struct_::StructIterator;
use super::*;
use crate::parquet::read::get_page_iterator as _get_page_iterator;
use crate::parquet::read::{get_page_iterator as _get_page_iterator, MemReader};
use crate::parquet::schema::types::PrimitiveType;

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<R: Read + Seek>(
pub fn get_page_iterator(
column_metadata: &ColumnChunkMetaData,
reader: R,
reader: MemReader,
pages_filter: Option<PageFilter>,
buffer: Vec<u8>,
max_header_size: usize,
) -> PolarsResult<PageReader<R>> {
) -> PolarsResult<PageReader> {
Ok(_get_page_iterator(
column_metadata,
reader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub(super) trait NestedDecoder<'a> {
) -> ParquetResult<()>;
fn push_n_nulls(&self, decoded: &mut Self::DecodedState, n: usize);

fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary;
fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dictionary;
}

/// The initial info of nested data types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod tests {
use crate::parquet::fallible_streaming_iterator;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::{PhysicalType, PrimitiveType};

#[test]
Expand All @@ -78,7 +79,7 @@ mod tests {
repetition_level_encoding: Encoding::Plain.into(),
statistics: None,
}),
vec![],
CowBuffer::Owned(vec![]),
Descriptor {
primitive_type: PrimitiveType::from_physical(
"a".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ where
match (self, page_validity) {
(Self::Unit(page), None) => {
values.extend(
page.by_ref()
.map(|v| decoder.decoder.decode(P::from_le_bytes(*v)))
page.map(|v| decoder.decoder.decode(P::from_le_bytes(*v)))
.take(additional),
);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ pub(super) trait Decoder<'a>: Sized {
fn with_capacity(&self, capacity: usize) -> Self::DecodedState;

/// Deserializes a [`DictPage`] into [`Self::Dict`].
fn deserialize_dict(&self, page: &DictPage) -> Self::Dict;
fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dict;
}

pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>(
Expand Down
8 changes: 5 additions & 3 deletions crates/polars-parquet/src/arrow/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use super::{ArrayIter, RowGroupMetaData};
use crate::arrow::read::column_iter_to_arrays;
use crate::parquet::indexes::FilteredPage;
use crate::parquet::metadata::ColumnChunkMetaData;
use crate::parquet::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader};
use crate::parquet::read::{
BasicDecompressor, IndexedPageReader, MemReader, PageMetaData, PageReader,
};

/// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into
/// an iterator of [`RecordBatchT`].
Expand Down Expand Up @@ -164,7 +166,7 @@ pub fn to_deserializer<'a>(
.for_each(|page| page.start -= meta.column_start);
meta.column_start = 0;
let pages = IndexedPageReader::new_with_page_meta(
std::io::Cursor::new(chunk),
MemReader::from_vec(chunk),
meta,
pages,
vec![],
Expand All @@ -185,7 +187,7 @@ pub fn to_deserializer<'a>(
.map(|(column_meta, chunk)| {
let len = chunk.len();
let pages = PageReader::new(
std::io::Cursor::new(chunk),
MemReader::from_vec(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
Expand Down
31 changes: 25 additions & 6 deletions crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::arrow::write::{slice_nested_leaf, utils};
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DictPage, Page};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::ParquetStatistics;
use crate::write::DynIter;
Expand Down Expand Up @@ -202,7 +203,10 @@ macro_rules! dyn_prim {
} else {
None
};
(DictPage::new(buffer, values.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), values.len(), false),
stats,
)
}};
}

Expand Down Expand Up @@ -254,7 +258,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), array.len(), false),
stats,
)
},
ArrowDataType::BinaryView => {
let array = array
Expand All @@ -274,7 +281,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), array.len(), false),
stats,
)
},
ArrowDataType::Utf8View => {
let array = array
Expand All @@ -295,7 +305,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), array.len(), false),
stats,
)
},
ArrowDataType::LargeBinary => {
let values = array.values().as_any().downcast_ref().unwrap();
Expand All @@ -311,7 +324,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, values.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), values.len(), false),
stats,
)
},
ArrowDataType::FixedSizeBinary(_) => {
let mut buffer = vec![];
Expand All @@ -327,7 +343,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), array.len(), false),
stats,
)
},
other => {
polars_bail!(nyi =
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-parquet/src/arrow/write/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::ParquetStatistics;

Expand Down Expand Up @@ -89,7 +90,7 @@ pub fn build_plain_page(
};
Ok(DataPage::new(
header,
buffer,
CowBuffer::Owned(buffer),
Descriptor {
primitive_type: type_,
max_def_level: 0,
Expand Down
Loading