Skip to content

Commit

Permalink
Get started on mmap [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Jul 15, 2024
1 parent d4eb865 commit 082de50
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 44 deletions.
38 changes: 19 additions & 19 deletions crates/polars-parquet/src/parquet/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ pub use crate::parquet::thrift_format::{
DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, PageHeader as ParquetPageHeader,
};

pub enum PageResult {
pub enum PageResult<'a> {
Single(Page),
Two { dict: DictPage, data: DataPage },
Two { dict: DictPage<'a>, data: DataPage<'a> },
}

/// A [`CompressedDataPage`] is compressed, encoded representation of a Parquet data page.
/// It holds actual data and thus cloning it is expensive.
#[derive(Debug)]
pub struct CompressedDataPage {
pub struct CompressedDataPage<'a> {
pub(crate) header: DataPageHeader,
pub(crate) buffer: Vec<u8>,
pub(crate) buffer: Cow<'a, u8>,
pub(crate) compression: Compression,
uncompressed_page_size: usize,
pub(crate) descriptor: Descriptor,
Expand All @@ -28,11 +28,11 @@ pub struct CompressedDataPage {
pub(crate) selected_rows: Option<Vec<Interval>>,
}

impl CompressedDataPage {
impl CompressedDataPage<'a> {
/// Returns a new [`CompressedDataPage`].
pub fn new(
header: DataPageHeader,
buffer: Vec<u8>,
buffer: Cow<'a, u8>,
compression: Compression,
uncompressed_page_size: usize,
descriptor: Descriptor,
Expand Down Expand Up @@ -134,17 +134,17 @@ impl DataPageHeader {
/// A [`DataPage`] is an uncompressed, encoded representation of a Parquet data page. It holds actual data
/// and thus cloning it is expensive.
#[derive(Debug, Clone)]
pub struct DataPage {
pub struct DataPage<'a> {
pub(super) header: DataPageHeader,
pub(super) buffer: Vec<u8>,
pub(super) buffer: &'a [u8],
pub descriptor: Descriptor,
pub selected_rows: Option<Vec<Interval>>,
}

impl DataPage {
impl<'a> DataPage<'a> {
pub fn new(
header: DataPageHeader,
buffer: Vec<u8>,
buffer: &'a [u8],
descriptor: Descriptor,
rows: Option<usize>,
) -> Self {
Expand Down Expand Up @@ -260,9 +260,9 @@ impl Page {
/// and thus cloning it is expensive.
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum CompressedPage {
Data(CompressedDataPage),
Dict(CompressedDictPage),
pub enum CompressedPage<'a> {
Data(CompressedDataPage<'a>),
Dict(CompressedDictPage<'a>),
}

impl CompressedPage {
Expand Down Expand Up @@ -304,14 +304,14 @@ impl CompressedPage {

/// An uncompressed, encoded dictionary page.
#[derive(Debug)]
pub struct DictPage {
pub buffer: Vec<u8>,
pub struct DictPage<'a> {
pub buffer: &'a [u8],
pub num_values: usize,
pub is_sorted: bool,
}

impl DictPage {
pub fn new(buffer: Vec<u8>, num_values: usize, is_sorted: bool) -> Self {
impl<'a> DictPage<'a> {
pub fn new(buffer: &'a [u8], num_values: usize, is_sorted: bool) -> Self {
Self {
buffer,
num_values,
Expand All @@ -322,8 +322,8 @@ impl DictPage {

/// A compressed, encoded dictionary page.
#[derive(Debug)]
pub struct CompressedDictPage {
pub(crate) buffer: Vec<u8>,
pub struct CompressedDictPage<'a> {
pub(crate) buffer: Cow<'a, [u8]>,
compression: Compression,
pub(crate) num_values: usize,
pub(crate) uncompressed_page_size: usize,
Expand Down
9 changes: 5 additions & 4 deletions crates/polars-parquet/src/parquet/read/column/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::{Read, Seek};
use std::vec::IntoIter;

use super::page::ReadWithSlice;
use super::{get_field_columns, get_page_iterator, PageFilter, PageReader};
use crate::parquet::error::ParquetError;
use crate::parquet::metadata::{ColumnChunkMetaData, RowGroupMetaData};
Expand All @@ -18,7 +19,7 @@ mod stream;
/// For primitive fields (e.g. `i64`), [`ColumnIterator`] yields exactly one column.
/// For complex fields, it yields multiple columns.
/// `max_page_size` is the maximum number of bytes allowed.
pub fn get_column_iterator<R: Read + Seek>(
pub fn get_column_iterator<R: ReadWithSlice + Seek>(
reader: R,
row_group: &RowGroupMetaData,
field_name: &str,
Expand Down Expand Up @@ -53,7 +54,7 @@ pub trait MutStreamingIterator: Sized {

/// A [`MutStreamingIterator`] that reads column chunks one by one,
/// returning a [`PageReader`] per column.
pub struct ColumnIterator<R: Read + Seek> {
pub struct ColumnIterator<R: ReadWithSlice + Seek> {
reader: Option<R>,
columns: Vec<ColumnChunkMetaData>,
page_filter: Option<PageFilter>,
Expand All @@ -62,7 +63,7 @@ pub struct ColumnIterator<R: Read + Seek> {
max_page_size: usize,
}

impl<R: Read + Seek> ColumnIterator<R> {
impl<R: ReadWithSlice + Seek> ColumnIterator<R> {
/// Returns a new [`ColumnIterator`]
/// `max_page_size` is the maximum allowed page size
pub fn new(
Expand All @@ -84,7 +85,7 @@ impl<R: Read + Seek> ColumnIterator<R> {
}
}

impl<R: Read + Seek> MutStreamingIterator for ColumnIterator<R> {
impl<R: ReadWithSlice + Seek> MutStreamingIterator for ColumnIterator<R> {
type Item = (PageReader<R>, ColumnChunkMetaData);
type Error = ParquetError;

Expand Down
6 changes: 3 additions & 3 deletions crates/polars-parquet/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod page;
#[cfg(feature = "async")]
mod stream;

use std::io::{Read, Seek, SeekFrom};
use std::io::{Seek, SeekFrom};
use std::sync::Arc;

pub use column::*;
Expand All @@ -16,7 +16,7 @@ pub use indexes::{read_columns_indexes, read_pages_locations};
pub use metadata::{deserialize_metadata, read_metadata, read_metadata_with_size};
#[cfg(feature = "async")]
pub use page::{get_page_stream, get_page_stream_from_column_start};
pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader};
pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader, ReadWithSlice};
#[cfg(feature = "async")]
pub use stream::read_metadata as read_metadata_async;

Expand All @@ -41,7 +41,7 @@ pub fn filter_row_groups(
}

/// Returns a new [`PageReader`] by seeking `reader` to the beginning of `column_chunk`.
pub fn get_page_iterator<R: Read + Seek>(
pub fn get_page_iterator<R: ReadWithSlice + Seek>(
column_chunk: &ColumnChunkMetaData,
mut reader: R,
pages_filter: Option<PageFilter>,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/read/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod reader;
mod stream;

pub use indexed_reader::IndexedPageReader;
pub use reader::{PageFilter, PageMetaData, PageReader};
pub use reader::{PageFilter, PageMetaData, PageReader, ReadWithSlice};

use crate::parquet::error::ParquetError;
use crate::parquet::page::CompressedPage;
Expand Down
86 changes: 69 additions & 17 deletions crates/polars-parquet/src/parquet/read/page/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,61 @@ use crate::parquet::page::{
};
use crate::parquet::parquet_bridge::Encoding;

pub enum WithSlice<'a> {
Slice(&'a [u8]),
Buffered(usize),
}

impl<'a> WithSlice<'a> {
pub fn num_bytes_read(&self) -> usize {
match self {
WithSlice::Slice(s) => s.len(),
WithSlice::Buffered(n) => *n,
}
}

pub fn as_slice(&self, buffer: &[u8]) -> &[u8] {
match self {
WithSlice::Slice(s) => s,
WithSlice::Buffered(n) => &buffer[0..*n],
}
}
}

pub trait ReadWithSlice: Read {
fn read_with_slice<'a>(
&'a mut self,
buffer: &mut Vec<u8>,
read_size: usize,
) -> std::io::Result<WithSlice<'a>>;
}

impl<'a> ReadWithSlice for &'a [u8] {
fn read_with_slice<'b>(
&'b mut self,
_buffer: &mut Vec<u8>,
read_size: usize,
) -> std::io::Result<WithSlice<'b>> {
let (read, remaining) = self.split_at(read_size);
*self = remaining;
Ok(WithSlice::Slice(read))
}
}

impl<'a, T: AsRef<[u8]>> ReadWithSlice for std::io::Cursor<T> {
fn read_with_slice<'b>(
&'b mut self,
_buffer: &mut Vec<u8>,
read_size: usize,
) -> std::io::Result<WithSlice<'b>> {
use std::io::{SeekFrom, Seek};
let offset = self.position() as usize;
self.seek(SeekFrom::Start((offset + read_size) as u64))?;
let slice = &self.get_ref().as_ref()[offset..offset + read_size];
Ok(WithSlice::Slice(slice))
}
}

/// This meta is a small part of [`ColumnChunkMetaData`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageMetaData {
Expand Down Expand Up @@ -63,7 +118,7 @@ pub type PageFilter = Arc<dyn Fn(&Descriptor, &DataPageHeader) -> bool + Send +
/// The pages from this iterator always have [`None`] [`crate::parquet::page::CompressedDataPage::selected_rows()`] since
/// filter pushdown is not supported without a
/// pre-computed [page index](https://github.com/apache/parquet-format/blob/master/PageIndex.md).
pub struct PageReader<R: Read> {
pub struct PageReader<R: ReadWithSlice> {
// The source
reader: R,

Expand All @@ -86,7 +141,7 @@ pub struct PageReader<R: Read> {
max_page_size: usize,
}

impl<R: Read> PageReader<R> {
impl<R: ReadWithSlice> PageReader<R> {
/// Returns a new [`PageReader`].
///
/// It assumes that the reader has been `sought` (`seek`) to the beginning of `column`.
Expand Down Expand Up @@ -129,13 +184,13 @@ impl<R: Read> PageReader<R> {
}
}

impl<R: Read> PageIterator for PageReader<R> {
impl<R: ReadWithSlice> PageIterator for PageReader<R> {
fn swap_buffer(&mut self, scratch: &mut Vec<u8>) {
std::mem::swap(&mut self.scratch, scratch)
}
}

impl<R: Read> Iterator for PageReader<R> {
impl<R: ReadWithSlice> Iterator for PageReader<R> {
type Item = ParquetResult<CompressedPage>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down Expand Up @@ -170,7 +225,7 @@ pub(super) fn read_page_header<R: Read>(

/// This function is lightweight and executes a minimal amount of work so that it is IO bounded.
// Any un-necessary CPU-intensive tasks SHOULD be executed on individual pages.
fn next_page<R: Read>(
fn next_page<R: ReadWithSlice>(
reader: &mut PageReader<R>,
buffer: &mut Vec<u8>,
) -> ParquetResult<Option<CompressedPage>> {
Expand All @@ -180,7 +235,7 @@ fn next_page<R: Read>(
build_page(reader, buffer)
}

pub(super) fn build_page<R: Read>(
pub(super) fn build_page<R: ReadWithSlice>(
reader: &mut PageReader<R>,
buffer: &mut Vec<u8>,
) -> ParquetResult<Option<CompressedPage>> {
Expand All @@ -197,19 +252,16 @@ pub(super) fn build_page<R: Read>(
}

buffer.clear();
buffer.try_reserve(read_size)?;
let bytes_read = reader
.reader
.by_ref()
.take(read_size as u64)
.read_to_end(buffer)?;

if bytes_read != read_size {
let with_slice = reader.reader.read_with_slice(buffer, read_size)?;

if with_slice.num_bytes_read() != read_size {
return Err(ParquetError::oos(
"The page header reported the wrong page size",
));
}

let buffer = with_slice.as_slice(buffer);

finish_page(
page_header,
buffer,
Expand All @@ -220,13 +272,13 @@ pub(super) fn build_page<R: Read>(
.map(Some)
}

pub(super) fn finish_page(
pub(super) fn finish_page<'a>(
page_header: ParquetPageHeader,
data: &mut Vec<u8>,
data: &'a [u8],
compression: Compression,
descriptor: &Descriptor,
selected_rows: Option<Vec<Interval>>,
) -> ParquetResult<CompressedPage> {
) -> ParquetResult<CompressedPage<'a>> {
let type_ = page_header.type_.try_into()?;
let uncompressed_page_size = page_header.uncompressed_page_size.try_into()?;
match type_ {
Expand Down

0 comments on commit 082de50

Please sign in to comment.