Skip to content

Commit

Permalink
Merge pull request #9727 from NVIDIA/branch-23.12
Browse files Browse the repository at this point in the history
[auto-merge] branch-23.12 to branch-24.02 [skip ci] [bot]
  • Loading branch information
nvauto authored Nov 15, 2023
2 parents f092553 + e4fdd84 commit aeb70db
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 29 deletions.
4 changes: 4 additions & 0 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ EOF
fi
fi

# Set a seed to be used in the tests, for datagen
export SPARK_RAPIDS_TEST_DATAGEN_SEED=${SPARK_RAPIDS_TEST_DATAGEN_SEED:-`date +%s`}
echo "SPARK_RAPIDS_TEST_DATAGEN_SEED used: $SPARK_RAPIDS_TEST_DATAGEN_SEED"

# Set a seed to be used to pick random tests to inject with OOM
export SPARK_RAPIDS_TEST_INJECT_OOM_SEED=${SPARK_RAPIDS_TEST_INJECT_OOM_SEED:-`date +%s`}
echo "SPARK_RAPIDS_TEST_INJECT_OOM_SEED used: $SPARK_RAPIDS_TEST_INJECT_OOM_SEED"
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from marks import ignore_order, incompat, approximate_float, allow_non_gpu
from marks import ignore_order, incompat, approximate_float, allow_non_gpu, datagen_overrides
from pyspark.sql.types import *
from pyspark.sql.types import IntegralType
from spark_session import *
Expand Down Expand Up @@ -585,6 +585,7 @@ def test_floor(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('floor(a)'))

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9722')
@pytest.mark.skipif(is_before_spark_330(), reason='scale parameter in Floor function is not supported before Spark 3.3.0')
@pytest.mark.parametrize('data_gen', double_n_long_gens + _arith_decimal_gens_no_neg_scale, ids=idfn)
def test_floor_scale_zero(data_gen):
Expand Down Expand Up @@ -677,6 +678,7 @@ def test_shift_right_unsigned(data_gen):

@incompat
@approximate_float
@datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9350")
@pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn)
def test_decimal_bround(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/ast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from asserts import assert_cpu_and_gpu_are_equal_collect_with_capture
from data_gen import *
from marks import approximate_float
from marks import approximate_float, datagen_overrides
from spark_session import with_cpu_session, is_before_spark_330
import pyspark.sql.functions as f

Expand Down Expand Up @@ -259,6 +259,7 @@ def test_lt(data_descr):
s2 < f.col('b'),
f.col('a') < f.col('b')))

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9711')
@pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn)
def test_lte(data_descr):
(s1, s2) = with_cpu_session(lambda spark: gen_scalars(data_descr[0], 2))
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ def do_join(spark):
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@ignore_order
def test_cache_expand_exec(data_gen, enable_vectorized_conf):
def op_df(spark, length=2048, seed=0):
def op_df(spark, length=2048):
cached = gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).cache()
('b', IntegerGen())], nullable=False), length=length).cache()
cached.count() # populate the cache
return cached.rollup(f.col("a"), f.col("b")).agg(f.col("b"))

Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/cast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_gen import *
from spark_session import is_before_spark_320, is_before_spark_330, is_spark_340_or_later, \
is_databricks113_or_later
from marks import allow_non_gpu, approximate_float
from marks import allow_non_gpu, approximate_float, datagen_overrides
from pyspark.sql.types import *
from spark_init_internal import spark_version
from datetime import date, datetime
Expand Down Expand Up @@ -146,6 +146,7 @@ def test_cast_string_date_non_ansi():
lambda spark: spark.createDataFrame(data_rows, "a string").select(f.col('a').cast(DateType())),
conf={'spark.rapids.sql.hasExtendedYearValues': 'false'})

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9708')
@pytest.mark.parametrize('data_gen', [StringGen('[0-9]{1,4}-[0-9]{1,2}-[0-9]{1,2}'),
StringGen('[0-9]{1,4}-[0-3][0-9]-[0-5][0-9][ |T][0-3][0-9]:[0-6][0-9]:[0-6][0-9]'),
StringGen('[0-9]{1,4}-[0-3][0-9]-[0-5][0-9][ |T][0-3][0-9]:[0-6][0-9]:[0-6][0-9].[0-9]{0,6}Z?')],
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/cmp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from data_gen import *
from spark_session import with_cpu_session, is_before_spark_330
from pyspark.sql.types import *
from marks import datagen_overrides
import pyspark.sql.functions as f

@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + struct_gens_sample_with_decimal128_no_list, ids=idfn)
Expand Down Expand Up @@ -329,6 +330,7 @@ def test_in(data_gen):

# Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf
# This is to test entries over that value.
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9687')
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn)
def test_in_set(data_gen):
# nulls are not supported for in on the GPU yet
Expand Down
6 changes: 5 additions & 1 deletion integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pyspark.sql.functions as f
import pyspark.sql.utils
from spark_session import with_cpu_session, with_gpu_session
from conftest import get_datagen_seed

nested_gens = [ArrayGen(LongGen()), ArrayGen(decimal_gen_128bit),
StructGen([("a", LongGen()), ("b", decimal_gen_128bit)]),
Expand Down Expand Up @@ -258,8 +259,11 @@ def test_sequence_without_step(start_gen, stop_gen):

@pytest.mark.parametrize('start_gen,stop_gen,step_gen', sequence_normal_integral_gens, ids=idfn)
def test_sequence_with_step(start_gen, stop_gen, step_gen):
# Get the datagen seed we use for all datagens, since we need to call start
# on step_gen
data_gen_seed = get_datagen_seed()
# Get a step scalar from the 'step_gen' which follows the rules.
step_gen.start(random.Random(0))
step_gen.start(random.Random(data_gen_seed))
step_lit = step_gen.gen()
assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, start_gen, stop_gen, step_gen).selectExpr(
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from data_gen import *
from spark_session import is_before_spark_320, is_jvm_charset_utf8
from pyspark.sql.types import *
from marks import datagen_overrides
import pyspark.sql.functions as f

def mk_str_gen(pattern):
Expand Down Expand Up @@ -68,6 +69,7 @@ def test_if_else_map(data_gen):
'IF(TRUE, b, c)',
'IF(a, b, c)'))

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9685')
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens, ids=idfn)
def test_case_when(data_gen):
Expand Down Expand Up @@ -130,6 +132,7 @@ def test_nvl(data_gen):
# in both cpu and gpu runs.
# E: java.lang.AssertionError: assertion failed: each serializer expression should contain\
# at least one `BoundReference`
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9684')
@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens_nonempty_struct + map_gens_sample, ids=idfn)
def test_coalesce(data_gen):
num_cols = 20
Expand All @@ -140,7 +143,6 @@ def test_coalesce(data_gen):
for x in range(0, num_cols)], nullable=False)
command_args = [f.col('_c' + str(x)) for x in range(0, num_cols)]
command_args.append(s1)
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen).select(
f.coalesce(*command_args)))
Expand Down
25 changes: 24 additions & 1 deletion integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,17 @@ def is_parquet_testing_tests_forced():
_inject_oom = None

def should_inject_oom():
global _inject_oom
return _inject_oom != None

# For datagen: we expect a seed to be provided by the environment, or default to 0.
# Note that tests can override their seed when calling into datagen by setting seed= in their tests.
_test_datagen_random_seed = int(os.getenv("SPARK_RAPIDS_TEST_DATAGEN_SEED", 0))
print(f"Starting with datagen test seed: {_test_datagen_random_seed}. "
"Set env variable SPARK_RAPIDS_TEST_DATAGEN_SEED to override.")

def get_datagen_seed():
return _test_datagen_random_seed

def get_limit():
return _limit

Expand All @@ -133,7 +141,12 @@ def pytest_runtest_setup(item):
global _sort_on_spark
global _sort_locally
global _inject_oom
global _test_datagen_random_seed
_inject_oom = item.get_closest_marker('inject_oom')
datagen_overrides = item.get_closest_marker('datagen_overrides')
if datagen_overrides:
_test_datagen_random_seed = datagen_overrides.kwargs.get('seed', _test_datagen_random_seed)

