Skip to content

Commit

Permalink
Add in support for null type (NVIDIA#1176)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Nov 20, 2020
1 parent de29761 commit d2f5be4
Show file tree
Hide file tree
Showing 43 changed files with 384 additions and 105 deletions.
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def test_columnar_pow(data_gen):
@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_least(data_gen):
num_cols = 20
s1 = gen_scalar(data_gen, force_no_nulls=True)
s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))
# we want lots of nulls
gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0))
for x in range(0, num_cols)], nullable=False)
Expand All @@ -446,7 +446,7 @@ def test_least(data_gen):
@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_greatest(data_gen):
num_cols = 20
s1 = gen_scalar(data_gen, force_no_nulls=True)
s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))
# we want lots of nulls
gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0))
for x in range(0, num_cols)], nullable=False)
Expand Down
20 changes: 10 additions & 10 deletions integration_tests/src/main/python/cmp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
def test_eq(data_gen):
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True)
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
Expand All @@ -35,7 +35,7 @@ def test_eq(data_gen):

@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
def test_eq_ns(data_gen):
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True)
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
Expand All @@ -47,7 +47,7 @@ def test_eq_ns(data_gen):

@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
def test_ne(data_gen):
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True)
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
Expand All @@ -59,7 +59,7 @@ def test_ne(data_gen):

@pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn)
def test_lt(data_gen):
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True)
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
Expand All @@ -71,7 +71,7 @@ def test_lt(data_gen):

@pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn)
def test_lte(data_gen):
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True)
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
Expand All @@ -83,7 +83,7 @@ def test_lte(data_gen):

@pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn)
def test_gt(data_gen):
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True)
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
Expand All @@ -95,7 +95,7 @@ def test_gt(data_gen):

@pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn)
def test_gte(data_gen):
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=True)
(s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
Expand All @@ -105,7 +105,7 @@ def test_gte(data_gen):
f.col('b') >= f.lit(None).cast(data_type),
f.col('a') >= f.col('b')))

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_isnull(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(
Expand Down Expand Up @@ -150,7 +150,7 @@ def test_filter_with_lit(expr):
def test_in(data_gen):
# nulls are not supported for in on the GPU yet
num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) - 1
scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=True))
scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen)))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)))

Expand All @@ -160,7 +160,7 @@ def test_in(data_gen):
def test_in_set(data_gen):
# nulls are not supported for in on the GPU yet
num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1
scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=True))
scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen)))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)))

48 changes: 23 additions & 25 deletions integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@

@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_if_else(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True)
string_type = to_cast_string(data_gen.data_type)
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
null_lit = get_null_lit_string(data_gen.data_type)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen).selectExpr(
# A literal predicate is not supported yet
'IF(a, b, c)',
'IF(a, {}, c)'.format(s1),
'IF(a, b, {})'.format(s2),
'IF(a, {}, {})'.format(s1, s2),
'IF(a, b, CAST(null as {}))'.format(string_type),
'IF(a, CAST(null as {}), c)'.format(string_type)))
'IF(a, b, {})'.format(null_lit),
'IF(a, {}, c)'.format(null_lit)))

@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_case_when(data_gen):
num_cmps = 20
s1 = gen_scalar(data_gen, force_no_nulls=True)
s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))
# we want lots of false
bool_gen = BooleanGen().with_special_case(False, weight=1000.0)
gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)]
Expand All @@ -57,7 +57,7 @@ def test_case_when(data_gen):

@pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn)
def test_nanvl(data_gen):
s1 = gen_scalar(data_gen, force_no_nulls=True)
s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).select(
Expand All @@ -68,21 +68,21 @@ def test_nanvl(data_gen):

@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_nvl(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True)
string_type = to_cast_string(data_gen.data_type)
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
null_lit = get_null_lit_string(data_gen.data_type)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
'nvl(a, b)',
'nvl(a, {})'.format(s2),
'nvl({}, b)'.format(s1),
'nvl(CAST(null as {}), b)'.format(string_type),
'nvl(a, CAST(null as {}))'.format(string_type)))
'nvl({}, b)'.format(null_lit),
'nvl(a, {})'.format(null_lit)))

