Skip to content

Commit

Permalink
Fix distributed cache to read requested schema (#2235)
Browse files Browse the repository at this point in the history
* broadcast AllConfs instead of SQLConf

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* Fixed scala unit tests

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* extracted method

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* addressed review comments

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* Fixed a bug where the selected attributes were in a different order than the stored cache

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* added comment for missing columns

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* addressed review comments

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed scala test

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* Apply suggestions from code review

Co-authored-by: Jason Lowe <jlowe@nvidia.com>

Co-authored-by: Raza Jafri <rjafri@nvidia.com>
Co-authored-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
3 people authored Apr 26, 2021
1 parent 7264ce7 commit 4f2d6e8
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 32 deletions.
21 changes: 21 additions & 0 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,27 @@ def test_cache_partial_load(data_gen, enableVectorizedConf):
.limit(50).select(f.col("b")), enableVectorizedConf
)

@allow_non_gpu('CollectLimitExec')
def test_cache_diff_req_order(spark_tmp_path):
def n_fold(spark):
data_path_cpu = spark_tmp_path + '/PARQUET_DATA/{}/{}'
data = spark.range(100).selectExpr(
"cast(id as double) as col0",
"cast(id - 100 as double) as col1",
"cast(id * 2 as double) as col2",
"rand(100) as col3",
"rand(200) as col4")

num_buckets = 10
with_random = data.selectExpr("*", "cast(rand(0) * {} as int) as BUCKET".format(num_buckets)).cache()
for test_bucket in range(0, num_buckets):
with_random.filter(with_random.BUCKET == test_bucket).drop("BUCKET") \
.write.parquet(data_path_cpu.format("test_data", test_bucket))
with_random.filter(with_random.BUCKET != test_bucket).drop("BUCKET") \
.write.parquet(data_path_cpu.format("train_data", test_bucket))

with_cpu_session(n_fold)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('ts_write', ['TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS'])
@pytest.mark.parametrize('enableVectorized', ['true', 'false'], ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.parquet.hadoop.ParquetFileWriter.Mode
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream}
import org.apache.parquet.schema.Type

import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
Expand All @@ -58,7 +59,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.SerializableConfiguration