order = item.get_closest_marker('ignore_order')
if order:
if order.kwargs.get('local', False):
Expand Down Expand Up @@ -260,6 +273,16 @@ def pytest_collection_modifyitems(config, items):
# decide if OOMs should be injected, and when
injection_mode = config.getoption('test_oom_injection_mode').lower()
inject_choice = False
datagen_overrides = item.get_closest_marker('datagen_overrides')
if datagen_overrides:
test_datagen_random_seed_choice = datagen_overrides.kwargs.get('seed', _test_datagen_random_seed)
if test_datagen_random_seed_choice != _test_datagen_random_seed:
extras.append('DATAGEN_SEED_OVERRIDE=%s' % str(test_datagen_random_seed_choice))
else:
extras.append('DATAGEN_SEED=%s' % str(test_datagen_random_seed_choice))
else:
extras.append('DATAGEN_SEED=%s' % str(_test_datagen_random_seed))

if injection_mode == 'random':
inject_choice = r.randrange(0, 2) == 1
elif injection_mode == 'always':
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list,
"'T'HH:mm[:ss]",
"'T'HH:mm"]

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9701')
@pytest.mark.parametrize('ts_part', csv_supported_ts_parts)
@pytest.mark.parametrize('date_format', csv_supported_date_formats)
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
Expand Down
42 changes: 26 additions & 16 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from spark_session import is_tz_utc, is_before_spark_340, with_cpu_session
import sre_yield
import struct
from conftest import skip_unless_precommit_tests
from conftest import skip_unless_precommit_tests,get_datagen_seed
import time
import os
from functools import lru_cache
Expand Down Expand Up @@ -756,14 +756,19 @@ def skip_if_not_utc():
# Note: Current(2023/06/06) maxmium IT data size is 7282688 bytes, so LRU cache with maxsize 128
# will lead to 7282688 * 128 = 932 MB additional memory usage in edge case, which is acceptable.
@lru_cache(maxsize=128, typed=True)
def gen_df_help(data_gen, length, seed):
rand = random.Random(seed)
def gen_df_help(data_gen, length, seed_value):
rand = random.Random(seed_value)
data_gen.start(rand)
data = [data_gen.gen() for index in range(0, length)]
return data

def gen_df(spark, data_gen, length=2048, seed=0, num_slices=None):
def gen_df(spark, data_gen, length=2048, seed=None, num_slices=None):
"""Generate a spark dataframe from the given data generators."""
if seed is None:
seed_value = get_datagen_seed()
else:
seed_value = seed

if isinstance(data_gen, list):
src = StructGen(data_gen, nullable=False)
else:
Expand All @@ -775,7 +780,7 @@ def gen_df(spark, data_gen, length=2048, seed=0, num_slices=None):
if src.contains_ts():
skip_if_not_utc()

data = gen_df_help(src, length, seed)
data = gen_df_help(src, length, seed_value)

# We use `numSlices` to create an RDD with the specific number of partitions,
# which is then turned into a dataframe. If not specified, it is `None` (default spark value)
Expand Down Expand Up @@ -816,39 +821,44 @@ def _mark_as_lit(data, data_type):
# lit does not take a data type so we might have to cast it
return f.lit(data).cast(data_type)

def _gen_scalars_common(data_gen, count, seed=0):
def _gen_scalars_common(data_gen, count, seed=None):
if isinstance(data_gen, list):
src = StructGen(data_gen, nullable=False)
else:
src = data_gen

if seed is None:
seed_value = get_datagen_seed()
else:
seed_value = seed

# Before we get too far we need to verify that we can run with timestamps
if src.contains_ts():
skip_if_not_utc()

rand = random.Random(seed)
rand = random.Random(seed_value)
src.start(rand)
return src

