Skip to content

Commit

Permalink
Improve columnarCopy for HostColumnarToGpu (#4770)
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <lovedreamf@gmail.com>

Improve columnarCopy for HostColumnarToGpu
  • Loading branch information
sperlingxx authored Feb 16, 2022
1 parent 342cbca commit 73ad70d
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 190 deletions.
57 changes: 29 additions & 28 deletions docs/tuning-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,34 +306,35 @@ performance.

Custom Spark SQL Metrics are available which can help identify performance bottlenecks in a query.

| Key | Name | Description |
|------------------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| bufferTime | buffer time | Time spent buffering input from file data sources. This buffering time happens on the CPU, typically with no GPU semaphore held. |
| readFsTime | time to read fs data | Time spent actually reading the data and writing it to on-heap memory. This is a part of `bufferTime` |
| writeBufferTime | time to write data to buffer | Time spent moving the on-heap buffered data read from the file system to off-heap memory so the GPU can access it. This is a part of `bufferTime` |
| buildDataSize | build side size | Size in bytes of the build-side of a join. |
| buildTime | build time | Time to load the build-side of a join. |
| collectTime | collect time | For a broadcast the amount of time it took to collect the broadcast data back to the driver before broadcasting it back out. |
| computeAggTime | aggregation time | Time computing an aggregation. |
| concatTime | concat batch time | Time to concatenate batches. |
| filterTime | filter time | Time spent applying filters within other operators, such as joins. |
| gpuDecodeTime | GPU decode time | Time spent on GPU decoding encrypted or compressed data. |
| joinOutputRows | join output rows | The number of rows produced by a join before any filter expression is applied. |
| joinTime | join time | Time doing a join operation. |
| numInputBatches | input columnar batches | Number of columnar batches that the operator received from its child operator(s). |
| numInputRows | input rows | Number of rows that the operator received from its child operator(s). |
| numOutputBatches | output columnar batches | Number of columnar batches that the operator outputs. |
| numOutputRows | output rows | Number of rows that the operator outputs. |
| numPartitions | partitions | Number of output partitions from a file scan or shuffle exchange. |
| opTime | op time | Time that an operator takes, exclusive of the time for executing or fetching results from child operators, and typically outside of the time it takes to acquire the GPU semaphore. |
| partitionSize | partition data size | Total size in bytes of output partitions. |
| peakDevMemory | peak device memory | Peak GPU memory used during execution of an operator. |
| semaphoreWaitTime| GPU semaphore wait time | Time spent waiting for the GPU semaphore. |
| sortTime | sort time | Time spent in sort operations in GpuSortExec and GpuTopN. |
| spillData | bytes spilled from GPU | Total bytes spilled from GPU. |
| spillDisk | bytes spilled to disk | Total bytes spilled from GPU to disk. |
| spillHost | bytes spilled to host | Total bytes spilled from GPU to host memory. |
| streamTime | stream time | Time spent reading data from a child. This generally happens for the stream side of a hash join or for columnar to row and row to columnar operations. |
| Key | Name | Description |
|-------------------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| bufferTime | buffer time | Time spent buffering input from file data sources. This buffering time happens on the CPU, typically with no GPU semaphore held. |
| readFsTime | time to read fs data | Time spent actually reading the data and writing it to on-heap memory. This is a part of `bufferTime` |
| writeBufferTime | time to write data to buffer | Time spent moving the on-heap buffered data read from the file system to off-heap memory so the GPU can access it. This is a part of `bufferTime` |
| buildDataSize | build side size | Size in bytes of the build-side of a join. |
| buildTime | build time | Time to load the build-side of a join. |
| collectTime | collect time | For a broadcast the amount of time it took to collect the broadcast data back to the driver before broadcasting it back out. |
| computeAggTime | aggregation time | Time computing an aggregation. |
| concatTime | concat batch time | Time to concatenate batches. |
| copyBufferTime | copy buffer time | Time spent on copying upstreaming data into Rapids buffers. |
| filterTime | filter time | Time spent applying filters within other operators, such as joins. |
| gpuDecodeTime | GPU decode time | Time spent on GPU decoding encrypted or compressed data. |
| joinOutputRows | join output rows | The number of rows produced by a join before any filter expression is applied. |
| joinTime | join time | Time doing a join operation. |
| numInputBatches | input columnar batches | Number of columnar batches that the operator received from its child operator(s). |
| numInputRows | input rows | Number of rows that the operator received from its child operator(s). |
| numOutputBatches | output columnar batches | Number of columnar batches that the operator outputs. |
| numOutputRows | output rows | Number of rows that the operator outputs. |
| numPartitions | partitions | Number of output partitions from a file scan or shuffle exchange. |
| opTime | op time | Time that an operator takes, exclusive of the time for executing or fetching results from child operators, and typically outside of the time it takes to acquire the GPU semaphore. |
| partitionSize | partition data size | Total size in bytes of output partitions. |
| peakDevMemory | peak device memory | Peak GPU memory used during execution of an operator. |
| semaphoreWaitTime | GPU semaphore wait time | Time spent waiting for the GPU semaphore. |
| sortTime | sort time | Time spent in sort operations in GpuSortExec and GpuTopN. |
| spillData | bytes spilled from GPU | Total bytes spilled from GPU. |
| spillDisk | bytes spilled to disk | Total bytes spilled from GPU to disk. |
| spillHost | bytes spilled to host | Total bytes spilled from GPU to host memory. |
| streamTime | stream time | Time spent reading data from a child. This generally happens for the stream side of a hash join or for columnar to row and row to columnar operations. |

Not all metrics are enabled by default. The configuration setting `spark.rapids.sql.metrics.level` can be set
to `DEBUG`, `MODERATE`, or `ESSENTIAL`, with `MODERATE` being the default value. More information about this
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import ai.rapids.cudf.HostColumnVector.ColumnBuilder;

import org.apache.spark.sql.vectorized.ColumnVector;

/**
* A helper class which efficiently transfers different types of host columnar data into cuDF.
* It is written in Java for two reasons:
* 1. Scala for-loop is slower (Scala while-loop is identical to Java loop)
* 2. Both ColumnBuilder and ColumnVector are Java classes
*/
public class ColumnarCopyHelper {

public static void nullCopy(ColumnBuilder b, int rows) {
for (int i = 0; i < rows; i++) {
b.appendNull();
}
}

public static void booleanCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getBoolean(i));
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getBoolean(i));
}
}
}

