Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: clear metadata and project fields of ParquetRecordBatchStream::schema #5135

Conversation

Jefffrey
Copy link
Contributor

Which issue does this PR close?

Closes #4023

Rationale for this change

What changes are included in this PR?

Ensure metadata hashmap of Schema from ParquetRecordBatchStream::schema is empty, to ensure exact same schema as for its RecordBatches it outputs.

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label Nov 28, 2023
@tustvold
Copy link
Contributor

Whilst I agree with the spirit of this, stripping this metadata would be a major breaking change that would definitely break IOx and possibly some other workloads. Perhaps we could just document this potential disparity?

@Jefffrey
Copy link
Contributor Author

Whilst I agree with the spirit of this, stripping this metadata would be a major breaking change that would definitely break IOx and possibly some other workloads. Perhaps we could just document this potential disparity?

No problem, updated the docstring

Comment on lines 577 to 579
/// Note that unlike its synchronous counterpart [`ParquetRecordBatchReader`], the [`SchemaRef`]
/// returned here will contain the original metadata, whereas [`ParquetRecordBatchReader`]
/// strips this metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure about this, they use the same logic? The difference is that the returned RecordBatch lack the metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see how schema is created for ParquetRecordBatchReader here:

schema: Arc::new(Schema::new(levels.fields.clone())),

And here:

let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
};

They only account for the fields, ignoring the metadata.

@tustvold
Copy link
Contributor

tustvold commented Nov 28, 2023

Ok sorry for going back and forth on this, I hadn't quite grasped what the issue here was

#[tokio::test]
async fn test_projection_schema() {
    let mut metadata = HashMap::with_capacity(1);
    metadata.insert("key".to_string(), "value".to_string());

    let schema = Arc::new(
        Schema::new(Fields::from(vec![
            Field::new("a", DataType::Int32, true),
            Field::new("c", DataType::UInt64, true),
            Field::new("d", DataType::Float32, true),
        ]))
        .with_metadata(metadata.clone()),
    );

    let mut file = tempfile().unwrap();
    let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
    writer
        .write(&RecordBatch::new_empty(schema.clone()))
        .unwrap();
    writer.close().unwrap();

    let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
    let sync_file_schema = builder.schema().clone();
    let mask = ProjectionMask::leaves(&builder.parquet_schema(), [1, 2]);
    let reader = builder.with_projection(mask).build().unwrap();

    let sync_reader_schema = reader.schema();

    assert_eq!(sync_file_schema.fields.len(), 3);
    assert_eq!(sync_file_schema.metadata, metadata);
    assert_eq!(sync_reader_schema.fields.len(), 2);
    assert_eq!(sync_reader_schema.metadata, HashMap::default());

    let file = tokio::fs::File::from(file);
    let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap();
    let async_file_schema = builder.schema().clone();
    let mask = ProjectionMask::leaves(&builder.parquet_schema(), [1, 2]);
    let reader = builder.with_projection(mask).build().unwrap();

    let async_reader_schema = reader.schema();

    assert_eq!(async_file_schema.fields.len(), 3);
    assert_eq!(async_file_schema.metadata, metadata);
    assert_eq!(async_reader_schema.fields.len(), 2); // Currently fails
    assert_eq!(async_reader_schema.metadata, HashMap::default()); // Currently fails
}

I think demonstrates the issue, in particular the schema returned by ParquetRecordBatchStream is incorrect, it should return the projected schema with the metadata removed.

@Jefffrey
Copy link
Contributor Author

I'll revert back to the initial change, as well as account for projection which I wasn't aware it didn't account for either

@tustvold
Copy link
Contributor

I'll revert back to the initial change, as well as account for projection which I wasn't aware it didn't account for either

I think if you determine the schema from the ParquetField on ReaderFactory (which should be a DataType::StructArray) I think you might be able to get both in one

@Jefffrey
Copy link
Contributor Author

Re-implemented the metadata stripping for ParquetRecordBatchStream::schema and also ensure it respects the projected columns

@@ -385,13 +385,28 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
offset: self.offset,
};

// Ensure schema of ParquetRecordBatchStream respects projection, and does
// not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches)
let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite correct in the case of a projection of a nested schema, its possible this isn't actually determined until the ArrayReader is constructed...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although https://docs.rs/parquet/latest/parquet/arrow/fn.parquet_to_arrow_field_levels.html

takes a ProjectionMask and should apply it already?

Copy link
Contributor Author

@Jefffrey Jefffrey Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit worried about this, as I couldn't find a straightforward way that the schema was constructed from ParquetField + ProjectionMask, as it seems done in the ArrayReader construction indeed.

Edit: wasn't aware of https://docs.rs/parquet/latest/parquet/arrow/fn.parquet_to_arrow_field_levels.html, will check it out 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a quick play, stand by

@Jefffrey
Copy link
Contributor Author

I've reworked it to make use of the function introduced by #5149

Hopefully I've used it correctly here

@Jefffrey Jefffrey changed the title Parquet: clear metadata of ParquetRecordBatchStream::schema Parquet: clear metadata and project fields of ParquetRecordBatchStream::schema Nov 30, 2023
@tustvold tustvold merged commit a36bf7a into apache:master Dec 5, 2023
17 checks passed
@Jefffrey Jefffrey deleted the clear_parquet_record_batch_stream_schema_metadata branch December 5, 2023 12:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ParquetRecordBatchStream Should Return the Projected Schema
2 participants