Skip to content

Commit

Permalink
Set OrcConf.INCLUDE_COLUMNS for ORC reading (#4933)
Browse files Browse the repository at this point in the history
Closes #3026

Following SPARK-35783, set OrcConf.INCLUDE_COLUMNS.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
  • Loading branch information
sperlingxx authored Mar 19, 2022
1 parent e4c34c5 commit ae8f21b
Showing 1 changed file with 54 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,13 @@ private case class GpuOrcFileFilterHandler(
orcResultSchemaString(canPruneCols, dataSchema, readDataSchema, partitionSchema, conf)
assert(requestedColIds.length == readDataSchema.length,
"[BUG] requested column IDs do not match required schema")

// Following SPARK-35783, set requested columns as OrcConf. This setting may not make
// any difference. Just in case it might be important for the ORC methods called by us,
// either today or in the future.
val includeColumns = requestedColIds.filter(_ != -1).sorted.mkString(",")
conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, includeColumns)

// Only need to filter ORC's schema evolution if it cannot prune directly
val requestedMapping = if (canPruneCols) {
None
Expand Down Expand Up @@ -946,8 +953,18 @@ private case class GpuOrcFileFilterHandler(

def getOrcPartitionReaderContext: OrcPartitionReaderContext = {
val isCaseSensitive = readerOpts.getIsSchemaEvolutionCaseAware
val updatedReadSchema = checkSchemaCompatibility(orcReader.getSchema, readerOpts.getSchema,
isCaseSensitive)

// align include status with the read schema during the potential field prune
val readerOptInclude = readerOpts.getInclude match {
case null => Array.fill(readerOpts.getSchema.getMaximumId + 1)(true)
case a => a
}
val (updatedReadSchema, updatedInclude) = checkSchemaCompatibility(
orcReader.getSchema, readerOpts.getSchema, isCaseSensitive, readerOptInclude)
if (readerOpts.getInclude != null) {
readerOpts.include(updatedInclude)
}

val evolution = new SchemaEvolution(orcReader.getSchema, updatedReadSchema, readerOpts)
val (sargApp, sargColumns) = getSearchApplier(evolution,
orcFileReaderOpts.getUseUTCTimestamp,
Expand Down Expand Up @@ -1163,23 +1180,27 @@ private case class GpuOrcFileFilterHandler(
}

/**
* Check if the read schema is compatible with the file schema.
* Check if the read schema is compatible with the file schema. Meanwhile, recursively
* prune all incompatible required fields in terms of both ORC schema and include status.
*
* Only do the check for columns that can be found in the file schema, and
* the missing ones are ignored, since a null column will be added in the final
* output for each of them.
*
* It takes care of both the top and nested columns.
*
* @param fileSchema input file's ORC schema
* @param readSchema ORC schema for what will be read
* @param fileSchema input file's ORC schema
* @param readSchema ORC schema for what will be read
* @param isCaseAware true if field names are case-sensitive
* @return the pruned read schema, containing only columns also exist in the file schema.
* @param include Array[Boolean] represents whether a field of specific ID is included
* @return A tuple contains the pruned read schema and the pruned include status. For both
* return items, they only carry columns who also exist in the file schema.
*/
private def checkSchemaCompatibility(
fileSchema: TypeDescription,
readSchema: TypeDescription,
isCaseAware: Boolean): TypeDescription = {
isCaseAware: Boolean,
include: Array[Boolean]): (TypeDescription, Array[Boolean]) = {
assert(fileSchema.getCategory == readSchema.getCategory)
readSchema.getCategory match {
case TypeDescription.Category.STRUCT =>
Expand All @@ -1196,34 +1217,45 @@ private case class GpuOrcFileFilterHandler(
val readerChildren = readSchema.getChildren.asScala

val prunedReadSchema = TypeDescription.createStruct()
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
readerFieldNames.zip(readerChildren).foreach { case (readField, readType) =>
// Skip check for the missing names because a column with nulls will be added
// for each of them.
if (fileTypesMap.contains(readField)) {
prunedReadSchema.addField(readField,
checkSchemaCompatibility(fileTypesMap(readField), readType, isCaseAware))
val (newChild, childInclude) = checkSchemaCompatibility(
fileTypesMap(readField), readType, isCaseAware, include)
prunedReadSchema.addField(readField, newChild)
prunedInclude ++= childInclude
}
}
prunedReadSchema
prunedReadSchema -> prunedInclude.toArray
// Go into children for LIST, MAP, UNION to filter out the missing names
// for struct children.
case TypeDescription.Category.LIST =>
val newChild = checkSchemaCompatibility(fileSchema.getChildren.get(0),
readSchema.getChildren.get(0), isCaseAware)
TypeDescription.createList(newChild)
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
val (newChild, childInclude) = checkSchemaCompatibility(fileSchema.getChildren.get(0),
readSchema.getChildren.get(0), isCaseAware, include)
prunedInclude ++= childInclude
TypeDescription.createList(newChild) -> prunedInclude.toArray
case TypeDescription.Category.MAP =>
val newKey = checkSchemaCompatibility(fileSchema.getChildren.get(0),
readSchema.getChildren.get(0), isCaseAware)
val newValue = checkSchemaCompatibility(fileSchema.getChildren.get(1),
readSchema.getChildren.get(1), isCaseAware)
TypeDescription.createMap(newKey, newValue)
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
val (newKey, keyInclude) = checkSchemaCompatibility(fileSchema.getChildren.get(0),
readSchema.getChildren.get(0), isCaseAware, include)
val (newValue, valueInclude) = checkSchemaCompatibility(fileSchema.getChildren.get(1),
readSchema.getChildren.get(1), isCaseAware, include)
prunedInclude ++= keyInclude
prunedInclude ++= valueInclude
TypeDescription.createMap(newKey, newValue) -> prunedInclude.toArray
case TypeDescription.Category.UNION =>
val newUnion = TypeDescription.createUnion()
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
readSchema.getChildren.asScala.zip(fileSchema.getChildren.asScala)
.foreach { case(r, f) =>
newUnion.addUnionChild(checkSchemaCompatibility(f, r, isCaseAware))
.foreach { case (r, f) =>
val (newChild, childInclude) = checkSchemaCompatibility(f, r, isCaseAware, include)
newUnion.addUnionChild(newChild)
prunedInclude ++= childInclude
}
newUnion
newUnion -> prunedInclude.toArray
// Primitive types should be equal to each other.
case _ =>
if (!OrcShims.typeDescriptionEqual(fileSchema, readSchema)) {
Expand All @@ -1232,7 +1264,7 @@ private case class GpuOrcFileFilterHandler(
s" file schema: $fileSchema\n" +
s" read schema: $readSchema")
}
readSchema.clone()
readSchema.clone() -> Array(include(readSchema.getId))
}
}

Expand Down

0 comments on commit ae8f21b

Please sign in to comment.