public static void byteCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getByte(i));
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getByte(i));
}
}
}

public static void shortCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getShort(i));
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getShort(i));
}
}
}

public static void intCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getInt(i));
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getInt(i));
}
}
}

public static void longCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getLong(i));
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getLong(i));
}
}
}

public static void floatCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getFloat(i));
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getFloat(i));
}
}
}

public static void doubleCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getDouble(i));
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getDouble(i));
}
}
}

public static void stringCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.appendUTF8String(cv.getUTF8String(i).getBytes());
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.appendUTF8String(cv.getUTF8String(i).getBytes());
}
}
}

// TODO: https://github.com/NVIDIA/spark-rapids/issues/4784
public static void decimal32Copy(ColumnVector cv, ColumnBuilder b, int rows,
int precision, int scale) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append((int) cv.getDecimal(i, precision, scale).toUnscaledLong());
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append((int) cv.getDecimal(i, precision, scale).toUnscaledLong());
}
}
}

public static void decimal64Copy(ColumnVector cv, ColumnBuilder b, int rows,
int precision, int scale) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getDecimal(i, precision, scale).toUnscaledLong());
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getDecimal(i, precision, scale).toUnscaledLong());
}
}
}

public static void decimal128Copy(ColumnVector cv, ColumnBuilder b, int rows,
int precision, int scale) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) {
b.append(cv.getDecimal(i, precision, scale).toJavaBigDecimal());
}
return;
}
for (int i = 0; i < rows; i++) {
if (cv.isNullAt(i)) {
b.appendNull();
} else {
b.append(cv.getDecimal(i, precision, scale).toJavaBigDecimal());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -387,7 +387,7 @@ public GpuColumnarBatchBuilder(StructType schema, int rows) {
}

public void copyColumnar(ColumnVector cv, int colNum, boolean nullable, int rows) {
HostColumnarToGpu.columnarCopy(cv, builder(colNum), nullable, rows);
HostColumnarToGpu.columnarCopy(cv, builder(colNum), rows);
}

public ai.rapids.cudf.HostColumnVector.ColumnBuilder builder(int i) {
Expand Down
Loading

0 comments on commit 73ad70d

Please sign in to comment.