diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index c47583aa831..f5a834febb9 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -431,7 +431,7 @@ def test_columnar_pow(data_gen): @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) def test_least(data_gen): num_cols = 20 - s1 = gen_scalar(data_gen, force_no_nulls=True) + s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) # we want lots of nulls gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0)) for x in range(0, num_cols)], nullable=False) @@ -446,7 +446,7 @@ def test_least(data_gen): @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) def test_greatest(data_gen): num_cols = 20 - s1 = gen_scalar(data_gen, force_no_nulls=True) + s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) # we want lots of nulls gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0)) for x in range(0, num_cols)], nullable=False) diff --git a/integration_tests/src/main/python/cmp_test.py b/integration_tests/src/main/python/cmp_test.py index ee93dc08d32..833cdaac470 100644 --- a/integration_tests/src/main/python/cmp_test.py +++ b/integration_tests/src/main/python/cmp_test.py @@ -23,7 +23,7 @@ @pytest.mark.parametrize('data_gen', eq_gens, ids=idfn) def test_eq(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True) + (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -35,7 +35,7 @@ def test_eq(data_gen): @pytest.mark.parametrize('data_gen', eq_gens, ids=idfn) def test_eq_ns(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True) + (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -47,7 +47,7 @@ def test_eq_ns(data_gen): @pytest.mark.parametrize('data_gen', eq_gens, ids=idfn) def test_ne(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True) + (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -59,7 +59,7 @@ def test_ne(data_gen): @pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn) def test_lt(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True) + (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -71,7 +71,7 @@ def test_lt(data_gen): @pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn) def test_lte(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True) + (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -83,7 +83,7 @@ def test_lte(data_gen): @pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn) def test_gt(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True) + (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -95,7 +95,7 @@ def test_gt(data_gen): @pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn) def test_gte(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True) + (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -105,7 +105,7 @@ def test_gte(data_gen): f.col('b') >= f.lit(None).cast(data_type), f.col('a') >= f.col('b'))) -@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample, ids=idfn) +@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_isnull(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select( @@ -150,7 +150,7 @@ def test_filter_with_lit(expr): def test_in(data_gen): # nulls are not supported for in on the GPU yet num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) - 1 - scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=True)) + scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars))) @@ -160,7 +160,7 @@ def test_in(data_gen): def test_in_set(data_gen): # nulls are not supported for in on the GPU yet num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1 - scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=True)) + scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars))) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index e6cf44d5790..66126c6279f 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -22,8 +22,8 @@ @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) def test_if_else(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True) - string_type = to_cast_string(data_gen.data_type) + (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen).selectExpr( # A literal predicate is not supported yet @@ -31,13 +31,13 @@ def test_if_else(data_gen): 'IF(a, {}, c)'.format(s1), 'IF(a, b, {})'.format(s2), 'IF(a, {}, {})'.format(s1, s2), - 'IF(a, b, CAST(null as {}))'.format(string_type), - 'IF(a, CAST(null as {}), c)'.format(string_type))) + 'IF(a, b, {})'.format(null_lit), + 'IF(a, {}, c)'.format(null_lit))) @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) def test_case_when(data_gen): num_cmps = 20 - s1 = gen_scalar(data_gen, force_no_nulls=True) + s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) # we want lots of false bool_gen = BooleanGen().with_special_case(False, weight=1000.0) gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)] @@ -57,7 +57,7 @@ def test_case_when(data_gen): @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_nanvl(data_gen): - s1 = gen_scalar(data_gen, force_no_nulls=True) + s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -68,21 +68,21 @@ def test_nanvl(data_gen): @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) def test_nvl(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True) - string_type = to_cast_string(data_gen.data_type) + (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( 'nvl(a, b)', 'nvl(a, {})'.format(s2), 'nvl({}, b)'.format(s1), - 'nvl(CAST(null as {}), b)'.format(string_type), - 'nvl(a, CAST(null as {}))'.format(string_type))) + 'nvl({}, b)'.format(null_lit), + 'nvl(a, {})'.format(null_lit))) #nvl is translated into a 2 param version of coalesce @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) def test_coalesce(data_gen): num_cols = 20 - s1 = gen_scalar(data_gen, force_no_nulls=True) + s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) # we want lots of nulls gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=1000.0)) for x in range(0, num_cols)], nullable=False) @@ -102,38 +102,36 @@ def test_coalesce_constant_output(): @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) def test_nvl2(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True) - string_type = to_cast_string(data_gen.data_type) + (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : three_col_df(spark, data_gen, data_gen, data_gen).selectExpr( 'nvl2(a, b, c)', 'nvl2(a, b, {})'.format(s2), 'nvl2({}, b, c)'.format(s1), - 'nvl2(CAST(null as {}), b, c)'.format(string_type), - 'nvl2(a, CAST(null as {}), c)'.format(string_type))) + 'nvl2({}, b, c)'.format(null_lit), + 'nvl2(a, {}, c)'.format(null_lit))) @pytest.mark.parametrize('data_gen', eq_gens, ids=idfn) def test_nullif(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True) - string_type = to_cast_string(data_gen.data_type) + (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( 'nullif(a, b)', 'nullif(a, {})'.format(s2), 'nullif({}, b)'.format(s1), - 'nullif(CAST(null as {}), b)'.format(string_type), - 'nullif(a, CAST(null as {}))'.format(string_type))) + 'nullif({}, b)'.format(null_lit), + 'nullif(a, {})'.format(null_lit))) @pytest.mark.parametrize('data_gen', eq_gens, ids=idfn) def test_ifnull(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True) - string_type = to_cast_string(data_gen.data_type) + (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( 'ifnull(a, b)', 'ifnull(a, {})'.format(s2), 'ifnull({}, b)'.format(s1), - 'ifnull(CAST(null as {}), b)'.format(string_type), - 'ifnull(a, CAST(null as {}))'.format(string_type))) - - + 'ifnull({}, b)'.format(null_lit), + 'ifnull(a, {})'.format(null_lit))) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 41d36501696..46a3cf79fe4 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -527,6 +527,16 @@ def contains_ts(self): return self._key_gen.contains_ts() or self._value_gen.contains_ts() +class NullGen(DataGen): + """Generate NullType values""" + def __init__(self): + super().__init__(NullType(), nullable=True) + + def start(self, rand): + def make_null(): + return None + self._start(rand, make_null) + def skip_if_not_utc(): if (not is_tz_utc()): pytest.skip('The java system time zone is not set to UTC') @@ -571,6 +581,8 @@ def _gen_scalars_common(data_gen, count, seed=0): def gen_scalars(data_gen, count, seed=0, force_no_nulls=False): """Generate scalar values.""" + if force_no_nulls: + assert(not isinstance(data_gen, NullGen)) src = _gen_scalars_common(data_gen, count, seed=seed) return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls)) for i in range(0, count)) @@ -640,6 +652,13 @@ def to_cast_string(spark_type): else: raise RuntimeError('CAST TO TYPE {} NOT SUPPORTED YET'.format(spark_type)) +def get_null_lit_string(spark_type): + if isinstance(spark_type, NullType): + return 'null' + else: + string_type = to_cast_string(spark_type) + return 'CAST(null as {})'.format(string_type) + def _convert_to_sql(t, data): if isinstance(data, str): d = "'" + data.replace("'", "\\'") + "'" @@ -655,6 +674,9 @@ def _convert_to_sql(t, data): def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): """Generate scalar values, but strings that can be used in selectExpr or SQL""" src = _gen_scalars_common(data_gen, count, seed=seed) + if isinstance(data_gen, NullGen): + assert not force_no_nulls + return ('null' for i in range(0, count)) string_type = to_cast_string(data_gen.data_type) return (_convert_to_sql(string_type, src.gen(force_no_nulls=force_no_nulls)) for i in range(0, count)) @@ -669,6 +691,8 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): date_gen = DateGen() timestamp_gen = TimestampGen() +null_gen = NullGen() + numeric_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen] integral_gens = [byte_gen, short_gen, int_gen, long_gen] # A lot of mathematical expressions only support a double as input @@ -679,17 +703,17 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): # all of the basic gens all_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, - string_gen, boolean_gen, date_gen, timestamp_gen] + string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] # TODO add in some array generators to this once that is supported for sorting # a selection of generators that should be orderable (sortable and compareable) orderable_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, - string_gen, boolean_gen, date_gen, timestamp_gen] + string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] # TODO add in some array generators to this once that is supported for these operations # a selection of generators that can be compared for equality eq_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, - string_gen, boolean_gen, date_gen, timestamp_gen] + string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] date_gens = [date_gen] date_n_time_gens = [date_gen, timestamp_gen] diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 1c7c6df14b2..5d96b785062 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -63,6 +63,16 @@ ('a', RepeatSeqGen(StringGen(pattern='[0-9]{0,30}'), length= 20)), ('b', IntegerGen()), ('c', LongGen())] +# grouping strings with nulls present, and null value +_grpkey_strings_with_extra_nulls = [ + ('a', RepeatSeqGen(StringGen(pattern='[0-9]{0,30}'), length= 20)), + ('b', IntegerGen()), + ('c', NullGen())] +# grouping NullType +_grpkey_nulls = [ + ('a', NullGen()), + ('b', IntegerGen()), + ('c', LongGen())] # grouping floats with other columns containing nans and nulls _grpkey_floats_with_nulls_and_nans = [ @@ -112,7 +122,9 @@ _grpkey_longs_with_nulls, _grpkey_dbls_with_nulls, _grpkey_floats_with_nulls, - _grpkey_strings_with_nulls] + _grpkey_strings_with_nulls, + _grpkey_nulls, + _grpkey_strings_with_extra_nulls] # List of schemas with NaNs included _init_list_with_nans_and_no_nans = [ diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index a418b4434de..3e6bbbc1505 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -21,7 +21,7 @@ from spark_session import with_spark_session, is_before_spark_310 all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), - BooleanGen(), DateGen(), TimestampGen(), + BooleanGen(), DateGen(), TimestampGen(), null_gen, pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat])] diff --git a/integration_tests/src/main/python/row_conversion_test.py b/integration_tests/src/main/python/row_conversion_test.py index 834fad44ba0..12a0f346b9c 100644 --- a/integration_tests/src/main/python/row_conversion_test.py +++ b/integration_tests/src/main/python/row_conversion_test.py @@ -36,6 +36,14 @@ def test_row_conversions(): ["n", ArrayGen(boolean_gen)], ["o", ArrayGen(ArrayGen(short_gen))], ["p", StructGen([["c0", byte_gen], ["c1", ArrayGen(byte_gen)]])], ["q", simple_string_to_string_map_gen], - ["r", MapGen(BooleanGen(nullable=False), ArrayGen(boolean_gen), max_length=2)]] + ["r", MapGen(BooleanGen(nullable=False), ArrayGen(boolean_gen), max_length=2)], + ["s", null_gen]] + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again")) + +def test_row_conversions_fixed_width(): + gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen], + ["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen], + ["i", timestamp_gen], ["j", date_gen]] assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again")) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index d1a28461108..a90d61acb42 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -21,13 +21,16 @@ import pyspark.sql.functions as f orderable_gen_classes = [ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, - BooleanGen, TimestampGen, DateGen, StringGen] + BooleanGen, TimestampGen, DateGen, StringGen, NullGen] @pytest.mark.parametrize('data_gen_class', orderable_gen_classes, ids=idfn) @pytest.mark.parametrize('nullable', [True, False], ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_orderby(data_gen_class, nullable, order): - data_gen = data_gen_class(nullable=nullable) + if (data_gen_class == NullGen): + data_gen = data_gen_class() + else: + data_gen = data_gen_class(nullable=nullable) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order)) @@ -35,14 +38,17 @@ def test_single_orderby(data_gen_class, nullable, order): @pytest.mark.parametrize('nullable', [True, False], ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_sort_in_part(data_gen_class, nullable, order): - data_gen = data_gen_class(nullable=nullable) + if (data_gen_class == NullGen): + data_gen = data_gen_class() + else: + data_gen = data_gen_class(nullable=nullable) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).sortWithinPartitions(order)) orderable_gens_sort = [byte_gen, short_gen, int_gen, long_gen, pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/84')), pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/84')), - boolean_gen, timestamp_gen, date_gen, string_gen] + boolean_gen, timestamp_gen, date_gen, string_gen, null_gen] @pytest.mark.parametrize('data_gen', orderable_gens_sort, ids=idfn) def test_multi_orderby(data_gen): assert_gpu_and_cpu_are_equal_collect( diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala index 4a302bfb478..ee10a5cd27e 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, SerializeConcatHostBuffersDeserializeBatch} +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -51,6 +52,10 @@ class GpuBroadcastHashJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index c12c04da706..c4948ec6935 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, ShuffledHashJoinExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.rapids.execution.GpuShuffledHashJoinBase +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch object GpuJoinUtils { @@ -59,6 +60,10 @@ class GpuShuffledHashJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) } diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala index 9f33d53a12a..43bc1aef4d3 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala @@ -21,6 +21,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, SortMergeJoinExec} +import org.apache.spark.sql.types.DataType /** * HashJoin changed in Spark 3.1 requiring Shim @@ -41,6 +42,10 @@ class GpuSortMergeJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { // Use conditions from Hash Join GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index cfe47874fb9..3a3bcf6811f 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -233,6 +233,10 @@ class Spark300Shims extends SparkShims { GpuOverrides.wrapExpr(a.ignoreNullsExpr, conf, Some(this)) override val childExprs: Seq[BaseExprMeta[_]] = Seq(child, ignoreNulls) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExpression = GpuFirst(child.convertToGpu(), ignoreNulls.convertToGpu()) }), @@ -244,6 +248,10 @@ class Spark300Shims extends SparkShims { GpuOverrides.wrapExpr(a.ignoreNullsExpr, conf, Some(this)) override val childExprs: Seq[BaseExprMeta[_]] = Seq(child, ignoreNulls) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExpression = GpuLast(child.convertToGpu(), ignoreNulls.convertToGpu()) }), diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala index d0f7b9934c6..363876848b7 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, BuildSide, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.rapids.execution.SerializeConcatHostBuffersDeserializeBatch +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch class GpuBroadcastHashJoinMeta( @@ -48,6 +49,10 @@ class GpuBroadcastHashJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala index 8af8617e45f..02d78fb4e15 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.rapids.execution.SerializeConcatHostBuffersDeserializeBatch +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch class GpuBroadcastHashJoinMeta( @@ -48,6 +49,10 @@ class GpuBroadcastHashJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala index 4800ee75abe..54cae386fc9 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClustered import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch object GpuJoinUtils { @@ -59,6 +60,10 @@ class GpuShuffledHashJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) } diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala index a5f063b7a72..20b47c7ad48 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.types.DataType /** * HashJoin changed in Spark 3.1 requiring Shim @@ -42,6 +43,10 @@ class GpuSortMergeJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { // Use conditions from Hash Join GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) @@ -51,7 +56,7 @@ class GpuSortMergeJoinMeta( s"see ${RapidsConf.ENABLE_REPLACE_SORTMERGEJOIN.key}") } - // make sure this is last check - if this is SortMergeJoin, the children can be Sorts and we + // make sure this is the last check - if this is SortMergeJoin, the children can be Sorts and we // want to validate they can run on GPU and remove them before replacing this with a // ShuffleHashJoin if (canThisBeReplaced) { diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuBroadcastHashJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuBroadcastHashJoinExec.scala index 31cd7a7de81..a8a18b5235e 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuBroadcastHashJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuBroadcastHashJoinExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, SerializeConcatHostBuffersDeserializeBatch} +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -53,6 +54,10 @@ class GpuBroadcastHashJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala index 03583b36839..0783a7403e5 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.rapids.execution.GpuShuffledHashJoinBase +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch object GpuJoinUtils { @@ -60,6 +61,10 @@ class GpuShuffledHashJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) } diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuSortMergeJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuSortMergeJoinExec.scala index 10759a2fae5..9ebf5fc4447 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuSortMergeJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuSortMergeJoinExec.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.types.DataType /** * HashJoin changed in Spark 3.1 requiring Shim @@ -42,6 +43,10 @@ class GpuSortMergeJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { // Use conditions from Hash Join GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) @@ -51,7 +56,7 @@ class GpuSortMergeJoinMeta( s"see ${RapidsConf.ENABLE_REPLACE_SORTMERGEJOIN.key}") } - // make sure this is last check - if this is SortMergeJoin, the children can be Sorts and we + // make sure this is the last check - if this is SortMergeJoin, the children can be Sorts and we // want to validate they can run on GPU and remove them before replacing this with a // ShuffleHashJoin if (canThisBeReplaced) { diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala index 986eaec376e..0f648147acb 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala @@ -270,7 +270,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { } def isSupportedByCudf(schema: Seq[Attribute]): Boolean = { - schema.forall(a => GpuColumnVector.isSupportedType(a.dataType)) + schema.forall(a => GpuColumnVector.isNonNestedSupportedType(a.dataType)) } def isTypeSupportedByParquet(dataType: DataType): Boolean = { diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 891df7f5946..4dc57138f67 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -61,7 +61,7 @@ private static HostColumnVector.DataType convertFrom(DataType spark, boolean nul return new HostColumnVector.StructType(nullable, children); } else { // Only works for basic types - return new HostColumnVector.BasicType(nullable, getRapidsType(spark)); + return new HostColumnVector.BasicType(nullable, getNonNestedRapidsType(spark)); } } @@ -179,6 +179,9 @@ private static DType toRapidsOrNull(DataType type) { return DType.TIMESTAMP_MICROSECONDS; } else if (type instanceof StringType) { return DType.STRING; + } else if (type instanceof NullType) { + // INT8 is used for both in this case + return DType.INT8; } else if (type instanceof DecimalType) { // Decimal supportable check has been conducted in the GPU plan overriding stage. // So, we don't have to handle decimal-supportable problem at here. @@ -193,11 +196,11 @@ private static DType toRapidsOrNull(DataType type) { return null; } - public static boolean isSupportedType(DataType type) { + public static boolean isNonNestedSupportedType(DataType type) { return toRapidsOrNull(type) != null; } - public static DType getRapidsType(DataType type) { + public static DType getNonNestedRapidsType(DataType type) { DType result = toRapidsOrNull(type); if (result == null) { throw new IllegalArgumentException(type + " is not supported for GPU processing yet."); @@ -262,7 +265,7 @@ private static StructType structFromAttributes(List format) { */ public static Schema from(StructType input) { Schema.Builder builder = Schema.builder(); - input.foreach(f -> builder.column(GpuColumnVector.getRapidsType(f.dataType()), f.name())); + input.foreach(f -> builder.column(GpuColumnVector.getNonNestedRapidsType(f.dataType()), f.name())); return builder.build(); } @@ -321,7 +324,7 @@ private static boolean typeConversionAllowed(ColumnView cv, DataType colType) { return ((DecimalType) colType).precision() <= DType.DECIMAL64_MAX_PRECISION; } if (!dt.isNestedType()) { - return getRapidsType(colType).equals(dt); + return getNonNestedRapidsType(colType).equals(dt); } if (colType instanceof MapType) { MapType mType = (MapType) colType; diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java index f13e05fb765..3fd46523fde 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java @@ -87,7 +87,7 @@ public static long[] getUncompressedColumnSizes(ColumnarBatch batch) { private static boolean typeConversionAllowed(ColumnMeta columnMeta, DataType colType) { DType dt = DType.fromNative(columnMeta.dtypeId(), columnMeta.dtypeScale()); if (!dt.isNestedType()) { - return GpuColumnVector.getRapidsType(colType).equals(dt); + return GpuColumnVector.getNonNestedRapidsType(colType).equals(dt); } if (colType instanceof MapType) { MapType mType = (MapType) colType; diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java index 627d6a264c7..a887a904150 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java @@ -72,7 +72,7 @@ protected UnsafeRowToColumnarBatchIterator( outputTypes = new DataType[schema.length]; for (int i = 0; i < schema.length; i++) { - rapidsTypes[i] = GpuColumnVector.getRapidsType(schema[i].dataType()); + rapidsTypes[i] = GpuColumnVector.getNonNestedRapidsType(schema[i].dataType()); outputTypes[i] = schema[i].dataType(); } this.totalTime = totalTime; diff --git a/sql-plugin/src/main/java/org/apache/spark/sql/catalyst/CudfUnsafeRow.java b/sql-plugin/src/main/java/org/apache/spark/sql/catalyst/CudfUnsafeRow.java index 28686854e9f..f6b97b44424 100644 --- a/sql-plugin/src/main/java/org/apache/spark/sql/catalyst/CudfUnsafeRow.java +++ b/sql-plugin/src/main/java/org/apache/spark/sql/catalyst/CudfUnsafeRow.java @@ -56,7 +56,7 @@ public static int getRowSizeEstimate(Attribute[] attributes) { // This needs to match what is in cudf and what is in the constructor. int offset = 0; for (Attribute attr : attributes) { - int length = GpuColumnVector.getRapidsType(attr.dataType()).getSizeInBytes(); + int length = GpuColumnVector.getNonNestedRapidsType(attr.dataType()).getSizeInBytes(); offset = alignOffset(offset, length); offset += length; } @@ -135,7 +135,7 @@ public CudfUnsafeRow(Attribute[] attributes, int[] remapping) { startOffsets = new int[attributes.length]; for (int i = 0; i < attributes.length; i++) { Attribute attr = attributes[i]; - int length = GpuColumnVector.getRapidsType(attr.dataType()).getSizeInBytes(); + int length = GpuColumnVector.getNonNestedRapidsType(attr.dataType()).getSizeInBytes(); assert length > 0 : "Only fixed width types are currently supported."; offset = alignOffset(offset, length); startOffsets[i] = offset; diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index d4cdf7e5bba..e1b69d509b8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -76,6 +76,11 @@ class CastExprMeta[INPUT <: CastBase]( } } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true, + allowBinary = true) + override def convertToGpu(child: Expression): GpuExpression = GpuCast(child, toType, ansiEnabled, cast.timeZoneId) } @@ -126,6 +131,12 @@ object GpuCast { return true } from match { + case NullType => to match { + // The only thing we really need is that we can use a null scalar to create a vector + case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | + DoubleType | TimestampType | DateType | StringType => true + case _ => false + } case BooleanType => to match { case ByteType | ShortType | IntegerType | LongType => true case FloatType | DoubleType => true @@ -244,6 +255,10 @@ case class GpuCast( override def doColumnar(input: GpuColumnVector): ColumnVector = { (input.dataType(), dataType) match { + case (NullType, to) => + withResource(GpuScalar.from(null, to)) { scalar => + ColumnVector.fromScalar(scalar, input.getRowCount.toInt) + } case (DateType, BooleanType | _: NumericType) => // casts from date type to numerics are always null withResource(GpuScalar.from(null, dataType)) { scalar => @@ -255,7 +270,7 @@ case class GpuCast( withResource(input.getBase.castTo(DType.INT64)) { asLongs => withResource(Scalar.fromDouble(1000000)) { microsPerSec => // Use trueDiv to ensure cast to double before division for full precision - asLongs.trueDiv(microsPerSec, GpuColumnVector.getRapidsType(dataType)) + asLongs.trueDiv(microsPerSec, GpuColumnVector.getNonNestedRapidsType(dataType)) } } case (TimestampType, ByteType | ShortType | IntegerType) => @@ -277,14 +292,14 @@ case class GpuCast( Scalar.fromByte(Byte.MaxValue)) } } - cv.castTo(GpuColumnVector.getRapidsType(dataType)) + cv.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) } } } case (TimestampType, _: LongType) => withResource(input.getBase.castTo(DType.INT64)) { asLongs => withResource(Scalar.fromInt(1000000)) { microsPerSec => - asLongs.floorDiv(microsPerSec, GpuColumnVector.getRapidsType(dataType)) + asLongs.floorDiv(microsPerSec, GpuColumnVector.getNonNestedRapidsType(dataType)) } } case (TimestampType, StringType) => @@ -294,43 +309,43 @@ case class GpuCast( case (LongType, IntegerType) if ansiMode => assertValuesInRange(input.getBase, Scalar.fromInt(Int.MinValue), Scalar.fromInt(Int.MaxValue)) - input.getBase.castTo(GpuColumnVector.getRapidsType(dataType)) + input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) // ansi cast from larger-than-short integral types, to short case (LongType|IntegerType, ShortType) if ansiMode => assertValuesInRange(input.getBase, Scalar.fromShort(Short.MinValue), Scalar.fromShort(Short.MaxValue)) - input.getBase.castTo(GpuColumnVector.getRapidsType(dataType)) + input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) // ansi cast from larger-than-byte integral types, to byte case (LongType|IntegerType|ShortType, ByteType) if ansiMode => assertValuesInRange(input.getBase, Scalar.fromByte(Byte.MinValue), Scalar.fromByte(Byte.MaxValue)) - input.getBase.castTo(GpuColumnVector.getRapidsType(dataType)) + input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) // ansi cast from floating-point types, to byte case (FloatType|DoubleType, ByteType) if ansiMode => assertValuesInRange(input.getBase, Scalar.fromByte(Byte.MinValue), Scalar.fromByte(Byte.MaxValue)) - input.getBase.castTo(GpuColumnVector.getRapidsType(dataType)) + input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) // ansi cast from floating-point types, to short case (FloatType|DoubleType, ShortType) if ansiMode => assertValuesInRange(input.getBase, Scalar.fromShort(Short.MinValue), Scalar.fromShort(Short.MaxValue)) - input.getBase.castTo(GpuColumnVector.getRapidsType(dataType)) + input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) // ansi cast from floating-point types, to integer case (FloatType|DoubleType, IntegerType) if ansiMode => assertValuesInRange(input.getBase, Scalar.fromInt(Int.MinValue), Scalar.fromInt(Int.MaxValue)) - input.getBase.castTo(GpuColumnVector.getRapidsType(dataType)) + input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) // ansi cast from floating-point types, to long case (FloatType|DoubleType, LongType) if ansiMode => assertValuesInRange(input.getBase, Scalar.fromLong(Long.MinValue), Scalar.fromLong(Long.MaxValue)) - input.getBase.castTo(GpuColumnVector.getRapidsType(dataType)) + input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) case (FloatType | DoubleType, TimestampType) => // Spark casting to timestamp from double assumes value is in microseconds @@ -348,27 +363,27 @@ case class GpuCast( case (BooleanType, TimestampType) => // cudf requires casting to a long first. withResource(input.getBase.castTo(DType.INT64)) { longs => - longs.castTo(GpuColumnVector.getRapidsType(dataType)) + longs.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) } case (BooleanType | ByteType | ShortType | IntegerType, TimestampType) => // cudf requires casting to a long first withResource(input.getBase.castTo(DType.INT64)) { longs => withResource(longs.castTo(DType.TIMESTAMP_SECONDS)) { timestampSecs => - timestampSecs.castTo(GpuColumnVector.getRapidsType(dataType)) + timestampSecs.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) } } case (_: NumericType, TimestampType) => // Spark casting to timestamp assumes value is in seconds, but timestamps // are tracked in microseconds. withResource(input.getBase.castTo(DType.TIMESTAMP_SECONDS)) { timestampSecs => - timestampSecs.castTo(GpuColumnVector.getRapidsType(dataType)) + timestampSecs.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) } case (FloatType, LongType) | (DoubleType, IntegerType | LongType) => // Float.NaN => Int is casted to a zero but float.NaN => Long returns a small negative // number Double.NaN => Int | Long, returns a small negative number so Nans have to be // converted to zero first withResource(FloatUtils.nanToZero(input.getBase)) { inputWithNansToZero => - inputWithNansToZero.castTo(GpuColumnVector.getRapidsType(dataType)) + inputWithNansToZero.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) } case (FloatType|DoubleType, StringType) => castFloatingTypeToString(input) @@ -383,7 +398,8 @@ case class GpuCast( case TimestampType => castStringToTimestamp(trimmed) case FloatType | DoubleType => - castStringToFloats(trimmed, ansiMode, GpuColumnVector.getRapidsType(dataType)) + castStringToFloats(trimmed, ansiMode, + GpuColumnVector.getNonNestedRapidsType(dataType)) case ByteType | ShortType | IntegerType | LongType => // filter out values that are not valid longs or nulls val regex = if (ansiMode) { @@ -407,7 +423,7 @@ case class GpuCast( // for that type. Note that the scalar values here are named parameters so are not // created until they are needed withResource(longStrings) { longStrings => - GpuColumnVector.getRapidsType(dataType) match { + GpuColumnVector.getNonNestedRapidsType(dataType) match { case DType.INT8 => castStringToIntegralType(longStrings, DType.INT8, Scalar.fromInt(Byte.MinValue), Scalar.fromInt(Byte.MaxValue)) @@ -430,7 +446,7 @@ case class GpuCast( input.getBase.asByteList(true) case _ => - input.getBase.castTo(GpuColumnVector.getRapidsType(dataType)) + input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala index ee89744fb4c..9c54169e9fd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpandExec.scala @@ -46,6 +46,10 @@ class GpuExpandExecMeta( override val childExprs: Seq[BaseExprMeta[_]] = gpuProjections.flatten ++ outputAttributes + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + /** * Convert what this wraps to a GPU enabled version. */ @@ -147,7 +151,7 @@ class GpuExpandIterator( * a boolean indicating whether an existing vector was re-used. */ def getOrCreateNullCV(dataType: DataType): (GpuColumnVector, Boolean) = { - val rapidsType = GpuColumnVector.getRapidsType(dataType) + val rapidsType = GpuColumnVector.getNonNestedRapidsType(dataType) nullCVs.get(dataType) match { case Some(cv) => (cv.incRefCount(), true) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 570ea260537..a8c1a10c02e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -113,7 +113,7 @@ object GpuOrcScanBase { meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet") } schema.foreach { field => - if (!GpuColumnVector.isSupportedType(field.dataType)) { + if (!GpuColumnVector.isNonNestedSupportedType(field.dataType)) { meta.willNotWorkOnGpu(s"GpuOrcScan does not support fields of type ${field.dataType}") } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 5b6753c173b..dc8ac476153 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -648,7 +648,10 @@ object GpuOverrides { } override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowCalendarInterval = true, allowDecimal = true) + GpuOverrides.isSupportedType(t, + allowNull = true, + allowDecimal = true, + allowCalendarInterval = true) }), expr[Signum]( "Returns -1.0, 0.0 or 1.0 as expr is negative, 0 or positive", @@ -660,6 +663,7 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[Alias](a, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowNull = true, allowMaps = true, allowArray = true, allowStruct = true, @@ -674,6 +678,7 @@ object GpuOverrides { (att, conf, p, r) => new BaseExprMeta[AttributeReference](att, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowNull = true, allowMaps = true, allowArray = true, allowStruct = true, @@ -694,10 +699,7 @@ object GpuOverrides { expr[Cast]( "Convert a column of one type of data into another type", (cast, conf, p, r) => new CastExprMeta[Cast](cast, SparkSession.active.sessionState.conf - .ansiEnabled, conf, p, r) { - override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowBinary = true) - }), + .ansiEnabled, conf, p, r)), expr[AnsiCast]( "Convert a column of one type of data into another type", (cast, conf, p, r) => new CastExprMeta[AnsiCast](cast, true, conf, p, r)), @@ -872,6 +874,7 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[IsNull](a, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowNull = true, allowMaps = true, allowArray = true, allowStruct = true, @@ -885,6 +888,7 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[IsNotNull](a, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowNull = true, allowMaps = true, allowArray = true, allowStruct = true, @@ -918,6 +922,7 @@ object GpuOverrides { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowNull = true, allowMaps = true, allowArray = true, allowStruct = true, @@ -990,18 +995,30 @@ object GpuOverrides { expr[Coalesce] ( "Returns the first non-null argument if exists. Otherwise, null", (a, conf, p, r) => new ExprMeta[Coalesce](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExpression = GpuCoalesce(childExprs.map(_.convertToGpu())) } ), expr[Least] ( "Returns the least value of all parameters, skipping null values", (a, conf, p, r) => new ExprMeta[Least](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExpression = GpuLeast(childExprs.map(_.convertToGpu())) } ), expr[Greatest] ( "Returns the greatest value of all parameters, skipping null values", (a, conf, p, r) => new ExprMeta[Greatest](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExpression = GpuGreatest(childExprs.map(_.convertToGpu())) } ), @@ -1266,30 +1283,50 @@ object GpuOverrides { expr[EqualNullSafe]( "Check if the values are equal including nulls <=>", (a, conf, p, r) => new BinaryExprMeta[EqualNullSafe](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuEqualNullSafe(lhs, rhs) }), expr[EqualTo]( "Check if the values are equal", (a, conf, p, r) => new BinaryExprMeta[EqualTo](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuEqualTo(lhs, rhs) }), expr[GreaterThan]( "> operator", (a, conf, p, r) => new BinaryExprMeta[GreaterThan](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuGreaterThan(lhs, rhs) }), expr[GreaterThanOrEqual]( ">= operator", (a, conf, p, r) => new BinaryExprMeta[GreaterThanOrEqual](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuGreaterThanOrEqual(lhs, rhs) }), expr[In]( "IN operator", (in, conf, p, r) => new ExprMeta[In](in, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagExprForGpu(): Unit = { val unaliased = in.list.map(extractLit) if (!unaliased.forall(_.isDefined)) { @@ -1309,6 +1346,10 @@ object GpuOverrides { expr[InSet]( "INSET operator", (in, conf, p, r) => new ExprMeta[InSet](in, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagExprForGpu(): Unit = { if (in.hset.contains(null)) { willNotWorkOnGpu("nulls are not supported") @@ -1325,18 +1366,30 @@ object GpuOverrides { expr[LessThan]( "< operator", (a, conf, p, r) => new BinaryExprMeta[LessThan](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuLessThan(lhs, rhs) }), expr[LessThanOrEqual]( "<= operator", (a, conf, p, r) => new BinaryExprMeta[LessThanOrEqual](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuLessThanOrEqual(lhs, rhs) }), expr[CaseWhen]( "CASE WHEN expression", (a, conf, p, r) => new ExprMeta[CaseWhen](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagExprForGpu(): Unit = { val anyLit = a.branches.exists { case (predicate, _) => isLit(predicate) } if (anyLit) { @@ -1359,6 +1412,10 @@ object GpuOverrides { expr[If]( "IF expression", (a, conf, p, r) => new ExprMeta[If](a, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagExprForGpu(): Unit = { if (isLit(a.predicate)) { willNotWorkOnGpu(s"literal predicate ${a.predicate} is not supported") @@ -1405,10 +1462,13 @@ object GpuOverrides { } else { childrenExprMeta } - override def convertToGpu(): GpuExpression = { - // handle the case AggregateExpression has the resultIds parameter where its - // Seq[ExprIds] instead of single ExprId. - val resultId = try { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExpression = { + // handle the case AggregateExpression has the resultIds parameter where its + // Seq[ExprIds] instead of single ExprId. + val resultId = try { val resultMethod = a.getClass.getMethod("resultId") resultMethod.invoke(a).asInstanceOf[ExprId] } catch { @@ -1418,7 +1478,7 @@ object GpuOverrides { } GpuAggregateExpression(childExprs(0).convertToGpu().asInstanceOf[GpuAggregateFunction], a.mode, a.isDistinct, filter.map(_.convertToGpu()), resultId) - } + } }), expr[SortOrder]( "Sort order", @@ -1426,6 +1486,9 @@ object GpuOverrides { // One of the few expressions that are not replaced with a GPU version override def convertToGpu(): Expression = a.withNewChildren(childExprs.map(_.convertToGpu())) + + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowNull = true) }), expr[Count]( "Count aggregate operator", @@ -1436,6 +1499,10 @@ object GpuOverrides { } } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExpression = GpuCount(childExprs.map(_.convertToGpu())) }), expr[Max]( @@ -1449,6 +1516,11 @@ object GpuOverrides { s" ${RapidsConf.HAS_NANS} to false.") } } + + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(child: Expression): GpuExpression = GpuMax(child) }), expr[Min]( @@ -1462,6 +1534,11 @@ object GpuOverrides { s" ${RapidsConf.HAS_NANS} to false.") } } + + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(child: Expression): GpuExpression = GpuMin(child) }), expr[Sum]( @@ -1818,6 +1895,9 @@ object GpuOverrides { override val childExprs: Seq[BaseExprMeta[_]] = hp.expressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowNull = true) + override def convertToGpu(): GpuPartitioning = GpuHashPartitioning(childExprs.map(_.convertToGpu()), hp.numPartitions) }), @@ -1826,6 +1906,10 @@ object GpuOverrides { (rp, conf, p, r) => new PartMeta[RangePartitioning](rp, conf, p, r) { override val childExprs: Seq[BaseExprMeta[_]] = rp.ordering.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowNull = true) + override def convertToGpu(): GpuPartitioning = { if (rp.numPartitions > 1) { val gpuOrdering = childExprs.map(_.convertToGpu()).asInstanceOf[Seq[SortOrder]] @@ -1845,6 +1929,9 @@ object GpuOverrides { part[RoundRobinPartitioning]( "Round robin partitioning", (rrp, conf, p, r) => new PartMeta[RoundRobinPartitioning](rrp, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowNull = true) + override def convertToGpu(): GpuPartitioning = { GpuRoundRobinPartitioning(rrp.numPartitions) } @@ -1853,6 +1940,10 @@ object GpuOverrides { "Single partitioning", (sp, conf, p, r) => new PartMeta[SinglePartition.type](sp, conf, p, r) { override val childExprs: Seq[ExprMeta[_]] = Seq.empty[ExprMeta[_]] + + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowNull = true) + override def convertToGpu(): GpuPartitioning = { GpuSinglePartitioning(childExprs.map(_.convertToGpu())) } @@ -1895,6 +1986,7 @@ object GpuOverrides { new SparkPlanMeta[ProjectExec](proj, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowNull = true, allowMaps = true, allowArray = true, allowStruct = true, @@ -1932,6 +2024,10 @@ object GpuOverrides { exec[CoalesceExec]( "The backend for the dataframe coalesce method", (coalesce, conf, parent, r) => new SparkPlanMeta[CoalesceExec](coalesce, conf, parent, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExec = GpuCoalesceExec(coalesce.numPartitions, childPlans.head.convertIfNeeded()) }), @@ -1992,6 +2088,7 @@ object GpuOverrides { (filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowNull = true, allowMaps = true, allowArray = true, allowStruct = true, @@ -2006,6 +2103,10 @@ object GpuOverrides { exec[UnionExec]( "The backend for the union operator", (union, conf, p, r) => new SparkPlanMeta[UnionExec](union, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExec = GpuUnionExec(childPlans.map(_.convertIfNeeded())) }), @@ -2024,6 +2125,10 @@ object GpuOverrides { override val childExprs: Seq[BaseExprMeta[_]] = condition.toSeq + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExec = GpuCartesianProductExec( childPlans.head.convertIfNeeded(), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index 71e6b723385..5d90c7aa8b4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -121,11 +121,24 @@ private object GpuRowToColumnConverter { case (MapType(k, v, vcn), false) => NotNullMapConverter(getConverterForType(k, nullable = false), getConverterForType(v, vcn)) + case (NullType, true) => + NullConverter case (unknown, _) => throw new UnsupportedOperationException( s"Type $unknown not supported") } } + private object NullConverter extends TypeConverter { + override def append(row: SpecializedGetters, + column: Int, + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { + builder.appendNull() + 1 + VALIDITY + } + + override def getNullSize: Double = 1 + VALIDITY + } + private object BooleanConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala index c2ed0b319f4..2af10fab32a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala @@ -41,6 +41,10 @@ class GpuSortMeta( sort.global, childPlans(0).convertIfNeeded()) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { if (GpuOverrides.isAnyStringLit(sort.sortOrder)) { willNotWorkOnGpu("string literal values are not supported in a sort") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 2b6564902f9..ee31a5ea801 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -240,7 +240,7 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } } } - val expectedType = GpuColumnVector.getRapidsType(windowFunc.dataType) + val expectedType = GpuColumnVector.getNonNestedRapidsType(windowFunc.dataType) if (expectedType != aggColumn.getType) { withResource(aggColumn) { aggColumn => GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) @@ -271,7 +271,7 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } } } - val expectedType = GpuColumnVector.getRapidsType(windowFunc.dataType) + val expectedType = GpuColumnVector.getNonNestedRapidsType(windowFunc.dataType) if (expectedType != aggColumn.getType) { withResource(aggColumn) { aggColumn => GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index 9cb2064da26..00ef2e64b71 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -115,6 +115,10 @@ object HostColumnarToGpu { for (i <- 0 until rows) { b.appendUTF8String(cv.getUTF8String(i).getBytes) } + case (NullType, true) => + for (_ <- 0 until rows) { + b.appendNull() + } case (dt: DecimalType, nullable) => // Because DECIMAL64 is the only supported decimal DType, we can // append unscaledLongValue instead of BigDecimal itself to speedup this conversion. @@ -131,8 +135,8 @@ object HostColumnarToGpu { b.append(cv.getDecimal(i, DType.DECIMAL64_MAX_PRECISION, dt.scale).toUnscaledLong) } } - case (t, n) => - throw new UnsupportedOperationException(s"Converting to GPU for ${t} is not currently " + + case (t, _) => + throw new UnsupportedOperationException(s"Converting to GPU for $t is not currently " + s"supported") } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index ac3d5cb3ae9..4f2404adbf7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -99,7 +99,9 @@ class GpuHashAggregateMeta( resultExpressions override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) + GpuOverrides.isSupportedType(t, + allowNull = true, + allowStringMaps = true) override def tagPlanForGpu(): Unit = { if (agg.resultExpressions.isEmpty) { @@ -266,6 +268,10 @@ class GpuSortAggregateMeta( } } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExec = { // we simply convert to a HashAggregateExec and let GpuOverrides take care of inserting a // GpuSortExec if one is needed @@ -609,7 +615,7 @@ case class GpuHashAggregateExec( childCv } else { withResource(childCv) { childCv => - val rapidsType = GpuColumnVector.getRapidsType(ref.dataType) + val rapidsType = GpuColumnVector.getNonNestedRapidsType(ref.dataType) GpuColumnVector.from(childCv.getBase.castTo(rapidsType), ref.dataType) } } @@ -853,7 +859,7 @@ case class GpuHashAggregateExec( val resCols = new ArrayBuffer[ColumnVector](result.getNumberOfColumns) for (i <- 0 until result.getNumberOfColumns) { - val rapidsType = GpuColumnVector.getRapidsType(dataTypes(i)) + val rapidsType = GpuColumnVector.getNonNestedRapidsType(dataTypes(i)) // cast will be cheap if type matches, only does refCount++ in that case closeOnExcept(result.getColumn(i).castTo(rapidsType)) { castedCol => resCols += GpuColumnVector.from(castedCol, dataTypes(i)) @@ -881,7 +887,7 @@ case class GpuHashAggregateExec( agg.mergeReductionAggregate } withResource(aggFn(toAggregateCvs(agg.getOrdinal(agg.ref)).getBase)) { res => - val rapidsType = GpuColumnVector.getRapidsType(agg.dataType) + val rapidsType = GpuColumnVector.getNonNestedRapidsType(agg.dataType) withResource(cudf.ColumnVector.fromScalar(res, 1)) { cv => cvs += GpuColumnVector.from(cv.castTo(rapidsType), agg.dataType) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala index 6f457dca9b3..670c70a31f8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala @@ -99,7 +99,7 @@ object GpuScalar { } def from(v: Any, t: DataType): Scalar = v match { - case _ if v == null => Scalar.fromNull(GpuColumnVector.getRapidsType(t)) + case _ if v == null => Scalar.fromNull(GpuColumnVector.getNonNestedRapidsType(t)) case _ if t.isInstanceOf[DecimalType] => var bigDec = v match { case vv: Decimal => vv.toBigDecimal.bigDecimal diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala index 61f363be819..4c953061e06 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala @@ -236,7 +236,7 @@ trait GpuGreatestLeastBase extends ComplexTypeMergingExpression with GpuExpressi private[this] def isFp = dataType == FloatType || dataType == DoubleType // TODO need a better way to do this for nested types - protected lazy val dtype: DType = GpuColumnVector.getRapidsType(dataType) + protected lazy val dtype: DType = GpuColumnVector.getNonNestedRapidsType(dataType) override def checkInputDataTypes(): TypeCheckResult = { if (children.length <= 1) { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala index e8236d27eff..62e2594fca2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala @@ -94,6 +94,7 @@ class GpuGetArrayItemMeta( override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowNull = true, allowArray = true, allowStruct = true, allowNesting = true) @@ -131,7 +132,8 @@ case class GpuGetArrayItem(child: Expression, ordinal: Expression) if (ordinal.isValid && ordinal.getInt >= 0) { lhs.getBase.extractListElement(ordinal.getInt) } else { - withResource(Scalar.fromNull(GpuColumnVector.getRapidsType(dataType))) { nullScalar => + withResource(Scalar.fromNull( + GpuColumnVector.getNonNestedRapidsType(dataType))) { nullScalar => ColumnVector.fromScalar(nullScalar, lhs.getRowCount.toInt) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index ea6b071dcf8..a066adf1d10 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -206,6 +206,10 @@ class GpuBroadcastMeta( rule: ConfKeysAndIncompat) extends SparkPlanMeta[BroadcastExchangeExec](exchange, conf, parent, rule) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { if (!TrampolineUtil.isSupportedRelation(exchange.mode)) { willNotWorkOnGpu( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index c3e266ee40c..d955ed32de2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -48,6 +48,10 @@ class GpuBroadcastNestedLoopJoinMeta( override val childExprs: Seq[BaseExprMeta[_]] = condition.toSeq + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def tagPlanForGpu(): Unit = { join.joinType match { case Inner => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala index f2e1f4af20b..641cd8aec37 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExec.scala @@ -57,6 +57,10 @@ class GpuShuffleMeta( wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu)) } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowNull = true) + override def convertToGpu(): GpuExec = ShimLoader.getSparkShims.getGpuShuffleExchangeExec( childParts(0).convertToGpu(), diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 8a2c3772ee2..3e2d7134ba8 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -41,7 +41,8 @@ class CastOpSuite extends GpuExpressionTestSuite { DataTypes.FloatType, DataTypes.DoubleType, DataTypes.DateType, DataTypes.TimestampType, - DataTypes.StringType + DataTypes.StringType, + DataTypes.NullType ) /** Produces a matrix of all possible casts. */ @@ -98,7 +99,7 @@ class CastOpSuite extends GpuExpressionTestSuite { } catch { case e: Exception => - fail(s"Cast from $from to $to failed; ansi=$ansiEnabled", e) + fail(s"Cast from $from to $to failed; ansi=$ansiEnabled $e", e) } } } else if (!shouldSkip) { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala b/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala index 96fb0141524..659234bd86b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala @@ -183,6 +183,7 @@ object FuzzerUtils { case DataTypes.StringType => r.nextString() case DataTypes.TimestampType => r.nextTimestamp() case DataTypes.DateType => r.nextDate() + case DataTypes.NullType => null case _ => throw new IllegalStateException( s"fuzzer does not support data type ${field.dataType}") } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuUnitTests.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuUnitTests.scala index d77b1129ef9..b0e8d06e2a0 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuUnitTests.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuUnitTests.scala @@ -43,7 +43,7 @@ class GpuUnitTests extends SparkQueryCompareTestSuite { val cv = v.asInstanceOf[ColumnVector] // close the vector that was passed in and return a new vector withResource(cv) { cv => - GpuColumnVector.from(cv.castTo(GpuColumnVector.getRapidsType(to)), to) + GpuColumnVector.from(cv.castTo(GpuColumnVector.getNonNestedRapidsType(to)), to) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala index 4d8a54058c6..9010cb601a1 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala @@ -71,7 +71,7 @@ class DecimalUnitTest extends GpuUnitTests { test("test decimal as column vector") { val dt32 = DecimalType(DType.DECIMAL64_MAX_PRECISION, 5) val dt64 = DecimalType(DType.DECIMAL64_MAX_PRECISION, 9) - val cudfCV = ColumnVector.decimalFromDoubles(GpuColumnVector.getRapidsType(dt32), + val cudfCV = ColumnVector.decimalFromDoubles(GpuColumnVector.getNonNestedRapidsType(dt32), RoundingMode.UNNECESSARY, dec32Data.map(_.toDouble): _*) withResource(GpuColumnVector.from(cudfCV, dt32)) { cv: GpuColumnVector => assertResult(dec32Data.length)(cv.getRowCount) @@ -193,7 +193,8 @@ class DecimalUnitTest extends GpuUnitTests { withResource( GpuColumnVector.from(ColumnVector.fromDecimals(dec64Data.map(_.toJavaBigDecimal): _*), DecimalType(DType.DECIMAL64_MAX_PRECISION, 9))) { cv => - val dt = new HostColumnVector.BasicType(false, GpuColumnVector.getRapidsType(cv.dataType())) + val dt = new HostColumnVector.BasicType(false, + GpuColumnVector.getNonNestedRapidsType(cv.dataType())) val builder = new HostColumnVector.ColumnBuilder(dt, cv.getRowCount) withResource(cv.copyToHost()) { hostCV => HostColumnarToGpu.columnarCopy(hostCV, builder, false, cv.getRowCount.toInt) @@ -214,7 +215,8 @@ class DecimalUnitTest extends GpuUnitTests { withResource( GpuColumnVector.from(ColumnVector.fromDecimals(dec64WithNull: _*), DecimalType(DType.DECIMAL64_MAX_PRECISION, 9))) { cv => - val dt = new HostColumnVector.BasicType(true, GpuColumnVector.getRapidsType(cv.dataType())) + val dt = new HostColumnVector.BasicType(true, + GpuColumnVector.getNonNestedRapidsType(cv.dataType())) val builder = new HostColumnVector.ColumnBuilder(dt, cv.getRowCount) withResource(cv.copyToHost()) { hostCV => HostColumnarToGpu.columnarCopy(hostCV, builder, true, cv.getRowCount.toInt)