/**
* copied from Spark org.apache.spark.util.ByteBufferInputStream
Expand Down Expand Up @@ -326,12 +326,10 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
})
} else {
val cachedSchema = getCatalystSchema(schema, schema)
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, cachedSchema)
val broadcastedConf = SparkSession.active.sparkContext.broadcast(conf.getAllConfs)
input.mapPartitions {
cbIter =>
new CachedBatchIteratorProducer[ColumnarBatch](cbIter, cachedSchema,
broadcastedHadoopConf, broadcastedConf)
new CachedBatchIteratorProducer[ColumnarBatch](cbIter, cachedSchema, broadcastedConf)
.getColumnarBatchToCachedBatchIterator
}
}
Expand Down Expand Up @@ -485,12 +483,11 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
})
cbRdd.mapPartitions(iter => CloseableColumnBatchIterator(iter))
} else {
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, selectedAttributes)
val broadcastedConf = SparkSession.active.sparkContext.broadcast(conf.getAllConfs)
input.mapPartitions {
cbIter => {
new CachedBatchIteratorConsumer(cbIter, cacheAttributes, selectedAttributes,
broadcastedHadoopConf, broadcastedConf).getColumnBatchIterator
broadcastedConf).getColumnBatchIterator
}
}
}
Expand All @@ -517,12 +514,11 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
InternalRow.empty
}
}
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, selectedAttributes)
val broadcastedConf = SparkSession.active.sparkContext.broadcast(conf.getAllConfs)
input.mapPartitions {
cbIter => {
new CachedBatchIteratorConsumer(cbIter, cacheAttributes, selectedAttributes,
broadcastedHadoopConf, broadcastedConf).getInternalRowIterator
broadcastedConf).getInternalRowIterator
}
}
}
Expand Down Expand Up @@ -606,12 +602,11 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
cbIter: Iterator[CachedBatch],
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
sharedHadoopConf: Broadcast[SerializableConfiguration],
sharedConf: Broadcast[Map[String, String]]) {

val conf = getConfFromMap(sharedConf)
val hadoopConf = sharedHadoopConf.value.value
val origRequestedSchema: Seq[Attribute] = getCatalystSchema(selectedAttributes, cacheAttributes)
val hadoopConf = getHadoopConf(origRequestedSchema.toStructType, conf)
val origCacheSchema: Seq[Attribute] = getCatalystSchema(cacheAttributes, cacheAttributes)
val options: ParquetReadOptions = HadoopReadOptions.builder(hadoopConf).build()
/**
Expand Down Expand Up @@ -804,9 +799,11 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
} else {
(origCacheSchema, origRequestedSchema)
}
val reqSparkSchema =
StructType(sparkSchema.filter(field =>
requestedSchema.exists(a => a.name.equals(field.name))))

val reqSparkSchema = StructType(requestedSchema.toStructType.map { field =>
sparkSchema.fields(sparkSchema.fieldIndex(field.name))
})

val reqParquetSchema = sparkToParquetSchemaConverter.convert(reqSparkSchema)

// reset spark schema calculated from parquet schema
Expand Down Expand Up @@ -834,6 +831,34 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
this.totalRowCount += block.getRowCount
}

// initialize missingColumns to cover the case where requested column isn't present in the
// cache, which should never happen but just in case it does
val columns = reqParquetSchema.getColumns
val paths = reqParquetSchema.getPaths
val fileSchema = parquetFileReader.getFooter.getFileMetaData.getSchema
val types = reqParquetSchema.asGroupType.getFields
for (i <- 0 until reqParquetSchema.getFieldCount) {
val t = reqParquetSchema.getFields.get(i)
if (!t.isPrimitive || t.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException("Complex types not supported.")
}
val colPath = paths.get(i)
if (fileSchema.containsPath(colPath)) {
val fd = fileSchema.getColumnDescription(colPath)
if (!(fd == columns.get(i))) {
throw new UnsupportedOperationException("Schema evolution not supported.")
}
missingColumns(i) = false
} else {
if (columns.get(i).getMaxDefinitionLevel == 0) {
// Column is missing in data but the required data is non-nullable.
// This file is invalid.
throw new IOException(s"Required column is missing in data file: ${colPath.toList}")
}
missingColumns(i) = true
}
}

for (i <- missingColumns.indices) {
if (missingColumns(i)) {
columnVectors(i).putNulls(0, capacity)
Expand All @@ -849,8 +874,6 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
throw new IOException("expecting more rows but reached last" +
" block. Read " + rowsReturned + " out of " + totalRowCount)
}
val columns = reqParquetSchema.getColumns
val types = reqParquetSchema.asGroupType.getFields
columnReaders = new Array[VectorizedColumnReader](columns.size)
for (i <- 0 until columns.size) {
if (!missingColumns(i)) {
Expand Down Expand Up @@ -929,18 +952,17 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
*
* @param iter - an iterator over InternalRow or ColumnarBatch
* @param cachedAttributes - Schema of the cached batch
* @param sharedHadoopConf - Hadoop conf
* @param sharedConf - SQL conf
* @tparam T - Strictly either InternalRow or ColumnarBatch
*/
private[rapids] class CachedBatchIteratorProducer[T](
iter: Iterator[T],
cachedAttributes: Seq[Attribute],
sharedHadoopConf: Broadcast[SerializableConfiguration],
sharedConf: Broadcast[Map[String, String]]) {

val conf = getConfFromMap(sharedConf)
val hadoopConf = sharedHadoopConf.value.value
val hadoopConf =
getHadoopConf(getCatalystSchema(cachedAttributes, cachedAttributes).toStructType, conf)

def getInternalRowToCachedBatchIterator: Iterator[CachedBatch] = {
new InternalRowToCachedBatchIterator
Expand Down Expand Up @@ -1259,8 +1281,6 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
storageLevel: StorageLevel,
conf: SQLConf): RDD[CachedBatch] = {

val parquetSchema = getCatalystSchema(schema, schema)

val rapidsConf = new RapidsConf(conf)

if (rapidsConf.isSqlEnabled && isSupportedByCudf(schema)) {
Expand All @@ -1273,24 +1293,15 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema))
})
} else {
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, parquetSchema)
val broadcastedConf = SparkSession.active.sparkContext.broadcast(conf.getAllConfs)
input.mapPartitions {
cbIter =>
new CachedBatchIteratorProducer[InternalRow](cbIter, schema,
broadcastedHadoopConf, broadcastedConf)
new CachedBatchIteratorProducer[InternalRow](cbIter, schema, broadcastedConf)
.getInternalRowToCachedBatchIterator
}
}
}

private def getBroadcastedHadoopConf(
conf: SQLConf,
requestedSchema: Seq[Attribute]): Broadcast[SerializableConfiguration] = {
val hadoopConf = getHadoopConf(requestedSchema.toStructType, conf)
SparkSession.active.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
}

override def buildFilter(
predicates: Seq[Expression],
cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite {
val ser = new ParquetCachedBatchSerializer

val producer = new ser.CachedBatchIteratorProducer[ColumnarBatch](cbIter, schema,
withCpuSparkSession(spark =>
spark.sparkContext.broadcast(new SerializableConfiguration(new Configuration(true)))),
withCpuSparkSession(spark => spark.sparkContext.broadcast(new SQLConf().getAllConfs)))
val mockParquetOutputFileFormat = mock(classOf[ParquetOutputFileFormat])
var totalSize = 0L
Expand Down

0 comments on commit 4f2d6e8

Please sign in to comment.