diff --git a/shims/spark300/src/main/scala/org/apache/spark/sql/rapids/shims/spark300/GpuWindowInPandasExec.scala b/shims/spark300/src/main/scala/org/apache/spark/sql/rapids/shims/spark300/GpuWindowInPandasExec.scala index db4245418dc..238c227a217 100644 --- a/shims/spark300/src/main/scala/org/apache/spark/sql/rapids/shims/spark300/GpuWindowInPandasExec.scala +++ b/shims/spark300/src/main/scala/org/apache/spark/sql/rapids/shims/spark300/GpuWindowInPandasExec.scala @@ -264,6 +264,9 @@ case class GpuWindowInPandasExec( val allInputs = windowBoundsInput ++ dataInputs val allInputTypes = allInputs.map(_.dataType) + // cache in a local variable to avoid serializing the full child plan + val childOutput = child.output + // Start processing. child.execute().mapPartitions { iter => val context = TaskContext.get() @@ -273,19 +276,19 @@ case class GpuWindowInPandasExec( val pythonInputProj = UnsafeProjection.create( allInputs, windowBoundsInput.map(ref => - AttributeReference(s"i_${ref.ordinal}", ref.dataType)()) ++ child.output + AttributeReference(s"i_${ref.ordinal}", ref.dataType)()) ++ childOutput ) val pythonInputSchema = StructType( allInputTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) } ) - val grouping = UnsafeProjection.create(partitionSpec, child.output) + val grouping = UnsafeProjection.create(partitionSpec, childOutput) // The queue used to buffer input rows so we can drain it to // combine input with output from Python. val queue = HybridRowQueue(context.taskMemoryManager(), - new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length) + new File(Utils.getLocalDir(SparkEnv.get.conf)), childOutput.length) context.addTaskCompletionListener[Unit] { _ => queue.close() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index e230eb348d6..2fbff7bfd4a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -681,15 +681,19 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) val totalTime = longMetric(TOTAL_TIME) val peakDevMemory = longMetric("peakDevMemory") + // cache in local vars to avoid serializing the plan + val outputSchema = schema + val decompressMemoryTarget = maxDecompressBatchMemory + val hasMaps = child.schema.fields.exists(_.dataType.isInstanceOf[MapType]) + val batches = child.executeColumnar() batches.mapPartitions { iter => - val hasMaps = child.schema.fields.exists(field => field.dataType.isInstanceOf[MapType]) - if (child.schema.nonEmpty && !hasMaps) { - new GpuCoalesceIterator(iter, schema, goal, maxDecompressBatchMemory, + if (outputSchema.nonEmpty && !hasMaps) { + new GpuCoalesceIterator(iter, outputSchema, goal, decompressMemoryTarget, numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches") } else if (hasMaps) { - new GpuCoalesceIteratorForMaps(iter, schema, goal, maxDecompressBatchMemory, + new GpuCoalesceIteratorForMaps(iter, outputSchema, goal, decompressMemoryTarget, numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches") } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala index 71004f75535..ee89744fb4c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala @@ -89,8 +89,12 @@ case class GpuExpandExec( override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { val boundProjections: Seq[Seq[GpuExpression]] = projections.map(GpuBindReferences.bindGpuReferences(_, child.output)) + + // cache in a local to avoid serializing the plan + val metricsMap = metrics + child.executeColumnar().mapPartitions { it => - new GpuExpandIterator(boundProjections, metrics, it) + new GpuExpandIterator(boundProjections, metricsMap, it) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index 5ac6e0a9fe0..ae461405e59 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -653,6 +653,10 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceGoal) val totalTime = longMetric(TOTAL_TIME) val localGoal = goal val rowBased = child.execute() + + // cache in a local to avoid serializing the plan + val localSchema = schema + // The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long // values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields". // So, we are going to be cautious and start with that until we have tested it more. @@ -664,7 +668,6 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceGoal) localOutput.toArray, localGoal, totalTime, numInputRows, numOutputRows, numOutputBatches)) } else { - val localSchema = schema val converters = new GpuRowToColumnConverter(localSchema) rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter, localSchema, localGoal, converters, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala index 9f1a8ce9c9b..c2ed0b319f4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala @@ -85,13 +85,14 @@ case class GpuSortExec( override def doExecuteColumnar(): RDD[ColumnarBatch] = { val sortTime = longMetric("sortTime") + val peakDevMemory = longMetric("peakDevMemory") val crdd = child.executeColumnar() crdd.mapPartitions { cbIter => val sorter = createBatchGpuSorter() val sortedIterator = sorter.sort(cbIter) sortTime += sorter.getSortTimeNanos - metrics("peakDevMemory") += sorter.getPeakMemoryUsage + peakDevMemory += sorter.getPeakMemoryUsage sortedIterator } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index 201c7b262e2..3517bb4c72a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -278,9 +278,12 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal) val totalTime = longMetric(TOTAL_TIME) val peakDevMemory = longMetric("peakDevMemory") + // cache in a local to avoid serializing the plan + val outputSchema = schema + val batches = child.executeColumnar() batches.mapPartitions { iter => - new HostToGpuCoalesceIterator(iter, goal, schema, + new HostToGpuCoalesceIterator(iter, goal, outputSchema, numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime, totalTime, peakDevMemory, "HostColumnarToGpu") } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 0b73a5ef1d0..487f5b0e759 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -372,6 +372,9 @@ case class GpuHashAggregateExec( // val rdd = child.executeColumnar() + // cache in a local variable to avoid serializing the full child plan + val childOutput = child.output + rdd.mapPartitions { cbIter => { var batch: ColumnarBatch = null // incoming batch // @@ -422,7 +425,7 @@ case class GpuHashAggregateExec( // 3. boundFinalProjections: on merged batches, finalize aggregates // (GpuAverage => CudfSum/CudfCount) // 4. boundResultReferences: project the result expressions Spark expects in the output. - val boundExpression = setupReferences(child.output, groupingExpressions, aggregateExpressions) + val boundExpression = setupReferences(childOutput, groupingExpressions, aggregateExpressions) try { while (cbIter.hasNext) { // 1) Consume the raw incoming batch, evaluating nested expressions diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index c287fd88198..aa23f8d64b4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -546,6 +546,13 @@ case class GpuArrowEvalPythonExec( lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf) val inputRDD = child.executeColumnar() val pythonOutputSchema = resultAttrs.map(_.dataType).toArray + + // cache in a local to avoid serializing the plan + val childOutput = child.output + val targetBatchSize = batchSize + val runnerConf = pythonRunnerConf + val timeZone = sessionLocalTimeZone + inputRDD.mapPartitions { iter => val queue: BatchQueue = new BatchQueue() val context = TaskContext.get() @@ -579,8 +586,8 @@ case class GpuArrowEvalPythonExec( StructField(s"_$i", dt) }) - val boundReferences = GpuBindReferences.bindReferences(allInputs, child.output) - val batchedIterator = new RebatchingRoundoffIterator(iter, schema, batchSize, + val boundReferences = GpuBindReferences.bindReferences(allInputs, childOutput) + val batchedIterator = new RebatchingRoundoffIterator(iter, schema, targetBatchSize, numInputRows, numInputBatches) val projectedIterator = batchedIterator.map { batch => // We have to do the project before we add the batch because the batch might be closed @@ -603,9 +610,9 @@ case class GpuArrowEvalPythonExec( argOffsets, schema, resultAttrs.map(_.dataType).toArray, - sessionLocalTimeZone, - pythonRunnerConf, - batchSize, + timeZone, + runnerConf, + targetBatchSize, () => queue.finish()){ override def minReadTargetBatchSize: Int = targetReadBatchSize }.compute(projectedIterator,