#nvl is translated into a 2 param version of coalesce
@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_coalesce(data_gen):
num_cols = 20
s1 = gen_scalar(data_gen, force_no_nulls=True)
s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))
# we want lots of nulls
gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=1000.0))
for x in range(0, num_cols)], nullable=False)
Expand All @@ -102,38 +102,36 @@ def test_coalesce_constant_output():

@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_nvl2(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True)
string_type = to_cast_string(data_gen.data_type)
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
null_lit = get_null_lit_string(data_gen.data_type)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : three_col_df(spark, data_gen, data_gen, data_gen).selectExpr(
'nvl2(a, b, c)',
'nvl2(a, b, {})'.format(s2),
'nvl2({}, b, c)'.format(s1),
'nvl2(CAST(null as {}), b, c)'.format(string_type),
'nvl2(a, CAST(null as {}), c)'.format(string_type)))
'nvl2({}, b, c)'.format(null_lit),
'nvl2(a, {}, c)'.format(null_lit)))

@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
def test_nullif(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True)
string_type = to_cast_string(data_gen.data_type)
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
null_lit = get_null_lit_string(data_gen.data_type)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
'nullif(a, b)',
'nullif(a, {})'.format(s2),
'nullif({}, b)'.format(s1),
'nullif(CAST(null as {}), b)'.format(string_type),
'nullif(a, CAST(null as {}))'.format(string_type)))
'nullif({}, b)'.format(null_lit),
'nullif(a, {})'.format(null_lit)))

@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
def test_ifnull(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True)
string_type = to_cast_string(data_gen.data_type)
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
null_lit = get_null_lit_string(data_gen.data_type)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
'ifnull(a, b)',
'ifnull(a, {})'.format(s2),
'ifnull({}, b)'.format(s1),
'ifnull(CAST(null as {}), b)'.format(string_type),
'ifnull(a, CAST(null as {}))'.format(string_type)))


'ifnull({}, b)'.format(null_lit),
'ifnull(a, {})'.format(null_lit)))
30 changes: 27 additions & 3 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,16 @@ def contains_ts(self):
return self._key_gen.contains_ts() or self._value_gen.contains_ts()


class NullGen(DataGen):
"""Generate NullType values"""
def __init__(self):
super().__init__(NullType(), nullable=True)

def start(self, rand):
def make_null():
return None
self._start(rand, make_null)

def skip_if_not_utc():
if (not is_tz_utc()):
pytest.skip('The java system time zone is not set to UTC')
Expand Down Expand Up @@ -571,6 +581,8 @@ def _gen_scalars_common(data_gen, count, seed=0):

def gen_scalars(data_gen, count, seed=0, force_no_nulls=False):
"""Generate scalar values."""
if force_no_nulls:
assert(not isinstance(data_gen, NullGen))
src = _gen_scalars_common(data_gen, count, seed=seed)
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls)) for i in range(0, count))

Expand Down Expand Up @@ -640,6 +652,13 @@ def to_cast_string(spark_type):
else:
raise RuntimeError('CAST TO TYPE {} NOT SUPPORTED YET'.format(spark_type))

def get_null_lit_string(spark_type):
if isinstance(spark_type, NullType):
return 'null'
else:
string_type = to_cast_string(spark_type)
return 'CAST(null as {})'.format(string_type)

def _convert_to_sql(t, data):
if isinstance(data, str):
d = "'" + data.replace("'", "\\'") + "'"
Expand All @@ -655,6 +674,9 @@ def _convert_to_sql(t, data):
def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
"""Generate scalar values, but strings that can be used in selectExpr or SQL"""
src = _gen_scalars_common(data_gen, count, seed=seed)
if isinstance(data_gen, NullGen):
assert not force_no_nulls
return ('null' for i in range(0, count))
string_type = to_cast_string(data_gen.data_type)
return (_convert_to_sql(string_type, src.gen(force_no_nulls=force_no_nulls)) for i in range(0, count))

Expand All @@ -669,6 +691,8 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
date_gen = DateGen()
timestamp_gen = TimestampGen()

null_gen = NullGen()

numeric_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen]
integral_gens = [byte_gen, short_gen, int_gen, long_gen]
# A lot of mathematical expressions only support a double as input
Expand All @@ -679,17 +703,17 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):

# all of the basic gens
all_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, timestamp_gen]
string_gen, boolean_gen, date_gen, timestamp_gen, null_gen]

