Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform explicit UnsafeRow projection in ColumnarToRow transition #5274

Merged
merged 6 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ The tests can be enabled by appending the option `--fuzz_test` to the command.

To reproduce an error appearing in the fuzz tests, you also need to add the flag `--debug_tmp_path` to save the test data.

### Enabling Apache Iceberg tests

Some tests require that Apache Iceberg has been configured in the Spark environment and cannot run
properly without it. These tests assume Iceberg is not configured and are disabled by default.
If Spark has been configured to support Iceberg then these tests can be enabled by adding the
`--iceberg` option to the command.

## Writing tests

There are a number of libraries provided to help someone write new tests.
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ def pytest_addoption(parser):
parser.addoption(
"--fuzz_test", action='store_true', default=False, help="if true enable fuzz tests"
)
parser.addoption(
"--iceberg", action="store_true", default=False, help="if true enable Iceberg tests"
)
1 change: 1 addition & 0 deletions integration_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ markers =
nightly_gpu_mem_consuming_case: case in nightly_resource_consuming_test that consume much more GPU memory than normal cases
nightly_host_mem_consuming_case: case in nightly_resource_consuming_test that consume much more host memory than normal cases
fuzz_test: Mark fuzz tests
iceberg: Mark a test that requires Iceberg has been configured, skipping if tests are not configured for Iceberg
filterwarnings =
ignore:.*pytest.mark.order.*:_pytest.warning_types.PytestUnknownMarkWarning
4 changes: 4 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ def pytest_runtest_setup(item):
else:
_limit = -1

if item.get_closest_marker('iceberg'):
if not item.config.getoption('iceberg'):
pytest.skip('Iceberg tests not configured to run')

def pytest_configure(config):
global _runtime_env
_runtime_env = config.getoption('runtime_env')
Expand Down
28 changes: 28 additions & 0 deletions integration_tests/src/main/python/iceberg_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from asserts import assert_gpu_and_cpu_are_equal_collect
from marks import allow_non_gpu, iceberg
from spark_session import with_cpu_session

@allow_non_gpu('BatchScanExec')
@iceberg
def test_iceberg_fallback_not_unsafe_row(spark_tmp_table_factory):
table = spark_tmp_table_factory.get()
def setup_iceberg_table(spark):
spark.sql("CREATE TABLE {} (id BIGINT, data STRING) USING ICEBERG".format(table))
spark.sql("INSERT INTO {} VALUES (1, 'a'), (2, 'b'), (3, 'c')".format(table))
with_cpu_session(setup_iceberg_table)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT COUNT(DISTINCT id) from {}".format(table)))
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@
shuffle_test = pytest.mark.shuffle_test
nightly_gpu_mem_consuming_case = pytest.mark.nightly_gpu_mem_consuming_case
nightly_host_mem_consuming_case = pytest.mark.nightly_host_mem_consuming_case
fuzz_test = pytest.mark.fuzz_test
fuzz_test = pytest.mark.fuzz_test
iceberg = pytest.mark.iceberg
22 changes: 21 additions & 1 deletion jenkins/databricks/test.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#
# Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,6 +76,14 @@ TEST_MODE=${TEST_MODE:-'IT_ONLY'}
TEST_TYPE="nightly"
PCBS_CONF="com.nvidia.spark.ParquetCachedBatchSerializer"

ICEBERG_VERSION=0.13.1
ICEBERG_SPARK_VER=$(echo $BASE_SPARK_VER | cut -d. -f1,2)
ICEBERG_CONFS="--packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPARK_VER}_2.12:${ICEBERG_VERSION} \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=/tmp/spark-warehouse-$$"

# Enable event log for qualification & profiling tools testing
export PYSP_TEST_spark_eventLog_enabled=true
mkdir -p /tmp/spark-events
Expand All @@ -100,6 +108,12 @@ if [ -d "$LOCAL_JAR_PATH" ]; then
LOCAL_JAR_PATH=$LOCAL_JAR_PATH SPARK_SUBMIT_FLAGS="$SPARK_CONF $CUDF_UDF_TEST_ARGS" TEST_PARALLEL=1 \
bash $LOCAL_JAR_PATH/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m "cudf_udf" --cudf_udf --test_type=$TEST_TYPE
fi

if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "ICEBERG_ONLY" ]]; then
## Run Iceberg tests
LOCAL_JAR_PATH=$LOCAL_JAR_PATH SPARK_SUBMIT_FLAGS="$SPARK_CONF $ICEBERG_CONFS" TEST_PARALLEL=1 \
bash $LOCAL_JAR_PATH/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m iceberg --iceberg --test_type=$TEST_TYPE
fi
else
if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then
## Run tests with jars building from the spark-rapids source code
Expand All @@ -118,4 +132,10 @@ else
SPARK_SUBMIT_FLAGS="$SPARK_CONF $CUDF_UDF_TEST_ARGS" TEST_PARALLEL=1 \
bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m "cudf_udf" --cudf_udf --test_type=$TEST_TYPE
fi

