Skip to content

Commit

Permalink
Cleanup some instances of excess closure serialization (#1097)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Nov 11, 2020
1 parent 63134ff commit 53e7976
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ case class GpuWindowInPandasExec(
val allInputs = windowBoundsInput ++ dataInputs
val allInputTypes = allInputs.map(_.dataType)

// cache in a local variable to avoid serializing the full child plan
val childOutput = child.output

// Start processing.
child.execute().mapPartitions { iter =>
val context = TaskContext.get()
Expand All @@ -273,19 +276,19 @@ case class GpuWindowInPandasExec(
val pythonInputProj = UnsafeProjection.create(
allInputs,
windowBoundsInput.map(ref =>
AttributeReference(s"i_${ref.ordinal}", ref.dataType)()) ++ child.output
AttributeReference(s"i_${ref.ordinal}", ref.dataType)()) ++ childOutput
)
val pythonInputSchema = StructType(
allInputTypes.zipWithIndex.map { case (dt, i) =>
StructField(s"_$i", dt)
}
)
val grouping = UnsafeProjection.create(partitionSpec, child.output)
val grouping = UnsafeProjection.create(partitionSpec, childOutput)

// The queue used to buffer input rows so we can drain it to
// combine input with output from Python.
val queue = HybridRowQueue(context.taskMemoryManager(),
new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
new File(Utils.getLocalDir(SparkEnv.get.conf)), childOutput.length)
context.addTaskCompletionListener[Unit] { _ =>
queue.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,15 +681,19 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
val totalTime = longMetric(TOTAL_TIME)
val peakDevMemory = longMetric("peakDevMemory")

// cache in local vars to avoid serializing the plan
val outputSchema = schema
val decompressMemoryTarget = maxDecompressBatchMemory
val hasMaps = child.schema.fields.exists(_.dataType.isInstanceOf[MapType])

val batches = child.executeColumnar()
batches.mapPartitions { iter =>
val hasMaps = child.schema.fields.exists(field => field.dataType.isInstanceOf[MapType])
if (child.schema.nonEmpty && !hasMaps) {
new GpuCoalesceIterator(iter, schema, goal, maxDecompressBatchMemory,
if (outputSchema.nonEmpty && !hasMaps) {
new GpuCoalesceIterator(iter, outputSchema, goal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime,
concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches")
} else if (hasMaps) {
new GpuCoalesceIteratorForMaps(iter, schema, goal, maxDecompressBatchMemory,
new GpuCoalesceIteratorForMaps(iter, outputSchema, goal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime,
concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ case class GpuExpandExec(
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val boundProjections: Seq[Seq[GpuExpression]] =
projections.map(GpuBindReferences.bindGpuReferences(_, child.output))

// cache in a local to avoid serializing the plan
val metricsMap = metrics

child.executeColumnar().mapPartitions { it =>
new GpuExpandIterator(boundProjections, metrics, it)
new GpuExpandIterator(boundProjections, metricsMap, it)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,10 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceGoal)
val totalTime = longMetric(TOTAL_TIME)
val localGoal = goal
val rowBased = child.execute()

// cache in a local to avoid serializing the plan
val localSchema = schema

// The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long
// values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields".
// So, we are going to be cautious and start with that until we have tested it more.
Expand All @@ -664,7 +668,6 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceGoal)
localOutput.toArray, localGoal, totalTime, numInputRows, numOutputRows,
numOutputBatches))
} else {
val localSchema = schema
val converters = new GpuRowToColumnConverter(localSchema)
rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter,
localSchema, localGoal, converters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ case class GpuSortExec(

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val sortTime = longMetric("sortTime")
val peakDevMemory = longMetric("peakDevMemory")

val crdd = child.executeColumnar()
crdd.mapPartitions { cbIter =>
val sorter = createBatchGpuSorter()
val sortedIterator = sorter.sort(cbIter)
sortTime += sorter.getSortTimeNanos
metrics("peakDevMemory") += sorter.getPeakMemoryUsage
peakDevMemory += sorter.getPeakMemoryUsage
sortedIterator
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,12 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal)
val totalTime = longMetric(TOTAL_TIME)
val peakDevMemory = longMetric("peakDevMemory")

// cache in a local to avoid serializing the plan
val outputSchema = schema

val batches = child.executeColumnar()
batches.mapPartitions { iter =>
new HostToGpuCoalesceIterator(iter, goal, schema,
new HostToGpuCoalesceIterator(iter, goal, outputSchema,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime,
totalTime, peakDevMemory, "HostColumnarToGpu")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ case class GpuHashAggregateExec(
//
val rdd = child.executeColumnar()

// cache in a local variable to avoid serializing the full child plan
val childOutput = child.output

rdd.mapPartitions { cbIter => {
var batch: ColumnarBatch = null // incoming batch
//
Expand Down Expand Up @@ -422,7 +425,7 @@ case class GpuHashAggregateExec(
// 3. boundFinalProjections: on merged batches, finalize aggregates
// (GpuAverage => CudfSum/CudfCount)
// 4. boundResultReferences: project the result expressions Spark expects in the output.
val boundExpression = setupReferences(child.output, groupingExpressions, aggregateExpressions)
val boundExpression = setupReferences(childOutput, groupingExpressions, aggregateExpressions)
try {
while (cbIter.hasNext) {
// 1) Consume the raw incoming batch, evaluating nested expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ case class GpuArrowEvalPythonExec(
lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)
val inputRDD = child.executeColumnar()
val pythonOutputSchema = resultAttrs.map(_.dataType).toArray

// cache in a local to avoid serializing the plan
val childOutput = child.output
val targetBatchSize = batchSize
val runnerConf = pythonRunnerConf
val timeZone = sessionLocalTimeZone

inputRDD.mapPartitions { iter =>
val queue: BatchQueue = new BatchQueue()
val context = TaskContext.get()
Expand Down Expand Up @@ -579,8 +586,8 @@ case class GpuArrowEvalPythonExec(
StructField(s"_$i", dt)
})

val boundReferences = GpuBindReferences.bindReferences(allInputs, child.output)
val batchedIterator = new RebatchingRoundoffIterator(iter, schema, batchSize,
val boundReferences = GpuBindReferences.bindReferences(allInputs, childOutput)
val batchedIterator = new RebatchingRoundoffIterator(iter, schema, targetBatchSize,
numInputRows, numInputBatches)
val projectedIterator = batchedIterator.map { batch =>
// We have to do the project before we add the batch because the batch might be closed
Expand All @@ -603,9 +610,9 @@ case class GpuArrowEvalPythonExec(
argOffsets,
schema,
resultAttrs.map(_.dataType).toArray,
sessionLocalTimeZone,
pythonRunnerConf,
batchSize,
timeZone,
runnerConf,
targetBatchSize,
() => queue.finish()){
override def minReadTargetBatchSize: Int = targetReadBatchSize
}.compute(projectedIterator,
Expand Down

0 comments on commit 53e7976

Please sign in to comment.