Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: clear metadata and project fields of ParquetRecordBatchStream::schema #5135

4 changes: 4 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ impl Iterator for ParquetRecordBatchReader {
}

impl RecordBatchReader for ParquetRecordBatchReader {
/// Returns the projected [`SchemaRef`] for reading the parquet file.
///
/// Note that the schema metadata will be stripped here. See
/// [`ParquetRecordBatchReaderBuilder::schema`] if the metadata is desired.
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
112 changes: 107 additions & 5 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use arrow_schema::{DataType, Schema, SchemaRef};

use crate::arrow::array_reader::{build_array_reader, RowGroups};
use crate::arrow::arrow_reader::{
Expand Down Expand Up @@ -385,13 +385,28 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
offset: self.offset,
};

// Ensure schema of ParquetRecordBatchStream respects projection, and does
// not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite correct in the case of a projection of a nested schema, its possible this isn't actually determined until the ArrayReader is constructed...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although https://docs.rs/parquet/latest/parquet/arrow/fn.parquet_to_arrow_field_levels.html

takes a ProjectionMask and should apply it already?

Copy link
Contributor Author

@Jefffrey Jefffrey Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit worried about this, as I couldn't find a straightforward way that the schema was constructed from ParquetField + ProjectionMask, as it seems done in the ArrayReader construction indeed.

Edit: wasn't aware of https://docs.rs/parquet/latest/parquet/arrow/fn.parquet_to_arrow_field_levels.html, will check it out 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a quick play, stand by

Some(DataType::Struct(fields)) => fields
.iter()
.enumerate()
.filter_map(|(idx, field)| {
self.projection.leaf_included(idx).then_some(field.clone())
})
.collect::<Vec<_>>(),
None => vec![],
_ => unreachable!("Must be Struct for root type"),
};
let schema = Arc::new(Schema::new(projected_fields));

Ok(ParquetRecordBatchStream {
metadata: self.metadata,
batch_size,
row_groups,
projection: self.projection,
selection: self.selection,
schema: self.schema,
schema,
reader: Some(reader),
state: StreamState::Init,
})
Expand Down Expand Up @@ -572,7 +587,10 @@ impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
}

impl<T> ParquetRecordBatchStream<T> {
/// Returns the [`SchemaRef`] for this parquet file
/// Returns the projected [`SchemaRef`] for reading the parquet file.
///
/// Note that the schema metadata will be stripped here. See
/// [`ParquetRecordBatchStreamBuilder::schema`] if the metadata is desired.
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
Expand Down Expand Up @@ -855,10 +873,14 @@ mod tests {
use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use arrow_array::{
Array, ArrayRef, Float32Array, Int32Array, Int8Array, RecordBatchReader, Scalar,
StringArray, StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Fields, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::sync::Mutex;
use tempfile::tempfile;

Expand Down Expand Up @@ -1584,6 +1606,86 @@ mod tests {
test_get_row_group_column_bloom_filter(data, false).await;
}

#[tokio::test]
async fn test_parquet_record_batch_stream_schema() {
let mut metadata = HashMap::with_capacity(1);
metadata.insert("key".to_string(), "value".to_string());

let schema = Arc::new(
Schema::new(Fields::from(vec![
Field::new("a", DataType::Int32, true),
Field::new("c", DataType::UInt64, true),
Field::new("d", DataType::Float32, true),
]))
.with_metadata(metadata.clone()),
);
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("a", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![-1, 1])) as ArrayRef,
),
(
Arc::new(Field::new("c", DataType::UInt64, true)),
Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef,
),
(
Arc::new(Field::new("d", DataType::Float32, true)),
Arc::new(Float32Array::from(vec![1.0, 2.0])) as ArrayRef,
),
]);
let record_batch = RecordBatch::from(struct_array)
.with_schema(schema.clone())
.unwrap();

// Write parquet with custom metadata in schema
let mut file = tempfile().unwrap();
let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
writer.write(&record_batch).unwrap();
writer.close().unwrap();

// Test projecting for [], [0], [0, 1], [0, 1, 2]
for num_projected in 0..schema.fields().len() {
let mask_indices = 0..num_projected;

let builder =
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
let sync_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), mask_indices.clone());
let mut reader = builder.with_projection(mask).build().unwrap();
let sync_reader_schema = reader.schema();
let batch = reader.next().unwrap().unwrap();
let sync_batch_schema = batch.schema();

// Builder schema should preserve all fields and metadata
assert_eq!(sync_builder_schema.fields.len(), schema.fields().len());
assert_eq!(sync_builder_schema.metadata, metadata);
// Reader & batch schema should show only projected fields, and no metadata
assert_eq!(sync_reader_schema.fields.len(), num_projected);
assert_eq!(sync_reader_schema.metadata, HashMap::default());
assert_eq!(sync_batch_schema.fields.len(), num_projected);
assert_eq!(sync_batch_schema.metadata, HashMap::default());

// Ensure parity with async implementation
let file = tokio::fs::File::from(file.try_clone().unwrap());
let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
let async_builder_schema = builder.schema().clone();
let mask = ProjectionMask::leaves(builder.parquet_schema(), mask_indices);
let mut reader = builder.with_projection(mask).build().unwrap();
let async_reader_schema = reader.schema().clone();
let batch = reader.next().await.unwrap().unwrap();
let async_batch_schema = batch.schema();

// Builder schema should preserve all fields and metadata
assert_eq!(async_builder_schema.fields.len(), schema.fields().len());
assert_eq!(async_builder_schema.metadata, metadata);
// Reader & batch schema should show only projected fields, and no metadata
assert_eq!(async_reader_schema.fields.len(), num_projected);
assert_eq!(async_reader_schema.metadata, HashMap::default());
assert_eq!(async_batch_schema.fields.len(), num_projected);
assert_eq!(async_batch_schema.metadata, HashMap::default());
}
}

#[tokio::test]
async fn test_get_row_group_column_bloom_filter_with_length() {
// convert to new parquet file with bloom_filter_length
Expand Down
Loading