if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "ICEBERG_ONLY" ]]; then
## Run Iceberg tests
SPARK_SUBMIT_FLAGS="$SPARK_CONF $ICEBERG_CONFS" TEST_PARALLEL=1 \
bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m iceberg --iceberg --test_type=$TEST_TYPE
fi
fi
28 changes: 28 additions & 0 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,25 @@ export SCRIPT_PATH="$(pwd -P)"
export TARGET_DIR="$SCRIPT_PATH/target"
mkdir -p $TARGET_DIR

run_iceberg_tests() {
ICEBERG_VERSION="0.13.1"
# get the major/minor version of Spark
ICEBERG_SPARK_VER=$(echo $SPARK_VER | cut -d. -f1,2)

# Iceberg does not support Spark 3.3+ yet
if [[ "$ICEBERG_SPARK_VER" < "3.3" ]]; then
SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $SEQ_CONF \
--packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPARK_VER}_2.12:${ICEBERG_VERSION} \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=/tmp/spark-warehouse-$$" \
./run_pyspark_from_build.sh -m iceberg --iceberg
else
echo "Skipping Iceberg tests. Iceberg does not support Spark $ICEBERG_SPARK_VER"
fi
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}

run_test_not_parallel() {
local TEST=${1//\.py/}
local LOG_FILE
Expand All @@ -179,6 +198,10 @@ run_test_not_parallel() {
./run_pyspark_from_build.sh -k cache_test
;;

iceberg)
run_iceberg_tests
;;

*)
echo -e "\n\n>>>>> $TEST...\n"
LOG_FILE="$TARGET_DIR/$TEST.log"
Expand Down Expand Up @@ -276,6 +299,11 @@ if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "CUDF_UDF_ONLY" ]]; then
run_test_not_parallel cudf_udf_test
fi

# Iceberg tests
if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "ICEBERG_ONLY" ]]; then
run_test_not_parallel iceberg
fi

popd
stop-slave.sh
stop-master.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import ai.rapids.cudf.NvtxRange;
import ai.rapids.cudf.Table;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.DataType;
Expand All @@ -38,12 +39,14 @@
import java.util.Optional;

