Skip to content

Commit

Permalink
Add round trip tests for reading/writing parquet metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 26, 2024
1 parent a2818bf commit 51516af
Showing 1 changed file with 143 additions and 0 deletions.
143 changes: 143 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,146 @@ pub fn parquet_column<'a>(
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}

#[cfg(test)]
mod test {
use crate::arrow::ArrowWriter;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;

#[test]
fn test_metadata_read_write_roundtrip() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
assert_ne!(
metadata_bytes.len(),
parquet_bytes.len(),
"metadata is subset of parquet"
);

let roundtrip_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&metadata_bytes)
.unwrap();

assert_eq!(original_metadata, roundtrip_metadata);
}

#[test]
fn test_metadata_read_write_roundtrip_page_index() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file including the page index structures
// (which are stored elsewhere in the footer)
let original_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();

// Need to normalize the metadata first to remove offsets in data
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}

#[test]
// Reproducer for https://github.com/apache/arrow-rs/issues/6464 (this should eventually pass)
#[should_panic(expected = "missing required field ColumnIndex.null_pages")]
fn test_metadata_read_write_partial_offset() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file WITHOUT the page index structures
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes requesting to read the offsets
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true) // there are no page indexes in the metadata
.parse_and_finish(&metadata_bytes)
.unwrap();

// Need to normalize the metadata first to remove offsets in data
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}

// TODO: test reading parquet bytes from serialized metadata

/// Write a parquet filed into an in memory buffer
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
let data = vec![100, 200, 201, 300, 102, 33];
let array: ArrayRef = Arc::new(Int32Array::from(data));
let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);

Bytes::from(buf)
}

/// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
let mut buf = vec![];
ParquetMetaDataWriter::new(&mut buf, metadata)
.finish()
.unwrap();
Bytes::from(buf)
}

/// Sets the page index offset locations in the metadata None
///
/// This is because the offsets are used to find the relative location of the index
/// structures, and thus differ depending on how the structures are stored.
fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
rg_builder = rg_builder.add_column_metadata(
col.into_builder()
.set_offset_index_offset(None)
.set_index_page_offset(None)
.set_column_index_offset(None)
.build()
.unwrap(),
);
}
let rg = rg_builder.build().unwrap();
metadata_builder = metadata_builder.add_row_group(rg);
}
metadata_builder.build()
}
}

0 comments on commit 51516af

Please sign in to comment.