Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set OrcConf.INCLUDE_COLUMNS for ORC reading #4933

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,13 @@ private case class GpuOrcFileFilterHandler(
orcResultSchemaString(canPruneCols, dataSchema, readDataSchema, partitionSchema, conf)
assert(requestedColIds.length == readDataSchema.length,
"[BUG] requested column IDs do not match required schema")

// Following SPARK-35783, set requested columns as OrcConf. This setting may not make
// any difference. Just in case it might be important for the ORC methods called by us,
// either today or in the future.
val includeColumns = requestedColIds.filter(_ != -1).sorted.mkString(",")
conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, includeColumns)

// Only need to filter ORC's schema evolution if it cannot prune directly
val requestedMapping = if (canPruneCols) {
None
Expand Down Expand Up @@ -946,8 +953,18 @@ private case class GpuOrcFileFilterHandler(

def getOrcPartitionReaderContext: OrcPartitionReaderContext = {
val isCaseSensitive = readerOpts.getIsSchemaEvolutionCaseAware
val updatedReadSchema = checkSchemaCompatibility(orcReader.getSchema, readerOpts.getSchema,
isCaseSensitive)

// align include status with the read schema during the potential field prune
val readerOptInclude = readerOpts.getInclude match {
case null => Array.fill(readerOpts.getSchema.getMaximumId + 1)(true)
case a => a
}
val (updatedReadSchema, updatedInclude) = checkSchemaCompatibility(
orcReader.getSchema, readerOpts.getSchema, isCaseSensitive, readerOptInclude)
if (readerOpts.getInclude != null) {
readerOpts.include(updatedInclude)
}

val evolution = new SchemaEvolution(orcReader.getSchema, updatedReadSchema, readerOpts)
val (sargApp, sargColumns) = getSearchApplier(evolution,
orcFileReaderOpts.getUseUTCTimestamp,
Expand Down Expand Up @@ -1163,23 +1180,27 @@ private case class GpuOrcFileFilterHandler(
}

/**
* Check if the read schema is compatible with the file schema.
* Check if the read schema is compatible with the file schema. Meanwhile, recursively
* prune all incompatible required fields in terms of both ORC schema and include status.
*
* Only do the check for columns that can be found in the file schema, and
* the missing ones are ignored, since a null column will be added in the final
* output for each of them.
*
* It takes care of both the top and nested columns.
*
* @param fileSchema input file's ORC schema
* @param readSchema ORC schema for what will be read
* @param fileSchema input file's ORC schema
* @param readSchema ORC schema for what will be read
* @param isCaseAware true if field names are case-sensitive
* @return the pruned read schema, containing only columns also exist in the file schema.
* @param include Array[Boolean] represents whether a field of specific ID is included
* @return A tuple contains the pruned read schema and the pruned include status. For both
* return items, they only carry columns who also exist in the file schema.
*/
private def checkSchemaCompatibility(
fileSchema: TypeDescription,
readSchema: TypeDescription,
isCaseAware: Boolean): TypeDescription = {
isCaseAware: Boolean,
include: Array[Boolean]): (TypeDescription, Array[Boolean]) = {
assert(fileSchema.getCategory == readSchema.getCategory)
readSchema.getCategory match {
case TypeDescription.Category.STRUCT =>
Expand All @@ -1196,34 +1217,45 @@ private case class GpuOrcFileFilterHandler(
val readerChildren = readSchema.getChildren.asScala

val prunedReadSchema = TypeDescription.createStruct()
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
readerFieldNames.zip(readerChildren).foreach { case (readField, readType) =>
// Skip check for the missing names because a column with nulls will be added
// for each of them.
if (fileTypesMap.contains(readField)) {
prunedReadSchema.addField(readField,
checkSchemaCompatibility(fileTypesMap(readField), readType, isCaseAware))
val (newChild, childInclude) = checkSchemaCompatibility(
fileTypesMap(readField), readType, isCaseAware, include)
prunedReadSchema.addField(readField, newChild)
prunedInclude ++= childInclude
}
}
prunedReadSchema
prunedReadSchema -> prunedInclude.toArray
// Go into children for LIST, MAP, UNION to filter out the missing names
// for struct children.
case TypeDescription.Category.LIST =>
val newChild = checkSchemaCompatibility(fileSchema.getChildren.get(0),
readSchema.getChildren.get(0), isCaseAware)
TypeDescription.createList(newChild)
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
val (newChild, childInclude) = checkSchemaCompatibility(fileSchema.getChildren.get(0),
readSchema.getChildren.get(0), isCaseAware, include)
prunedInclude ++= childInclude
TypeDescription.createList(newChild) -> prunedInclude.toArray
case TypeDescription.Category.MAP =>
val newKey = checkSchemaCompatibility(fileSchema.getChildren.get(0),
readSchema.getChildren.get(0), isCaseAware)
val newValue = checkSchemaCompatibility(fileSchema.getChildren.get(1),
readSchema.getChildren.get(1), isCaseAware)
TypeDescription.createMap(newKey, newValue)
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
val (newKey, keyInclude) = checkSchemaCompatibility(fileSchema.getChildren.get(0),
readSchema.getChildren.get(0), isCaseAware, include)
val (newValue, valueInclude) = checkSchemaCompatibility(fileSchema.getChildren.get(1),
readSchema.getChildren.get(1), isCaseAware, include)
prunedInclude ++= keyInclude
prunedInclude ++= valueInclude
TypeDescription.createMap(newKey, newValue) -> prunedInclude.toArray
case TypeDescription.Category.UNION =>
val newUnion = TypeDescription.createUnion()
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
readSchema.getChildren.asScala.zip(fileSchema.getChildren.asScala)
.foreach { case(r, f) =>
newUnion.addUnionChild(checkSchemaCompatibility(f, r, isCaseAware))
.foreach { case (r, f) =>
val (newChild, childInclude) = checkSchemaCompatibility(f, r, isCaseAware, include)
newUnion.addUnionChild(newChild)
prunedInclude ++= childInclude
}
newUnion
newUnion -> prunedInclude.toArray
// Primitive types should be equal to each other.
case _ =>
if (!OrcShims.typeDescriptionEqual(fileSchema, readSchema)) {
Expand All @@ -1232,7 +1264,7 @@ private case class GpuOrcFileFilterHandler(
s" file schema: $fileSchema\n" +
s" read schema: $readSchema")
}
readSchema.clone()
readSchema.clone() -> Array(include(readSchema.getId))
}
}

Expand Down