Skip to content

Commit

Permalink
fix: Properly write nested NullArray in Parquet (#17807)
Browse files Browse the repository at this point in the history
Co-authored-by: ritchie <ritchie46@gmail.com>
  • Loading branch information
coastalwhite and ritchie46 authored Jul 24, 2024
1 parent 60d3050 commit 0c2cfae
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ chrono = { workspace = true }
chrono-tz = { workspace = true, optional = true }
dyn-clone = { version = "1" }
either = { workspace = true }
foreign_vec = { version = "0.1" }
hashbrown = { workspace = true }
num-traits = { workspace = true }
parking_lot = { workspace = true }
polars-error = { workspace = true }
polars-utils = { workspace = true }
serde = { workspace = true, optional = true }
Expand Down
21 changes: 18 additions & 3 deletions crates/polars-arrow/src/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use crate::ffi;
#[derive(Clone)]
pub struct NullArray {
data_type: ArrowDataType,

/// Validity mask. This is always all-zeroes.
validity: Bitmap,

length: usize,
}

Expand All @@ -25,7 +29,13 @@ impl NullArray {
polars_bail!(ComputeError: "NullArray can only be initialized with a DataType whose physical type is Null");
}

Ok(Self { data_type, length })
let validity = Bitmap::new_zeroed(length);

Ok(Self {
data_type,
validity,
length,
})
}

