Skip to content

Commit

Permalink
perf: collect Parquet dictionary binary as view
Browse files Browse the repository at this point in the history
This optimizes how Parquet dictionary over binary is collected. Now, instead of
pushing the items one at the time into a buffer. The dictionary is used as a
buffer and views are made into that buffer. This should not only speed up the
Parquet decoder, but should also reduce memory consumption and speed up
subsequent operations.

I did a small benchmark, but this does not really mean much.

```
Benchmark 1: After Optimization
  Time (mean ± σ):      2.007 s ±  0.005 s    [User: 1.712 s, System: 0.523 s]
  Range (min … max):    2.000 s …  2.013 s    10 runs

Benchmark 2: Before Optimization
  Time (mean ± σ):      2.285 s ±  0.009 s    [User: 1.956 s, System: 0.595 s]
  Range (min … max):    2.274 s …  2.306 s    10 runs

Summary
  After Optimization ran
    1.14 ± 0.01 times faster than Before Optimization
```
  • Loading branch information
coastalwhite committed Jul 7, 2024
1 parent 200c6a4 commit d1afbc1
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 65 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::array::iterator::NonNullValuesIter;
use crate::bitmap::utils::{BitmapIter, ZipValidity};
pub type BinaryViewArray = BinaryViewArrayGeneric<[u8]>;
pub type Utf8ViewArray = BinaryViewArrayGeneric<str>;
pub use view::{View, INLINE_VIEW_SIZE};
pub use view::View;

use super::Splitable;

Expand Down
12 changes: 12 additions & 0 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,18 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.views.push(value);
}

#[inline]
pub fn push_buffer(&mut self, buffer: Buffer<u8>) -> u32 {
if !self.in_progress_buffer.is_empty() {
self.completed_buffers
.push(Buffer::from(std::mem::take(&mut self.in_progress_buffer)));
}

let buffer_idx = self.completed_buffers.len();
self.completed_buffers.push(buffer);
buffer_idx as u32
}

