Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimzing Expand+Aggregate in sqlw with many count distinct #48

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.util.Random

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuExpressionsUtils.NullVecCache
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -54,7 +55,9 @@ class GpuExpandExecMeta(
val projections = gpuProjections.map(_.map(_.convertToGpu()))
GpuExpandExec(projections, expand.output, childPlans.head.convertIfNeeded())(
useTieredProject = conf.isTieredProjectEnabled,
preprojectEnabled = conf.isExpandPreprojectEnabled)
preprojectEnabled = conf.isExpandPreprojectEnabled,
cacheNullMaxCount = conf.expandCachingNullVecMaxCount,
coalesceAfter = conf.isCoalesceAfterExpandEnabled)
}
}

Expand All @@ -72,11 +75,17 @@ case class GpuExpandExec(
output: Seq[Attribute],
child: SparkPlan)(
useTieredProject: Boolean = false,
preprojectEnabled: Boolean = false) extends ShimUnaryExecNode with GpuExec {
preprojectEnabled: Boolean = false,
cacheNullMaxCount: Int = 0,
override val coalesceAfter: Boolean = true
) extends ShimUnaryExecNode with GpuExec {

override def otherCopyArgs: Seq[AnyRef] = Seq[AnyRef](
useTieredProject.asInstanceOf[java.lang.Boolean],
preprojectEnabled.asInstanceOf[java.lang.Boolean])
preprojectEnabled.asInstanceOf[java.lang.Boolean],
cacheNullMaxCount.asInstanceOf[java.lang.Integer],
coalesceAfter.asInstanceOf[java.lang.Boolean]
)

private val PRE_PROJECT_TIME = "preprojectTime"
override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
Expand Down Expand Up @@ -127,7 +136,7 @@ case class GpuExpandExec(
}

child.executeColumnar().mapPartitions { it =>
new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it))
new GpuExpandIterator(boundProjections, metricsMap, preprojectIter(it), cacheNullMaxCount)
}
}

Expand Down Expand Up @@ -191,7 +200,8 @@ case class GpuExpandExec(
class GpuExpandIterator(
boundProjections: Seq[GpuTieredProject],
metrics: Map[String, GpuMetric],
it: Iterator[ColumnarBatch])
it: Iterator[ColumnarBatch],
cacheNullMaxCount: Int)
extends Iterator[ColumnarBatch] {

private var sb: Option[SpillableColumnarBatch] = None
Expand All @@ -206,9 +216,20 @@ class GpuExpandIterator(
Option(TaskContext.get()).foreach { tc =>
onTaskCompletion(tc) {
sb.foreach(_.close())

if (cacheNullMaxCount > 0) {
import scala.collection.JavaConverters._
GpuExpressionsUtils.cachedNullVectors.get().values().asScala.foreach(_.close())
GpuExpressionsUtils.cachedNullVectors.get().clear()
}
}
}

if (cacheNullMaxCount > 0 && GpuExpressionsUtils.cachedNullVectors.get() == null) {
GpuExpressionsUtils.cachedNullVectors.set(new NullVecCache(cacheNullMaxCount))
}


override def hasNext: Boolean = sb.isDefined || it.hasNext

override def next(): ColumnarBatch = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.{ShimBinaryExpression, ShimExpression, ShimTernaryExpression, ShimUnaryExpression}
import java.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -52,6 +53,40 @@ object GpuExpressionsUtils {
"implemented and should have been disabled")
}

// This is only for ExpandExec which will generate a lot of null vectors
case class NullVecKey(d: DataType, n: Int)

class NullVecCache(private val maxNulls: Int)
extends util.LinkedHashMap[NullVecKey, GpuColumnVector](100, 0.75f, true) {
private var totalNulls: Long = 0L

override def clear(): Unit = {
super.clear()
totalNulls = 0
}

override def put(key: NullVecKey, v: GpuColumnVector): GpuColumnVector = {
if (v.getRowCount > maxNulls) {
throw new IllegalStateException(s"spark.rapids.sql.expandCachingNullVec.maxNulls" +
s"($maxNulls) is set too small to hold single vector with ${v.getRowCount} rows.")
}
val iter = entrySet().iterator()
while (iter.hasNext && totalNulls > maxNulls - v.getRowCount) {
val entry = iter.next()
iter.remove()
totalNulls -= entry.getValue.getRowCount
}

val ret = super.put(key, v)
totalNulls += v.getRowCount
ret
}

override def remove(key: Any): GpuColumnVector = throw new UnsupportedOperationException()
}

val cachedNullVectors = new ThreadLocal[NullVecCache]()

/**
* Tries to resolve a `GpuColumnVector` from a Scala `Any`.
*
Expand All @@ -73,7 +108,18 @@ object GpuExpressionsUtils {
def resolveColumnVector(any: Any, numRows: Int): GpuColumnVector = {
withResourceIfAllowed(any) {
case c: GpuColumnVector => c.incRefCount()
case s: GpuScalar => GpuColumnVector.from(s, numRows, s.dataType)
case s: GpuScalar =>
if (!s.isValid && cachedNullVectors.get() != null) {
if (!cachedNullVectors.get.containsKey(NullVecKey.apply(s.dataType, numRows))) {
cachedNullVectors.get.put(NullVecKey.apply(s.dataType, numRows),
GpuColumnVector.from(s, numRows, s.dataType))
}

val ret = cachedNullVectors.get().get(NullVecKey.apply(s.dataType, numRows))
ret.incRefCount()
} else {
GpuColumnVector.from(s, numRows, s.dataType)
}
case other =>
throw new IllegalArgumentException(s"Cannot resolve a ColumnVector from the value:" +
s" $other. Please convert it to a GpuScalar or a GpuColumnVector before returning.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case _: GpuDataSourceScanExec => true
case _: DataSourceV2ScanExecBase => true
case _: RDDScanExec => true // just in case an RDD was reading in data
case _: ExpandExec => true
case _ => false
}

Expand Down
18 changes: 18 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,20 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val ENABLE_COALESCE_AFTER_EXPAND = conf("spark.rapids.sql.coalesceAfterExpand.enabled")
.doc("When set to false disables the coalesce after GPU Expand. ")
.internal()
.booleanConf
.createWithDefault(false)

val EXPAND_CACHING_NULL_VEC_MAX_NULL_COUNT =
conf("spark.rapids.sql.expandCachingNullVec.maxNulls")
.doc("Max number of null scalar in null vectors to cache for GPU Expand. " +
"If the number of null scala exceeds this value, the null vectors will not be cached." +
"The value has to be positive for caching to be enabled.")
.internal().integerConf
.createWithDefault(0)

val ENABLE_ORC_FLOAT_TYPES_TO_STRING =
conf("spark.rapids.sql.format.orc.floatTypesToString.enable")
.doc("When reading an ORC file, the source data schemas(schemas of ORC file) may differ " +
Expand Down Expand Up @@ -2821,6 +2835,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isExpandPreprojectEnabled: Boolean = get(ENABLE_EXPAND_PREPROJECT)

lazy val isCoalesceAfterExpandEnabled: Boolean = get(ENABLE_COALESCE_AFTER_EXPAND)

lazy val expandCachingNullVecMaxCount: Int = get(EXPAND_CACHING_NULL_VEC_MAX_NULL_COUNT)

lazy val multiThreadReadNumThreads: Int = {
// Use the largest value set among all the options.
val deprecatedConfs = Seq(
Expand Down
Loading