def gen_scalars(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalars(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values."""
if force_no_nulls:
assert(not isinstance(data_gen, NullGen))
src = _gen_scalars_common(data_gen, count, seed=seed)
data_type = src.data_type
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count))

def gen_scalar(data_gen, seed=0, force_no_nulls=False):
def gen_scalar(data_gen, seed=None, force_no_nulls=False):
"""Generate a single scalar value."""
v = list(gen_scalars(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]

def gen_scalar_values(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalar_values(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values."""
src = _gen_scalars_common(data_gen, count, seed=seed)
return (src.gen(force_no_nulls=force_no_nulls) for i in range(0, count))

def gen_scalar_value(data_gen, seed=0, force_no_nulls=False):
def gen_scalar_value(data_gen, seed=None, force_no_nulls=False):
"""Generate a single scalar value."""
v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]
Expand Down Expand Up @@ -890,18 +900,18 @@ def tmp(something):
return meta + idfn(something)
return tmp

def three_col_df(spark, a_gen, b_gen, c_gen, length=2048, seed=0, num_slices=None):
def three_col_df(spark, a_gen, b_gen, c_gen, length=2048, seed=None, num_slices=None):
gen = StructGen([('a', a_gen),('b', b_gen),('c', c_gen)], nullable=False)
return gen_df(spark, gen, length=length, seed=seed, num_slices=num_slices)

def two_col_df(spark, a_gen, b_gen, length=2048, seed=0, num_slices=None):
def two_col_df(spark, a_gen, b_gen, length=2048, seed=None, num_slices=None):
gen = StructGen([('a', a_gen),('b', b_gen)], nullable=False)
return gen_df(spark, gen, length=length, seed=seed, num_slices=num_slices)

def binary_op_df(spark, gen, length=2048, seed=0, num_slices=None):
def binary_op_df(spark, gen, length=2048, seed=None, num_slices=None):
return two_col_df(spark, gen, gen, length=length, seed=seed, num_slices=num_slices)

def unary_op_df(spark, gen, length=2048, seed=0, num_slices=None):
def unary_op_df(spark, gen, length=2048, seed=None, num_slices=None):
return gen_df(spark, StructGen([('a', gen)], nullable=False),
length=length, seed=seed, num_slices=num_slices)

Expand Down Expand Up @@ -974,7 +984,7 @@ def _convert_to_sql(spark_type, data):
else:
return 'CAST({} as {})'.format(d, to_cast_string(spark_type))

def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalars_for_sql(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values, but strings that can be used in selectExpr or SQL"""
src = _gen_scalars_common(data_gen, count, seed=seed)
if isinstance(data_gen, NullGen):
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/expand_exec_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
# see https://issues.apache.org/jira/browse/SPARK-40089.
@ignore_order(local=True)
def test_expand_exec(data_gen):
def op_df(spark, length=2048, seed=0):
def op_df(spark, length=2048):
return gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).rollup(f.col("a"), f.col("b")).agg(f.col("b"))
('b', IntegerGen())], nullable=False), length=length).rollup(f.col("a"), f.col("b")).agg(f.col("b"))

assert_gpu_and_cpu_are_equal_collect(op_df)
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
arrays_with_binary = [ArrayGen(BinaryGen(max_length=5))]
maps_with_binary = [MapGen(IntegerGen(nullable=False), BinaryGen(max_length=5))]

def four_op_df(spark, gen, length=2048, seed=0):
def four_op_df(spark, gen, length=2048):
return gen_df(spark, StructGen([
('a', gen),
('b', gen),
('c', gen),
('d', gen)], nullable=False), length=length, seed=seed)
('d', gen)], nullable=False), length=length)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/map_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture
from data_gen import *
from conftest import is_databricks_runtime
from marks import allow_non_gpu, ignore_order
from marks import allow_non_gpu, ignore_order, datagen_overrides
from spark_session import is_before_spark_330, is_databricks104_or_later, is_databricks113_or_later, is_spark_33X, is_spark_340_or_later
from pyspark.sql.functions import create_map, col, lit, row_number
from pyspark.sql.types import *
Expand Down Expand Up @@ -186,6 +186,7 @@ def query_map_scalar(spark):


@allow_non_gpu('WindowLocalExec')
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9683')
@pytest.mark.parametrize('data_gen', supported_key_map_gens, ids=idfn)
def test_map_scalars_supported_key_types(data_gen):
key_gen = data_gen._key_gen
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
delta_lake = pytest.mark.delta_lake
large_data_test = pytest.mark.large_data_test
pyarrow_test = pytest.mark.pyarrow_test
datagen_overrides = pytest.mark.datagen_overrides
1 change: 1 addition & 0 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ def generate_map_with_empty_validity(spark, path):
lambda spark, path: spark.read.parquet(path),
data_path)

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9701')
@pytest.mark.parametrize('data_gen', parquet_nested_datetime_gen, ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase_write', ['CORRECTED', 'LEGACY'])
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def test_window_aggs_for_range_numeric_date(data_gen, batch_size):
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9682")
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls,
_grpkey_longs_with_nulls,
Expand Down

0 comments on commit aeb70db

Please sign in to comment.