Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set OrcConf.INCLUDE_COLUMNS for ORC reading #4933

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,12 @@ private case class GpuOrcFileFilterHandler(
val requestedMapping = if (canPruneCols) {
None
} else {
// Following SPARK-35783, set requested columns as OrcConf. This setting may not make
// any difference. Just in case it might be important for the ORC methods called by us,
// either today or in the future.
val includeColumns = requestedColIds.filter(_ != -1).sorted.mkString(",")
conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, includeColumns)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this is conditional on !canPruneCols? The equivalent Spark code setting this is not similarly conditional. We're calling buildOrcReader in either case which will examine the INCLUDE_COLUMNS setting, so it seems prudent to set it whether or not we can prune, minimally to help keep this code in sync with the Spark version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jlowe, following your question, I found that we should add this conf on the opposite conditional branch, which canPruneCols is TRUE. Because canPruneCols represents whether trusts ORC to prune columns or do it by ourselves.

@wbo4958 please correct me if I am wrong. Thank you!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also simplified the requestedColumnIds method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also simplified the requestedColumnIds method.

Please revert this change. The code was intentionally trying to be similar to the Apache Spark version. The same is true for the code calling this function.

I found that we should add this conf on the opposite conditional branch, which canPruneCols is TRUE. Because canPruneCols represents whether trusts ORC to prune columns or do it by ourselves.

IMO we need to do what Spark is doing with respect to when and how it sets up INCLUDE_COLUMNS. Looking at the Spark code, it is always setting INCLUDE_COLUMNS as long as requestedColPruneInfo is non-empty. We should do the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jlowe, I reverted it. But I found a bigger problem. It seems that the setting of OrcConf.INCLUDE_COLUMNS is NOT compatible with the schema pruning applied in checkSchemaCompatibility, which leads to the failure when constructing SchemaEvolution: https://github.com/NVIDIA/spark-rapids/blob/branch-22.04/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScanBase.scala#L951.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkSchemaCompatibility also is derived from Apache Spark, so does it's version too have issues with INCLUDE_COLUMNS when pruning is applied?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowe I failed to find checkSchemaCompatibility or similiar things in Apache Spark. Alternatively, I made some change on the checkSchemaCompatibility. Let it prune include status array by the field ID of pruned fields from read schema, which takes place simultaneously with the prune of read schema.


Some(requestedColIds)
}
val fullSchema = StructType(dataSchema ++ partitionSchema)
Expand Down