Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize sample perf #4159

Merged
merged 5 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Name | Description | Default Value
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.orc.enabled"></a>spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.ColumnVector

import org.apache.spark.sql.vectorized.ColumnarBatch

object GatherUtils extends Arm {
def gather(cb: ColumnarBatch, rows: ArrayBuffer[Int]): ColumnarBatch = {
val colTypes = GpuColumnVector.extractTypes(cb)
if (rows.isEmpty) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else if (cb.numCols() == 0) {
// for count agg, num of cols is 0
val c = GpuColumnVector.emptyBatchFromTypes(colTypes)
c.setNumRows(rows.length)
c
} else {
withResource(ColumnVector.fromInts(rows: _*)) { gatherCv =>
withResource(GpuColumnVector.from(cb)) { table =>
// GPU gather
withResource(table.gather(gatherCv)) { gatheredTable =>
GpuColumnVector.from(gatheredTable, colTypes)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,12 @@ object RapidsConf {
.booleanConf
.createWithDefault(value = false)

val ENABLE_FAST_SAMPLE = conf("spark.rapids.sql.fast.sample")
.doc("Option to turn on fast sample. If enable it is inconsistent with CPU sample " +
"because of GPU sample algorithm is inconsistent with CPU.")
.booleanConf
.createWithDefault(value = false)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -1733,6 +1739,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCpuBasedUDFEnabled: Boolean = get(ENABLE_CPU_BASED_UDF)

lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import ai.rapids.cudf._
Expand Down Expand Up @@ -365,19 +366,32 @@ case class GpuFilterExec(
}
}

class GpuSampleExecMeta(sample: SampleExec, conf: RapidsConf, p: Option[RapidsMeta[_, _, _]],
class GpuSampleExecMeta(
sample: SampleExec,
conf: RapidsConf,
p: Option[RapidsMeta[_, _, _]],
r: DataFromReplacementRule) extends SparkPlanMeta[SampleExec](sample, conf, p, r)
with Logging {
with Logging {
override def convertToGpu(): GpuExec = {
val gpuChild = childPlans.head.convertIfNeeded()
GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
if (conf.isFastSampleEnabled) {
// Use GPU sample JNI, this is faster, but the output is not the same as CPU produces
GpuFastSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
} else {
// The output is the same as CPU produces
// First generates row indexes by CPU sampler, then use GPU to gathers
GpuSampleExec(sample.lowerBound, sample.upperBound, sample.withReplacement,
sample.seed, gpuChild)
}
}
}

case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement: Boolean,
seed: Long, child: SparkPlan)
extends ShimUnaryExecNode with GpuExec {
case class GpuSampleExec(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long, child: SparkPlan) extends ShimUnaryExecNode with GpuExec {

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))
Expand All @@ -404,7 +418,9 @@ case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)

val rdd = child.executeColumnar()
// CPU consistent, first generates sample row indexes by CPU, then gathers by GPU
if (withReplacement) {
new GpuPartitionwiseSampledRDD(
rdd,
Expand All @@ -415,46 +431,106 @@ case class GpuSampleExec(lowerBound: Double, upperBound: Double, withReplacement
} else {
rdd.mapPartitionsWithIndex(
(index, iterator) => {
// use CPU sampler generate filter
// use CPU sampler generate row indexes
val sampler = new BernoulliCellSampler(lowerBound, upperBound)
sampler.setSeed(seed + index)
iterator.map[ColumnarBatch] { batch =>
numOutputBatches += 1
withResource(batch) { b => // will generate new columnar column, close this
val numRows = b.numRows()
val filter = withResource(HostColumnVector.builder(DType.BOOL8, numRows)) {
builder =>
(0 until numRows).foreach { _ =>
val n = sampler.sample()
if (n > 0) {
builder.append(1.toByte)
numOutputRows += 1
} else {
builder.append(0.toByte)
}
iterator.map[ColumnarBatch] { columnarBatch =>
// collect sampled row idx
// samples idx in batch one by one, so it's same as CPU execution
withResource(new NvtxWithMetrics("Sample Exec", NvtxColor.YELLOW, opTime)) { _ =>
withResource(columnarBatch) { cb =>
// generate sampled row indexes by CPU
val sampledRows = new ArrayBuffer[Int]
var rowIndex = 0
while (rowIndex < cb.numRows()) {
if (sampler.sample() > 0) {
sampledRows += rowIndex
}
builder.buildAndPutOnDevice()
rowIndex += 1
}
numOutputBatches += 1
numOutputRows += sampledRows.length
// gather by row indexes
GatherUtils.gather(cb, sampledRows)
}
}
}
}
, preservesPartitioning = true
)
}
}
}

// use GPU filer rows
val colTypes = GpuColumnVector.extractTypes(b)
withResource(filter) { filter =>
withResource(GpuColumnVector.from(b)) { tbl =>
withResource(tbl.filter(filter)) { filteredData =>
if (filteredData.getRowCount == 0) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else {
GpuColumnVector.from(filteredData, colTypes)
}
case class GpuFastSampleExec(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long,
child: SparkPlan) extends ShimUnaryExecNode with GpuExec {

override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME))

override def output: Seq[Attribute] = {
child.output
}

// add one coalesce exec to avoid empty batch and small batch,
// because sample will shrink the batch
override val coalesceAfter: Boolean = true

// Note GPU sample does not preserve the ordering
override def outputOrdering: Seq[SortOrder] = Nil

override def outputPartitioning: Partitioning = child.outputPartitioning

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException(s"Row-based execution should not occur for $this")

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)
val rdd = child.executeColumnar()

// CPU inconsistent, uses GPU sample JNI
rdd.mapPartitionsWithIndex(
(index, iterator) => {
iterator.map[ColumnarBatch] { columnarBatch =>
withResource(new NvtxWithMetrics("Fast Sample Exec", NvtxColor.YELLOW, opTime)) { _ =>
withResource(columnarBatch) { cb =>
numOutputBatches += 1
val numSampleRows = (cb.numRows() * (upperBound - lowerBound)).toLong

val colTypes = GpuColumnVector.extractTypes(cb)
if (numSampleRows == 0L) {
GpuColumnVector.emptyBatchFromTypes(colTypes)
} else if (cb.numCols() == 0) {
// for count agg, num of cols is 0
val c = GpuColumnVector.emptyBatchFromTypes(colTypes)
c.setNumRows(numSampleRows.toInt)
c
} else {
withResource(GpuColumnVector.from(cb)) { table =>
// GPU sample
withResource(table.sample(numSampleRows, withReplacement, seed + index)) {
sampled =>
val cb = GpuColumnVector.from(sampled, colTypes)
numOutputRows += cb.numRows()
cb
}
}
}
}
}
}
,preservesPartitioning = true
)
}
}
, preservesPartitioning = true
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package org.apache.spark.sql.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{DeviceMemoryBuffer, DType, GatherMap, HostMemoryBuffer, NvtxColor}
import com.nvidia.spark.rapids.{Arm, GpuColumnVector, GpuMetric, NvtxWithMetrics}
import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.{Arm, GatherUtils, GpuMetric, NvtxWithMetrics}

import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.random.PoissonSampler
Expand All @@ -37,52 +37,35 @@ class GpuPoissonSampler(fraction: Double, useGapSamplingIfPossible: Boolean,
} else {
batchIterator.map { columnarBatch =>
withResource(new NvtxWithMetrics("Sample Exec", NvtxColor.YELLOW, opTime)) { _ =>
numOutputBatches += 1
withResource(columnarBatch) { cb =>
// collect sampled row idx
// samples idx in batch one by one, so it's same with CPU version
val sampledRows = sample(cb.numRows())

val intBytes = DType.INT32.getSizeInBytes()
val totalBytes = sampledRows.length * intBytes
withResource(HostMemoryBuffer.allocate(totalBytes)) { hostBuffer =>
// copy row idx to host buffer
for (idx <- 0 until sampledRows.length) {
hostBuffer.setInt(idx * intBytes, sampledRows(idx))
}

// generate gather map and send to GPU to gather
withResource(DeviceMemoryBuffer.allocate(totalBytes)) { deviceBuffer =>
deviceBuffer.copyFromHostBuffer(0, hostBuffer, 0, totalBytes)
withResource(new GatherMap(deviceBuffer).toColumnView(0, sampledRows.length)) {
gatherCv =>
val colTypes = GpuColumnVector.extractTypes(cb)
withResource(GpuColumnVector.from(cb)) { table =>
withResource(table.gather(gatherCv)) { gatheredTable =>
GpuColumnVector.from(gatheredTable, colTypes)
}
}
}
}
}
numOutputBatches += 1
numOutputRows += sampledRows.length
GatherUtils.gather(cb, sampledRows)
}
}
}
}
}

// collect the sampled row idx
// collect the sampled row indexes, Note one row can be sampled multiple times
private def sample(numRows: Int): ArrayBuffer[Int] = {
val buf = new ArrayBuffer[Int]
for (rowIdx <- 0 until numRows) {
var rowIdx = 0
while (rowIdx < numRows) {
// invoke PoissonSampler sample
val rowCount = super.sample()
if (rowCount > 0) {
numOutputRows += rowCount
for (_ <- 0 until rowCount) {
var i = 0
while (i < rowCount) {
buf += rowIdx
i = i + 1
}
}
rowIdx += 1
}
buf
}
Expand Down