From 4019778590ddf8c0c18f644e5ec04d59c73c770d Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 13 Nov 2020 08:54:42 -0600 Subject: [PATCH] Support lists to/from the GPU (#1104) Signed-off-by: Robert (Bobby) Evans --- integration_tests/run_pyspark_from_build.sh | 11 +- .../src/main/python/array_test.py | 47 +++ integration_tests/src/main/python/data_gen.py | 21 ++ .../src/main/python/row_conversion_test.py | 38 +++ .../src/main/python/string_test.py | 13 +- .../nvidia/spark/rapids/GpuColumnVector.java | 30 +- .../spark/rapids/RapidsHostColumnVector.java | 120 +------- .../rapids/RapidsHostColumnVectorCore.java | 108 +++++-- .../nvidia/spark/rapids/GpuOverrides.scala | 15 +- .../spark/rapids/GpuRowToColumnarExec.scala | 279 +++++++++++------- .../com/nvidia/spark/rapids/RapidsMeta.scala | 4 +- .../sql/rapids/complexTypeExtractors.scala | 22 +- 12 files changed, 417 insertions(+), 291 deletions(-) create mode 100644 integration_tests/src/main/python/array_test.py create mode 100644 integration_tests/src/main/python/row_conversion_test.py diff --git a/integration_tests/run_pyspark_from_build.sh b/integration_tests/run_pyspark_from_build.sh index 84b818dcf5cb..1095b4df4baf 100755 --- a/integration_tests/run_pyspark_from_build.sh +++ b/integration_tests/run_pyspark_from_build.sh @@ -53,10 +53,8 @@ else if python -c 'import findspark'; then echo "FOUND findspark" - FIND_SPARK=1 else TEST_PARALLEL=0 - FIND_SPARK=0 echo "findspark not installed cannot run tests in parallel" fi if python -c 'import xdist.plugin'; @@ -71,16 +69,16 @@ else then # With xdist 0 and 1 are the same parallelsm but # 0 is more effecient - TEST_PARALLEL="" + TEST_PARALLEL_OPTS="" MEMORY_FRACTION='1' else MEMORY_FRACTION=`python -c "print(1/($TEST_PARALLEL + 1))"` - TEST_PARALLEL="-n $TEST_PARALLEL" + TEST_PARALLEL_OPTS="-n $TEST_PARALLEL" fi RUN_DIR="$SCRIPTPATH"/target/run_dir mkdir -p "$RUN_DIR" cd "$RUN_DIR" - if [[ "${FIND_SPARK}" == "1" ]]; + if [[ "${TEST_PARALLEL_OPTS}" != "" ]]; then export PYSP_TEST_spark_driver_extraClassPath="${ALL_JARS// /:}" export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS" @@ -93,7 +91,7 @@ else python \ "$SCRIPTPATH"/runtests.py --rootdir "$SCRIPTPATH" "$SCRIPTPATH"/src/main/python \ - $TEST_PARALLEL \ + $TEST_PARALLEL_OPTS \ -v -rfExXs "$TEST_TAGS" \ --std_input_path="$SCRIPTPATH"/src/test/resources/ \ "$TEST_ARGS" \ @@ -112,6 +110,5 @@ else "$TEST_ARGS" \ $RUN_TEST_PARAMS \ "$@" - fi fi diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py new file mode 100644 index 000000000000..0603e867f741 --- /dev/null +++ b/integration_tests/src/main/python/array_test.py @@ -0,0 +1,47 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect +from data_gen import * +from marks import incompat +from pyspark.sql.types import * +import pyspark.sql.functions as f + +# Once we support arrays as literals then we can support a[null] and +# negative indexes for all array gens. When that happens +# test_nested_array_index should go away and this should test with +# array_gens_sample instead +@pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn) +def test_array_index(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).selectExpr( + 'a[0]', + 'a[1]', + 'a[null]', + 'a[3]', + 'a[50]', + 'a[-1]')) + +# Once we support arrays as literals then we can support a[null] for +# all array gens. See test_array_index for more info +@pytest.mark.parametrize('data_gen', nested_array_gens_sample, ids=idfn) +def test_nested_array_index(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).selectExpr( + 'a[0]', + 'a[1]', + 'a[3]', + 'a[50]')) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 0f9768945508..f37a889bbc7a 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -231,6 +231,9 @@ def __init__(self, child, length): self._length = length self._index = 0 + def __repr__(self): + return super().__repr__() + '(' + str(self._child) + ')' + def _loop_values(self): ret = self._vals[self._index] self._index = (self._index + 1) % self._length @@ -362,6 +365,9 @@ def __init__(self, children, nullable=True, special_cases=[]): super().__init__(StructType(tmp), nullable=nullable, special_cases=special_cases) self.children = children + def __repr__(self): + return super().__repr__() + '(' + ','.join([str(i) for i in self.children]) + ')' + def start(self, rand): for name, child in self.children: child.start(rand) @@ -482,6 +488,9 @@ def __init__(self, child_gen, min_length=0, max_length=100, nullable=True): self._max_length = max_length self._child_gen = child_gen + def __repr__(self): + return super().__repr__() + '(' + str(self._child_gen) + ')' + def start(self, rand): self._child_gen.start(rand) def gen_array(): @@ -643,10 +652,12 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): all_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, date_gen, timestamp_gen] +# TODO add in some array generators to this once that is supported for sorting # a selection of generators that should be orderable (sortable and compareable) orderable_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, date_gen, timestamp_gen] +# TODO add in some array generators to this once that is supported for these operations # a selection of generators that can be compared for equality eq_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, date_gen, timestamp_gen] @@ -655,3 +666,13 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): date_n_time_gens = [date_gen, timestamp_gen] boolean_gens = [boolean_gen] + +single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens] + +# Be careful to not make these too large of data generation takes for ever +# This is only a few nested array gens, because nesting can be very deep +nested_array_gens_sample = [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), + ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10)] + +# Some array gens, but not all because of nesting +array_gens_sample = single_level_array_gens + nested_array_gens_sample diff --git a/integration_tests/src/main/python/row_conversion_test.py b/integration_tests/src/main/python/row_conversion_test.py new file mode 100644 index 000000000000..fffd47340c06 --- /dev/null +++ b/integration_tests/src/main/python/row_conversion_test.py @@ -0,0 +1,38 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect +from data_gen import * +from marks import incompat, approximate_float +from pyspark.sql.types import * +import pyspark.sql.functions as f + + +# This is one of the most basic tests where we verify that we can +# move data onto and off of the GPU without messing up. All data +# that comes from data_gen is row formatted, with how pyspark +# currently works and when we do a collect all of that data has +# to be brought back to the CPU (rows) to be returned. +# So we just need a very simple operation in the middle that +# can be done on the GPU. +def test_row_conversions(): + gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen], + ["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen], + ["i", timestamp_gen], ["j", date_gen], ["k", ArrayGen(byte_gen)], + ["l", ArrayGen(string_gen)], ["m", ArrayGen(float_gen)], + ["n", ArrayGen(boolean_gen)], ["o", ArrayGen(ArrayGen(short_gen))]] + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again")) diff --git a/integration_tests/src/main/python/string_test.py b/integration_tests/src/main/python/string_test.py index 372f42723503..067cafc8356e 100644 --- a/integration_tests/src/main/python/string_test.py +++ b/integration_tests/src/main/python/string_test.py @@ -23,19 +23,14 @@ def mk_str_gen(pattern): return StringGen(pattern).with_special_case('').with_special_pattern('.{0,10}') -# Because of limitations in array support we need to combine these two together to make -# this work. This should be split up into separate tests once support is better. -def test_split_with_array_index(): +def test_split(): data_gen = mk_str_gen('([ABC]{0,3}_?){0,7}') delim = '_' assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr( - 'split(a, "AB")[0]', - 'split(a, "_")[1]', - 'split(a, "_")[null]', - 'split(a, "_")[3]', - 'split(a, "_")[0]', - 'split(a, "_")[-1]')) + 'split(a, "AB")', + 'split(a, "C")', + 'split(a, "_")')) @pytest.mark.parametrize('data_gen,delim', [(mk_str_gen('([ABC]{0,3}_?){0,7}'), '_'), (mk_str_gen('([MNP_]{0,3}\\.?){0,5}'), '.'), diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 7a63c64ebfa2..834fd0aa9055 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -39,6 +39,24 @@ */ public class GpuColumnVector extends GpuColumnVectorBase { + private static HostColumnVector.DataType convertFrom(DataType spark, boolean nullable) { + if (spark instanceof ArrayType) { + ArrayType arrayType = (ArrayType) spark; + return new HostColumnVector.ListType(nullable, + convertFrom(arrayType.elementType(), arrayType.containsNull())); + } else if (spark instanceof MapType) { + MapType mapType = (MapType) spark; + return new HostColumnVector.ListType(nullable, + new HostColumnVector.StructType(false, Arrays.asList( + convertFrom(mapType.keyType(), false), + convertFrom(mapType.valueType(), mapType.valueContainsNull()) + ))); + } else { + // Only works for basic types + return new HostColumnVector.BasicType(nullable, getRapidsType(spark)); + } + } + public static final class GpuColumnarBatchBuilder implements AutoCloseable { private final ai.rapids.cudf.HostColumnVector.ColumnBuilder[] builders; private final StructField[] fields; @@ -60,17 +78,9 @@ public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) try { for (int i = 0; i < len; i++) { StructField field = fields[i]; - if (field.dataType() instanceof MapType) { - builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.ListType(true, - new HostColumnVector.StructType(true, Arrays.asList( - new HostColumnVector.BasicType(true, DType.STRING), - new HostColumnVector.BasicType(true, DType.STRING)))), rows); - } else { - DType type = getRapidsType(field); - builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.BasicType(true, type), rows); - } - success = true; + builders[i] = new HostColumnVector.ColumnBuilder(convertFrom(field.dataType(), field.nullable()), rows); } + success = true; } finally { if (!success) { for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) { diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java index 4f20c9a9ca1f..22f890ab9d08 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java @@ -17,18 +17,8 @@ package com.nvidia.spark.rapids; -import ai.rapids.cudf.DType; -import ai.rapids.cudf.HostColumnVectorCore; -import ai.rapids.cudf.HostMemoryBuffer; - import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.MapType; -import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.spark.sql.vectorized.ColumnarMap; -import org.apache.spark.unsafe.types.UTF8String; /** * A GPU accelerated version of the Spark ColumnVector. @@ -36,7 +26,7 @@ * is on the host, and we want to keep as much of the data on the device as possible. * We also provide GPU accelerated versions of the transitions to and from rows. */ -public final class RapidsHostColumnVector extends ColumnVector { +public final class RapidsHostColumnVector extends RapidsHostColumnVectorCore { /** * Get the underlying host cudf columns from the batch. This does not increment any @@ -74,123 +64,21 @@ public static RapidsHostColumnVector[] extractColumns(ColumnarBatch batch) { * Sets up the data type of this column vector. */ RapidsHostColumnVector(DataType type, ai.rapids.cudf.HostColumnVector cudfCv) { - super(type); + super(type, cudfCv); // TODO need some checks to be sure everything matches this.cudfCv = cudfCv; } - public RapidsHostColumnVector incRefCount() { + public final RapidsHostColumnVector incRefCount() { // Just pass through the reference counting cudfCv.incRefCount(); return this; } - @Override - public void close() { - // Just pass through - cudfCv.close(); - } - - @Override - public boolean hasNull() { - return cudfCv.hasNulls(); - } - - @Override - public int numNulls() { - return (int) cudfCv.getNullCount(); - } - - @Override - public boolean isNullAt(int rowId) { - return cudfCv.isNull(rowId); - } - - @Override - public boolean getBoolean(int rowId) { - return cudfCv.getBoolean(rowId); - } - - @Override - public byte getByte(int rowId) { - return cudfCv.getByte(rowId); - } - - @Override - public short getShort(int rowId) { - return cudfCv.getShort(rowId); - } - - @Override - public int getInt(int rowId) { - return cudfCv.getInt(rowId); - } - - @Override - public long getLong(int rowId) { - return cudfCv.getLong(rowId); - } - - @Override - public float getFloat(int rowId) { - return cudfCv.getFloat(rowId); - } - - @Override - public double getDouble(int rowId) { - return cudfCv.getDouble(rowId); - } - - @Override - public ColumnarArray getArray(int rowId) { - throw new IllegalStateException("Arrays are currently not supported by rapids cudf"); - } - - @Override - public ColumnarMap getMap(int ordinal) { - MapType mt = (MapType) dataType(); - ai.rapids.cudf.ColumnViewAccess structHcv = cudfCv.getChildColumnViewAccess(0); - // keys - ai.rapids.cudf.ColumnViewAccess firstHcv = structHcv.getChildColumnViewAccess(0); - HostColumnVectorCore firstHcvCore = (HostColumnVectorCore) firstHcv; - // values - ai.rapids.cudf.ColumnViewAccess secondHcv = structHcv.getChildColumnViewAccess(1); - HostColumnVectorCore secondHcvCore = (HostColumnVectorCore) secondHcv; - - RapidsHostColumnVectorCore firstChild = new RapidsHostColumnVectorCore(mt.keyType(), firstHcvCore); - RapidsHostColumnVectorCore secondChild = new RapidsHostColumnVectorCore(mt.valueType(), secondHcvCore); - int startOffset = cudfCv.getOffsetBuffer().getInt(ordinal * DType.INT32.getSizeInBytes()); - return new ColumnarMap(firstChild, secondChild, startOffset, - cudfCv.getOffsetBuffer().getInt((ordinal + 1) * DType.INT32.getSizeInBytes()) - startOffset); - } - - @Override - public Decimal getDecimal(int rowId, int precision, int scale) { - throw new IllegalStateException("The decimal type is currently not supported by rapids cudf"); - } - - @Override - public UTF8String getUTF8String(int rowId) { - // TODO need a cheaper way to go directly to the String - return UTF8String.fromString(cudfCv.getJavaString(rowId)); - } - - @Override - public byte[] getBinary(int rowId) { - throw new IllegalStateException("Binary data access is currently not supported by rapids cudf"); - } - - @Override - public ColumnVector getChild(int ordinal) { - throw new IllegalStateException("Struct and struct like types are currently not supported by rapids cudf"); - } - - public ai.rapids.cudf.HostColumnVector getBase() { + public final ai.rapids.cudf.HostColumnVector getBase() { return cudfCv; } - public long getRowCount() { return cudfCv.getRowCount(); } - public GpuColumnVector copyToDevice() { return new GpuColumnVector(type, cudfCv.copyToDevice()); } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java index 001d856903ab..dc47f3693a29 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java @@ -17,8 +17,13 @@ package com.nvidia.spark.rapids; +import ai.rapids.cudf.ColumnViewAccess; +import ai.rapids.cudf.HostColumnVectorCore; +import ai.rapids.cudf.HostMemoryBuffer; +import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; @@ -31,109 +36,150 @@ * is on the host, and we want to keep as much of the data on the device as possible. * We also provide GPU accelerated versions of the transitions to and from rows. */ -public final class RapidsHostColumnVectorCore extends ColumnVector { +public class RapidsHostColumnVectorCore extends ColumnVector { - private final ai.rapids.cudf.HostColumnVectorCore cudfCv; + private final HostColumnVectorCore cudfCv; + private final RapidsHostColumnVectorCore[] cachedChildren; /** * Sets up the data type of this column vector. */ - RapidsHostColumnVectorCore(DataType type, ai.rapids.cudf.HostColumnVectorCore cudfCv) { + RapidsHostColumnVectorCore(DataType type, HostColumnVectorCore cudfCv) { super(type); - // TODO need some checks to be sure everything matches this.cudfCv = cudfCv; - } - - @Override - public void close() { - // Just pass through + if (type instanceof MapType) { + // Map is a special case where we cache 2 children because it really ends up being + // a list of structs in CUDF so the list only has one child, not the key/value of + // stored in the struct + cachedChildren = new RapidsHostColumnVectorCore[2]; + } else { + cachedChildren = new RapidsHostColumnVectorCore[cudfCv.getNumChildren()]; + } + } + + @Override + public final void close() { + for (int i = 0; i < cachedChildren.length; i++) { + RapidsHostColumnVectorCore cv = cachedChildren[i]; + if (cv != null) { + cv.close(); + // avoid double closing this + cachedChildren[i] = null; + } + } cudfCv.close(); } @Override - public boolean hasNull() { + public final boolean hasNull() { return cudfCv.hasNulls(); } @Override - public int numNulls() { + public final int numNulls() { return (int) cudfCv.getNullCount(); } @Override - public boolean isNullAt(int rowId) { + public final boolean isNullAt(int rowId) { return cudfCv.isNull(rowId); } @Override - public boolean getBoolean(int rowId) { + public final boolean getBoolean(int rowId) { return cudfCv.getBoolean(rowId); } @Override - public byte getByte(int rowId) { + public final byte getByte(int rowId) { return cudfCv.getByte(rowId); } @Override - public short getShort(int rowId) { + public final short getShort(int rowId) { return cudfCv.getShort(rowId); } @Override - public int getInt(int rowId) { + public final int getInt(int rowId) { return cudfCv.getInt(rowId); } @Override - public long getLong(int rowId) { + public final long getLong(int rowId) { return cudfCv.getLong(rowId); } @Override - public float getFloat(int rowId) { + public final float getFloat(int rowId) { return cudfCv.getFloat(rowId); } @Override - public double getDouble(int rowId) { + public final double getDouble(int rowId) { return cudfCv.getDouble(rowId); } @Override - public ColumnarArray getArray(int rowId) { - throw new IllegalStateException("Arrays are currently not supported by rapids cudf"); + public final ColumnarArray getArray(int rowId) { + if (cachedChildren[0] == null) { + // cache the child data + ArrayType at = (ArrayType) dataType(); + HostColumnVectorCore data = (HostColumnVectorCore) cudfCv.getChildColumnViewAccess(0); + cachedChildren[0] = new RapidsHostColumnVectorCore(at.elementType(), data); + } + RapidsHostColumnVectorCore data = cachedChildren[0]; + int startOffset = (int) cudfCv.getStartListOffset(rowId); + int endOffset = (int) cudfCv.getEndListOffset(rowId); + return new ColumnarArray(data, startOffset, endOffset - startOffset); } @Override - public ColumnarMap getMap(int ordinal) { - throw new IllegalStateException("Maps are currently not supported by rapids cudf"); + public final ColumnarMap getMap(int ordinal) { + if (cachedChildren[0] == null) { + // Cache the key/value + MapType mt = (MapType) dataType(); + ColumnViewAccess structHcv = cudfCv.getChildColumnViewAccess(0); + // keys + HostColumnVectorCore firstHcvCore = (HostColumnVectorCore) structHcv.getChildColumnViewAccess(0); + + // values + HostColumnVectorCore secondHcvCore = (HostColumnVectorCore) structHcv.getChildColumnViewAccess(1); + + cachedChildren[0] = new RapidsHostColumnVectorCore(mt.keyType(), firstHcvCore); + cachedChildren[1] = new RapidsHostColumnVectorCore(mt.valueType(), secondHcvCore); + } + RapidsHostColumnVectorCore keys = cachedChildren[0]; + RapidsHostColumnVectorCore values = cachedChildren[1]; + + int startOffset = (int) cudfCv.getStartListOffset(ordinal); + int endOffset = (int) cudfCv.getEndListOffset(ordinal); + return new ColumnarMap(keys, values, startOffset,endOffset - startOffset); } @Override - public Decimal getDecimal(int rowId, int precision, int scale) { + public final Decimal getDecimal(int rowId, int precision, int scale) { throw new IllegalStateException("The decimal type is currently not supported by rapids cudf"); } @Override - public UTF8String getUTF8String(int rowId) { - // TODO need a cheaper way to go directly to the String - return UTF8String.fromString(cudfCv.getJavaString(rowId)); + public final UTF8String getUTF8String(int rowId) { + return UTF8String.fromBytes(cudfCv.getUTF8(rowId)); } @Override - public byte[] getBinary(int rowId) { + public final byte[] getBinary(int rowId) { throw new IllegalStateException("Binary data access is currently not supported by rapids cudf"); } @Override - public ColumnVector getChild(int ordinal) { + public final ColumnVector getChild(int ordinal) { throw new IllegalStateException("Struct and struct like types are currently not supported by rapids cudf"); } - public ai.rapids.cudf.HostColumnVectorCore getBase() { + public HostColumnVectorCore getBase() { return cudfCv; } - public long getRowCount() { return cudfCv.getRowCount(); } + public final long getRowCount() { return cudfCv.getRowCount(); } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 75c29e98176d..c721ec4fd8a1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -609,7 +609,10 @@ object GpuOverrides { "Gives a column a name", (a, conf, p, r) => new UnaryExprMeta[Alias](a, conf, p, r) { override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true, allowBinary = true) + GpuOverrides.isSupportedType(t, + allowStringMaps = true, + allowArray = true, + allowNesting = true) override def convertToGpu(child: Expression): GpuExpression = GpuAlias(child, a.name)(a.exprId, a.qualifier, a.explicitMetadata) @@ -618,7 +621,10 @@ object GpuOverrides { "References an input column", (att, conf, p, r) => new BaseExprMeta[AttributeReference](att, conf, p, r) { override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) + GpuOverrides.isSupportedType(t, + allowStringMaps = true, + allowArray = true, + allowNesting = true) // This is the only NOOP operator. It goes away when things are bound override def convertToGpu(): Expression = att @@ -1799,7 +1805,10 @@ object GpuOverrides { (proj, conf, p, r) => { new SparkPlanMeta[ProjectExec](proj, conf, p, r) { override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) + GpuOverrides.isSupportedType(t, + allowStringMaps = true, + allowArray = true, + allowNesting = true) override def convertToGpu(): GpuExec = GpuProjectExec(childExprs.map(_.convertToGpu()), childPlans(0).convertIfNeeded()) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index ae461405e591..e28ff30253dc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -16,13 +16,9 @@ package com.nvidia.spark.rapids -import scala.collection.mutable.ArrayBuffer - import ai.rapids.cudf.{NvtxColor, NvtxRange} -import ai.rapids.cudf.HostColumnVector.StructData import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder import com.nvidia.spark.rapids.GpuMetricNames._ -import com.nvidia.spark.rapids.GpuRowToColumnConverter.{FixedWidthTypeConverter, VariableWidthTypeConverter} import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -44,39 +40,33 @@ private class GpuRowToColumnConverter(schema: StructType) extends Serializable { } /** - * Append row values to the column builders and return the number of variable-width data bytes - * written + * Append row values to the column builders and return an approximation of the data bytes + * written. It is a Double because of validity can be less than a single byte. */ - final def convert(row: InternalRow, builders: GpuColumnarBatchBuilder): Long = { - var bytes: Long = 0 + final def convert(row: InternalRow, builders: GpuColumnarBatchBuilder): Double = { + var bytes: Double = 0 for (idx <- 0 until row.numFields) { - converters(idx) match { - case tc: FixedWidthTypeConverter => - tc.append(row, idx, builders.builder(idx)) - case tc: VariableWidthTypeConverter => - bytes += tc.append(row, idx, builders.builder(idx)) - } + bytes += converters(idx).append(row, idx, builders.builder(idx)) } bytes } } private object GpuRowToColumnConverter { + // Sizes estimates for different things + /* + * size of an offset entry. In general we have 1 more offset entry than rows, so + * we might be off by one entry per column. + */ + private[this] val OFFSET = Integer.BYTES + private[this] val VALIDITY = 0.125 // 1/8th of a byte (1 bit) + private[this] val VALIDITY_N_OFFSET = OFFSET + VALIDITY - private trait TypeConverter extends Serializable - - private abstract class FixedWidthTypeConverter extends TypeConverter { - /** Append row value to the column builder */ - def append(row: SpecializedGetters, - column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit - } - - private abstract class VariableWidthTypeConverter extends TypeConverter { + private abstract class TypeConverter extends Serializable { /** Append row value to the column builder and return the number of data bytes written */ def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Long + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double } private def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = { @@ -103,188 +93,247 @@ private object GpuRowToColumnConverter { case (StringType, false) => NotNullStringConverter // NOT SUPPORTED YET // case CalendarIntervalType => CalendarConverter - // NOT SUPPORTED YET - // case at: ArrayType => new ArrayConverter(getConverterForType(at.elementType)) + case (at: ArrayType, true) => + ArrayConverter(getConverterForType(at.elementType, at.containsNull)) + case (at: ArrayType, false) => + NotNullArrayConverter(getConverterForType(at.elementType, at.containsNull)) // NOT SUPPORTED YET // case st: StructType => new StructConverter(st.fields.map( // (f) => getConverterForType(f.dataType))) // NOT SUPPORTED YET // case dt: DecimalType => new DecimalConverter(dt) // NOT SUPPORTED YET - case (MapType(StringType, StringType, _), _) => MapConverter + case (MapType(k, v, vcn), true) => + MapConverter(getConverterForType(k, nullable = false), + getConverterForType(v, vcn)) + case (MapType(k, v, vcn), false) => + NotNullMapConverter(getConverterForType(k, nullable = false), + getConverterForType(v, vcn)) case (unknown, _) => throw new UnsupportedOperationException( s"Type $unknown not supported") } } - private object BooleanConverter extends FixedWidthTypeConverter { + private object BooleanConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { if (row.isNullAt(column)) { builder.appendNull() } else { NotNullBooleanConverter.append(row, column, builder) } + 1 + VALIDITY + } } - private object NotNullBooleanConverter extends FixedWidthTypeConverter { + private object NotNullBooleanConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { builder.append(if (row.getBoolean(column)) 1.toByte else 0.toByte) + 1 + } } - private object ByteConverter extends FixedWidthTypeConverter { + private object ByteConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { if (row.isNullAt(column)) { builder.appendNull() } else { NotNullByteConverter.append(row, column, builder) } + 1 + VALIDITY + } } - private object NotNullByteConverter extends FixedWidthTypeConverter { + private object NotNullByteConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { builder.append(row.getByte(column)) + 1 + } } - private object ShortConverter extends FixedWidthTypeConverter { + private object ShortConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { if (row.isNullAt(column)) { builder.appendNull() } else { NotNullShortConverter.append(row, column, builder) } + 2 + VALIDITY + } } - private object NotNullShortConverter extends FixedWidthTypeConverter { + private object NotNullShortConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { builder.append(row.getShort(column)) + 2 + } } - private object IntConverter extends FixedWidthTypeConverter { + private object IntConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { if (row.isNullAt(column)) { builder.appendNull() } else { NotNullIntConverter.append(row, column, builder) } + 4 + VALIDITY + } } - private object NotNullIntConverter extends FixedWidthTypeConverter { + private object NotNullIntConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { builder.append(row.getInt(column)) + 4 + } } - private object FloatConverter extends FixedWidthTypeConverter { + private object FloatConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { if (row.isNullAt(column)) { builder.appendNull() } else { NotNullFloatConverter.append(row, column, builder) } + 4 + VALIDITY + } } - private object NotNullFloatConverter extends FixedWidthTypeConverter { + private object NotNullFloatConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { builder.append(row.getFloat(column)) + 4 + } } - private object LongConverter extends FixedWidthTypeConverter { + private object LongConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { if (row.isNullAt(column)) { builder.appendNull() } else { NotNullLongConverter.append(row, column, builder) } + 8 + VALIDITY + } } - private object NotNullLongConverter extends FixedWidthTypeConverter { + private object NotNullLongConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { builder.append(row.getLong(column)) + 8 + } } - private object DoubleConverter extends FixedWidthTypeConverter { + private object DoubleConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { if (row.isNullAt(column)) { builder.appendNull() } else { NotNullDoubleConverter.append(row, column, builder) } + 8 + VALIDITY + } } - private object NotNullDoubleConverter extends FixedWidthTypeConverter { + private object NotNullDoubleConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Unit = + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { builder.append(row.getDouble(column)) + 8 + } } - private object StringConverter extends VariableWidthTypeConverter { + private object StringConverter extends TypeConverter { override def append(row: SpecializedGetters, - column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Long = + column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = if (row.isNullAt(column)) { builder.appendNull() - 0 + VALIDITY_N_OFFSET } else { - NotNullStringConverter.append(row, column, builder) + NotNullStringConverter.append(row, column, builder) + VALIDITY } } - private object NotNullStringConverter extends VariableWidthTypeConverter { + private object NotNullStringConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, - builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Long = { + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { val bytes = row.getUTF8String(column).getBytes builder.appendUTF8String(bytes) - bytes.length + bytes.length + OFFSET } } - // ONLY supports Map(String, String) - private case object MapConverter - extends VariableWidthTypeConverter { + private[this] def mapConvert( + keyConverter: TypeConverter, + valueConverter: TypeConverter, + row: SpecializedGetters, + column: Int, + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder) : Double = { + var ret = 0.0 + val m = row.getMap(column) + val numElements = m.numElements() + val srcKeys = m.keyArray() + val srcValues = m.valueArray() + val structBuilder = builder.getChild(0) + val keyBuilder = structBuilder.getChild(0) + val valueBuilder = structBuilder.getChild(1) + for (i <- 0 until numElements) { + ret += keyConverter.append(srcKeys, i, keyBuilder) + ret += valueConverter.append(srcValues, i, valueBuilder) + structBuilder.endStruct() + } + builder.endList() + ret + OFFSET + } + + private case class MapConverter( + keyConverter: TypeConverter, + valueConverter: TypeConverter) extends TypeConverter { override def append(row: SpecializedGetters, - column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Long = { - import scala.collection.JavaConverters._ - val m = row.getMap(column) - val numElements = m.numElements() - val srcKeys = m.keyArray() - val srcValues = m.valueArray() - val listOfData = new ArrayBuffer[StructData](numElements) - (0 until srcKeys.numElements()).foreach { i => - val values = new ArrayBuffer[AnyRef]() - values += new String(srcKeys.getUTF8String(i).getBytes) - values += new String(srcValues.getUTF8String(i).getBytes) - listOfData += new StructData(values.asJava) + column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { + if (row.isNullAt(column)) { + builder.appendNull() + VALIDITY_N_OFFSET + } else { + mapConvert(keyConverter, valueConverter, row, column, builder) + VALIDITY } - builder.appendLists(listOfData.asJava) - numElements } } + + private case class NotNullMapConverter( + keyConverter: TypeConverter, + valueConverter: TypeConverter) extends TypeConverter { + override def append(row: SpecializedGetters, + column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = + mapConvert(keyConverter, valueConverter, row, column, builder) + } + // private object CalendarConverter extends FixedWidthTypeConverter { // override def append(row: SpecializedGetters, // column: Int, @@ -300,23 +349,43 @@ private object GpuRowToColumnConverter { // } // } // - // private case class ArrayConverter(childConverter: TypeConverter) extends TypeConverter { - // override def append(row: SpecializedGetters, - // column: Int, - // builder: ai.rapids.cudf.HostColumnVector.Builder): Unit = { - // if (row.isNullAt(column)) { - // builder.appendNull() - // } else { - // val values = row.getArray(column) - // val numElements = values.numElements() - // cv.appendArray(numElements) - // val arrData = cv.arrayData() - // for (i <- 0 until numElements) { - // childConverter.append(values, i, arrData) - // } - // } - // } - // } + + private[this] def arrayConvert( + childConverter: TypeConverter, + row: SpecializedGetters, + column: Int, + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder) : Double = { + var ret = 0.0 + val values = row.getArray(column) + val numElements = values.numElements() + val child = builder.getChild(0) + for (i <- 0 until numElements) { + ret += childConverter.append(values, i, child) + } + builder.endList() + ret + OFFSET + } + + private case class ArrayConverter(childConverter: TypeConverter) + extends TypeConverter { + override def append(row: SpecializedGetters, + column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { + if (row.isNullAt(column)) { + builder.appendNull() + VALIDITY_N_OFFSET + } else { + arrayConvert(childConverter, row, column, builder) + VALIDITY + } + } + } + + private case class NotNullArrayConverter(childConverter: TypeConverter) + extends TypeConverter { + override def append(row: SpecializedGetters, + column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { + arrayConvert(childConverter, row, column, builder) + } + } // // private case class StructConverter( // childConverters: Array[TypeConverter]) extends TypeConverter { @@ -368,11 +437,6 @@ class RowToColumnarIterator( numOutputRows: SQLMetric = null, numOutputBatches: SQLMetric = null) extends Iterator[ColumnarBatch] { - private val dataTypes: Array[DataType] = localSchema.fields.map(_.dataType) - private val variableWidthColumnCount = dataTypes.count(dt => !GpuBatchUtils.isFixedWidth(dt)) - private val fixedWidthDataSizePerRow = - dataTypes.filter(GpuBatchUtils.isFixedWidth).map(_.defaultSize).sum - private val nullableColumns = localSchema.fields.count(_.nullable) private val targetSizeBytes = localGoal.targetSizeBytes private var targetRows = 0 private var totalOutputBytes: Long = 0 @@ -405,18 +469,13 @@ class RowToColumnarIterator( val builders = new GpuColumnarBatchBuilder(localSchema, targetRows, null) try { var rowCount = 0 - var byteCount: Long = variableWidthColumnCount * 4 // offset bytes + // Double because validity can be < 1 byte, and this is just an estimate anyways + var byteCount: Double = 0 // read at least one row while (rowIter.hasNext && (rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) { val row = rowIter.next() - val variableWidthDataBytes = converters.convert(row, builders) - byteCount += fixedWidthDataSizePerRow // fixed-width data bytes - byteCount += variableWidthDataBytes // variable-width data bytes - byteCount += variableWidthColumnCount * GpuBatchUtils.OFFSET_BYTES // offset bytes - if (nullableColumns > 0 && rowCount % GpuBatchUtils.VALIDITY_BUFFER_BOUNDARY_ROWS == 0) { - byteCount += GpuBatchUtils.VALIDITY_BUFFER_BOUNDARY_BYTES * nullableColumns - } + byteCount += converters.convert(row, builders) rowCount += 1 } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 68e324242dec..cbb41dc9c05f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -682,14 +682,14 @@ abstract class BaseExprMeta[INPUT <: Expression]( final override def tagSelfForGpu(): Unit = { try { if (!areAllSupportedTypes(expr.dataType)) { - willNotWorkOnGpu(s"expression ${expr.getClass.getSimpleName} ${expr} " + + willNotWorkOnGpu(s"expression ${expr.getClass.getSimpleName} $expr " + s"produces an unsupported type ${expr.dataType}") } } catch { case _ : java.lang.UnsupportedOperationException => if (!ignoreUnsetDataTypes) { - willNotWorkOnGpu(s"expression ${expr.getClass.getSimpleName} ${expr} " + + willNotWorkOnGpu(s"expression ${expr.getClass.getSimpleName} $expr " + s" does not have a corresponding dataType.") } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala index 7f9aaf1db734..2ca0d8902509 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.{BinaryExprMeta, ConfKeysAndIncompat, GpuBinaryEx import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExtractValue, GetArrayItem, GetMapValue, ImplicitCastInputTypes, NullIntolerant} import org.apache.spark.sql.catalyst.util.TypeUtils -import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType, IntegralType, MapType, StringType} +import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, DataType, IntegralType, MapType, StringType, StructType} class GpuGetArrayItemMeta( expr: GetArrayItem, @@ -33,8 +33,19 @@ class GpuGetArrayItemMeta( import GpuOverrides._ override def tagExprForGpu(): Unit = { - if (!isLit(expr.ordinal)) { + val litOrd = extractLit(expr.ordinal) + if (litOrd.isEmpty) { willNotWorkOnGpu("only literal ordinals are supported") + } else { + // Once literal array/struct types are supported this can go away + val ord = litOrd.get.value + if (ord == null || ord.asInstanceOf[Int] < 0) { + expr.dataType match { + case ArrayType(_, _) | MapType(_, _, _) | StructType(_) => + willNotWorkOnGpu("negative and null indexes are not supported for nested types") + case _ => + } + } } } override def convertToGpu( @@ -43,7 +54,8 @@ class GpuGetArrayItemMeta( GpuGetArrayItem(arr, ordinal) override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowArray = true, allowNesting = false) + // For expressions it is only the output that is checked, not the input types. + GpuOverrides.isSupportedType(t, allowArray = true) } /** @@ -103,6 +115,10 @@ class GpuGetMapValueMeta( if (!isStringLit(expr.key)) { willNotWorkOnGpu("Only String literal keys are supported") } + expr.child.dataType match { + case MapType(StringType, StringType, _) => // works + case _ => willNotWorkOnGpu("Only a Map[String, String] is supported") + } } override def convertToGpu(