Skip to content

Commit

Permalink
Optimize sample perf (NVIDIA#4159)
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Gao <res_life@163.com>
  • Loading branch information
Chong Gao committed Dec 6, 2021
1 parent 25cca61 commit 714ecbb
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 65 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Name | Description | Default Value
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.orc.enabled"></a>spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.ColumnVector

import org.apache.spark.sql.vectorized.ColumnarBatch

object GatherUtils extends Arm {
def gather(cb: ColumnarBatch, rows: ArrayBuffer[Int]): ColumnarBatch = {
val colTypes = GpuColumnVector.extractTypes(cb)
if (rows.isEmpty) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else if (cb.numCols() == 0) {
// for count agg, num of cols is 0
val c = GpuColumnVector.emptyBatchFromTypes(colTypes)
c.setNumRows(rows.length)
c
} else {
withResource(ColumnVector.fromInts(rows: _*)) { gatherCv =>
withResource(GpuColumnVector.from(cb)) { table =>
// GPU gather
withResource(table.gather(gatherCv)) { gatheredTable =>
GpuColumnVector.from(gatheredTable, colTypes)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,12 @@ object RapidsConf {
.booleanConf
.createWithDefault(value = false)

val ENABLE_FAST_SAMPLE = conf("spark.rapids.sql.fast.sample")
.doc("Option to turn on fast sample. If enable it is inconsistent with CPU sample " +
"because of GPU sample algorithm is inconsistent with CPU.")
.booleanConf
.createWithDefault(value = false)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -1733,6 +1739,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCpuBasedUDFEnabled: Boolean = get(ENABLE_CPU_BASED_UDF)

lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import ai.rapids.cudf._
Expand Down Expand Up @@ -365,19 +366,32 @@ case class GpuFilterExec(
}
}

class GpuSampleExecMeta(sample: SampleExec, conf: RapidsConf, p: Option[RapidsMeta[_, _, _]],
class GpuSampleExecMeta(
sample: SampleExec,
conf: RapidsConf,
p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends SparkPlanMeta[SampleExec](sample, conf, p, r)
with Logging {
with Logging {
override def convertToGpu(): GpuExec = {
val gpuChild = childPlans.head.convertIfNeeded()
GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
if (conf.isFastSampleEnabled) {
// Use GPU sample JNI, this is faster, but the output is not the same as CPU produces
GpuFastSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
} else {
// The output is the same as CPU produces
// First generates row indexes by CPU sampler, then use GPU to gathers
GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
}
}
}

case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement: Boolean,
seed: Long, child: SparkPlan)
extends ShimUnaryExecNode with GpuExec {
case class GpuSampleExec(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long, child: SparkPlan) extends ShimUnaryExecNode with GpuExec {

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))
Expand All @@ -404,7 +418,9 @@ case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)

val rdd = child.executeColumnar()
// CPU consistent, first generates sample row indexes by CPU, then gathers by GPU
if (withReplacement) {
new GpuPartitionwiseSampledRDD(
rdd,
Expand All @@ -415,46 +431,106 @@ case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement
} else {
rdd.mapPartitionsWithIndex(
(index, iterator) => {
// use CPU sampler generate filter
// use CPU sampler generate row indexes
val sampler = new BernoulliCellSampler(lowerBound, upperBound)
sampler.setSeed(seed + index)
iterator.map[ColumnarBatch] { batch =>
numOutputBatches += 1
withResource(batch) { b => // will generate new columnar column, close this
val numRows = b.numRows()
val filter = withResource(HostColumnVector.builder(DType.BOOL8, numRows)) {
builder =>
(0 until numRows).foreach { _ =>
val n = sampler.sample()
if (n > 0) {
builder.append(1.toByte)
numOutputRows += 1
} else {
builder.append(0.toByte)
}
iterator.map[ColumnarBatch] { columnarBatch =>
// collect sampled row idx
// samples idx in batch one by one, so it's same as CPU execution
withResource(new NvtxWithMetrics("Sample Exec", NvtxColor.YELLOW, opTime)) { _ =>
withResource(columnarBatch) { cb =>
// generate sampled row indexes by CPU
val sampledRows = new ArrayBuffer[Int]
var rowIndex = 0
while (rowIndex < cb.numRows()) {
if (sampler.sample() > 0) {
sampledRows += rowIndex
}
builder.buildAndPutOnDevice()
rowIndex += 1
}
numOutputBatches += 1
numOutputRows += sampledRows.length
// gather by row indexes
GatherUtils.gather(cb, sampledRows)
}
}
}
}
, preservesPartitioning = true
)
}
}
}

// use GPU filer rows
val colTypes = GpuColumnVector.extractTypes(b)
withResource(filter) { filter =>
withResource(GpuColumnVector.from(b)) { tbl =>
withResource(tbl.filter(filter)) { filteredData =>
if (filteredData.getRowCount == 0) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else {
GpuColumnVector.from(filteredData, colTypes)
}
case class GpuFastSampleExec(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long,
child: SparkPlan) extends ShimUnaryExecNode with GpuExec {

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))

override def output: Seq[Attribute] = {
child.output
}

// add one coalesce exec to avoid empty batch and small batch,
// because sample will shrink the batch
override val coalesceAfter: Boolean = true

// Note GPU sample does not preserve the ordering
override def outputOrdering: Seq[SortOrder] = Nil

override def outputPartitioning: Partitioning = child.outputPartitioning

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException(s"Row-based execution should not occur for $this")

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)
val rdd = child.executeColumnar()

// CPU inconsistent, uses GPU sample JNI
rdd.mapPartitionsWithIndex(
(index, iterator) => {
iterator.map[ColumnarBatch] { columnarBatch =>
withResource(new NvtxWithMetrics("Fast Sample Exec", NvtxColor.YELLOW, opTime)) { _ =>
withResource(columnarBatch) { cb =>
numOutputBatches += 1
val numSampleRows = (cb.numRows() * (upperBound - lowerBound)).toLong

val colTypes = GpuColumnVector.extractTypes(cb)
if (numSampleRows == 0L) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else if (cb.numCols() == 0) {
// for count agg, num of cols is 0
val c = GpuColumnVector.emptyBatchFromTypes(colTypes)
c.setNumRows(numSampleRows.toInt)
c
} else {
withResource(GpuColumnVector.from(cb)) { table =>
// GPU sample
withResource(table.sample(numSampleRows, withReplacement, seed + index)) {
sampled =>
val cb = GpuColumnVector.from(sampled, colTypes)
numOutputRows += cb.numRows()
cb
}
}
}
}
}
}
,preservesPartitioning = true
)
}
}
, preservesPartitioning = true
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package org.apache.spark.sql.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{DeviceMemoryBuffer, DType, GatherMap, HostMemoryBuffer, NvtxColor}
import com.nvidia.spark.rapids.{Arm, GpuColumnVector, GpuMetric, NvtxWithMetrics}
import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.{Arm, GatherUtils, GpuMetric, NvtxWithMetrics}

import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.random.PoissonSampler
Expand All @@ -37,52 +37,35 @@ class GpuPoissonSampler(fraction: Double, useGapSamplingIfPossible: Boolean,
} else {
batchIterator.map { columnarBatch =>
withResource(new NvtxWithMetrics("Sample Exec", NvtxColor.YELLOW, opTime)) { _ =>
numOutputBatches += 1
withResource(columnarBatch) { cb =>
// collect sampled row idx
// samples idx in batch one by one, so it's same with CPU version
val sampledRows = sample(cb.numRows())

val intBytes = DType.INT32.getSizeInBytes()
val totalBytes = sampledRows.length * intBytes
withResource(HostMemoryBuffer.allocate(totalBytes)) { hostBuffer =>
// copy row idx to host buffer
for (idx <- 0 until sampledRows.length) {
hostBuffer.setInt(idx * intBytes, sampledRows(idx))
}

// generate gather map and send to GPU to gather
withResource(DeviceMemoryBuffer.allocate(totalBytes)) { deviceBuffer =>
deviceBuffer.copyFromHostBuffer(0, hostBuffer, 0, totalBytes)
withResource(new GatherMap(deviceBuffer).toColumnView(0, sampledRows.length)) {
gatherCv =>
val colTypes = GpuColumnVector.extractTypes(cb)
withResource(GpuColumnVector.from(cb)) { table =>
withResource(table.gather(gatherCv)) { gatheredTable =>
GpuColumnVector.from(gatheredTable, colTypes)
}
}
}
}
}
numOutputBatches += 1
numOutputRows += sampledRows.length
GatherUtils.gather(cb, sampledRows)
}
}
}
}
}

// collect the sampled row idx
// collect the sampled row indexes, Note one row can be sampled multiple times
private def sample(numRows: Int): ArrayBuffer[Int] = {
val buf = new ArrayBuffer[Int]
for (rowIdx <- 0 until numRows) {
var rowIdx = 0
while (rowIdx < numRows) {
// invoke PoissonSampler sample
val rowCount = super.sample()
if (rowCount > 0) {
numOutputRows += rowCount
for (_ <- 0 until rowCount) {
var i = 0
while (i < rowCount) {
buf += rowIdx
i = i + 1
}
}
rowIdx += 1
}
buf
}
Expand Down

0 comments on commit 714ecbb

Please sign in to comment.