Skip to content

Commit

Permalink
Example of reading and writing parquet metadata outside the file
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 7, 2024
1 parent c039762 commit a112ae4
Show file tree
Hide file tree
Showing 3 changed files with 292 additions and 2 deletions.
6 changes: 6 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ sysinfo = ["dep:sysinfo"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["dep:crc32fast"]


[[example]]
name = "external_metadata"
required-features = ["arrow", "async"]
path = "./examples/external_metadata.rs"

[[example]]
name = "read_parquet"
required-features = ["arrow"]
Expand Down
241 changes: 241 additions & 0 deletions parquet/examples/external_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_cast::pretty::pretty_format_batches;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;

/// This example demonstrates advanced usage of the Parquet metadata APIs.
///
/// For example, you can copy the metadata for parquet files stored on remote
/// object storage (e.g. S3) to a local file or an in-memory cache, use a query
/// engine like DataFusion to analyze the metadata to determine which file to
/// read, and then read any matching files with a single object store request.
///
/// # Usecase
///
/// 1. Read Parquet metadata from Parquet file stored on a remote source
///
/// 2. Store that metadata somewhere "locally" that is faster to access than
/// the rest of a parquet data (e.g. a cache).
///
/// 3. Use the metadata to determine of the file should be read, and if so, read
/// the remote Parquet file, without re-reading / reparsing the metadata footer.
///
/// # Example Overview
/// 1. Reads the metadata of a Parquet file
///
/// 2. Removes some column statistics from the metadata (to make them smaller)
///
/// 3. Stores the metadata in a separate file
///
/// 4. Reads the metadata from the separate file and uses that to read the
/// Parquet file
#[tokio::main(flavor = "current_thread")]
async fn main() -> parquet::errors::Result<()> {
let tempdir = TempDir::new().unwrap();
let parquet_path = create_parquet_file(&tempdir);
let metadata_path = tempdir.path().join("thrift_metadata.dat");

// In this example, we use a tokio file to mimic an async remote data source
let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?;

let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await;
println!(
"Metadata from 'remote' Parquet file into memory: {} bytes",
metadata.memory_size()
);

// now slim down the metadata and write it to a "local" file
let metadata = prepare_metadata(metadata);
write_metadata_to_local_file(metadata, &metadata_path);

// now read the metadata from the local file and use it to read the "remote" Parquet file
let metadata = read_metadata_from_local_file(&metadata_path);
println!("Read metadata from file");

let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await;

// display the results
let batches_string = pretty_format_batches(&batches).unwrap().to_string();
let batches_lines: Vec<_> = batches_string.split('\n').collect();

assert_eq!(
batches_lines,
[
"+-----+-------------+",
"| id | description |",
"+-----+-------------+",
"| 100 | oranges |",
"| 200 | apples |",
"| 201 | grapefruit |",
"| 300 | bannanas |",
"| 102 | grapes |",
"| 33 | pears |",
"+-----+-------------+",
],
"actual output:\n\n{batches_lines:#?}"
);

Ok(())
}

/// Reads the metadata from a "remote" parquet file
///
/// Note that this function models reading from a remote file source using a
/// tokio file. In a real application, you would implement [`MetadataFetch`] for
/// your own remote source.
///
/// [`MetadataFetch`]: parquet::arrow::async_reader::MetadataFetch
async fn get_metadata_from_remote_parquet_file(
remote_file: &mut tokio::fs::File,
) -> ParquetMetaData {
// the remote source must know the total file size (e.g. from an object store LIST operation)
let file_size = remote_file.metadata().await.unwrap().len();

// tell the reader to read the page index
ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(remote_file, file_size as usize)
.await
.unwrap()
}

/// modifies the metadata to reduce its size
fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData {
let orig_size = metadata.memory_size();

let mut builder = metadata.into_builder();

// remove column statistics to reduce the size of the metadata by converting
// the various structures into their respective builders and modifying them
// as needed.
for row_group in builder.take_row_groups() {
let mut row_group_builder = row_group.into_builder();
for column in row_group_builder.take_columns() {
let column = column.into_builder().clear_statistics().build().unwrap();
row_group_builder = row_group_builder.add_column_metadata(column);
}
let row_group = row_group_builder.build().unwrap();
builder = builder.add_row_group(row_group);
}
let metadata = builder.build();

// verifiy that the size has indeed been reduced
let new_size = metadata.memory_size();
assert!(new_size < orig_size, "metadata size did not decrease");
println!("Reduced metadata size from {} to {}", orig_size, new_size);
metadata
}

/// writes the metadata to a file
///
/// The data is stored using the same thrift format as the Parquet file metadata
fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path>) {
let file = File::create(file).unwrap();
ParquetMetaDataWriter::new(file, &metadata)
.finish()
.unwrap()
}

/// Reads the metadata from a file
///
/// This function reads the format written by `write_metadata_to_file`
fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData {
let file = File::open(file).unwrap();
ParquetMetaDataReader::new()
.with_column_indexes(true)
.with_offset_indexes(true)
.parse_and_finish(&file)
.unwrap()
}