/// Returns a new [`NullArray`].
Expand Down Expand Up @@ -66,8 +76,9 @@ impl NullArray {
///
/// # Safety
/// The caller must ensure that `offset + length < self.len()`.
pub unsafe fn slice_unchecked(&mut self, _offset: usize, length: usize) {
pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
self.length = length;
self.validity.slice_unchecked(offset, length);
}

#[inline]
Expand All @@ -80,7 +91,7 @@ impl Array for NullArray {
impl_common_array!();

fn validity(&self) -> Option<&Bitmap> {
None
Some(&self.validity)
}

fn with_validity(&self, _: Option<Bitmap>) -> Box<dyn Array> {
Expand Down Expand Up @@ -179,13 +190,17 @@ impl Splitable for NullArray {
}

unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
let (lhs, rhs) = self.validity.split_at(offset);

(
Self {
data_type: self.data_type.clone(),
validity: lhs,
length: offset,
},
Self {
data_type: self.data_type.clone(),
validity: rhs,
length: self.len() - offset,
},
)
Expand Down
56 changes: 54 additions & 2 deletions crates/polars-arrow/src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::ops::Deref;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use either::Either;
use parking_lot::RwLockUpgradableReadGuard;
use polars_error::{polars_bail, PolarsResult};

use super::utils::{count_zeros, fmt, get_bit, get_bit_unchecked, BitChunk, BitChunks, BitmapIter};
Expand Down Expand Up @@ -399,7 +400,58 @@ impl Bitmap {
/// Initializes an new [`Bitmap`] filled with unset values.
#[inline]
pub fn new_zeroed(length: usize) -> Self {
Self::new_with_value(false, length)
// There are quite some situations where we just want a zeroed out Bitmap, since that would
// constantly need to reallocate we make a static that contains the largest allocation.
// Then, we can just take an Arc::clone of that slice everytime or grow it if needed.
static GLOBAL_ZERO_BYTES: OnceLock<parking_lot::RwLock<Arc<Bytes<u8>>>> = OnceLock::new();

let rwlock_zero_bytes = GLOBAL_ZERO_BYTES.get_or_init(|| {
let byte_length = length.div_ceil(8).next_power_of_two();
parking_lot::RwLock::new(Arc::new(Bytes::from(vec![0; byte_length])))
});

let unset_bit_count_cache = AtomicU64::new(length as u64);

let zero_bytes = rwlock_zero_bytes.upgradable_read();
if zero_bytes.len() * 8 >= length {
let bytes = zero_bytes.clone();
return Bitmap {
bytes,
offset: 0,
length,
unset_bit_count_cache,
};
}

let mut zero_bytes = RwLockUpgradableReadGuard::upgrade(zero_bytes);

// Race Condition:
// By the time we got here, another Guard could have been upgraded, and the buffer
// could have been expanded already. So we want to check again whether we cannot just take
// that buffer.
if zero_bytes.len() * 8 >= length {
let bytes = zero_bytes.clone();
return Bitmap {
bytes,
offset: 0,
length,
unset_bit_count_cache,
};
}

// Let do exponential increases so that we are not constantly allocating new
// buffers.
let byte_length = length.div_ceil(8).next_power_of_two();

let bytes = Arc::new(Bytes::from(vec![0; byte_length]));
*zero_bytes = bytes.clone();

Bitmap {
bytes,
offset: 0,
length,
unset_bit_count_cache,
}
}

/// Initializes an new [`Bitmap`] filled with the given value.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) enum BytesAllocator {
#[allow(dead_code)]
Arrow(arrow_buffer::Buffer),
}
pub(crate) type BytesInner<T> = foreign_vec::ForeignVec<BytesAllocator, T>;
pub(crate) type BytesInner<T> = polars_utils::foreign_vec::ForeignVec<BytesAllocator, T>;

/// Bytes representation.
#[repr(transparent)]
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-parquet/src/arrow/write/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::types::NativeType;

pub fn array_to_page<'a, T, R>(
array: &'a PrimitiveArray<T>,
pub fn array_to_page<T, R>(
array: &PrimitiveArray<T>,
options: WriteOptions,
type_: PrimitiveType,
nested: &[Nested],
) -> PolarsResult<DataPage>
where
PrimitiveArray<T>: polars_compute::min_max::MinMaxKernel<Scalar<'a> = T>,
T: ArrowNativeType,
R: NativeType,
T: num_traits::AsPrimitive<R>,
Expand Down
100 changes: 100 additions & 0 deletions crates/polars-utils/src/foreign_vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/// This is pulled out of https://github.com/DataEngineeringLabs/foreign_vec
use std::mem::ManuallyDrop;
use std::ops::DerefMut;
use std::vec::Vec;

/// Mode of deallocating memory regions
enum Allocation<D> {
/// Native allocation
Native,
// A foreign allocator and its ref count
Foreign(D),
}

/// A continuous memory region that may be allocated externally.
///
/// In the most common case, this is created from [`Vec`].
/// However, this region may also be allocated by a foreign allocator `D`
/// and behave as `&[T]`.
pub struct ForeignVec<D, T> {
/// An implementation using an `enum` of a `Vec` or a foreign pointer is not used
/// because `deref` is at least 50% more expensive than the deref of a `Vec`.
data: ManuallyDrop<Vec<T>>,
/// the region was allocated
allocation: Allocation<D>,
}

impl<D, T> ForeignVec<D, T> {
/// Takes ownership of an allocated memory region.
/// # Panics
/// This function panics if and only if pointer is not null
/// # Safety
/// This function is safe if and only if `ptr` is valid for `length`
/// # Implementation
/// This function leaks if and only if `owner` does not deallocate
/// the region `[ptr, ptr+length[` when dropped.
#[inline]
pub unsafe fn from_foreign(ptr: *const T, length: usize, owner: D) -> Self {
assert!(!ptr.is_null());
// This line is technically outside the assumptions of `Vec::from_raw_parts`, since
// `ptr` was not allocated by `Vec`. However, one of the invariants of this struct
// is that we do never expose this region as a `Vec`; we only use `Vec` on it to provide
// immutable access to the region (via `Vec::deref` to `&[T]`).
let data = Vec::from_raw_parts(ptr as *mut T, length, length);
let data = ManuallyDrop::new(data);

Self {
data,
allocation: Allocation::Foreign(owner),
}
}

/// Returns a `Some` mutable reference of [`Vec<T>`] iff this was initialized
/// from a [`Vec<T>`] and `None` otherwise.
pub fn get_vec(&mut self) -> Option<&mut Vec<T>> {
match &self.allocation {
Allocation::Foreign(_) => None,
Allocation::Native => Some(self.data.deref_mut()),
}
}
}

impl<D, T> Drop for ForeignVec<D, T> {
#[inline]
fn drop(&mut self) {
match self.allocation {
Allocation::Foreign(_) => {
// the foreign is dropped via its `Drop`
},
Allocation::Native => {
let data = core::mem::take(&mut self.data);
let _ = ManuallyDrop::into_inner(data);
},
}
}
}

impl<D, T> core::ops::Deref for ForeignVec<D, T> {
type Target = [T];

#[inline]
fn deref(&self) -> &[T] {
&self.data
}
}

impl<D, T: core::fmt::Debug> core::fmt::Debug for ForeignVec<D, T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
core::fmt::Debug::fmt(&**self, f)
}
}

impl<D, T> From<Vec<T>> for ForeignVec<D, T> {
#[inline]
fn from(data: Vec<T>) -> Self {
Self {
data: ManuallyDrop::new(data),
allocation: Allocation::Native,
}
}
}
1 change: 1 addition & 0 deletions crates/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod contention_pool;
pub mod cpuid;
mod error;
pub mod floor_divmod;
pub mod foreign_vec;
pub mod functions;
pub mod hashing;
pub mod idx_vec;
Expand Down
8 changes: 8 additions & 0 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,14 @@ def test_read_byte_stream_split_arrays(
assert_frame_equal(read, df)


@pytest.mark.write_disk()
def test_parquet_nested_null_array_17795(tmp_path: Path) -> None:
filename = tmp_path / "nested_null.parquet"

pl.DataFrame([{"struct": {"field": None}}]).write_parquet(filename)
pq.read_table(filename)


@pytest.mark.write_disk()
def test_parquet_record_batches_pyarrow_fixed_size_list_16614(tmp_path: Path) -> None:
filename = tmp_path / "a.parquet"
Expand Down

0 comments on commit 0c2cfae

Please sign in to comment.