Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup some instances of excess closure serialization #1097

Merged
merged 1 commit into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ case class GpuWindowInPandasExec(
val allInputs = windowBoundsInput ++ dataInputs
val allInputTypes = allInputs.map(_.dataType)

// cache in a local variable to avoid serializing the full child plan
val childOutput = child.output

// Start processing.
child.execute().mapPartitions { iter =>
val context = TaskContext.get()
Expand All @@ -273,19 +276,19 @@ case class GpuWindowInPandasExec(
val pythonInputProj = UnsafeProjection.create(
allInputs,
windowBoundsInput.map(ref =>
AttributeReference(s"i_${ref.ordinal}", ref.dataType)()) ++ child.output
AttributeReference(s"i_${ref.ordinal}", ref.dataType)()) ++ childOutput
)
val pythonInputSchema = StructType(
allInputTypes.zipWithIndex.map { case (dt, i) =>
StructField(s"_$i", dt)
}
)
val grouping = UnsafeProjection.create(partitionSpec, child.output)
val grouping = UnsafeProjection.create(partitionSpec, childOutput)

// The queue used to buffer input rows so we can drain it to
// combine input with output from Python.
val queue = HybridRowQueue(context.taskMemoryManager(),
new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
new File(Utils.getLocalDir(SparkEnv.get.conf)), childOutput.length)
context.addTaskCompletionListener[Unit] { _ =>
queue.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,15 +681,19 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
val totalTime = longMetric(TOTAL_TIME)
val peakDevMemory = longMetric("peakDevMemory")

// cache in local vars to avoid serializing the plan
val outputSchema = schema
val decompressMemoryTarget = maxDecompressBatchMemory
val hasMaps = child.schema.fields.exists(_.dataType.isInstanceOf[MapType])

val batches = child.executeColumnar()
batches.mapPartitions { iter =>
val hasMaps = child.schema.fields.exists(field => field.dataType.isInstanceOf[MapType])
if (child.schema.nonEmpty && !hasMaps) {
new GpuCoalesceIterator(iter, schema, goal, maxDecompressBatchMemory,
if (outputSchema.nonEmpty && !hasMaps) {
new GpuCoalesceIterator(iter, outputSchema, goal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime,
concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches")
} else if (hasMaps) {
new GpuCoalesceIteratorForMaps(iter, schema, goal, maxDecompressBatchMemory,
new GpuCoalesceIteratorForMaps(iter, outputSchema, goal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime,
concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ case class GpuExpandExec(
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val boundProjections: Seq[Seq[GpuExpression]] =
projections.map(GpuBindReferences.bindGpuReferences(_, child.output))

// cache in a local to avoid serializing the plan
val metricsMap = metrics

child.executeColumnar().mapPartitions { it =>
new GpuExpandIterator(boundProjections, metrics, it)
new GpuExpandIterator(boundProjections, metricsMap, it)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,10 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceGoal)
val totalTime = longMetric(TOTAL_TIME)
val localGoal = goal
val rowBased = child.execute()

// cache in a local to avoid serializing the plan
val localSchema = schema

// The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long
// values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields".
// So, we are going to be cautious and start with that until we have tested it more.
Expand All @@ -664,7 +668,6 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceGoal)
localOutput.toArray, localGoal, totalTime, numInputRows, numOutputRows,
numOutputBatches))
} else {
val localSchema = schema
val converters = new GpuRowToColumnConverter(localSchema)
rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter,
localSchema, localGoal, converters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ case class GpuSortExec(

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val sortTime = longMetric("sortTime")
val peakDevMemory = longMetric("peakDevMemory")

val crdd = child.executeColumnar()
crdd.mapPartitions { cbIter =>
val sorter = createBatchGpuSorter()
val sortedIterator = sorter.sort(cbIter)
sortTime += sorter.getSortTimeNanos
metrics("peakDevMemory") += sorter.getPeakMemoryUsage
peakDevMemory += sorter.getPeakMemoryUsage
sortedIterator
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,12 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal)
val totalTime = longMetric(TOTAL_TIME)
val peakDevMemory = longMetric("peakDevMemory")

// cache in a local to avoid serializing the plan
val outputSchema = schema

val batches = child.executeColumnar()
batches.mapPartitions { iter =>
new HostToGpuCoalesceIterator(iter, goal, schema,
new HostToGpuCoalesceIterator(iter, goal, outputSchema,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime,
totalTime, peakDevMemory, "HostColumnarToGpu")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ case class GpuHashAggregateExec(
//
val rdd = child.executeColumnar()

// cache in a local variable to avoid serializing the full child plan
val childOutput = child.output

rdd.mapPartitions { cbIter => {
var batch: ColumnarBatch = null // incoming batch
//
Expand Down Expand Up @@ -422,7 +425,7 @@ case class GpuHashAggregateExec(
// 3. boundFinalProjections: on merged batches, finalize aggregates
// (GpuAverage => CudfSum/CudfCount)
// 4. boundResultReferences: project the result expressions Spark expects in the output.
val boundExpression = setupReferences(child.output, groupingExpressions, aggregateExpressions)
val boundExpression = setupReferences(childOutput, groupingExpressions, aggregateExpressions)
try {
while (cbIter.hasNext) {
// 1) Consume the raw incoming batch, evaluating nested expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ case class GpuArrowEvalPythonExec(
lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)
val inputRDD = child.executeColumnar()
val pythonOutputSchema = resultAttrs.map(_.dataType).toArray

// cache in a local to avoid serializing the plan
val childOutput = child.output
val targetBatchSize = batchSize
val runnerConf = pythonRunnerConf
val timeZone = sessionLocalTimeZone

inputRDD.mapPartitions { iter =>
val queue: BatchQueue = new BatchQueue()
val context = TaskContext.get()
Expand Down Expand Up @@ -579,8 +586,8 @@ case class GpuArrowEvalPythonExec(
StructField(s"_$i", dt)
})

val boundReferences = GpuBindReferences.bindReferences(allInputs, child.output)
val batchedIterator = new RebatchingRoundoffIterator(iter, schema, batchSize,
val boundReferences = GpuBindReferences.bindReferences(allInputs, childOutput)
val batchedIterator = new RebatchingRoundoffIterator(iter, schema, targetBatchSize,
numInputRows, numInputBatches)
val projectedIterator = batchedIterator.map { batch =>
// We have to do the project before we add the batch because the batch might be closed
Expand All @@ -603,9 +610,9 @@ case class GpuArrowEvalPythonExec(
argOffsets,
schema,
resultAttrs.map(_.dataType).toArray,
sessionLocalTimeZone,
pythonRunnerConf,
batchSize,
timeZone,
runnerConf,
targetBatchSize,
() => queue.finish()){
override def minReadTargetBatchSize: Int = targetReadBatchSize
}.compute(projectedIterator,
Expand Down