/// Reads the "remote" Parquet file using the metadata
///
/// This shows how to read the Parquet file using previously read metadata
/// instead of the metadata in the Parquet file itself. This avoids an IO /
/// having to fetch and decode the metadata from the Parquet file before
/// beginning to read it.
///
/// Note that this function models reading from a remote file source using a
/// tokio file. In a real application, you would implement [`AsyncFileReader`]
/// for your own remote source.
///
/// In this example, we simply read the results as Arrow record batches
///
/// [`AsyncFileReader`]: parquet::arrow::async_reader::AsyncFileReader
async fn read_remote_parquet_file_with_metadata(
remote_file: tokio::fs::File,
metadata: ParquetMetaData,
) -> Vec<RecordBatch> {
let options = ArrowReaderOptions::new()
// tell the reader to read the page index
.with_page_index(true);
// create a reader with pre-existing metadata
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(remote_file, arrow_reader_metadata)
.build()
.unwrap();

reader.try_collect::<Vec<_>>().await.unwrap()
}

/// Make a new parquet file in the temporary directory, and returns the path
fn create_parquet_file(tmpdir: &TempDir) -> PathBuf {
let path = tmpdir.path().join("example.parquet");
let new_file = File::create(&path).unwrap();

let batch = RecordBatch::try_from_iter(vec![
(
"id",
Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef,
),
(
"description",
Arc::new(StringArray::from(vec![
"oranges",
"apples",
"grapefruit",
"bannanas",
"grapes",
"pears",
])),
),
])
.unwrap();

let props = WriterProperties::builder()
// ensure we write the page index level statistics
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap();

writer.write(&batch).unwrap();
writer.finish().unwrap();

path
}
47 changes: 45 additions & 2 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,11 +1209,30 @@ impl ColumnChunkMetaData {
}
}

/// Builder for column chunk metadata.
/// Builder for [`ColumnChunkMetaData`]
///
/// This builder is used to create a new column chunk metadata or modify an
/// existing one.
///
/// # Example
/// ```no_run
/// # use parquet::file::metadata::{ColumnChunkMetaData, ColumnChunkMetaDataBuilder};
/// # fn get_column_chunk_metadata() -> ColumnChunkMetaData { unimplemented!(); }
/// let column_chunk_metadata = get_column_chunk_metadata();
/// // create a new builder from existing column chunk metadata
/// let builder = ColumnChunkMetaDataBuilder::from(column_chunk_metadata);
/// // clear the statistics:
/// let column_chunk_metadata: ColumnChunkMetaData = builder
/// .clear_statistics()
/// .build()
/// .unwrap();
/// ```
pub struct ColumnChunkMetaDataBuilder(ColumnChunkMetaData);

impl ColumnChunkMetaDataBuilder {
/// Creates new column chunk metadata builder.
///
/// See also [`ColumnChunkMetaData::builder`]
fn new(column_descr: ColumnDescPtr) -> Self {
Self(ColumnChunkMetaData {
column_descr,
Expand Down Expand Up @@ -1297,12 +1316,18 @@ impl ColumnChunkMetaDataBuilder {
self
}

/// Sets optional dictionary page ofset in bytes.
/// Sets optional dictionary page offset in bytes.
pub fn set_dictionary_page_offset(mut self, value: Option<i64>) -> Self {
self.0.dictionary_page_offset = value;
self
}

/// Clears the dictionary page offset in bytes
pub fn clear_dictionary_page_offset(mut self) -> Self {
self.0.dictionary_page_offset = None;
self
}

/// Sets optional index page offset in bytes.
pub fn set_index_page_offset(mut self, value: Option<i64>) -> Self {
self.0.index_page_offset = value;
Expand All @@ -1315,12 +1340,24 @@ impl ColumnChunkMetaDataBuilder {
self
}

/// Clears the statistics for this column chunk.
pub fn clear_statistics(mut self) -> Self {
self.0.statistics = None;
self
}

/// Sets page encoding stats for this column chunk.
pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
self.0.encoding_stats = Some(value);
self
}

/// Clears the page encoding stats for this column chunk.
pub fn clear_page_encoding_stats(mut self) -> Self {
self.0.encoding_stats = None;
self
}

/// Sets optional bloom filter offset in bytes.
pub fn set_bloom_filter_offset(mut self, value: Option<i64>) -> Self {
self.0.bloom_filter_offset = value;
Expand Down Expand Up @@ -1492,6 +1529,12 @@ impl ColumnIndexBuilder {
}
}

impl From<ColumnChunkMetaData> for ColumnChunkMetaDataBuilder {
fn from(value: ColumnChunkMetaData) -> Self {
ColumnChunkMetaDataBuilder(value)
}
}

/// Builder for offset index, part of the Parquet [PageIndex].
///
/// [PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
Expand Down

0 comments on commit a112ae4

Please sign in to comment.