Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add round trip tests for reading/writing parquet metadata #6463

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,82 @@ mod test {
);
}

#[test]
fn test_metadata_read_write_roundtrip() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
assert_ne!(
metadata_bytes.len(),
parquet_bytes.len(),
"metadata is subset of parquet"
);

let roundtrip_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&metadata_bytes)
.unwrap();

assert_eq!(original_metadata, roundtrip_metadata);
}

#[test]
fn test_metadata_read_write_roundtrip_page_index() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file including the page index structures
// (which are stored elsewhere in the footer)
let original_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();

// Need to normalize the metadata first to remove offsets in data
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}

/// Sets the page index offset locations in the metadata to `None`
///
/// This is because the offsets are used to find the relative location of the index
/// structures, and thus differ depending on how the structures are stored.
fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
rg_builder = rg_builder.add_column_metadata(
col.into_builder()
.set_offset_index_offset(None)
.set_index_page_offset(None)
.set_column_index_offset(None)
.build()
.unwrap(),
);
}
let rg = rg_builder.build().unwrap();
metadata_builder = metadata_builder.add_row_group(rg);
}
metadata_builder.build()
}

/// Write a parquet filed into an in memory buffer
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
Expand Down
295 changes: 0 additions & 295 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,298 +378,3 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
}
}
}

#[cfg(test)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed per @adriangb 's comment: #6463 (comment)

#[cfg(feature = "arrow")]
#[cfg(feature = "async")]
mod tests {
use std::sync::Arc;

use crate::file::metadata::{
ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter,
RowGroupMetaData,
};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::{
arrow::ArrowWriter,
file::{page_index::index::Index, serialized_reader::ReadOptionsBuilder},
};
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType as ArrowDataType, Field, Schema};
use bytes::{BufMut, Bytes, BytesMut};

struct TestMetadata {
#[allow(dead_code)]
file_size: usize,
metadata: ParquetMetaData,
}

fn has_page_index(metadata: &ParquetMetaData) -> bool {
match metadata.column_index() {
Some(column_index) => column_index
.iter()
.any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))),
None => false,
}
}

#[test]
fn test_roundtrip_parquet_metadata_without_page_index() {
// We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so
// we at least test round trip without them
let metadata = get_test_metadata(false, false);
assert!(!has_page_index(&metadata.metadata));

let mut buf = BytesMut::new().writer();
{
let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
writer.finish().unwrap();
}

let data = buf.into_inner().freeze();

let decoded_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
assert!(!has_page_index(&metadata.metadata));

assert_eq!(metadata.metadata, decoded_metadata);
}

fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata {
let mut buf = BytesMut::new().writer();
let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
"a",
ArrowDataType::Int32,
true,
)]));

// build row groups / pages that exercise different combinations of nulls and values
// note that below we set the row group and page sizes to 4 and 2 respectively
// so that these "groupings" make sense
let a: ArrayRef = Arc::new(Int32Array::from(vec![
// a row group that has all values
Some(i32::MIN),
Some(-1),
Some(1),
Some(i32::MAX),
// a row group with a page of all nulls and a page of all values
None,
None,
Some(2),
Some(3),
// a row group that has all null pages
None,
None,
None,
None,
// a row group having 1 page with all values and 1 page with some nulls
Some(4),
Some(5),
None,
Some(6),
// a row group having 1 page with all nulls and 1 page with some nulls
None,
None,
Some(7),
None,
// a row group having all pages with some nulls
None,
Some(8),
Some(9),
None,
]));

let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let writer_props_builder = match write_page_index {
true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
};

// tune the size or pages to the data above
// to make sure we exercise code paths where all items in a page are null, etc.
let writer_props = writer_props_builder
.set_max_row_group_size(4)
.set_data_page_row_count_limit(2)
.set_write_batch_size(2)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let data = buf.into_inner().freeze();

let reader_opts = match read_page_index {
true => ReadOptionsBuilder::new().with_page_index().build(),
false => ReadOptionsBuilder::new().build(),
};
let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap();
let metadata = reader.metadata().clone();
TestMetadata {
file_size: data.len(),
metadata,
}
}

/// Temporary function so we can test loading metadata with page indexes
/// while we haven't fully figured out how to load it cleanly
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
use crate::arrow::async_reader::MetadataFetch;
use crate::errors::Result as ParquetResult;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::ops::Range;

