From b3315382a49fffde53b38c2dc747e8b9b5a3fb1d Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Fri, 19 Jul 2024 16:39:54 +0200 Subject: [PATCH] perf: Use mmap-ed memory if possible in Parquet reader (#17725) --- Cargo.lock | 1 + Cargo.toml | 1 + crates/polars-io/Cargo.toml | 4 +- crates/polars-io/src/mmap.rs | 11 + crates/polars-io/src/parquet/read/mmap.rs | 21 +- .../polars-io/src/parquet/read/read_impl.rs | 10 +- crates/polars-parquet/Cargo.toml | 2 +- .../src/arrow/read/deserialize/mod.rs | 3 +- .../src/arrow/read/deserialize/null/mod.rs | 5 +- .../src/arrow/read/row_group.rs | 5 +- .../src/arrow/write/dictionary.rs | 2 +- .../polars-parquet/src/arrow/write/utils.rs | 2 +- crates/polars-parquet/src/parquet/mod.rs | 34 ++ crates/polars-parquet/src/parquet/page/mod.rs | 2 +- .../src/parquet/read/compression.rs | 3 +- crates/polars-parquet/src/parquet/read/mod.rs | 6 +- .../src/parquet/read/page/indexed_reader.rs | 6 +- .../src/parquet/read/page/memreader.rs | 191 ----------- .../src/parquet/read/page/mod.rs | 2 - .../src/parquet/read/page/reader.rs | 8 +- .../src/parquet/read/page/stream.rs | 4 +- .../src/parquet/write/compression.rs | 3 +- .../polars-parquet/src/parquet/write/page.rs | 2 +- crates/polars-utils/Cargo.toml | 2 + crates/polars-utils/src/lib.rs | 2 + crates/polars-utils/src/mem.rs | 17 + crates/polars-utils/src/mmap.rs | 300 ++++++++++++++++++ crates/polars/tests/it/io/parquet/read/mod.rs | 5 +- .../tests/it/io/parquet/write/binary.rs | 2 +- .../tests/it/io/parquet/write/indexes.rs | 4 +- .../polars/tests/it/io/parquet/write/mod.rs | 2 +- .../tests/it/io/parquet/write/primitive.rs | 2 +- 32 files changed, 421 insertions(+), 243 deletions(-) delete mode 100644 crates/polars-parquet/src/parquet/read/page/memreader.rs create mode 100644 crates/polars-utils/src/mmap.rs diff --git a/Cargo.lock b/Cargo.lock index e344e7f0f078..de08d6cffe5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3417,6 +3417,7 @@ dependencies = [ "bytemuck", "hashbrown", "indexmap", + "memmap2", "num-traits", "once_cell", "polars-error", diff --git a/Cargo.toml b/Cargo.toml index b996a7877bfe..2efb3e722d49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ indexmap = { version = "2", features = ["std"] } itoa = "1.0.6" itoap = { version = "1", features = ["simd"] } memchr = "2.6" +memmap = { package = "memmap2", version = "0.7" } multiversion = "0.7" ndarray = { version = "0.15", default-features = false } num-traits = "0.2" diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index c99cc6e407f9..aa2dc674f7a9 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -14,7 +14,7 @@ polars-error = { workspace = true } polars-json = { workspace = true, optional = true } polars-parquet = { workspace = true, optional = true } polars-time = { workspace = true, features = [], optional = true } -polars-utils = { workspace = true } +polars-utils = { workspace = true, features = ['mmap'] } ahash = { workspace = true } arrow = { workspace = true } @@ -30,7 +30,7 @@ futures = { workspace = true, optional = true } glob = { version = "0.3" } itoa = { workspace = true, optional = true } memchr = { workspace = true } -memmap = { package = "memmap2", version = "0.7" } +memmap = { workspace = true } num-traits = { workspace = true } object_store = { workspace = true, optional = true } once_cell = { workspace = true } diff --git a/crates/polars-io/src/mmap.rs b/crates/polars-io/src/mmap.rs index 5f454ba081c6..07fe8c326d9b 100644 --- a/crates/polars-io/src/mmap.rs +++ b/crates/polars-io/src/mmap.rs @@ -8,6 +8,7 @@ use memmap::Mmap; use once_cell::sync::Lazy; use polars_core::config::verbose; use polars_error::{polars_bail, PolarsResult}; +use polars_utils::mmap::{MemSlice, MmapSlice}; // Keep track of memory mapped files so we don't write to them while reading // Use a btree as it uses less memory than a hashmap and this thing never shrinks. @@ -143,6 +144,16 @@ impl std::ops::Deref for ReaderBytes<'_> { } } +impl<'a> ReaderBytes<'a> { + pub fn into_mem_slice(self) -> MemSlice { + match self { + ReaderBytes::Borrowed(v) => MemSlice::from_slice(v), + ReaderBytes::Owned(v) => MemSlice::from_vec(v), + ReaderBytes::Mapped(v, _) => MemSlice::from_mmap(MmapSlice::new(v)), + } + } +} + impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> { fn from(m: &'a mut T) -> Self { match m.to_bytes() { diff --git a/crates/polars-io/src/parquet/read/mmap.rs b/crates/polars-io/src/parquet/read/mmap.rs index 8ac959069f76..cbd656a3f138 100644 --- a/crates/polars-io/src/parquet/read/mmap.rs +++ b/crates/polars-io/src/parquet/read/mmap.rs @@ -4,11 +4,11 @@ use bytes::Bytes; #[cfg(feature = "async")] use polars_core::datatypes::PlHashMap; use polars_error::PolarsResult; -use polars_parquet::parquet::read::MemReader; use polars_parquet::read::{ column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData, PageReader, }; +use polars_utils::mmap::{MemReader, MemSlice}; /// Store columns data in two scenarios: /// 1. a local memory mapped file @@ -21,8 +21,8 @@ use polars_parquet::read::{ /// b. asynchronously fetch them in parallel, for example using object_store /// c. store the data in this data structure /// d. when all the data is available deserialize on multiple threads, for example using rayon -pub enum ColumnStore<'a> { - Local(&'a [u8]), +pub enum ColumnStore { + Local(MemSlice), #[cfg(feature = "async")] Fetched(PlHashMap), } @@ -33,7 +33,7 @@ pub(super) fn mmap_columns<'a>( store: &'a ColumnStore, columns: &'a [ColumnChunkMetaData], field_name: &str, -) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> { +) -> Vec<(&'a ColumnChunkMetaData, MemSlice)> { get_field_columns(columns, field_name) .into_iter() .map(|meta| _mmap_single_column(store, meta)) @@ -43,10 +43,10 @@ pub(super) fn mmap_columns<'a>( fn _mmap_single_column<'a>( store: &'a ColumnStore, meta: &'a ColumnChunkMetaData, -) -> (&'a ColumnChunkMetaData, &'a [u8]) { +) -> (&'a ColumnChunkMetaData, MemSlice) { let (start, len) = meta.byte_range(); let chunk = match store { - ColumnStore::Local(file) => &file[start as usize..(start + len) as usize], + ColumnStore::Local(mem_slice) => mem_slice.slice(start as usize, (start + len) as usize), #[cfg(all(feature = "async", feature = "parquet"))] ColumnStore::Fetched(fetched) => { let entry = fetched.get(&start).unwrap_or_else(|| { @@ -54,7 +54,7 @@ fn _mmap_single_column<'a>( "mmap_columns: column with start {start} must be prefetched in ColumnStore.\n" ) }); - entry.as_ref() + MemSlice::from_slice(entry.as_ref()) }, }; (meta, chunk) @@ -63,7 +63,7 @@ fn _mmap_single_column<'a>( // similar to arrow2 serializer, except this accepts a slice instead of a vec. // this allows us to memory map pub(super) fn to_deserializer<'a>( - columns: Vec<(&ColumnChunkMetaData, &'a [u8])>, + columns: Vec<(&ColumnChunkMetaData, MemSlice)>, field: Field, num_rows: usize, chunk_size: Option, @@ -73,8 +73,11 @@ pub(super) fn to_deserializer<'a>( let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() .map(|(column_meta, chunk)| { + // Advise fetching the data for the column chunk + chunk.prefetch(); + let pages = PageReader::new( - MemReader::from_slice(chunk), + MemReader::new(chunk), column_meta, std::sync::Arc::new(|_, _| true), vec![], diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 97d2a93a9cec..9841450305cf 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -8,6 +8,7 @@ use polars_core::prelude::*; use polars_core::utils::{accumulate_dataframes_vertical, split_df}; use polars_core::POOL; use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData}; +use polars_utils::mmap::MemSlice; use rayon::prelude::*; #[cfg(feature = "cloud")] @@ -446,8 +447,7 @@ pub fn read_parquet( } let reader = ReaderBytes::from(&mut reader); - let bytes = reader.deref(); - let store = mmap::ColumnStore::Local(bytes); + let store = mmap::ColumnStore::Local(reader.into_mem_slice()); let dfs = rg_to_dfs( &store, @@ -492,8 +492,12 @@ impl FetchRowGroupsFromMmapReader { let reader_bytes = get_reader_bytes(reader_ptr)?; Ok(FetchRowGroupsFromMmapReader(reader_bytes)) } + fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { - Ok(mmap::ColumnStore::Local(self.0.deref())) + // @TODO: we can something smarter here with mmap + Ok(mmap::ColumnStore::Local(MemSlice::from_slice( + self.0.deref(), + ))) } } diff --git a/crates/polars-parquet/Cargo.toml b/crates/polars-parquet/Cargo.toml index 2e8418c5e03d..5c62479ccaa3 100644 --- a/crates/polars-parquet/Cargo.toml +++ b/crates/polars-parquet/Cargo.toml @@ -23,7 +23,7 @@ futures = { workspace = true, optional = true } num-traits = { workspace = true } polars-compute = { workspace = true } polars-error = { workspace = true } -polars-utils = { workspace = true } +polars-utils = { workspace = true, features = ["mmap"] } simdutf8 = { workspace = true } parquet-format-safe = "0.2" diff --git a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs index 007f7cfea8ac..99589b2da41c 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs @@ -31,12 +31,13 @@ mod utils; use arrow::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, MapArray}; use arrow::datatypes::{ArrowDataType, Field, IntervalUnit}; use arrow::offset::Offsets; +use polars_utils::mmap::MemReader; use simple::page_iter_to_arrays; pub use self::nested_utils::{init_nested, InitNested, NestedArrayIter, NestedState}; pub use self::struct_::StructIterator; use super::*; -use crate::parquet::read::{get_page_iterator as _get_page_iterator, MemReader}; +use crate::parquet::read::get_page_iterator as _get_page_iterator; use crate::parquet::schema::types::PrimitiveType; /// Creates a new iterator of compressed pages. diff --git a/crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs index 5d0adfe69bc1..c1a7c7eb5037 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs @@ -61,12 +61,11 @@ mod tests { use super::iter_to_arrays; use crate::parquet::encoding::Encoding; use crate::parquet::error::ParquetError; - #[allow(unused_imports)] - use crate::parquet::fallible_streaming_iterator; use crate::parquet::metadata::Descriptor; use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page}; - use crate::parquet::read::CowBuffer; use crate::parquet::schema::types::{PhysicalType, PrimitiveType}; + #[allow(unused_imports)] + use crate::parquet::{fallible_streaming_iterator, CowBuffer}; #[test] fn limit() { diff --git a/crates/polars-parquet/src/arrow/read/row_group.rs b/crates/polars-parquet/src/arrow/read/row_group.rs index 2ff368d77c99..022b7350b0bc 100644 --- a/crates/polars-parquet/src/arrow/read/row_group.rs +++ b/crates/polars-parquet/src/arrow/read/row_group.rs @@ -4,14 +4,13 @@ use arrow::array::Array; use arrow::datatypes::Field; use arrow::record_batch::RecordBatchT; use polars_error::PolarsResult; +use polars_utils::mmap::MemReader; use super::{ArrayIter, RowGroupMetaData}; use crate::arrow::read::column_iter_to_arrays; use crate::parquet::indexes::FilteredPage; use crate::parquet::metadata::ColumnChunkMetaData; -use crate::parquet::read::{ - BasicDecompressor, IndexedPageReader, MemReader, PageMetaData, PageReader, -}; +use crate::parquet::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader}; /// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into /// an iterator of [`RecordBatchT`]. diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 316d734b63e0..d8a1866e6bc0 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -19,9 +19,9 @@ use crate::arrow::write::{slice_nested_leaf, utils}; use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DictPage, Page}; -use crate::parquet::read::CowBuffer; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::ParquetStatistics; +use crate::parquet::CowBuffer; use crate::write::DynIter; pub(crate) fn encode_as_dictionary_optional( diff --git a/crates/polars-parquet/src/arrow/write/utils.rs b/crates/polars-parquet/src/arrow/write/utils.rs index 0c9f8bec8cdb..bc979eecad21 100644 --- a/crates/polars-parquet/src/arrow/write/utils.rs +++ b/crates/polars-parquet/src/arrow/write/utils.rs @@ -8,9 +8,9 @@ use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::encoding::Encoding; use crate::parquet::metadata::Descriptor; use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}; -use crate::parquet::read::CowBuffer; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::ParquetStatistics; +use crate::parquet::CowBuffer; fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> PolarsResult<()> { buffer.extend_from_slice(&[0; 4]); diff --git a/crates/polars-parquet/src/parquet/mod.rs b/crates/polars-parquet/src/parquet/mod.rs index e54600fb4af5..c6c227c47f6e 100644 --- a/crates/polars-parquet/src/parquet/mod.rs +++ b/crates/polars-parquet/src/parquet/mod.rs @@ -15,7 +15,10 @@ pub mod statistics; pub mod types; pub mod write; +use std::ops::Deref; + use parquet_format_safe as thrift_format; +use polars_utils::mmap::MemSlice; pub use streaming_decompression::{fallible_streaming_iterator, FallibleStreamingIterator}; pub const HEADER_SIZE: u64 = PARQUET_MAGIC.len() as u64; @@ -24,3 +27,34 @@ pub const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; /// The number of bytes read at the end of the parquet file on first read const DEFAULT_FOOTER_READ_SIZE: u64 = 64 * 1024; + +/// A copy-on-write buffer over bytes +#[derive(Debug, Clone)] +pub enum CowBuffer { + Borrowed(MemSlice), + Owned(Vec), +} + +impl Deref for CowBuffer { + type Target = [u8]; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + match self { + CowBuffer::Borrowed(v) => v.deref(), + CowBuffer::Owned(v) => v.deref(), + } + } +} + +impl CowBuffer { + pub fn to_mut(&mut self) -> &mut Vec { + match self { + CowBuffer::Borrowed(v) => { + *self = Self::Owned(v.clone().to_vec()); + self.to_mut() + }, + CowBuffer::Owned(v) => v, + } + } +} diff --git a/crates/polars-parquet/src/parquet/page/mod.rs b/crates/polars-parquet/src/parquet/page/mod.rs index 77b5da526fd8..5e2f1ab7d52a 100644 --- a/crates/polars-parquet/src/parquet/page/mod.rs +++ b/crates/polars-parquet/src/parquet/page/mod.rs @@ -1,4 +1,4 @@ -use super::read::CowBuffer; +use super::CowBuffer; use crate::parquet::compression::Compression; use crate::parquet::encoding::{get_length, Encoding}; use crate::parquet::error::{ParquetError, ParquetResult}; diff --git a/crates/polars-parquet/src/parquet/read/compression.rs b/crates/polars-parquet/src/parquet/read/compression.rs index 700db3b28604..63e1934efda5 100644 --- a/crates/polars-parquet/src/parquet/read/compression.rs +++ b/crates/polars-parquet/src/parquet/read/compression.rs @@ -1,11 +1,10 @@ use parquet_format_safe::DataPageHeaderV2; use super::page::PageIterator; -use super::CowBuffer; use crate::parquet::compression::{self, Compression}; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{CompressedPage, DataPage, DataPageHeader, DictPage, Page}; -use crate::parquet::FallibleStreamingIterator; +use crate::parquet::{CowBuffer, FallibleStreamingIterator}; fn decompress_v1( compressed: &[u8], diff --git a/crates/polars-parquet/src/parquet/read/mod.rs b/crates/polars-parquet/src/parquet/read/mod.rs index 680ed676194b..23300a248149 100644 --- a/crates/polars-parquet/src/parquet/read/mod.rs +++ b/crates/polars-parquet/src/parquet/read/mod.rs @@ -16,10 +16,8 @@ pub use indexes::{read_columns_indexes, read_pages_locations}; pub use metadata::{deserialize_metadata, read_metadata, read_metadata_with_size}; #[cfg(feature = "async")] pub use page::{get_page_stream, get_page_stream_from_column_start}; -pub use page::{ - CowBuffer, IndexedPageReader, MemReader, MemReaderSlice, PageFilter, PageIterator, - PageMetaData, PageReader, -}; +pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader}; +use polars_utils::mmap::MemReader; #[cfg(feature = "async")] pub use stream::read_metadata as read_metadata_async; diff --git a/crates/polars-parquet/src/parquet/read/page/indexed_reader.rs b/crates/polars-parquet/src/parquet/read/page/indexed_reader.rs index 7f8bc34764c4..90788d0a7320 100644 --- a/crates/polars-parquet/src/parquet/read/page/indexed_reader.rs +++ b/crates/polars-parquet/src/parquet/read/page/indexed_reader.rs @@ -1,9 +1,9 @@ use std::collections::VecDeque; use std::io::{Seek, SeekFrom}; -use super::memreader::MemReader; +use polars_utils::mmap::{MemReader, MemSlice}; + use super::reader::{finish_page, read_page_header, PageMetaData}; -use super::MemReaderSlice; use crate::parquet::error::ParquetError; use crate::parquet::indexes::{FilteredPage, Interval}; use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor}; @@ -44,7 +44,7 @@ fn read_page( reader: &mut MemReader, start: u64, length: usize, -) -> Result<(ParquetPageHeader, MemReaderSlice), ParquetError> { +) -> Result<(ParquetPageHeader, MemSlice), ParquetError> { // seek to the page reader.seek(SeekFrom::Start(start))?; diff --git a/crates/polars-parquet/src/parquet/read/page/memreader.rs b/crates/polars-parquet/src/parquet/read/page/memreader.rs deleted file mode 100644 index 4c200e506474..000000000000 --- a/crates/polars-parquet/src/parquet/read/page/memreader.rs +++ /dev/null @@ -1,191 +0,0 @@ -use std::io; -use std::ops::Deref; -use std::sync::Arc; - -/// A cursor over a segment of heap allocated memory. This is used for the Parquet reader to avoid -/// sequential allocations. -#[derive(Debug, Clone)] -pub struct MemReader { - data: Arc<[u8]>, - position: usize, -} - -/// A reference to a slice of a memory reader. -/// -/// This should not outlast the original the original [`MemReader`] because it still owns all the -/// memory. -#[derive(Debug, Clone)] -pub struct MemReaderSlice { - data: Arc<[u8]>, - start: usize, - end: usize, -} - -impl Default for MemReaderSlice { - fn default() -> Self { - let slice: &[u8] = &[]; - Self { - data: Arc::from(slice), - start: 0, - end: 0, - } - } -} - -impl Deref for MemReaderSlice { - type Target = [u8]; - - #[inline(always)] - fn deref(&self) -> &Self::Target { - &self.data[self.start..self.end] - } -} - -#[derive(Debug, Clone)] -pub enum CowBuffer { - Borrowed(MemReaderSlice), - Owned(Vec), -} - -impl Deref for CowBuffer { - type Target = [u8]; - - #[inline(always)] - fn deref(&self) -> &Self::Target { - match self { - CowBuffer::Borrowed(v) => v.deref(), - CowBuffer::Owned(v) => v.deref(), - } - } -} - -impl MemReader { - #[inline(always)] - pub fn new(data: Arc<[u8]>) -> Self { - Self { data, position: 0 } - } - - #[inline(always)] - pub fn len(&self) -> usize { - self.data.len() - } - - #[inline(always)] - pub fn remaining_len(&self) -> usize { - self.data.len() - self.position - } - - #[inline(always)] - pub fn position(&self) -> usize { - self.position - } - - #[inline(always)] - pub fn from_slice(data: &[u8]) -> Self { - let data = data.into(); - Self { data, position: 0 } - } - - #[inline(always)] - pub fn from_vec(data: Vec) -> Self { - let data = data.into_boxed_slice().into(); - Self { data, position: 0 } - } - - #[inline(always)] - pub fn from_reader(mut reader: R) -> io::Result { - let mut vec = Vec::new(); - reader.read_to_end(&mut vec)?; - Ok(Self::from_vec(vec)) - } - - #[inline(always)] - pub fn read_slice(&mut self, n: usize) -> MemReaderSlice { - let start = self.position; - let end = usize::min(self.position + n, self.data.len()); - - self.position = end; - - MemReaderSlice { - data: self.data.clone(), - start, - end, - } - } -} - -impl io::Read for MemReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let n = usize::min(buf.len(), self.remaining_len()); - buf[..n].copy_from_slice(&self.data[self.position..self.position + n]); - self.position += n; - Ok(n) - } -} - -impl io::Seek for MemReader { - fn seek(&mut self, pos: io::SeekFrom) -> io::Result { - let position = match pos { - io::SeekFrom::Start(position) => usize::min(position as usize, self.len()), - io::SeekFrom::End(offset) => { - let Some(position) = self.len().checked_add_signed(offset as isize) else { - return Err(io::Error::new( - io::ErrorKind::Other, - "Seek before to before buffer", - )); - }; - - position - }, - io::SeekFrom::Current(offset) => { - let Some(position) = self.len().checked_add_signed(offset as isize) else { - return Err(io::Error::new( - io::ErrorKind::Other, - "Seek before to before buffer", - )); - }; - - position - }, - }; - - eprintln!( - "pos = {}, new_pos = {}, seek = {:?}", - self.position, position, pos - ); - - self.position = position; - - Ok(position as u64) - } -} - -impl MemReaderSlice { - #[inline(always)] - pub fn to_vec(self) -> Vec { - <[u8]>::to_vec(&self) - } - - #[inline] - pub fn from_vec(v: Vec) -> Self { - let end = v.len(); - - Self { - data: v.into(), - start: 0, - end, - } - } -} - -impl CowBuffer { - pub fn to_mut(&mut self) -> &mut Vec { - match self { - CowBuffer::Borrowed(v) => { - *self = Self::Owned(v.clone().to_vec()); - self.to_mut() - }, - CowBuffer::Owned(v) => v, - } - } -} diff --git a/crates/polars-parquet/src/parquet/read/page/mod.rs b/crates/polars-parquet/src/parquet/read/page/mod.rs index 350e49257e79..a8f1396d37d6 100644 --- a/crates/polars-parquet/src/parquet/read/page/mod.rs +++ b/crates/polars-parquet/src/parquet/read/page/mod.rs @@ -1,11 +1,9 @@ mod indexed_reader; -mod memreader; mod reader; #[cfg(feature = "async")] mod stream; pub use indexed_reader::IndexedPageReader; -pub use memreader::{CowBuffer, MemReader, MemReaderSlice}; pub use reader::{PageFilter, PageMetaData, PageReader}; use crate::parquet::error::ParquetError; diff --git a/crates/polars-parquet/src/parquet/read/page/reader.rs b/crates/polars-parquet/src/parquet/read/page/reader.rs index 82b542ee1eb1..7c8e32f40faf 100644 --- a/crates/polars-parquet/src/parquet/read/page/reader.rs +++ b/crates/polars-parquet/src/parquet/read/page/reader.rs @@ -1,9 +1,9 @@ use std::sync::{Arc, OnceLock}; use parquet_format_safe::thrift::protocol::TCompactInputProtocol; +use polars_utils::mmap::{MemReader, MemSlice}; -use super::memreader::MemReader; -use super::{MemReaderSlice, PageIterator}; +use super::PageIterator; use crate::parquet::compression::Compression; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::indexes::Interval; @@ -13,7 +13,7 @@ use crate::parquet::page::{ ParquetPageHeader, }; use crate::parquet::parquet_bridge::Encoding; -use crate::parquet::read::CowBuffer; +use crate::parquet::CowBuffer; /// This meta is a small part of [`ColumnChunkMetaData`]. #[derive(Debug, Clone, PartialEq, Eq)] @@ -211,7 +211,7 @@ pub(super) fn build_page(reader: &mut PageReader) -> ParquetResult>, diff --git a/crates/polars-parquet/src/parquet/read/page/stream.rs b/crates/polars-parquet/src/parquet/read/page/stream.rs index ee4cc4f4cd2b..86939e20be07 100644 --- a/crates/polars-parquet/src/parquet/read/page/stream.rs +++ b/crates/polars-parquet/src/parquet/read/page/stream.rs @@ -4,6 +4,7 @@ use async_stream::try_stream; use futures::io::{copy, sink}; use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream}; use parquet_format_safe::thrift::protocol::TCompactInputStreamProtocol; +use polars_utils::mmap::MemSlice; use super::reader::{finish_page, get_page_header, PageMetaData}; use super::PageFilter; @@ -11,7 +12,6 @@ use crate::parquet::compression::Compression; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor}; use crate::parquet::page::{CompressedPage, ParquetPageHeader}; -use crate::parquet::read::MemReaderSlice; /// Returns a stream of compressed data pages pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>( @@ -119,7 +119,7 @@ fn _get_page_stream( yield finish_page( page_header, - MemReaderSlice::from_vec(std::mem::take(&mut scratch)), + MemSlice::from_vec(std::mem::take(&mut scratch)), compression, &descriptor, None, diff --git a/crates/polars-parquet/src/parquet/write/compression.rs b/crates/polars-parquet/src/parquet/write/compression.rs index aa48d08c7b26..1c7d4d36a901 100644 --- a/crates/polars-parquet/src/parquet/write/compression.rs +++ b/crates/polars-parquet/src/parquet/write/compression.rs @@ -4,8 +4,7 @@ use crate::parquet::page::{ CompressedDataPage, CompressedDictPage, CompressedPage, DataPage, DataPageHeader, DictPage, Page, }; -use crate::parquet::read::CowBuffer; -use crate::parquet::{compression, FallibleStreamingIterator}; +use crate::parquet::{compression, CowBuffer, FallibleStreamingIterator}; /// Compresses a [`DataPage`] into a [`CompressedDataPage`]. fn compress_data( diff --git a/crates/polars-parquet/src/parquet/write/page.rs b/crates/polars-parquet/src/parquet/write/page.rs index 1f02b5ba570b..22216b36025a 100644 --- a/crates/polars-parquet/src/parquet/write/page.rs +++ b/crates/polars-parquet/src/parquet/write/page.rs @@ -218,7 +218,7 @@ async fn write_page_header_async( #[cfg(test)] mod tests { use super::*; - use crate::parquet::read::CowBuffer; + use crate::parquet::CowBuffer; #[test] fn dict_too_large() { diff --git a/crates/polars-utils/Cargo.toml b/crates/polars-utils/Cargo.toml index c237d8d56a29..6eec2021a15f 100644 --- a/crates/polars-utils/Cargo.toml +++ b/crates/polars-utils/Cargo.toml @@ -15,6 +15,7 @@ ahash = { workspace = true } bytemuck = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } +memmap = { workspace = true, optional = true } num-traits = { workspace = true } once_cell = { workspace = true } raw-cpuid = { workspace = true } @@ -30,5 +31,6 @@ rand = { workspace = true } version_check = { workspace = true } [features] +mmap = ["memmap"] bigidx = [] nightly = [] diff --git a/crates/polars-utils/src/lib.rs b/crates/polars-utils/src/lib.rs index 9ae809f77458..d01c87c77e28 100644 --- a/crates/polars-utils/src/lib.rs +++ b/crates/polars-utils/src/lib.rs @@ -38,6 +38,8 @@ pub mod wasm; pub mod float; pub mod index; pub mod io; +#[cfg(feature = "mmap")] +pub mod mmap; pub mod nulls; pub mod ord; pub mod partitioned; diff --git a/crates/polars-utils/src/mem.rs b/crates/polars-utils/src/mem.rs index 1ff0bd11f6ff..0e2deaae58b6 100644 --- a/crates/polars-utils/src/mem.rs +++ b/crates/polars-utils/src/mem.rs @@ -6,3 +6,20 @@ pub unsafe fn to_mutable_slice(s: &[T]) -> &mut [T] { let len = s.len(); std::slice::from_raw_parts_mut(ptr, len) } + +/// # Safety +/// +/// This should only be called with pointers to valid memory. +pub unsafe fn prefetch_l2(ptr: *const u8) { + #[cfg(target_arch = "x86_64")] + { + use std::arch::x86_64::*; + unsafe { _mm_prefetch(ptr as *const _, _MM_HINT_T1) }; + } + + #[cfg(target_arch = "aarch64")] + { + use std::arch::aarch64::*; + unsafe { _prefetch(ptr as *const _, _PREFETCH_READ, _PREFETCH_LOCALITY2) }; + } +} diff --git a/crates/polars-utils/src/mmap.rs b/crates/polars-utils/src/mmap.rs new file mode 100644 index 000000000000..f005713cb7e0 --- /dev/null +++ b/crates/polars-utils/src/mmap.rs @@ -0,0 +1,300 @@ +use std::ops::Deref; +use std::sync::Arc; +use std::{fmt, io}; + +pub use memmap::Mmap; + +use crate::mem::prefetch_l2; + +/// A read-only slice over an [`Mmap`] +pub struct MmapSlice { + // We keep the Mmap around to ensure it is still valid. + mmap: Arc, + ptr: *const u8, + len: usize, +} + +/// A cursor over a [`MemSlice`]. +#[derive(Debug, Clone)] +pub struct MemReader { + data: MemSlice, + position: usize, +} + +/// A read-only reference to a slice of memory. +/// +/// This memory can either be heap-allocated or be mmap-ed into memory. +/// +/// This still owns the all the original memory and therefore should probably not be a long-lasting +/// structure. +#[derive(Clone)] +pub struct MemSlice(MemSliceInner); + +#[derive(Clone)] +enum MemSliceInner { + Mmap(MmapSlice), + Allocated(AllocatedSlice), +} + +#[derive(Clone)] +struct AllocatedSlice { + data: Arc<[u8]>, + start: usize, + end: usize, +} + +impl Deref for MmapSlice { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + // SAFETY: Invariant of MmapSlice + unsafe { std::slice::from_raw_parts(self.ptr, self.len) } + } +} + +impl Deref for MemSlice { + type Target = [u8]; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + match &self.0 { + MemSliceInner::Mmap(v) => v.deref(), + MemSliceInner::Allocated(v) => &v.data[v.start..v.end], + } + } +} + +impl Default for AllocatedSlice { + fn default() -> Self { + let slice: &[u8] = &[]; + Self { + data: Arc::from(slice), + start: 0, + end: 0, + } + } +} + +impl fmt::Debug for MemSlice { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_tuple("MemSlice").field(&self.deref()).finish() + } +} + +impl Default for MemSlice { + fn default() -> Self { + Self(MemSliceInner::Allocated(AllocatedSlice::default())) + } +} + +impl MemReader { + pub fn new(data: MemSlice) -> Self { + Self { data, position: 0 } + } + + #[inline(always)] + pub fn remaining_len(&self) -> usize { + self.data.len() - self.position + } + + #[inline(always)] + pub fn total_len(&self) -> usize { + self.data.len() + } + + #[inline(always)] + pub fn position(&self) -> usize { + self.position + } + + #[inline(always)] + pub fn from_slice(data: &[u8]) -> Self { + Self::new(MemSlice::from_slice(data)) + } + + #[inline(always)] + pub fn from_vec(data: Vec) -> Self { + Self::new(MemSlice::from_vec(data)) + } + + #[inline(always)] + pub fn from_reader(mut reader: R) -> io::Result { + let mut vec = Vec::new(); + reader.read_to_end(&mut vec)?; + Ok(Self::from_vec(vec)) + } + + #[inline(always)] + pub fn read_slice(&mut self, n: usize) -> MemSlice { + let start = self.position; + let end = usize::min(self.position + n, self.data.len()); + self.position = end; + self.data.slice(start, end) + } +} + +impl io::Read for MemReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let n = usize::min(buf.len(), self.remaining_len()); + buf[..n].copy_from_slice(&self.data[self.position..self.position + n]); + self.position += n; + Ok(n) + } +} + +impl io::Seek for MemReader { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let position = match pos { + io::SeekFrom::Start(position) => usize::min(position as usize, self.total_len()), + io::SeekFrom::End(offset) => { + let Some(position) = self.total_len().checked_add_signed(offset as isize) else { + return Err(io::Error::new( + io::ErrorKind::Other, + "Seek before to before buffer", + )); + }; + + position + }, + io::SeekFrom::Current(offset) => { + let Some(position) = self.position.checked_add_signed(offset as isize) else { + return Err(io::Error::new( + io::ErrorKind::Other, + "Seek before to before buffer", + )); + }; + + position + }, + }; + + self.position = position; + + Ok(position as u64) + } +} + +impl MemSlice { + #[inline(always)] + pub fn to_vec(self) -> Vec { + <[u8]>::to_vec(self.deref()) + } + + #[inline] + pub fn from_vec(v: Vec) -> Self { + let end = v.len(); + + Self(MemSliceInner::Allocated(AllocatedSlice { + data: v.into_boxed_slice().into(), + start: 0, + end, + })) + } + + #[inline] + pub fn from_slice(slice: &[u8]) -> Self { + let end = slice.len(); + Self(MemSliceInner::Allocated(AllocatedSlice { + data: slice.into(), + start: 0, + end, + })) + } + + /// Attempt to prefetch the memory belonging to to this [`MemSlice`] + #[inline] + pub fn prefetch(&self) { + if self.len() == 0 { + return; + } + + // @TODO: We can play a bit more with this prefetching. Maybe introduce a maximum number of + // prefetches as to not overwhelm the processor. The linear prefetcher should pick it up + // at a certain point. + + const PAGE_SIZE: usize = 4096; + for i in 0..self.len() / PAGE_SIZE { + unsafe { prefetch_l2(self[i * PAGE_SIZE..].as_ptr()) }; + } + unsafe { prefetch_l2(self[self.len() - 1..].as_ptr()) } + } + + #[inline] + pub fn from_mmap(mmap: MmapSlice) -> Self { + Self(MemSliceInner::Mmap(mmap)) + } + + #[inline] + #[track_caller] + pub fn slice(&self, start: usize, end: usize) -> Self { + Self(match &self.0 { + MemSliceInner::Mmap(v) => MemSliceInner::Mmap(v.slice(start, end)), + MemSliceInner::Allocated(v) => MemSliceInner::Allocated({ + let len = v.end - v.start; + + assert!(start <= end); + assert!(start <= len); + assert!(end <= len); + + AllocatedSlice { + data: v.data.clone(), + start: v.start + start, + end: v.start + end, + } + }), + }) + } +} + +// SAFETY: This structure is read-only and does not contain any non-sync or non-send data. +unsafe impl Sync for MmapSlice {} +unsafe impl Send for MmapSlice {} + +impl fmt::Debug for MmapSlice { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_tuple("MmapSlice").field(&self.deref()).finish() + } +} + +impl Clone for MmapSlice { + fn clone(&self) -> Self { + Self { + mmap: self.mmap.clone(), + ptr: self.ptr, + len: self.len, + } + } +} + +impl MmapSlice { + #[inline] + pub fn new(mmap: Mmap) -> Self { + let slice: &[u8] = &mmap; + + let ptr = slice as *const [u8] as *const u8; + let len = slice.len(); + + let mmap = Arc::new(mmap); + + Self { mmap, ptr, len } + } + + /// Take a slice of the current [`MmapSlice`] + #[inline] + #[track_caller] + pub fn slice(&self, start: usize, end: usize) -> Self { + assert!(start <= end); + assert!(start <= self.len()); + assert!(end <= self.len()); + + // SAFETY: Start and end are within the slice + let ptr = unsafe { self.ptr.add(start) }; + let len = end - start; + + Self { + mmap: self.mmap.clone(), + ptr, + len, + } + } +} diff --git a/crates/polars/tests/it/io/parquet/read/mod.rs b/crates/polars/tests/it/io/parquet/read/mod.rs index 3923ca8cf537..175ed27c723b 100644 --- a/crates/polars/tests/it/io/parquet/read/mod.rs +++ b/crates/polars/tests/it/io/parquet/read/mod.rs @@ -25,13 +25,14 @@ use polars_parquet::parquet::read::get_page_stream; #[cfg(feature = "async")] use polars_parquet::parquet::read::read_metadata_async; use polars_parquet::parquet::read::{ - get_column_iterator, get_field_columns, read_metadata, BasicDecompressor, MemReader, - MutStreamingIterator, State, + get_column_iterator, get_field_columns, read_metadata, BasicDecompressor, MutStreamingIterator, + State, }; use polars_parquet::parquet::schema::types::{GroupConvertedType, ParquetType}; use polars_parquet::parquet::schema::Repetition; use polars_parquet::parquet::types::int96_to_i64_ns; use polars_parquet::parquet::FallibleStreamingIterator; +use polars_utils::mmap::MemReader; use super::*; diff --git a/crates/polars/tests/it/io/parquet/write/binary.rs b/crates/polars/tests/it/io/parquet/write/binary.rs index c53afd0bbdbd..bb9abc62c258 100644 --- a/crates/polars/tests/it/io/parquet/write/binary.rs +++ b/crates/polars/tests/it/io/parquet/write/binary.rs @@ -3,10 +3,10 @@ use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::metadata::Descriptor; use polars_parquet::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page}; -use polars_parquet::parquet::read::CowBuffer; use polars_parquet::parquet::statistics::BinaryStatistics; use polars_parquet::parquet::types::ord_binary; use polars_parquet::parquet::write::WriteOptions; +use polars_parquet::parquet::CowBuffer; fn unzip_option(array: &[Option>]) -> ParquetResult<(Vec, Vec)> { // leave the first 4 bytes announcing the length of the def level diff --git a/crates/polars/tests/it/io/parquet/write/indexes.rs b/crates/polars/tests/it/io/parquet/write/indexes.rs index 83e13dc05bc7..4ddff73b5fb1 100644 --- a/crates/polars/tests/it/io/parquet/write/indexes.rs +++ b/crates/polars/tests/it/io/parquet/write/indexes.rs @@ -7,13 +7,13 @@ use polars_parquet::parquet::indexes::{ }; use polars_parquet::parquet::metadata::SchemaDescriptor; use polars_parquet::parquet::read::{ - read_columns_indexes, read_metadata, read_pages_locations, BasicDecompressor, - IndexedPageReader, MemReader, + read_columns_indexes, read_metadata, read_pages_locations, BasicDecompressor, IndexedPageReader, }; use polars_parquet::parquet::schema::types::{ParquetType, PhysicalType, PrimitiveType}; use polars_parquet::parquet::write::{ Compressor, DynIter, DynStreamingIterator, FileWriter, Version, WriteOptions, }; +use polars_utils::mmap::MemReader; use super::super::read::collect; use super::primitive::array_to_page_v1; diff --git a/crates/polars/tests/it/io/parquet/write/mod.rs b/crates/polars/tests/it/io/parquet/write/mod.rs index 3fe24a4801a0..52c534d370b9 100644 --- a/crates/polars/tests/it/io/parquet/write/mod.rs +++ b/crates/polars/tests/it/io/parquet/write/mod.rs @@ -14,7 +14,6 @@ use polars_parquet::parquet::compression::{BrotliLevel, CompressionOptions}; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::metadata::{Descriptor, SchemaDescriptor}; use polars_parquet::parquet::page::Page; -use polars_parquet::parquet::read::MemReader; use polars_parquet::parquet::schema::types::{ParquetType, PhysicalType}; use polars_parquet::parquet::statistics::Statistics; #[cfg(feature = "async")] @@ -23,6 +22,7 @@ use polars_parquet::parquet::write::{ Compressor, DynIter, DynStreamingIterator, FileWriter, Version, WriteOptions, }; use polars_parquet::read::read_metadata; +use polars_utils::mmap::MemReader; use primitive::array_to_page_v1; use super::{alltypes_plain, alltypes_statistics, Array}; diff --git a/crates/polars/tests/it/io/parquet/write/primitive.rs b/crates/polars/tests/it/io/parquet/write/primitive.rs index 305cced89d8b..044925c5bb11 100644 --- a/crates/polars/tests/it/io/parquet/write/primitive.rs +++ b/crates/polars/tests/it/io/parquet/write/primitive.rs @@ -3,10 +3,10 @@ use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::metadata::Descriptor; use polars_parquet::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page}; -use polars_parquet::parquet::read::CowBuffer; use polars_parquet::parquet::statistics::PrimitiveStatistics; use polars_parquet::parquet::types::NativeType; use polars_parquet::parquet::write::WriteOptions; +use polars_parquet::parquet::CowBuffer; fn unzip_option(array: &[Option]) -> ParquetResult<(Vec, Vec)> { // leave the first 4 bytes announcing the length of the def level