# TODO add in some array generators to this once that is supported for sorting
# a selection of generators that should be orderable (sortable and compareable)
orderable_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, timestamp_gen]
string_gen, boolean_gen, date_gen, timestamp_gen, null_gen]

# TODO add in some array generators to this once that is supported for these operations
# a selection of generators that can be compared for equality
eq_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, timestamp_gen]
string_gen, boolean_gen, date_gen, timestamp_gen, null_gen]

date_gens = [date_gen]
date_n_time_gens = [date_gen, timestamp_gen]
Expand Down
14 changes: 13 additions & 1 deletion integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@
('a', RepeatSeqGen(StringGen(pattern='[0-9]{0,30}'), length= 20)),
('b', IntegerGen()),
('c', LongGen())]
# grouping strings with nulls present, and null value
_grpkey_strings_with_extra_nulls = [
('a', RepeatSeqGen(StringGen(pattern='[0-9]{0,30}'), length= 20)),
('b', IntegerGen()),
('c', NullGen())]
# grouping NullType
_grpkey_nulls = [
('a', NullGen()),
('b', IntegerGen()),
('c', LongGen())]

# grouping floats with other columns containing nans and nulls
_grpkey_floats_with_nulls_and_nans = [
Expand Down Expand Up @@ -112,7 +122,9 @@
_grpkey_longs_with_nulls,
_grpkey_dbls_with_nulls,
_grpkey_floats_with_nulls,
_grpkey_strings_with_nulls]
_grpkey_strings_with_nulls,
_grpkey_nulls,
_grpkey_strings_with_extra_nulls]

# List of schemas with NaNs included
_init_list_with_nans_and_no_nans = [
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from spark_session import with_spark_session, is_before_spark_310

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
BooleanGen(), DateGen(), TimestampGen(),
BooleanGen(), DateGen(), TimestampGen(), null_gen,
pytest.param(FloatGen(), marks=[incompat]),
pytest.param(DoubleGen(), marks=[incompat])]

Expand Down
10 changes: 9 additions & 1 deletion integration_tests/src/main/python/row_conversion_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ def test_row_conversions():
["n", ArrayGen(boolean_gen)], ["o", ArrayGen(ArrayGen(short_gen))],
["p", StructGen([["c0", byte_gen], ["c1", ArrayGen(byte_gen)]])],
["q", simple_string_to_string_map_gen],
["r", MapGen(BooleanGen(nullable=False), ArrayGen(boolean_gen), max_length=2)]]
["r", MapGen(BooleanGen(nullable=False), ArrayGen(boolean_gen), max_length=2)],
["s", null_gen]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again"))

def test_row_conversions_fixed_width():
gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen],
["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen],
["i", timestamp_gen], ["j", date_gen]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again"))
14 changes: 10 additions & 4 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,34 @@
import pyspark.sql.functions as f

orderable_gen_classes = [ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen,
BooleanGen, TimestampGen, DateGen, StringGen]
BooleanGen, TimestampGen, DateGen, StringGen, NullGen]

@pytest.mark.parametrize('data_gen_class', orderable_gen_classes, ids=idfn)
@pytest.mark.parametrize('nullable', [True, False], ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_orderby(data_gen_class, nullable, order):
data_gen = data_gen_class(nullable=nullable)
if (data_gen_class == NullGen):
data_gen = data_gen_class()
else:
data_gen = data_gen_class(nullable=nullable)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order))

@pytest.mark.parametrize('data_gen_class', orderable_gen_classes, ids=idfn)
@pytest.mark.parametrize('nullable', [True, False], ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_sort_in_part(data_gen_class, nullable, order):
data_gen = data_gen_class(nullable=nullable)
if (data_gen_class == NullGen):
data_gen = data_gen_class()
else:
data_gen = data_gen_class(nullable=nullable)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).sortWithinPartitions(order))

orderable_gens_sort = [byte_gen, short_gen, int_gen, long_gen,
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/84')),
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/84')),
boolean_gen, timestamp_gen, date_gen, string_gen]
boolean_gen, timestamp_gen, date_gen, string_gen, null_gen]
@pytest.mark.parametrize('data_gen', orderable_gens_sort, ids=idfn)
def test_multi_orderby(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
Loading

0 comments on commit d2f5be4

Please sign in to comment.