/// Adapt a `Bytes` to a `MetadataFetch` implementation.
struct AsyncBytes {
data: Bytes,
}

impl AsyncBytes {
fn new(data: Bytes) -> Self {
Self { data }
}
}

impl MetadataFetch for AsyncBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
async move { Ok(self.data.slice(range.start..range.end)) }.boxed()
}
}

/// A `MetadataFetch` implementation that reads from a subset of the full data
/// while accepting ranges that address the full data.
struct MaskedBytes {
inner: Box<dyn MetadataFetch + Send>,
inner_range: Range<usize>,
}

impl MaskedBytes {
fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: Range<usize>) -> Self {
Self { inner, inner_range }
}
}

impl MetadataFetch for &mut MaskedBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
let inner_range = self.inner_range.clone();
println!("inner_range: {:?}", inner_range);
println!("range: {:?}", range);
assert!(inner_range.start <= range.start && inner_range.end >= range.end);
let range =
range.start - self.inner_range.start..range.end - self.inner_range.start;
self.inner.fetch(range)
}
}

let metadata_length = data.len();
let mut reader = MaskedBytes::new(
Box::new(AsyncBytes::new(data)),
file_size - metadata_length..file_size,
);
ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut reader, file_size)
.await
.unwrap()
}

fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
assert_eq!(left.column_descr(), right.column_descr());
assert_eq!(left.encodings(), right.encodings());
assert_eq!(left.num_values(), right.num_values());
assert_eq!(left.compressed_size(), right.compressed_size());
assert_eq!(left.data_page_offset(), right.data_page_offset());
assert_eq!(left.statistics(), right.statistics());
assert_eq!(left.offset_index_length(), right.offset_index_length());
assert_eq!(left.column_index_length(), right.column_index_length());
assert_eq!(
left.unencoded_byte_array_data_bytes(),
right.unencoded_byte_array_data_bytes()
);
}

fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) {
assert_eq!(left.num_rows(), right.num_rows());
assert_eq!(left.file_offset(), right.file_offset());
assert_eq!(left.total_byte_size(), right.total_byte_size());
assert_eq!(left.schema_descr(), right.schema_descr());
assert_eq!(left.num_columns(), right.num_columns());
left.columns()
.iter()
.zip(right.columns().iter())
.for_each(|(lc, rc)| {
check_columns_are_equivalent(lc, rc);
});
}

#[tokio::test]
async fn test_encode_parquet_metadata_with_page_index() {
// Create a ParquetMetadata with page index information
let metadata = get_test_metadata(true, true);
assert!(has_page_index(&metadata.metadata));

let mut buf = BytesMut::new().writer();
{
let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
writer.finish().unwrap();
}

let data = buf.into_inner().freeze();

let decoded_metadata = load_metadata_from_bytes(data.len(), data).await;

// Because the page index offsets will differ, compare invariant parts of the metadata
assert_eq!(
metadata.metadata.file_metadata(),
decoded_metadata.file_metadata()
);
assert_eq!(
metadata.metadata.column_index(),
decoded_metadata.column_index()
);
assert_eq!(
metadata.metadata.offset_index(),
decoded_metadata.offset_index()
);
assert_eq!(
metadata.metadata.num_row_groups(),
decoded_metadata.num_row_groups()
);

// check that the mins and maxes are what we expect for each page
// also indirectly checking that the pages were written out as we expected them to be laid out
// (if they're not, or something gets refactored in the future that breaks that assumption,
// this test may have to drop down to a lower level and create metadata directly instead of relying on
// writing an entire file)
let column_indexes = metadata.metadata.column_index().unwrap();
assert_eq!(column_indexes.len(), 6);
// make sure each row group has 2 pages by checking the first column
// page counts for each column for each row group, should all be the same and there should be
// 12 pages in total across 6 row groups / 1 column
let mut page_counts = vec![];
for row_group in column_indexes {
for column in row_group {
match column {
Index::INT32(column_index) => {
page_counts.push(column_index.indexes.len());
}
_ => panic!("unexpected column index type"),
}
}
}
assert_eq!(page_counts, vec![2; 6]);

metadata
.metadata
.row_groups()
.iter()
.zip(decoded_metadata.row_groups().iter())
.for_each(|(left, right)| {
check_row_groups_are_equivalent(left, right);
});
}
}
Loading