diff --git a/crates/polars-arrow/src/array/binview/mod.rs b/crates/polars-arrow/src/array/binview/mod.rs index d3fcc3c263d3..2e0eee85d2ef 100644 --- a/crates/polars-arrow/src/array/binview/mod.rs +++ b/crates/polars-arrow/src/array/binview/mod.rs @@ -403,10 +403,10 @@ impl BinaryViewArrayGeneric { self.buffers .iter() .map(|buf| { - if buf.shared_count_strong() == 1 { - buf.len() - } else { + if buf.storage_refcount() > 1 { 0 + } else { + buf.len() } }) .sum() diff --git a/crates/polars-arrow/src/array/static_array_collect.rs b/crates/polars-arrow/src/array/static_array_collect.rs index 0b30ee25b365..296d93502abe 100644 --- a/crates/polars-arrow/src/array/static_array_collect.rs +++ b/crates/polars-arrow/src/array/static_array_collect.rs @@ -1,5 +1,4 @@ use std::borrow::Cow; -use std::sync::Arc; use polars_utils::no_call_const; @@ -15,6 +14,7 @@ use crate::datatypes::ArrowDataType; use crate::legacy::prelude::fixed_size_list::AnonymousBuilder as AnonymousFixedSizeListArrayBuilder; use crate::legacy::prelude::list::AnonymousBuilder as AnonymousListArrayBuilder; use crate::legacy::trusted_len::TrustedLenPush; +use crate::storage::SharedStorage; use crate::trusted_len::TrustedLen; use crate::types::NativeType; @@ -256,7 +256,7 @@ macro_rules! impl_collect_vec_validity { unsafe { // SAFETY: we made sure the null_count is correct. Some(Bitmap::from_inner_unchecked( - Arc::new(bitmap.into()), + SharedStorage::from_vec(bitmap), 0, buf.len(), Some(null_count), @@ -317,7 +317,7 @@ macro_rules! impl_trusted_collect_vec_validity { unsafe { // SAFETY: we made sure the null_count is correct. Some(Bitmap::from_inner_unchecked( - Arc::new(bitmap.into()), + SharedStorage::from_vec(bitmap), 0, buf.len(), Some(null_count), @@ -766,7 +766,7 @@ macro_rules! impl_collect_bool_validity { let false_count = len - true_count; let values = unsafe { - Bitmap::from_inner_unchecked(Arc::new(buf.into()), 0, len, Some(false_count)) + Bitmap::from_inner_unchecked(SharedStorage::from_vec(buf), 0, len, Some(false_count)) }; let null_count = len - nonnull_count; @@ -774,7 +774,7 @@ macro_rules! impl_collect_bool_validity { unsafe { // SAFETY: we made sure the null_count is correct. Some(Bitmap::from_inner_unchecked( - Arc::new(validity.into()), + SharedStorage::from_vec(validity), 0, len, Some(null_count), diff --git a/crates/polars-arrow/src/bitmap/immutable.rs b/crates/polars-arrow/src/bitmap/immutable.rs index 6ad76a07b639..c9aa0b681b4a 100644 --- a/crates/polars-arrow/src/bitmap/immutable.rs +++ b/crates/polars-arrow/src/bitmap/immutable.rs @@ -1,9 +1,8 @@ use std::ops::Deref; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, OnceLock}; +use std::sync::LazyLock; use either::Either; -use parking_lot::RwLockUpgradableReadGuard; use polars_error::{polars_bail, PolarsResult}; use super::utils::{count_zeros, fmt, get_bit, get_bit_unchecked, BitChunk, BitChunks, BitmapIter}; @@ -13,8 +12,8 @@ use crate::bitmap::aligned::AlignedBitmapSlice; use crate::bitmap::iterator::{ FastU32BitmapIter, FastU56BitmapIter, FastU64BitmapIter, TrueIdxIter, }; -use crate::buffer::Bytes; use crate::legacy::utils::FromTrustedLenIterator; +use crate::storage::SharedStorage; use crate::trusted_len::TrustedLen; const UNKNOWN_BIT_COUNT: u64 = u64::MAX; @@ -52,7 +51,7 @@ const UNKNOWN_BIT_COUNT: u64 = u64::MAX; /// let same: Bitmap = sliced.into_mut().left().unwrap(); /// ``` pub struct Bitmap { - bytes: Arc>, + storage: SharedStorage, // Both offset and length are measured in bits. They are used to bound the // bitmap to a region of Bytes. offset: usize, @@ -73,7 +72,7 @@ fn has_cached_unset_bit_count(ubcc: u64) -> bool { impl Clone for Bitmap { fn clone(&self) -> Self { Self { - bytes: Arc::clone(&self.bytes), + storage: self.storage.clone(), offset: self.offset, length: self.length, unset_bit_count_cache: AtomicU64::new( @@ -121,9 +120,9 @@ impl Bitmap { pub fn try_new(bytes: Vec, length: usize) -> PolarsResult { check(&bytes, 0, length)?; Ok(Self { + storage: SharedStorage::from_vec(bytes), length, offset: 0, - bytes: Arc::new(bytes.into()), unset_bit_count_cache: AtomicU64::new(if length == 0 { 0 } else { UNKNOWN_BIT_COUNT }), }) } @@ -142,32 +141,32 @@ impl Bitmap { /// Returns a new iterator of `bool` over this bitmap pub fn iter(&self) -> BitmapIter { - BitmapIter::new(&self.bytes, self.offset, self.length) + BitmapIter::new(&self.storage, self.offset, self.length) } /// Returns an iterator over bits in bit chunks [`BitChunk`]. /// /// This iterator is useful to operate over multiple bits via e.g. bitwise. pub fn chunks(&self) -> BitChunks { - BitChunks::new(&self.bytes, self.offset, self.length) + BitChunks::new(&self.storage, self.offset, self.length) } /// Returns a fast iterator that gives 32 bits at a time. /// Has a remainder that must be handled separately. pub fn fast_iter_u32(&self) -> FastU32BitmapIter<'_> { - FastU32BitmapIter::new(&self.bytes, self.offset, self.length) + FastU32BitmapIter::new(&self.storage, self.offset, self.length) } /// Returns a fast iterator that gives 56 bits at a time. /// Has a remainder that must be handled separately. pub fn fast_iter_u56(&self) -> FastU56BitmapIter<'_> { - FastU56BitmapIter::new(&self.bytes, self.offset, self.length) + FastU56BitmapIter::new(&self.storage, self.offset, self.length) } /// Returns a fast iterator that gives 64 bits at a time. /// Has a remainder that must be handled separately. pub fn fast_iter_u64(&self) -> FastU64BitmapIter<'_> { - FastU64BitmapIter::new(&self.bytes, self.offset, self.length) + FastU64BitmapIter::new(&self.storage, self.offset, self.length) } /// Returns an iterator that only iterates over the set bits. @@ -177,7 +176,7 @@ impl Bitmap { /// Returns the bits of this [`Bitmap`] as a [`AlignedBitmapSlice`]. pub fn aligned(&self) -> AlignedBitmapSlice<'_, T> { - AlignedBitmapSlice::new(&self.bytes, self.offset, self.length) + AlignedBitmapSlice::new(&self.storage, self.offset, self.length) } /// Returns the byte slice of this [`Bitmap`]. @@ -192,7 +191,7 @@ impl Bitmap { let start = self.offset / 8; let len = (self.offset % 8 + self.length).saturating_add(7) / 8; ( - &self.bytes[start..start + len], + &self.storage[start..start + len], self.offset % 8, self.length, ) @@ -224,7 +223,7 @@ impl Bitmap { /// computed. Repeated calls use the cached bitcount. pub fn unset_bits(&self) -> usize { self.lazy_unset_bits().unwrap_or_else(|| { - let zeros = count_zeros(&self.bytes, self.offset, self.length); + let zeros = count_zeros(&self.storage, self.offset, self.length); self.unset_bit_count_cache .store(zeros as u64, Ordering::Relaxed); zeros @@ -294,8 +293,9 @@ impl Bitmap { if length + small_portion >= self.length { // Subtract the null count of the chunks we slice off. let slice_end = self.offset + offset + length; - let head_count = count_zeros(&self.bytes, self.offset, offset); - let tail_count = count_zeros(&self.bytes, slice_end, self.length - length - offset); + let head_count = count_zeros(&self.storage, self.offset, offset); + let tail_count = + count_zeros(&self.storage, slice_end, self.length - length - offset); let new_count = *unset_bit_count_cache - head_count as u64 - tail_count as u64; *unset_bit_count_cache = new_count; } else { @@ -334,7 +334,7 @@ impl Bitmap { /// Panics iff `i >= self.len()`. #[inline] pub fn get_bit(&self, i: usize) -> bool { - get_bit(&self.bytes, self.offset + i) + get_bit(&self.storage, self.offset + i) } /// Unsafely returns whether the bit at position `i` is set. @@ -343,13 +343,13 @@ impl Bitmap { /// Unsound iff `i >= self.len()`. #[inline] pub unsafe fn get_bit_unchecked(&self, i: usize) -> bool { - get_bit_unchecked(&self.bytes, self.offset + i) + get_bit_unchecked(&self.storage, self.offset + i) } /// Returns a pointer to the start of this [`Bitmap`] (ignores `offsets`) /// This pointer is allocated iff `self.len() > 0`. pub(crate) fn as_ptr(&self) -> *const u8 { - self.bytes.deref().as_ptr() + self.storage.deref().as_ptr() } /// Returns a pointer to the start of this [`Bitmap`] (ignores `offsets`) @@ -366,15 +366,12 @@ impl Bitmap { /// * this [`Bitmap`] has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`]) /// * this [`Bitmap`] was not imported from the c data interface (FFI) pub fn into_mut(mut self) -> Either { - match ( - self.offset, - Arc::get_mut(&mut self.bytes).and_then(|b| b.get_vec()), - ) { - (0, Some(v)) => { - let data = std::mem::take(v); - Either::Right(MutableBitmap::from_vec(data, self.length)) + match self.storage.try_into_vec() { + Ok(v) => Either::Right(MutableBitmap::from_vec(v, self.length)), + Err(storage) => { + self.storage = storage; + Either::Left(self) }, - _ => Either::Left(self), } } @@ -390,7 +387,7 @@ impl Bitmap { let vec = chunk_iter_to_vec(chunks.chain(std::iter::once(remainder))); MutableBitmap::from_vec(vec, data.length) } else { - MutableBitmap::from_vec(data.bytes.as_ref().to_vec(), data.length) + MutableBitmap::from_vec(data.storage.as_ref().to_vec(), data.length) } }, Either::Right(data) => data, @@ -400,57 +397,23 @@ impl Bitmap { /// Initializes an new [`Bitmap`] filled with unset values. #[inline] pub fn new_zeroed(length: usize) -> Self { - // There are quite some situations where we just want a zeroed out Bitmap, since that would - // constantly need to reallocate we make a static that contains the largest allocation. - // Then, we can just take an Arc::clone of that slice everytime or grow it if needed. - static GLOBAL_ZERO_BYTES: OnceLock>>> = OnceLock::new(); - - let rwlock_zero_bytes = GLOBAL_ZERO_BYTES.get_or_init(|| { - let byte_length = length.div_ceil(8).next_power_of_two(); - parking_lot::RwLock::new(Arc::new(Bytes::from(vec![0; byte_length]))) - }); - - let unset_bit_count_cache = AtomicU64::new(length as u64); - - let zero_bytes = rwlock_zero_bytes.upgradable_read(); - if zero_bytes.len() * 8 >= length { - let bytes = zero_bytes.clone(); - return Bitmap { - bytes, - offset: 0, - length, - unset_bit_count_cache, - }; - } - - let mut zero_bytes = RwLockUpgradableReadGuard::upgrade(zero_bytes); - - // Race Condition: - // By the time we got here, another Guard could have been upgraded, and the buffer - // could have been expanded already. So we want to check again whether we cannot just take - // that buffer. - if zero_bytes.len() * 8 >= length { - let bytes = zero_bytes.clone(); - return Bitmap { - bytes, - offset: 0, - length, - unset_bit_count_cache, - }; - } - - // Let do exponential increases so that we are not constantly allocating new - // buffers. - let byte_length = length.div_ceil(8).next_power_of_two(); - - let bytes = Arc::new(Bytes::from(vec![0; byte_length])); - *zero_bytes = bytes.clone(); - - Bitmap { - bytes, + // We intentionally leak 1MiB of zeroed memory once so we don't have to + // refcount it. + const GLOBAL_ZERO_SIZE: usize = 1024 * 1024; + static GLOBAL_ZEROES: LazyLock> = + LazyLock::new(|| SharedStorage::from_static(vec![0; GLOBAL_ZERO_SIZE].leak())); + + let bytes_needed = length.div_ceil(8); + let storage = if bytes_needed <= GLOBAL_ZERO_SIZE { + GLOBAL_ZEROES.clone() + } else { + SharedStorage::from_vec(vec![0; bytes_needed]) + }; + Self { + storage, offset: 0, length, - unset_bit_count_cache, + unset_bit_count_cache: AtomicU64::new(length as u64), } } @@ -464,13 +427,20 @@ impl Bitmap { vec![0; length.saturating_add(7) / 8] }; let unset_bits = if value { 0 } else { length }; - unsafe { Bitmap::from_inner_unchecked(Arc::new(bytes.into()), 0, length, Some(unset_bits)) } + unsafe { + Bitmap::from_inner_unchecked( + SharedStorage::from_vec(bytes), + 0, + length, + Some(unset_bits), + ) + } } /// Counts the nulls (unset bits) starting from `offset` bits and for `length` bits. #[inline] pub fn null_count_range(&self, offset: usize, length: usize) -> usize { - count_zeros(&self.bytes, self.offset + offset, length) + count_zeros(&self.storage, self.offset + offset, length) } /// Creates a new [`Bitmap`] from a slice and length. @@ -506,12 +476,12 @@ impl Bitmap { /// # Safety /// Callers must ensure all invariants of this struct are upheld. pub unsafe fn from_inner_unchecked( - bytes: Arc>, + storage: SharedStorage, offset: usize, length: usize, unset_bits: Option, ) -> Self { - debug_assert!(check(&bytes[..], offset, length).is_ok()); + debug_assert!(check(&storage[..], offset, length).is_ok()); let unset_bit_count_cache = if let Some(n) = unset_bits { AtomicU64::new(n as u64) @@ -519,7 +489,7 @@ impl Bitmap { AtomicU64::new(UNKNOWN_BIT_COUNT) }; Self { - bytes, + storage, offset, length, unset_bit_count_cache, @@ -633,10 +603,10 @@ impl Bitmap { let length = value.len(); let unset_bits = value.null_count(); Self { + storage: SharedStorage::from_arrow_buffer(value.buffer().clone()), offset, length, unset_bit_count_cache: AtomicU64::new(unset_bits as u64), - bytes: Arc::new(crate::buffer::to_bytes(value.buffer().clone())), } } } @@ -646,7 +616,7 @@ impl<'a> IntoIterator for &'a Bitmap { type IntoIter = BitmapIter<'a>; fn into_iter(self) -> Self::IntoIter { - BitmapIter::<'a>::new(&self.bytes, self.offset, self.length) + BitmapIter::<'a>::new(&self.storage, self.offset, self.length) } } @@ -663,7 +633,7 @@ impl IntoIterator for Bitmap { impl From for arrow_buffer::buffer::NullBuffer { fn from(value: Bitmap) -> Self { let null_count = value.unset_bits(); - let buffer = crate::buffer::to_buffer(value.bytes); + let buffer = value.storage.into_arrow_buffer(); let buffer = arrow_buffer::buffer::BooleanBuffer::new(buffer, value.offset, value.length); // SAFETY: null count is accurate unsafe { arrow_buffer::buffer::NullBuffer::new_unchecked(buffer, null_count) } @@ -677,8 +647,6 @@ impl Splitable for Bitmap { } unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) { - let bytes = &self.bytes; - if offset == 0 { return (Self::new(), self.clone()); } @@ -709,12 +677,12 @@ impl Splitable for Bitmap { if lhs_length <= rhs_length { if rhs_length + small_portion >= self.length { - let count = count_zeros(&self.bytes, self.offset, lhs_length) as u64; + let count = count_zeros(&self.storage, self.offset, lhs_length) as u64; lhs_ubcc = count; rhs_ubcc = ubcc - count; } } else if lhs_length + small_portion >= self.length { - let count = count_zeros(&self.bytes, self.offset + offset, rhs_length) as u64; + let count = count_zeros(&self.storage, self.offset + offset, rhs_length) as u64; lhs_ubcc = ubcc - count; rhs_ubcc = count; } @@ -726,13 +694,13 @@ impl Splitable for Bitmap { ( Self { - bytes: bytes.clone(), + storage: self.storage.clone(), offset: self.offset, length: lhs_length, unset_bit_count_cache: AtomicU64::new(lhs_ubcc), }, Self { - bytes: bytes.clone(), + storage: self.storage.clone(), offset: self.offset + offset, length: rhs_length, unset_bit_count_cache: AtomicU64::new(rhs_ubcc), diff --git a/crates/polars-arrow/src/bitmap/mutable.rs b/crates/polars-arrow/src/bitmap/mutable.rs index c81df93f1cdc..d030682a63a7 100644 --- a/crates/polars-arrow/src/bitmap/mutable.rs +++ b/crates/polars-arrow/src/bitmap/mutable.rs @@ -1,5 +1,4 @@ use std::hint::unreachable_unchecked; -use std::sync::Arc; use polars_error::{polars_bail, PolarsResult}; @@ -8,6 +7,7 @@ use super::utils::{ }; use super::{intersects_with_mut, Bitmap}; use crate::bitmap::utils::{get_bit_unchecked, merge_reversed, set_bit_unchecked}; +use crate::storage::SharedStorage; use crate::trusted_len::TrustedLen; /// A container of booleans. [`MutableBitmap`] is semantically equivalent @@ -374,7 +374,7 @@ impl From for Option { // SAFETY: invariants of the `MutableBitmap` equal that of `Bitmap`. let bitmap = unsafe { Bitmap::from_inner_unchecked( - Arc::new(buffer.buffer.into()), + SharedStorage::from_vec(buffer.buffer), 0, buffer.length, Some(unset_bits), diff --git a/crates/polars-arrow/src/buffer/immutable.rs b/crates/polars-arrow/src/buffer/immutable.rs index 21f765d46408..1dfe805ffc57 100644 --- a/crates/polars-arrow/src/buffer/immutable.rs +++ b/crates/polars-arrow/src/buffer/immutable.rs @@ -1,11 +1,11 @@ use std::ops::Deref; -use std::sync::Arc; use either::Either; use num_traits::Zero; -use super::{Bytes, IntoIter}; +use super::IntoIter; use crate::array::{ArrayAccessor, Splitable}; +use crate::storage::SharedStorage; /// [`Buffer`] is a contiguous memory region that can be shared across /// thread boundaries. @@ -39,7 +39,7 @@ use crate::array::{ArrayAccessor, Splitable}; #[derive(Clone)] pub struct Buffer { /// The internal byte buffer. - storage: Arc>, + storage: SharedStorage, /// A pointer into the buffer where our data starts. ptr: *const T, @@ -48,8 +48,8 @@ pub struct Buffer { length: usize, } -unsafe impl Sync for Buffer {} -unsafe impl Send for Buffer {} +unsafe impl Sync for Buffer {} +unsafe impl Send for Buffer {} impl PartialEq for Buffer { #[inline] @@ -79,11 +79,11 @@ impl Buffer { } /// Auxiliary method to create a new Buffer - pub(crate) fn from_bytes(bytes: Bytes) -> Self { - let ptr = bytes.as_ptr(); - let length = bytes.len(); + pub(crate) fn from_storage(storage: SharedStorage) -> Self { + let ptr = storage.as_ptr(); + let length = storage.len(); Buffer { - storage: Arc::new(bytes), + storage, ptr, length, } @@ -204,7 +204,7 @@ impl Buffer { /// Returns a mutable reference to its underlying [`Vec`], if possible. /// /// This operation returns [`Either::Right`] iff this [`Buffer`]: - /// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`]) + /// * has no alive clones /// * has not been imported from the C data interface (FFI) #[inline] pub fn into_mut(mut self) -> Either> { @@ -212,36 +212,31 @@ impl Buffer { if self.is_sliced() { return Either::Left(self); } - match Arc::get_mut(&mut self.storage) - .and_then(|b| b.get_vec()) - .map(std::mem::take) - { - Some(inner) => Either::Right(inner), - None => Either::Left(self), + match self.storage.try_into_vec() { + Ok(v) => Either::Right(v), + Err(slf) => { + self.storage = slf; + Either::Left(self) + }, } } /// Returns a mutable reference to its slice, if possible. /// /// This operation returns [`Some`] iff this [`Buffer`]: - /// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`]) + /// * has no alive clones /// * has not been imported from the C data interface (FFI) #[inline] pub fn get_mut_slice(&mut self) -> Option<&mut [T]> { let offset = self.offset(); - let unique = Arc::get_mut(&mut self.storage)?; - let vec = unique.get_vec()?; - Some(unsafe { vec.get_unchecked_mut(offset..offset + self.length) }) - } - - /// Get the strong count of underlying `Arc` data buffer. - pub fn shared_count_strong(&self) -> usize { - Arc::strong_count(&self.storage) + let slice = self.storage.try_as_mut_slice()?; + Some(unsafe { slice.get_unchecked_mut(offset..offset + self.length) }) } - /// Get the weak count of underlying `Arc` data buffer. - pub fn shared_count_weak(&self) -> usize { - Arc::weak_count(&self.storage) + /// Since this takes a shared reference to self, beware that others might + /// increment this after you've checked it's equal to 1. + pub fn storage_refcount(&self) -> u64 { + self.storage.refcount() } } @@ -262,15 +257,8 @@ impl Buffer { impl From> for Buffer { #[inline] - fn from(p: Vec) -> Self { - let bytes: Bytes = p.into(); - let ptr = bytes.as_ptr(); - let length = bytes.len(); - Self { - storage: Arc::new(bytes), - ptr, - length, - } + fn from(v: Vec) -> Self { + Self::from_storage(SharedStorage::from_vec(v)) } } @@ -303,7 +291,7 @@ impl IntoIterator for Buffer { #[cfg(feature = "arrow_rs")] impl From for Buffer { fn from(value: arrow_buffer::Buffer) -> Self { - Self::from_bytes(crate::buffer::to_bytes(value)) + Self::from_storage(SharedStorage::from_arrow_buffer(value)) } } @@ -311,7 +299,7 @@ impl From for Buffer { impl From> for arrow_buffer::Buffer { fn from(value: Buffer) -> Self { let offset = value.offset(); - crate::buffer::to_buffer(value.storage).slice_with_length( + value.storage.into_arrow_buffer().slice_with_length( offset * std::mem::size_of::(), value.length * std::mem::size_of::(), ) diff --git a/crates/polars-arrow/src/buffer/mod.rs b/crates/polars-arrow/src/buffer/mod.rs index a5c3aaf90763..386545482d09 100644 --- a/crates/polars-arrow/src/buffer/mod.rs +++ b/crates/polars-arrow/src/buffer/mod.rs @@ -3,101 +3,5 @@ mod immutable; mod iterator; -use std::ops::Deref; - -use crate::ffi::InternalArrowArray; - -pub(crate) enum BytesAllocator { - // Dead code lint is a false positive. - // remove once fixed in rustc - #[allow(dead_code)] - InternalArrowArray(InternalArrowArray), - - #[cfg(feature = "arrow_rs")] - // Dead code lint is a false positive. - // remove once fixed in rustc - #[allow(dead_code)] - Arrow(arrow_buffer::Buffer), -} -pub(crate) type BytesInner = polars_utils::foreign_vec::ForeignVec; - -/// Bytes representation. -#[repr(transparent)] -pub struct Bytes(BytesInner); - -impl Bytes { - /// Takes ownership of an allocated memory region. - /// # Panics - /// This function panics if and only if pointer is not null - /// - /// # Safety - /// This function is safe if and only if `ptr` is valid for `length` - /// # Implementation - /// This function leaks if and only if `owner` does not deallocate - /// the region `[ptr, ptr+length[` when dropped. - #[inline] - pub(crate) unsafe fn from_foreign(ptr: *const T, length: usize, owner: BytesAllocator) -> Self { - Self(BytesInner::from_foreign(ptr, length, owner)) - } - - /// Returns a `Some` mutable reference of [`Vec`] iff this was initialized - /// from a [`Vec`] and `None` otherwise. - #[inline] - pub(crate) fn get_vec(&mut self) -> Option<&mut Vec> { - self.0.get_vec() - } -} - -impl Deref for Bytes { - type Target = [T]; - - #[inline] - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl From> for Bytes { - #[inline] - fn from(data: Vec) -> Self { - let inner: BytesInner = data.into(); - Bytes(inner) - } -} - -impl From> for Bytes { - #[inline] - fn from(value: BytesInner) -> Self { - Self(value) - } -} - -#[cfg(feature = "arrow_rs")] -pub(crate) fn to_buffer( - value: std::sync::Arc>, -) -> arrow_buffer::Buffer { - // This should never panic as ForeignVec pointer must be non-null - let ptr = std::ptr::NonNull::new(value.as_ptr() as _).unwrap(); - let len = value.len() * std::mem::size_of::(); - // SAFETY: allocation is guaranteed to be valid for `len` bytes - unsafe { arrow_buffer::Buffer::from_custom_allocation(ptr, len, value) } -} - -#[cfg(feature = "arrow_rs")] -pub(crate) fn to_bytes(value: arrow_buffer::Buffer) -> Bytes { - let ptr = value.as_ptr(); - let align = ptr.align_offset(std::mem::align_of::()); - assert_eq!(align, 0, "not aligned"); - let len = value.len() / std::mem::size_of::(); - - // Valid as `NativeType: Pod` and checked alignment above - let ptr = value.as_ptr() as *const T; - - let owner = crate::buffer::BytesAllocator::Arrow(value); - - // SAFETY: slice is valid for len elements of T - unsafe { Bytes::from_foreign(ptr, len, owner) } -} - pub use immutable::Buffer; pub(super) use iterator::IntoIter; diff --git a/crates/polars-arrow/src/ffi/array.rs b/crates/polars-arrow/src/ffi/array.rs index 55090f1c760a..60a102f56e94 100644 --- a/crates/polars-arrow/src/ffi/array.rs +++ b/crates/polars-arrow/src/ffi/array.rs @@ -7,9 +7,10 @@ use super::ArrowArray; use crate::array::*; use crate::bitmap::utils::bytes_for; use crate::bitmap::Bitmap; -use crate::buffer::{Buffer, Bytes, BytesAllocator}; +use crate::buffer::Buffer; use crate::datatypes::{ArrowDataType, PhysicalType}; use crate::ffi::schema::get_child; +use crate::storage::SharedStorage; use crate::types::NativeType; use crate::{match_integer_type, with_match_primitive_type_full}; @@ -259,8 +260,8 @@ unsafe fn create_buffer_known_len( return Ok(Buffer::new()); } let ptr: *mut T = get_buffer_ptr(array, dtype, index)?; - let bytes = Bytes::from_foreign(ptr, len, BytesAllocator::InternalArrowArray(owner)); - Ok(Buffer::from_bytes(bytes)) + let storage = SharedStorage::from_internal_arrow_array(ptr, len, owner); + Ok(Buffer::from_storage(storage)) } /// returns the buffer `i` of `array` interpreted as a [`Buffer`]. @@ -286,8 +287,8 @@ unsafe fn create_buffer( // We have to check alignment. // This is the zero-copy path. if ptr.align_offset(std::mem::align_of::()) == 0 { - let bytes = Bytes::from_foreign(ptr, len, BytesAllocator::InternalArrowArray(owner)); - Ok(Buffer::from_bytes(bytes).sliced(offset, len - offset)) + let storage = SharedStorage::from_internal_arrow_array(ptr, len, owner); + Ok(Buffer::from_storage(storage).sliced(offset, len - offset)) } // This is the path where alignment isn't correct. // We copy the data to a new vec @@ -321,7 +322,7 @@ unsafe fn create_bitmap( let offset: usize = array.offset.try_into().expect("offset to fit in `usize`"); let bytes_len = bytes_for(offset + len); - let bytes = Bytes::from_foreign(ptr, bytes_len, BytesAllocator::InternalArrowArray(owner)); + let storage = SharedStorage::from_internal_arrow_array(ptr, bytes_len, owner); let null_count = if is_validity { Some(array.null_count()) @@ -329,10 +330,7 @@ unsafe fn create_bitmap( None }; Ok(Bitmap::from_inner_unchecked( - Arc::new(bytes), - offset, - len, - null_count, + storage, offset, len, null_count, )) } diff --git a/crates/polars-arrow/src/lib.rs b/crates/polars-arrow/src/lib.rs index 15af97483a41..8c9b5c0d1af5 100644 --- a/crates/polars-arrow/src/lib.rs +++ b/crates/polars-arrow/src/lib.rs @@ -26,6 +26,7 @@ pub mod record_batch; pub mod offset; pub mod scalar; +pub mod storage; pub mod trusted_len; pub mod types; diff --git a/crates/polars-arrow/src/storage.rs b/crates/polars-arrow/src/storage.rs new file mode 100644 index 000000000000..864e4dc29d38 --- /dev/null +++ b/crates/polars-arrow/src/storage.rs @@ -0,0 +1,215 @@ +use std::mem::ManuallyDrop; +use std::ops::Deref; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicU64, Ordering}; + +use crate::ffi::InternalArrowArray; + +enum BackingStorage { + Vec { + capacity: usize, + }, + InternalArrowArray(InternalArrowArray), + #[cfg(feature = "arrow_rs")] + ArrowBuffer(arrow_buffer::Buffer), +} + +struct SharedStorageInner { + ref_count: AtomicU64, + ptr: *mut T, + length: usize, + backing: Option, +} + +impl Drop for SharedStorageInner { + fn drop(&mut self) { + match self.backing.take() { + Some(BackingStorage::InternalArrowArray(a)) => drop(a), + #[cfg(feature = "arrow_rs")] + Some(BackingStorage::ArrowBuffer(b)) => drop(b), + Some(BackingStorage::Vec { capacity }) => unsafe { + drop(Vec::from_raw_parts(self.ptr, self.length, capacity)) + }, + None => {}, + } + } +} + +pub struct SharedStorage { + inner: NonNull>, +} + +unsafe impl Send for SharedStorage {} +unsafe impl Sync for SharedStorage {} + +impl SharedStorage { + pub fn from_static(slice: &'static [T]) -> Self { + let length = slice.len(); + let ptr = slice.as_ptr().cast_mut(); + let inner = SharedStorageInner { + ref_count: AtomicU64::new(2), // Never used, but 2 so it won't pass exclusivity tests. + ptr, + length, + backing: None, + }; + Self { + inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(), + } + } + + pub fn from_vec(mut v: Vec) -> Self { + let length = v.len(); + let capacity = v.capacity(); + let ptr = v.as_mut_ptr(); + core::mem::forget(v); + let inner = SharedStorageInner { + ref_count: AtomicU64::new(1), + ptr, + length, + backing: Some(BackingStorage::Vec { capacity }), + }; + Self { + inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(), + } + } + + pub fn from_internal_arrow_array(ptr: *const T, len: usize, arr: InternalArrowArray) -> Self { + let inner = SharedStorageInner { + ref_count: AtomicU64::new(1), + ptr: ptr.cast_mut(), + length: len, + backing: Some(BackingStorage::InternalArrowArray(arr)), + }; + Self { + inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(), + } + } +} + +#[cfg(feature = "arrow_rs")] +impl SharedStorage { + pub fn from_arrow_buffer(buffer: arrow_buffer::Buffer) -> Self { + let ptr = buffer.as_ptr(); + let align_offset = ptr.align_offset(std::mem::align_of::()); + assert_eq!(align_offset, 0, "arrow_buffer::Buffer misaligned"); + let length = buffer.len() / std::mem::size_of::(); + + let inner = SharedStorageInner { + ref_count: AtomicU64::new(1), + ptr: ptr as *mut T, + length, + backing: Some(BackingStorage::ArrowBuffer(buffer)), + }; + Self { + inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(), + } + } + + pub fn into_arrow_buffer(self) -> arrow_buffer::Buffer { + let ptr = NonNull::new(self.as_ptr() as *mut u8).unwrap(); + let len = self.len() * std::mem::size_of::(); + let arc = std::sync::Arc::new(self); + unsafe { arrow_buffer::Buffer::from_custom_allocation(ptr, len, arc) } + } +} + +impl SharedStorage { + #[inline(always)] + pub fn len(&self) -> usize { + self.inner().length + } + + #[inline(always)] + pub fn as_ptr(&self) -> *const T { + self.inner().ptr + } + + #[inline(always)] + pub fn is_exclusive(&mut self) -> bool { + // Ordering semantics copied from Arc. + self.inner().ref_count.load(Ordering::Acquire) == 1 + } + + /// Gets the reference count of this storage. + /// + /// Because this function takes a shared reference this should not be used + /// in cases where we are checking if the refcount is one for safety, + /// someone else could increment it in the meantime. + #[inline(always)] + pub fn refcount(&self) -> u64 { + // Ordering semantics copied from Arc. + self.inner().ref_count.load(Ordering::Acquire) + } + + pub fn try_as_mut_slice(&mut self) -> Option<&mut [T]> { + self.is_exclusive().then(|| { + let inner = self.inner(); + unsafe { core::slice::from_raw_parts_mut(inner.ptr, inner.length) } + }) + } + + pub fn try_into_vec(mut self) -> Result, Self> { + let Some(BackingStorage::Vec { capacity }) = self.inner().backing else { + return Err(self); + }; + if self.is_exclusive() { + let slf = ManuallyDrop::new(self); + let inner = slf.inner(); + Ok(unsafe { Vec::from_raw_parts(inner.ptr, inner.length, capacity) }) + } else { + Err(self) + } + } + + #[inline(always)] + fn inner(&self) -> &SharedStorageInner { + unsafe { &*self.inner.as_ptr() } + } + + /// # Safety + /// May only be called once. + #[cold] + unsafe fn drop_slow(&mut self) { + unsafe { drop(Box::from_raw(self.inner.as_ptr())) } + } +} + +impl Deref for SharedStorage { + type Target = [T]; + + #[inline] + fn deref(&self) -> &Self::Target { + unsafe { + let inner = self.inner(); + core::slice::from_raw_parts(inner.ptr, inner.length) + } + } +} + +impl Clone for SharedStorage { + fn clone(&self) -> Self { + let inner = self.inner(); + if inner.backing.is_some() { + // Ordering semantics copied from Arc. + inner.ref_count.fetch_add(1, Ordering::Relaxed); + } + Self { inner: self.inner } + } +} + +impl Drop for SharedStorage { + fn drop(&mut self) { + let inner = self.inner(); + if inner.backing.is_none() { + return; + } + + // Ordering semantics copied from Arc. + if inner.ref_count.fetch_sub(1, Ordering::Release) == 1 { + std::sync::atomic::fence(Ordering::Acquire); + unsafe { + self.drop_slow(); + } + } + } +} diff --git a/crates/polars-core/src/chunked_array/object/extension/drop.rs b/crates/polars-core/src/chunked_array/object/extension/drop.rs index 3b3e16deff2e..075e9e99dc61 100644 --- a/crates/polars-core/src/chunked_array/object/extension/drop.rs +++ b/crates/polars-core/src/chunked_array/object/extension/drop.rs @@ -41,7 +41,7 @@ pub(crate) unsafe fn drop_object_array(values: &dyn Array) { // If the buf is not shared with anyone but us we can deallocate. let buf = arr.values(); - if buf.shared_count_strong() == 1 && !buf.is_empty() { + if buf.storage_refcount() == 1 && !buf.is_empty() { PolarsExtension::new(arr.clone()); }; } diff --git a/crates/polars-utils/src/foreign_vec.rs b/crates/polars-utils/src/foreign_vec.rs deleted file mode 100644 index f763246ec029..000000000000 --- a/crates/polars-utils/src/foreign_vec.rs +++ /dev/null @@ -1,100 +0,0 @@ -/// This is pulled out of https://github.com/DataEngineeringLabs/foreign_vec -use std::mem::ManuallyDrop; -use std::ops::DerefMut; -use std::vec::Vec; - -/// Mode of deallocating memory regions -enum Allocation { - /// Native allocation - Native, - // A foreign allocator and its ref count - Foreign(D), -} - -/// A continuous memory region that may be allocated externally. -/// -/// In the most common case, this is created from [`Vec`]. -/// However, this region may also be allocated by a foreign allocator `D` -/// and behave as `&[T]`. -pub struct ForeignVec { - /// An implementation using an `enum` of a `Vec` or a foreign pointer is not used - /// because `deref` is at least 50% more expensive than the deref of a `Vec`. - data: ManuallyDrop>, - /// the region was allocated - allocation: Allocation, -} - -impl ForeignVec { - /// Takes ownership of an allocated memory region. - /// # Panics - /// This function panics if and only if pointer is not null - /// # Safety - /// This function is safe if and only if `ptr` is valid for `length` - /// # Implementation - /// This function leaks if and only if `owner` does not deallocate - /// the region `[ptr, ptr+length[` when dropped. - #[inline] - pub unsafe fn from_foreign(ptr: *const T, length: usize, owner: D) -> Self { - assert!(!ptr.is_null()); - // This line is technically outside the assumptions of `Vec::from_raw_parts`, since - // `ptr` was not allocated by `Vec`. However, one of the invariants of this struct - // is that we do never expose this region as a `Vec`; we only use `Vec` on it to provide - // immutable access to the region (via `Vec::deref` to `&[T]`). - let data = Vec::from_raw_parts(ptr as *mut T, length, length); - let data = ManuallyDrop::new(data); - - Self { - data, - allocation: Allocation::Foreign(owner), - } - } - - /// Returns a `Some` mutable reference of [`Vec`] iff this was initialized - /// from a [`Vec`] and `None` otherwise. - pub fn get_vec(&mut self) -> Option<&mut Vec> { - match &self.allocation { - Allocation::Foreign(_) => None, - Allocation::Native => Some(self.data.deref_mut()), - } - } -} - -impl Drop for ForeignVec { - #[inline] - fn drop(&mut self) { - match self.allocation { - Allocation::Foreign(_) => { - // the foreign is dropped via its `Drop` - }, - Allocation::Native => { - let data = core::mem::take(&mut self.data); - let _ = ManuallyDrop::into_inner(data); - }, - } - } -} - -impl core::ops::Deref for ForeignVec { - type Target = [T]; - - #[inline] - fn deref(&self) -> &[T] { - &self.data - } -} - -impl core::fmt::Debug for ForeignVec { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - core::fmt::Debug::fmt(&**self, f) - } -} - -impl From> for ForeignVec { - #[inline] - fn from(data: Vec) -> Self { - Self { - data: ManuallyDrop::new(data), - allocation: Allocation::Native, - } - } -} diff --git a/crates/polars-utils/src/lib.rs b/crates/polars-utils/src/lib.rs index 68e331973800..eacd517d1254 100644 --- a/crates/polars-utils/src/lib.rs +++ b/crates/polars-utils/src/lib.rs @@ -14,7 +14,6 @@ pub mod contention_pool; pub mod cpuid; mod error; pub mod floor_divmod; -pub mod foreign_vec; pub mod functions; pub mod hashing; pub mod idx_vec; diff --git a/crates/polars/tests/it/arrow/buffer/immutable.rs b/crates/polars/tests/it/arrow/buffer/immutable.rs index a4835422c56e..9065b52fba35 100644 --- a/crates/polars/tests/it/arrow/buffer/immutable.rs +++ b/crates/polars/tests/it/arrow/buffer/immutable.rs @@ -99,7 +99,7 @@ fn from_arrow_vec() { } #[test] -#[should_panic(expected = "not aligned")] +#[should_panic(expected = "arrow_buffer::Buffer misaligned")] fn from_arrow_misaligned() { let buffer = arrow_buffer::Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]).slice(1); let _ = Buffer::::from(buffer);