#[inline]
pub fn push_value<V: AsRef<T>>(&mut self, value: V) {
if let Some(validity) = &mut self.validity {
Expand Down
86 changes: 68 additions & 18 deletions crates/polars-arrow/src/array/binview/view.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::fmt::{self, Display, Formatter};
use std::ops::Add;

use bytemuck::{Pod, Zeroable};
Expand All @@ -13,10 +13,12 @@ use crate::buffer::Buffer;
use crate::datatypes::PrimitiveType;
use crate::types::NativeType;

pub const INLINE_VIEW_SIZE: u32 = 12;

// We use this instead of u128 because we want alignment of <= 8 bytes.
#[derive(Debug, Copy, Clone, Default)]
/// A reference to a set of bytes.
///
/// If `length <= 12`, these bytes are inlined over the `prefix`, `buffer_idx` and `offset` fields.
/// If `length > 12`, these fields specify a slice of a buffer.
#[derive(Copy, Clone, Default)]
#[repr(C)]
pub struct View {
/// The length of the string/bytes.
Expand All @@ -29,29 +31,77 @@ pub struct View {
pub offset: u32,
}

impl fmt::Debug for View {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.length <= Self::MAX_INLINE_SIZE {
fmt.debug_struct("View")
.field("length", &self.length)
.field("content", &unsafe {
std::slice::from_raw_parts(
(self as *const _ as *const u8).add(4),
self.length as usize,
)
})
.finish()
} else {
fmt.debug_struct("View")
.field("length", &self.length)
.field("prefix", &self.prefix.to_be_bytes())
.field("buffer_idx", &self.buffer_idx)
.field("offset", &self.offset)
.finish()
}
}
}

impl View {
pub const MAX_INLINE_SIZE: u32 = 12;

#[inline(always)]
pub fn as_u128(self) -> u128 {
unsafe { std::mem::transmute(self) }
}

/// Create a new inline view
///
/// # Panics
///
/// Panics if the `bytes.len() > View::MAX_INLINE_SIZE`.
#[inline]
pub fn new_inline(bytes: &[u8]) -> Self {
debug_assert!(bytes.len() <= u32::MAX as usize);
assert!(bytes.len() as u32 <= Self::MAX_INLINE_SIZE);

let mut view = Self {
length: bytes.len() as u32,
..Default::default()
};

let view_ptr = &mut view as *mut _ as *mut u8;

// SAFETY:
// - bytes length <= 12,
// - size_of::<View> == 16
// - View is laid out as [length, prefix, buffer_idx, offset] (using repr(C))
// - By grabbing the view_ptr and adding 4, we have provenance over prefix, buffer_idx and
// offset. (i.e. the same could not be achieved with &mut self.prefix as *mut _ as *mut u8)
unsafe {
let inline_data_ptr = view_ptr.add(4);
core::ptr::copy_nonoverlapping(bytes.as_ptr(), inline_data_ptr, bytes.len());
}
view
}

#[inline]
pub fn new_from_bytes(bytes: &[u8], buffer_idx: u32, offset: u32) -> Self {
if bytes.len() <= 12 {
let mut ret = Self {
length: bytes.len() as u32,
..Default::default()
};
let ret_ptr = &mut ret as *mut _ as *mut u8;
unsafe {
core::ptr::copy_nonoverlapping(bytes.as_ptr(), ret_ptr.add(4), bytes.len());
}
ret
debug_assert!(bytes.len() <= u32::MAX as usize);

if bytes.len() as u32 <= Self::MAX_INLINE_SIZE {
Self::new_inline(bytes)
} else {
let prefix_buf: [u8; 4] = std::array::from_fn(|i| *bytes.get(i).unwrap_or(&0));
Self {
length: bytes.len() as u32,
prefix: u32::from_le_bytes(prefix_buf),
prefix: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
buffer_idx,
offset,
}
Expand Down Expand Up @@ -190,8 +240,8 @@ where
{
for view in views {
let len = view.length;
if len <= INLINE_VIEW_SIZE {
if len < INLINE_VIEW_SIZE && view.as_u128() >> (32 + len * 8) != 0 {
if len <= View::MAX_INLINE_SIZE {
if len < View::MAX_INLINE_SIZE && view.as_u128() >> (32 + len * 8) != 0 {
polars_bail!(ComputeError: "view contained non-zero padding in prefix");
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ mod values;
pub use binary::{BinaryArray, BinaryValueIter, MutableBinaryArray, MutableBinaryValuesArray};
pub use binview::{
BinaryViewArray, BinaryViewArrayGeneric, MutableBinaryViewArray, MutablePlBinary,
MutablePlString, Utf8ViewArray, View, ViewType, INLINE_VIEW_SIZE,
MutablePlString, Utf8ViewArray, View, ViewType,
};
pub use boolean::{BooleanArray, MutableBooleanArray};
pub use dictionary::{DictionaryArray, DictionaryKey, MutableDictionaryArray};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub(crate) fn deserialize_plain(values: &[u8], num_values: usize) -> BinaryDict
for v in all {
dict_values.push(v)
}

dict_values.into()
}

Expand Down
90 changes: 57 additions & 33 deletions crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::cell::Cell;
use std::collections::VecDeque;

use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray};
use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray, View};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::{ArrowDataType, PhysicalType};
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::binary::decoders::*;
use crate::parquet::encoding::hybrid_rle::BinaryDictionaryTranslator;
use crate::parquet::page::{DataPage, DictPage};
use crate::read::deserialize::utils;
use crate::read::deserialize::utils::{extend_from_decoder, next, DecodedState, MaybeNext};
use crate::read::deserialize::utils::{
self, extend_from_decoder, next, DecodedState, MaybeNext, TranslatedHybridRle,
};
use crate::read::{PagesIter, PrimitiveLogicalType};

type DecodedStateTuple = (MutableBinaryViewArray<[u8]>, MutableBitmap);
Expand Down Expand Up @@ -102,33 +104,65 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder {
BinaryState::OptionalDictionary(page_validity, page_values) => {
// Already done on the dict.
validate_utf8 = false;

let page_dict = &page_values.dict;
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values
.values
.by_ref()
.map(|index| page_dict.value(index as usize)),
)?;
page_values.values.get_result()?;
let offsets = page_dict.offsets();

if let Some(max_length) = offsets.lengths().max() {
// We do not have to push the buffer if all elements fit as inline views.
let buffer_idx = if max_length <= View::MAX_INLINE_SIZE as usize {
0
} else {
values.push_buffer(page_dict.values().clone())
};

// @Note: we could potentially use the View::new_inline function here, but that
// would require two collectors & two translators. So I don't think it is worth
// it.
let translator = BinaryDictionaryTranslator {
dictionary: page_dict,
buffer_idx,
};
let collector = TranslatedHybridRle::new(&mut page_values.values, &translator);

utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values.views_mut(),
collector,
)?;
}
},
BinaryState::RequiredDictionary(page) => {
// Already done on the dict.
validate_utf8 = false;

let page_dict = &page.dict;
let offsets = page_dict.offsets();

for x in page
.values
.by_ref()
.map(|index| page_dict.value(index as usize))
.take(additional)
{
values.push_value_ignore_validity(x)
if let Some(max_length) = offsets.lengths().max() {
// We do not have to push the buffer if all elements fit as inline views.
let buffer_idx = if max_length <= View::MAX_INLINE_SIZE as usize {
0
} else {
values.push_buffer(page_dict.values().clone())
};

// @Note: we could potentially use the View::new_inline function here, but that
// would require two collectors & two translators. So I don't think it is worth
// it.
let translator = BinaryDictionaryTranslator {
dictionary: page_dict,
buffer_idx,
};

page.values.translate_and_collect_n_into(
values.views_mut(),
additional,
&translator,
)?;
}
page.values.get_result()?;
},
BinaryState::FilteredOptional(page_validity, page_values) => {
extend_from_decoder(
Expand Down Expand Up @@ -273,17 +307,7 @@ pub(super) fn finish(
}

match data_type.to_physical_type() {
PhysicalType::BinaryView => unsafe {
Ok(BinaryViewArray::new_unchecked(
data_type.clone(),
array.views().clone(),
array.data_buffers().clone(),
array.validity().cloned(),
array.total_bytes_len(),
array.total_buffer_len(),
)
.boxed())
},
PhysicalType::BinaryView => Ok(array.boxed()),
PhysicalType::Utf8View => {
// SAFETY: we already checked utf8
unsafe {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ fn reserve_pushable_and_validity<'a, I, T, C: BatchableCollector<I, T>>(
}

/// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder
pub(super) fn extend_from_decoder<I, T: std::fmt::Debug, C: BatchableCollector<I, T>>(
pub(super) fn extend_from_decoder<I, T, C: BatchableCollector<I, T>>(
validity: &mut MutableBitmap,
page_validity: &mut dyn PageValidity,
limit: Option<usize>,
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ pub use decoder::Decoder;
pub use encoder::encode;
use polars_utils::iter::FallibleIterator;
use polars_utils::slice::GetSaferUnchecked;
pub use translator::{DictionaryTranslator, Translator, UnitTranslator};
pub use translator::{
BinaryDictionaryTranslator, DictionaryTranslator, FnTranslator, Translator, UnitTranslator,
};

use self::buffered::HybridRleBuffered;
use super::{bitpacked, ceil8, uleb128};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use arrow::array::{BinaryArray, View};

use crate::parquet::encoding::bitpacked::{Decoder, Unpackable, Unpacked};
use crate::parquet::encoding::hybrid_rle::{BufferedBitpacked, HybridRleBuffered};
use crate::parquet::error::{ParquetError, ParquetResult};
Expand Down Expand Up @@ -239,6 +241,52 @@ impl<'a, T: Copy> Translator<T> for DictionaryTranslator<'a, T> {
}
}

/// This is a binary dictionary translation variant of [`Translator`].
///
/// All the [`HybridRleDecoder`] values are regarded as a offset into a binary array regarded as a
/// dictionary.
///
/// [`HybridRleDecoder`]: super::HybridRleDecoder
pub struct BinaryDictionaryTranslator<'a> {
pub dictionary: &'a BinaryArray<i64>,
pub buffer_idx: u32,
}

impl<'a> Translator<View> for BinaryDictionaryTranslator<'a> {
fn translate(&self, index: u32) -> ParquetResult<View> {
if index as usize >= self.dictionary.len() {
return Err(ParquetError::oos("Dictionary index is out of range"));
}

let value = self.dictionary.value(index as usize);
let (start, _) = self.dictionary.offsets().start_end(index as usize);
Ok(View::new_from_bytes(value, self.buffer_idx, start as u32))
}

fn translate_slice(&self, target: &mut Vec<View>, source: &[u32]) -> ParquetResult<()> {
let Some(source_max) = source.iter().copied().max() else {
return Ok(());
};

if source_max as usize >= self.dictionary.len() {
return Err(ParquetError::oos("Dictionary index is out of range"));
}

let offsets = self.dictionary.offsets();

target.extend(source.iter().map(|&src_idx| {
// Safety: We have checked before that source only has indexes that are smaller than
// the dictionary length.
let value = unsafe { self.dictionary.value_unchecked(src_idx as usize) };
debug_assert!((src_idx as usize) < offsets.len_proxy());
let (start, _) = unsafe { offsets.start_end_unchecked(src_idx as usize) };
View::new_from_bytes(value, self.buffer_idx, start as u32)
}));

Ok(())
}
}

/// A closure-based translator
pub struct FnTranslator<O, F: Fn(u32) -> ParquetResult<O>>(pub F);

Expand Down
10 changes: 6 additions & 4 deletions crates/polars/tests/it/io/parquet/read/binary.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use polars_parquet::parquet::encoding::hybrid_rle::FnTranslator;
use polars_parquet::parquet::error::ParquetResult;
use polars_parquet::parquet::page::DataPage;

Expand All @@ -22,10 +23,11 @@ pub fn page_to_vec(
.map(Some)
.map(|x| x.transpose())
.collect(),
FixedLenBinaryPageState::RequiredDictionary(dict) => dict
.indexes
.map(|x| dict.dict.value(x as usize).map(|x| x.to_vec()).map(Some))
.collect(),
FixedLenBinaryPageState::RequiredDictionary(dict) => {
let dictionary =
FnTranslator(|v| dict.dict.value(v as usize).map(|v| Some(v.to_vec())));
dict.indexes.translate_and_collect(&dictionary)
},
FixedLenBinaryPageState::OptionalDictionary(validity, dict) => {
let values = dict
.indexes
Expand Down
Loading

0 comments on commit d1afbc1

Please sign in to comment.