/**
* This class converts UnsafeRow instances to ColumnarBatches on the GPU through the magic of
* This class converts InternalRow instances to ColumnarBatches on the GPU through the magic of
* code generation. This just provides most of the framework a concrete implementation will
* be generated based off of the schema.
* The InternalRow instances are first converted to UnsafeRow, cheaply if the instance is already
* UnsafeRow, and then the UnsafeRow data is collected into a ColumnarBatch.
*/
public abstract class UnsafeRowToColumnarBatchIterator implements Iterator<ColumnarBatch> {
protected final Iterator<UnsafeRow> input;
public abstract class InternalRowToColumnarBatchIterator implements Iterator<ColumnarBatch> {
protected final Iterator<InternalRow> input;
protected UnsafeRow pending = null;
protected final int numRowsEstimate;
protected final long dataLength;
Expand All @@ -56,8 +59,8 @@ public abstract class UnsafeRowToColumnarBatchIterator implements Iterator<Colum
protected final GpuMetric numOutputRows;
protected final GpuMetric numOutputBatches;

protected UnsafeRowToColumnarBatchIterator(
Iterator<UnsafeRow> input,
protected InternalRowToColumnarBatchIterator(
Iterator<InternalRow> input,
Attribute[] schema,
CoalesceSizeGoal goal,
GpuMetric semaphoreWaitTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder, SpecializedGetters, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodegenContext, CodeGenerator}
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, NamedExpression, SortOrder, SpecializedGetters, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodegenContext, CodeGenerator, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.TrampolineUtil
Expand Down Expand Up @@ -679,27 +679,40 @@ class RowToColumnarIterator(
}
}

object GeneratedUnsafeRowToCudfRowIterator extends Logging {
def apply(input: Iterator[UnsafeRow],
object GeneratedInternalRowToCudfRowIterator extends Logging {
def apply(input: Iterator[InternalRow],
schema: Array[Attribute],
goal: CoalesceSizeGoal,
semaphoreWaitTime: GpuMetric,
streamTime: GpuMetric,
opTime: GpuMetric,
numInputRows: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric): UnsafeRowToColumnarBatchIterator = {
numOutputBatches: GpuMetric): InternalRowToColumnarBatchIterator = {
val ctx = new CodegenContext

ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName)
ctx.addReferenceObj("schema", schema, classOf[Array[Attribute]].getName)
ctx.addReferenceObj("goal", goal, classOf[CoalesceSizeGoal].getName)
ctx.addReferenceObj("semaphoreWaitTime", semaphoreWaitTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("streamTime", streamTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("opTime", opTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("numInputRows", numInputRows, classOf[GpuMetric].getName)
ctx.addReferenceObj("numOutputRows", numOutputRows, classOf[GpuMetric].getName)
ctx.addReferenceObj("numOutputBatches", numOutputBatches, classOf[GpuMetric].getName)
// setup code generation context to use our custom row variable
val internalRow = ctx.freshName("internalRow")
ctx.currentVars = null
ctx.INPUT_ROW = internalRow

val generateUnsafeProj = GenerateUnsafeProjection.createCode(ctx,
schema.zipWithIndex.map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) }
)

val iterRef = ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName)
val schemaRef = ctx.addReferenceObj("schema", schema,
classOf[Array[Attribute]].getCanonicalName)
abellina marked this conversation as resolved.
Show resolved Hide resolved
val goalRef = ctx.addReferenceObj("goal", goal, classOf[CoalesceSizeGoal].getName)
val semaphoreWaitTimeRef = ctx.addReferenceObj("semaphoreWaitTime", semaphoreWaitTime,
classOf[GpuMetric].getName)
val streamTimeRef = ctx.addReferenceObj("streamTime", streamTime, classOf[GpuMetric].getName)
val opTimeRef = ctx.addReferenceObj("opTime", opTime, classOf[GpuMetric].getName)
val numInputRowsRef = ctx.addReferenceObj("numInputRows", numInputRows,
classOf[GpuMetric].getName)
val numOutputRowsRef = ctx.addReferenceObj("numOutputRows", numOutputRows,
classOf[GpuMetric].getName)
val numOutputBatchesRef = ctx.addReferenceObj("numOutputBatches", numOutputBatches,
classOf[GpuMetric].getName)

val rowBaseObj = ctx.freshName("rowBaseObj")
val rowBaseOffset = ctx.freshName("rowBaseOffset")
Expand Down Expand Up @@ -753,23 +766,26 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
val codeBody =
s"""
|public java.lang.Object generate(Object[] references) {
| return new SpecificUnsafeRowToColumnarBatchIterator(references);
| return new SpecificInternalRowToColumnarBatchIterator(references);
|}
|
|final class SpecificUnsafeRowToColumnarBatchIterator extends ${classOf[UnsafeRowToColumnarBatchIterator].getName} {
|final class SpecificInternalRowToColumnarBatchIterator extends ${classOf[InternalRowToColumnarBatchIterator].getName} {
| private final org.apache.spark.sql.catalyst.expressions.UnsafeProjection unsafeProj;
|
| ${ctx.declareMutableStates()}
|
| public SpecificUnsafeRowToColumnarBatchIterator(Object[] references) {
| super((scala.collection.Iterator<UnsafeRow>)references[0],
| (org.apache.spark.sql.catalyst.expressions.Attribute[])references[1],
| (com.nvidia.spark.rapids.CoalesceSizeGoal)references[2],
| (com.nvidia.spark.rapids.GpuMetric)references[3],
| (com.nvidia.spark.rapids.GpuMetric)references[4],
| (com.nvidia.spark.rapids.GpuMetric)references[5],
| (com.nvidia.spark.rapids.GpuMetric)references[6],
| (com.nvidia.spark.rapids.GpuMetric)references[7],
| (com.nvidia.spark.rapids.GpuMetric)references[8]);
| public SpecificInternalRowToColumnarBatchIterator(Object[] references) {
| super(
abellina marked this conversation as resolved.
Show resolved Hide resolved
| $iterRef,
| $schemaRef,
| $goalRef,
| $semaphoreWaitTimeRef,
| $streamTimeRef,
| $opTimeRef,
| $numInputRowsRef,
| $numOutputRowsRef,
| $numOutputBatchesRef);
|
| ${ctx.initMutableStates()}
| }
|
Expand All @@ -793,7 +809,13 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
| row = pending;
| pending = null;
| } else {
| row = (UnsafeRow)input.next();
| InternalRow $internalRow = (InternalRow) input.next();
| if ($internalRow instanceof UnsafeRow) {
| row = (UnsafeRow) $internalRow;
| } else {
| ${generateUnsafeProj.code}
| row = ${generateUnsafeProj.value};
| }
| }
| int numBytesUsedByRow = copyInto(row, dataBaseAddress + dataOffset, endDataAddress);
| offsetsBuffer.setInt(offsetIndex, dataOffset);
Expand Down Expand Up @@ -834,7 +856,7 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
logDebug(s"code for ${schema.mkString(",")}:\n${CodeFormatter.format(code)}")

val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[UnsafeRowToColumnarBatchIterator]
clazz.generate(ctx.references.toArray).asInstanceOf[InternalRowToColumnarBatchIterator]
}
}

Expand Down Expand Up @@ -905,9 +927,8 @@ case class GpuRowToColumnarExec(child: SparkPlan,
if ((1 until 100000000).contains(output.length) &&
CudfRowTransitions.areAllSupported(output)) {
val localOutput = output
rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator(
rowIter.asInstanceOf[Iterator[UnsafeRow]],
localOutput.toArray, localGoal, semaphoreWaitTime, streamTime, opTime,
rowBased.mapPartitions(rowIter => GeneratedInternalRowToCudfRowIterator(
rowIter, localOutput.toArray, localGoal, semaphoreWaitTime, streamTime, opTime,
numInputRows, numOutputRows, numOutputBatches))
} else {
val converters = new GpuRowToColumnConverter(localSchema)
Expand Down