Skip to content

Commit

Permalink
perf: Use mmap-ed memory if possible in Parquet reader (pola-rs#17725)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jul 19, 2024
1 parent 04714b9 commit b331538
Show file tree
Hide file tree
Showing 32 changed files with 421 additions and 243 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ indexmap = { version = "2", features = ["std"] }
itoa = "1.0.6"
itoap = { version = "1", features = ["simd"] }
memchr = "2.6"
memmap = { package = "memmap2", version = "0.7" }
multiversion = "0.7"
ndarray = { version = "0.15", default-features = false }
num-traits = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ polars-error = { workspace = true }
polars-json = { workspace = true, optional = true }
polars-parquet = { workspace = true, optional = true }
polars-time = { workspace = true, features = [], optional = true }
polars-utils = { workspace = true }
polars-utils = { workspace = true, features = ['mmap'] }

ahash = { workspace = true }
arrow = { workspace = true }
Expand All @@ -30,7 +30,7 @@ futures = { workspace = true, optional = true }
glob = { version = "0.3" }
itoa = { workspace = true, optional = true }
memchr = { workspace = true }
memmap = { package = "memmap2", version = "0.7" }
memmap = { workspace = true }
num-traits = { workspace = true }
object_store = { workspace = true, optional = true }
once_cell = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use memmap::Mmap;
use once_cell::sync::Lazy;
use polars_core::config::verbose;
use polars_error::{polars_bail, PolarsResult};
use polars_utils::mmap::{MemSlice, MmapSlice};

// Keep track of memory mapped files so we don't write to them while reading
// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
Expand Down Expand Up @@ -143,6 +144,16 @@ impl std::ops::Deref for ReaderBytes<'_> {
}
}

impl<'a> ReaderBytes<'a> {
pub fn into_mem_slice(self) -> MemSlice {
match self {
ReaderBytes::Borrowed(v) => MemSlice::from_slice(v),
ReaderBytes::Owned(v) => MemSlice::from_vec(v),
ReaderBytes::Mapped(v, _) => MemSlice::from_mmap(MmapSlice::new(v)),
}
}
}

impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> {
fn from(m: &'a mut T) -> Self {
match m.to_bytes() {
Expand Down
21 changes: 12 additions & 9 deletions crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::parquet::read::MemReader;
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
};
use polars_utils::mmap::{MemReader, MemSlice};

/// Store columns data in two scenarios:
/// 1. a local memory mapped file
Expand All @@ -21,8 +21,8 @@ use polars_parquet::read::{
/// b. asynchronously fetch them in parallel, for example using object_store
/// c. store the data in this data structure
/// d. when all the data is available deserialize on multiple threads, for example using rayon
pub enum ColumnStore<'a> {
Local(&'a [u8]),
pub enum ColumnStore {
Local(MemSlice),
#[cfg(feature = "async")]
Fetched(PlHashMap<u64, Bytes>),
}
Expand All @@ -33,7 +33,7 @@ pub(super) fn mmap_columns<'a>(
store: &'a ColumnStore,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> {
) -> Vec<(&'a ColumnChunkMetaData, MemSlice)> {
get_field_columns(columns, field_name)
.into_iter()
.map(|meta| _mmap_single_column(store, meta))
Expand All @@ -43,18 +43,18 @@ pub(super) fn mmap_columns<'a>(
fn _mmap_single_column<'a>(
store: &'a ColumnStore,
meta: &'a ColumnChunkMetaData,
) -> (&'a ColumnChunkMetaData, &'a [u8]) {
) -> (&'a ColumnChunkMetaData, MemSlice) {
let (start, len) = meta.byte_range();
let chunk = match store {
ColumnStore::Local(file) => &file[start as usize..(start + len) as usize],
ColumnStore::Local(mem_slice) => mem_slice.slice(start as usize, (start + len) as usize),
#[cfg(all(feature = "async", feature = "parquet"))]
ColumnStore::Fetched(fetched) => {
let entry = fetched.get(&start).unwrap_or_else(|| {
panic!(
"mmap_columns: column with start {start} must be prefetched in ColumnStore.\n"
)
});
entry.as_ref()
MemSlice::from_slice(entry.as_ref())
},
};
(meta, chunk)
Expand All @@ -63,7 +63,7 @@ fn _mmap_single_column<'a>(
// similar to arrow2 serializer, except this accepts a slice instead of a vec.
// this allows us to memory map
pub(super) fn to_deserializer<'a>(
columns: Vec<(&ColumnChunkMetaData, &'a [u8])>,
columns: Vec<(&ColumnChunkMetaData, MemSlice)>,
field: Field,
num_rows: usize,
chunk_size: Option<usize>,
Expand All @@ -73,8 +73,11 @@ pub(super) fn to_deserializer<'a>(
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
// Advise fetching the data for the column chunk
chunk.prefetch();

let pages = PageReader::new(
MemReader::from_slice(chunk),
MemReader::new(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
Expand Down
10 changes: 7 additions & 3 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData};
use polars_utils::mmap::MemSlice;
use rayon::prelude::*;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -446,8 +447,7 @@ pub fn read_parquet<R: MmapBytesReader>(
}

let reader = ReaderBytes::from(&mut reader);
let bytes = reader.deref();
let store = mmap::ColumnStore::Local(bytes);
let store = mmap::ColumnStore::Local(reader.into_mem_slice());

let dfs = rg_to_dfs(
&store,
Expand Down Expand Up @@ -492,8 +492,12 @@ impl FetchRowGroupsFromMmapReader {
let reader_bytes = get_reader_bytes(reader_ptr)?;
Ok(FetchRowGroupsFromMmapReader(reader_bytes))
}

fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
Ok(mmap::ColumnStore::Local(self.0.deref()))
// @TODO: we can something smarter here with mmap
Ok(mmap::ColumnStore::Local(MemSlice::from_slice(
self.0.deref(),
)))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ futures = { workspace = true, optional = true }
num-traits = { workspace = true }
polars-compute = { workspace = true }
polars-error = { workspace = true }
polars-utils = { workspace = true }
polars-utils = { workspace = true, features = ["mmap"] }
simdutf8 = { workspace = true }

parquet-format-safe = "0.2"
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ mod utils;
use arrow::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, MapArray};
use arrow::datatypes::{ArrowDataType, Field, IntervalUnit};
use arrow::offset::Offsets;
use polars_utils::mmap::MemReader;
use simple::page_iter_to_arrays;

pub use self::nested_utils::{init_nested, InitNested, NestedArrayIter, NestedState};
pub use self::struct_::StructIterator;
use super::*;
use crate::parquet::read::{get_page_iterator as _get_page_iterator, MemReader};
use crate::parquet::read::get_page_iterator as _get_page_iterator;
use crate::parquet::schema::types::PrimitiveType;

/// Creates a new iterator of compressed pages.
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,11 @@ mod tests {
use super::iter_to_arrays;
use crate::parquet::encoding::Encoding;
use crate::parquet::error::ParquetError;
#[allow(unused_imports)]
use crate::parquet::fallible_streaming_iterator;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::{PhysicalType, PrimitiveType};
#[allow(unused_imports)]
use crate::parquet::{fallible_streaming_iterator, CowBuffer};

#[test]
fn limit() {
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-parquet/src/arrow/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ use arrow::array::Array;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatchT;
use polars_error::PolarsResult;
use polars_utils::mmap::MemReader;

use super::{ArrayIter, RowGroupMetaData};
use crate::arrow::read::column_iter_to_arrays;
use crate::parquet::indexes::FilteredPage;
use crate::parquet::metadata::ColumnChunkMetaData;
use crate::parquet::read::{
BasicDecompressor, IndexedPageReader, MemReader, PageMetaData, PageReader,
};
use crate::parquet::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader};

/// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into
/// an iterator of [`RecordBatchT`].
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::arrow::write::{slice_nested_leaf, utils};
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DictPage, Page};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::ParquetStatistics;
use crate::parquet::CowBuffer;
use crate::write::DynIter;

pub(crate) fn encode_as_dictionary_optional(
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::ParquetStatistics;
use crate::parquet::CowBuffer;

fn encode_iter_v1<I: Iterator<Item = bool>>(buffer: &mut Vec<u8>, iter: I) -> PolarsResult<()> {
buffer.extend_from_slice(&[0; 4]);
Expand Down
34 changes: 34 additions & 0 deletions crates/polars-parquet/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ pub mod statistics;
pub mod types;
pub mod write;

use std::ops::Deref;

use parquet_format_safe as thrift_format;
use polars_utils::mmap::MemSlice;
pub use streaming_decompression::{fallible_streaming_iterator, FallibleStreamingIterator};

pub const HEADER_SIZE: u64 = PARQUET_MAGIC.len() as u64;
Expand All @@ -24,3 +27,34 @@ pub const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];

/// The number of bytes read at the end of the parquet file on first read
const DEFAULT_FOOTER_READ_SIZE: u64 = 64 * 1024;

/// A copy-on-write buffer over bytes
#[derive(Debug, Clone)]
pub enum CowBuffer {
Borrowed(MemSlice),
Owned(Vec<u8>),
}

impl Deref for CowBuffer {
type Target = [u8];

#[inline(always)]
fn deref(&self) -> &Self::Target {
match self {
CowBuffer::Borrowed(v) => v.deref(),
CowBuffer::Owned(v) => v.deref(),
}
}
}

impl CowBuffer {
pub fn to_mut(&mut self) -> &mut Vec<u8> {
match self {
CowBuffer::Borrowed(v) => {
*self = Self::Owned(v.clone().to_vec());
self.to_mut()
},
CowBuffer::Owned(v) => v,
}
}
}
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/page/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::read::CowBuffer;
use super::CowBuffer;
use crate::parquet::compression::Compression;
use crate::parquet::encoding::{get_length, Encoding};
use crate::parquet::error::{ParquetError, ParquetResult};
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-parquet/src/parquet/read/compression.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use parquet_format_safe::DataPageHeaderV2;

use super::page::PageIterator;
use super::CowBuffer;
use crate::parquet::compression::{self, Compression};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{CompressedPage, DataPage, DataPageHeader, DictPage, Page};
use crate::parquet::FallibleStreamingIterator;
use crate::parquet::{CowBuffer, FallibleStreamingIterator};

fn decompress_v1(
compressed: &[u8],
Expand Down
6 changes: 2 additions & 4 deletions crates/polars-parquet/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ pub use indexes::{read_columns_indexes, read_pages_locations};
pub use metadata::{deserialize_metadata, read_metadata, read_metadata_with_size};
#[cfg(feature = "async")]
pub use page::{get_page_stream, get_page_stream_from_column_start};
pub use page::{
CowBuffer, IndexedPageReader, MemReader, MemReaderSlice, PageFilter, PageIterator,
PageMetaData, PageReader,
};
pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader};
use polars_utils::mmap::MemReader;
#[cfg(feature = "async")]
pub use stream::read_metadata as read_metadata_async;

Expand Down
6 changes: 3 additions & 3 deletions crates/polars-parquet/src/parquet/read/page/indexed_reader.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::VecDeque;
use std::io::{Seek, SeekFrom};

use super::memreader::MemReader;
use polars_utils::mmap::{MemReader, MemSlice};

use super::reader::{finish_page, read_page_header, PageMetaData};
use super::MemReaderSlice;
use crate::parquet::error::ParquetError;
use crate::parquet::indexes::{FilteredPage, Interval};
use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor};
Expand Down Expand Up @@ -44,7 +44,7 @@ fn read_page(
reader: &mut MemReader,
start: u64,
length: usize,
) -> Result<(ParquetPageHeader, MemReaderSlice), ParquetError> {
) -> Result<(ParquetPageHeader, MemSlice), ParquetError> {
// seek to the page
reader.seek(SeekFrom::Start(start))?;

Expand Down
Loading

0 comments on commit b331538

Please sign in to comment.