diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala index 1bf0c628505..8c03ba30c4c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,9 @@ */ package com.nvidia.spark.rapids +import scala.collection.mutable +import scala.util.Random + import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.GpuMetric._ @@ -76,6 +79,7 @@ case class GpuExpandExec( override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL override lazy val additionalMetrics: Map[String, GpuMetric] = Map( OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME), + "preprojectTime" -> createNanoTimingMetric(MODERATE_LEVEL, "pre-projection time"), NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS), NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES)) @@ -88,15 +92,39 @@ case class GpuExpandExec( AttributeSet(projections.flatten.flatMap(_.references)) override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { - val boundProjections = projections.map { pl => - GpuBindReferences.bindGpuReferencesTiered(pl, child.output, useTieredProject) - } - // cache in a local to avoid serializing the plan val metricsMap = allMetrics + val notAllLeaf = preprojectionList.exists(_.children.nonEmpty) + val (boundProjections, preprojectIter) = if (useTieredProject && notAllLeaf) { + // Got some complicated expressions and tiered projection is enabled. + // Then try to do the pre-projection first. + val boundPreprojections = GpuBindReferences.bindGpuReferencesTiered( + preprojectionList, child.output, useTieredProject) + val preprojectIterFunc: Iterator[ColumnarBatch] => Iterator[ColumnarBatch] = iter => + iter.map{ cb => + val start = System.nanoTime() + val ret = boundPreprojections.projectAndCloseWithRetrySingleBatch( + SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + val timeTaken = System.nanoTime() - start + metricsMap("preprojectTime") += timeTaken + metricsMap(OP_TIME) += timeTaken + ret + } + val preprojectAttrs = preprojectionList.map(_.toAttribute) + val boundLists = updatedProjections.map { pl => + GpuBindReferences.bindGpuReferencesTiered(pl, preprojectAttrs, useTieredProject) + } + (boundLists, preprojectIterFunc) + } else { + val boundLists = projections.map { pl => + GpuBindReferences.bindGpuReferencesTiered(pl, child.output, useTieredProject) + } + (boundLists, identity[Iterator[ColumnarBatch]] _) + } + child.executeColumnar().mapPartitions { it => - new GpuExpandIterator(boundProjections, metricsMap, it) + new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it)) } } @@ -104,6 +132,46 @@ case class GpuExpandExec( throw new IllegalStateException("ROW BASED PROCESSING IS NOT SUPPORTED") } + /** + * Get the expressions that need to be pre-projected, along with the updated + * projections for expanding. + * + * Some rules (e.g. RewriteDistinctAggregates) in Spark will put non-leaf expressions + * in Expand projections, then it can not leverage the GPU tiered projection across + * the projection lists. + * So here tries to factor out these expressions for later evaluations before + * expanding to avoid duplicate evaluation for semantic-equal (sub) expressions. + */ + private[this] lazy val (preprojectionList, updatedProjections) = { + val projectListBuffer = mutable.Set[NamedExpression]() + val newProjections = projections.map { proList => + proList.map { + case attr: AttributeReference if child.outputSet.contains(attr) => + // A ref to child output, add it to pre-projection for passthrough. + projectListBuffer += attr + attr + case leaf if leaf.children.isEmpty => + // A leaf expression is simple enough, not necessary for pre-projection. + // e.g. GpuLiteral, and two internal columns (grouping id and grouping + // position) specific to Expand. + leaf + case notLeafNamed: NamedExpression => + logWarning(s"==>Got a named non-leaf expression: $notLeafNamed for preprojection") + // A named expression, e.g. GpuAlias. Add it for pre-projection. + projectListBuffer += notLeafNamed + // Replace with its reference + notLeafNamed.toAttribute + case notLeaf => + // Wrap by a GpuAlias + logWarning(s"==>Got a non-leaf expression: $notLeaf for preprojection") + val alias = GpuAlias(notLeaf, s"_preproject-c${Random.nextInt}")() + projectListBuffer += alias + // Replace with the reference of the new GpuAlias. + alias.toAttribute + } + } + (projectListBuffer.toList, newProjections) + } } class GpuExpandIterator(