From b9aadf1ce02f04730f6a31ce599678591c670962 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Mon, 9 Oct 2023 18:51:46 +0800 Subject: [PATCH 1/7] add negative scale config before calling lit Signed-off-by: Haoyang Li --- integration_tests/src/main/python/data_gen.py | 9 ++++++--- integration_tests/src/main/python/spark_session.py | 4 ++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 2a9d7e5e6f0..9b5a4b1a7ca 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -21,7 +21,7 @@ from pyspark.sql.types import * import pyspark.sql.functions as f import random -from spark_session import is_tz_utc, is_before_spark_340 +from spark_session import is_tz_utc, is_before_spark_340, set_single_conf import sre_yield import struct from conftest import skip_unless_precommit_tests @@ -596,11 +596,12 @@ def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc): self._epoch = datetime(1970, 1, 1, tzinfo=tzinfo) self._start_time = self._to_us_since_epoch(start) self._end_time = self._to_us_since_epoch(end) + self._tzinfo = tzinfo if (self._epoch >= start and self._epoch <= end): self.with_special_case(self._epoch) def _cache_repr(self): - return super()._cache_repr() + '(' + str(self._start_time) + ',' + str(self._end_time) + ')' + return super()._cache_repr() + '(' + str(self._start_time) + ',' + str(self._end_time) + ',' + str(self._tzinfo) + ')' _us = timedelta(microseconds=1) @@ -811,6 +812,9 @@ def _mark_as_lit(data, data_type): col_array.append(_mark_as_lit(k, data_type.keyType)) col_array.append(_mark_as_lit(data[k], data_type.valueType)) return f.create_map(*col_array) + elif isinstance(data_type, DecimalType): + set_single_conf("spark.sql.legacy.allowNegativeScaleOfDecimal", "true") + return f.lit(data).cast(data_type) else: # lit does not take a data type so we might have to cast it return f.lit(data).cast(data_type) @@ -1172,4 +1176,3 @@ def get_25_partitions_df(spark): StructField("c3", IntegerType())]) data = [[i, j, k] for i in range(0, 5) for j in range(0, 5) for k in range(0, 100)] return spark.createDataFrame(data, schema) - diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index bbc8c2cc414..bf197891ded 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -76,6 +76,10 @@ def _set_all_confs(conf): if _spark.conf.get(key, None) != value: _spark.conf.set(key, value) +def set_single_conf(key, value): + """Set a single config for a given spark session.""" + _spark.conf.set(key, value) + def reset_spark_session_conf(): """Reset all of the configs for a given spark session.""" _set_all_confs(_orig_conf) From c48f8f2c7c1e9af08913a6911fb57b19df50b8b1 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Mon, 9 Oct 2023 23:47:19 +0800 Subject: [PATCH 2/7] warp gen scalars with a spark session Signed-off-by: Haoyang Li --- integration_tests/src/main/python/data_gen.py | 19 ++++++++++--------- .../src/main/python/spark_session.py | 4 ---- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 9b5a4b1a7ca..e25d565fa0a 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -21,7 +21,7 @@ from pyspark.sql.types import * import pyspark.sql.functions as f import random -from spark_session import is_tz_utc, is_before_spark_340, set_single_conf +from spark_session import is_tz_utc, is_before_spark_340, with_cpu_session import sre_yield import struct from conftest import skip_unless_precommit_tests @@ -812,9 +812,6 @@ def _mark_as_lit(data, data_type): col_array.append(_mark_as_lit(k, data_type.keyType)) col_array.append(_mark_as_lit(data[k], data_type.valueType)) return f.create_map(*col_array) - elif isinstance(data_type, DecimalType): - set_single_conf("spark.sql.legacy.allowNegativeScaleOfDecimal", "true") - return f.lit(data).cast(data_type) else: # lit does not take a data type so we might have to cast it return f.lit(data).cast(data_type) @@ -835,11 +832,15 @@ def _gen_scalars_common(data_gen, count, seed=0): def gen_scalars(data_gen, count, seed=0, force_no_nulls=False): """Generate scalar values.""" - if force_no_nulls: - assert(not isinstance(data_gen, NullGen)) - src = _gen_scalars_common(data_gen, count, seed=seed) - data_type = src.data_type - return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count)) + def gen_scalars_help(data_gen, count, seed, force_no_nulls): + if force_no_nulls: + assert(not isinstance(data_gen, NullGen)) + src = _gen_scalars_common(data_gen, count, seed=seed) + data_type = src.data_type + return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count)) + return with_cpu_session(lambda spark: gen_scalars_help(data_gen=data_gen, + count=count, seed=seed, + force_no_nulls=force_no_nulls)) def gen_scalar(data_gen, seed=0, force_no_nulls=False): """Generate a single scalar value.""" diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index bf197891ded..bbc8c2cc414 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -76,10 +76,6 @@ def _set_all_confs(conf): if _spark.conf.get(key, None) != value: _spark.conf.set(key, value) -def set_single_conf(key, value): - """Set a single config for a given spark session.""" - _spark.conf.set(key, value) - def reset_spark_session_conf(): """Reset all of the configs for a given spark session.""" _set_all_confs(_orig_conf) From ecdb8514e2cdef3925f4d042517b4a39f248d808 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Tue, 10 Oct 2023 11:35:47 +0800 Subject: [PATCH 3/7] change every use of scalar gen to wrap them into a spark session Signed-off-by: Haoyang Li --- integration_tests/README.md | 4 ++ .../src/main/python/arithmetic_ops_test.py | 8 ++-- .../src/main/python/array_test.py | 6 ++- integration_tests/src/main/python/ast_test.py | 21 ++++++---- integration_tests/src/main/python/cmp_test.py | 42 ++++++++++++------- .../src/main/python/collection_ops_test.py | 21 ++++++---- .../src/main/python/conditionals_test.py | 24 +++++++---- integration_tests/src/main/python/data_gen.py | 14 +++---- .../src/main/python/generate_expr_test.py | 6 ++- integration_tests/src/main/python/map_test.py | 3 +- integration_tests/src/main/python/orc_test.py | 3 +- .../src/main/python/parquet_test.py | 3 +- .../src/main/python/string_test.py | 9 ++-- .../src/main/python/window_function_test.py | 3 +- 14 files changed, 107 insertions(+), 60 deletions(-) diff --git a/integration_tests/README.md b/integration_tests/README.md index 321bfa68c45..f64115bd448 100644 --- a/integration_tests/README.md +++ b/integration_tests/README.md @@ -454,6 +454,10 @@ The marks you care about are all in marks.py For the most part you can ignore this file. It provides the underlying Spark session to operations that need it, but most tests should interact with it through `asserts.py`. +All data generation should be putted inside a spark session context of some kind. +Typically this is done by passing a lambda to functions in `asserts.py` such as `assert_gpu_and_cpu_are_equal_collect`. +However, for scalar generation like `gen_scalars`, you may need to put it in a `with_cpu_session`. It is because negative scale decimals can have problems if called from outside of `with_spark_session`. + ## Guidelines for Testing When support for a new operator is added to the Rapids Accelerator for Spark, or when an existing operator is extended diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index 9749208d57e..a3ef0840dbd 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -938,10 +938,11 @@ def test_columnar_pow(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr('pow(a, b)')) -@pytest.mark.parametrize('data_gen', all_basic_gens + _arith_decimal_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', [DecimalGen(25,-3)], ids=idfn) def test_least(data_gen): num_cols = 20 - s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) + s1 = with_cpu_session( + lambda spark: gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))) # we want lots of nulls gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0)) for x in range(0, num_cols)], nullable=False) @@ -956,7 +957,8 @@ def test_least(data_gen): @pytest.mark.parametrize('data_gen', all_basic_gens + _arith_decimal_gens, ids=idfn) def test_greatest(data_gen): num_cols = 20 - s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) + s1 = with_cpu_session( + lambda spark: gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))) # we want lots of nulls gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0)) for x in range(0, num_cols)], nullable=False) diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index 99b68ccfba1..f26c78ef9ee 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -172,7 +172,8 @@ def test_array_item_ansi_not_fail_all_null_data(): StructGen([['child0', StructGen([['child01', IntegerGen()]])], ['child1', string_gen], ['child2', float_gen]], nullable=False), StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]], nullable=False)], ids=idfn) def test_make_array(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( 'array(null)', @@ -213,7 +214,8 @@ def test_orderby_array_of_structs(data_gen): string_gen, boolean_gen, date_gen, timestamp_gen], ids=idfn) def test_array_contains(data_gen): arr_gen = ArrayGen(data_gen) - literal = gen_scalar(data_gen, force_no_nulls=True) + literal = with_cpu_session( + lambda spark: gen_scalar(data_gen, force_no_nulls=True)) def get_input(spark): return two_col_df(spark, arr_gen, data_gen) diff --git a/integration_tests/src/main/python/ast_test.py b/integration_tests/src/main/python/ast_test.py index 6205a8c15f8..1fa535c7075 100644 --- a/integration_tests/src/main/python/ast_test.py +++ b/integration_tests/src/main/python/ast_test.py @@ -75,7 +75,8 @@ def test_literal(spark_tmp_path, data_gen): # Write data to Parquet so Spark generates a plan using just the count of the data. data_path = spark_tmp_path + '/AST_TEST_DATA' with_cpu_session(lambda spark: gen_df(spark, [("a", IntegerGen())]).write.parquet(data_path)) - scalar = gen_scalar(data_gen, force_no_nulls=True) + scalar = with_cpu_session( + lambda spark: gen_scalar(data_gen, force_no_nulls=True)) assert_gpu_ast(is_supported=True, func=lambda spark: spark.read.parquet(data_path).select(scalar)) @@ -234,7 +235,8 @@ def test_expm1(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_eq(data_descr): - (s1, s2) = gen_scalars(data_descr[0], 2) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') == s1, @@ -243,7 +245,8 @@ def test_eq(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_ne(data_descr): - (s1, s2) = gen_scalars(data_descr[0], 2) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') != s1, @@ -252,7 +255,8 @@ def test_ne(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_lt(data_descr): - (s1, s2) = gen_scalars(data_descr[0], 2) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') < s1, @@ -261,7 +265,8 @@ def test_lt(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_lte(data_descr): - (s1, s2) = gen_scalars(data_descr[0], 2) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') <= s1, @@ -270,7 +275,8 @@ def test_lte(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_gt(data_descr): - (s1, s2) = gen_scalars(data_descr[0], 2) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') > s1, @@ -279,7 +285,8 @@ def test_gt(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_gte(data_descr): - (s1, s2) = gen_scalars(data_descr[0], 2) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') >= s1, diff --git a/integration_tests/src/main/python/cmp_test.py b/integration_tests/src/main/python/cmp_test.py index eabe02fcffd..91fde8afeea 100644 --- a/integration_tests/src/main/python/cmp_test.py +++ b/integration_tests/src/main/python/cmp_test.py @@ -22,7 +22,8 @@ @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + struct_gens_sample_with_decimal128_no_list, ids=idfn) def test_eq(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -35,7 +36,8 @@ def test_eq(data_gen): @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') def test_eq_for_interval(): def test_func(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -53,7 +55,8 @@ def test_func(data_gen): @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + struct_gens_sample_with_decimal128_no_list, ids=idfn) def test_eq_ns(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -66,7 +69,8 @@ def test_eq_ns(data_gen): @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') def test_eq_ns_for_interval(): data_gen = DayTimeIntervalGen() - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -78,7 +82,8 @@ def test_eq_ns_for_interval(): @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + struct_gens_sample_with_decimal128_no_list, ids=idfn) def test_ne(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -91,7 +96,8 @@ def test_ne(data_gen): @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') def test_ne_for_interval(): def test_func(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -109,7 +115,8 @@ def test_func(data_gen): @pytest.mark.parametrize('data_gen', orderable_gens + struct_gens_sample_with_decimal128_no_list, ids=idfn) def test_lt(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -122,7 +129,8 @@ def test_lt(data_gen): @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') def test_lt_for_interval(): def test_func(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -140,7 +148,8 @@ def test_func(data_gen): @pytest.mark.parametrize('data_gen', orderable_gens + struct_gens_sample_with_decimal128_no_list, ids=idfn) def test_lte(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -153,7 +162,8 @@ def test_lte(data_gen): @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') def test_lte_for_interval(): def test_func(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -171,7 +181,8 @@ def test_func(data_gen): @pytest.mark.parametrize('data_gen', orderable_gens, ids=idfn) def test_gt(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -184,7 +195,8 @@ def test_gt(data_gen): @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') def test_gt_interval(): def test_func(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -202,7 +214,8 @@ def test_func(data_gen): @pytest.mark.parametrize('data_gen', orderable_gens + struct_gens_sample_with_decimal128_no_list, ids=idfn) def test_gte(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -215,7 +228,8 @@ def test_gte(data_gen): @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') def test_gte_for_interval(): def test_func(data_gen): - (s1, s2) = gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( diff --git a/integration_tests/src/main/python/collection_ops_test.py b/integration_tests/src/main/python/collection_ops_test.py index 8734d5d77b5..8c44229a2a1 100644 --- a/integration_tests/src/main/python/collection_ops_test.py +++ b/integration_tests/src/main/python/collection_ops_test.py @@ -45,8 +45,8 @@ def test_concat_list(data_gen): @pytest.mark.parametrize('dg', non_nested_array_gens, ids=idfn) def test_concat_double_list_with_lit(dg): data_gen = ArrayGen(dg, max_length=2) - array_lit = gen_scalar(data_gen) - array_lit2 = gen_scalar(data_gen) + array_lit = with_cpu_session(lambda spark: gen_scalar(data_gen)) + array_lit2 = with_cpu_session(lambda spark: gen_scalar(data_gen)) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, data_gen).select( f.concat(f.col('a'), @@ -67,8 +67,10 @@ def test_concat_double_list_with_lit(dg): @pytest.mark.parametrize('data_gen', non_nested_array_gens, ids=idfn) def test_concat_list_with_lit(data_gen): - lit_col1 = f.lit(gen_scalar(data_gen)).cast(data_gen.data_type) - lit_col2 = f.lit(gen_scalar(data_gen)).cast(data_gen.data_type) + lit_col1 = f.lit(with_cpu_session( + lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col2 = f.lit(with_cpu_session( + lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, data_gen).select( @@ -78,7 +80,8 @@ def test_concat_list_with_lit(data_gen): def test_concat_string(): gen = mk_str_gen('.{0,5}') - (s1, s2) = gen_scalars(gen, 2, force_no_nulls=True) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, gen).select( f.concat(), @@ -106,8 +109,10 @@ def test_map_concat(data_gen): @pytest.mark.parametrize('data_gen', map_gens_sample + decimal_64_map_gens + decimal_128_map_gens, ids=idfn) def test_map_concat_with_lit(data_gen): - lit_col1 = f.lit(gen_scalar(data_gen)).cast(data_gen.data_type) - lit_col2 = f.lit(gen_scalar(data_gen)).cast(data_gen.data_type) + lit_col1 = f.lit(with_cpu_session( + lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col2 = f.lit(with_cpu_session( + lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, data_gen).select( f.map_concat(f.col('a'), f.col('b'), lit_col1), @@ -150,7 +155,7 @@ def test_sort_array(data_gen): @pytest.mark.parametrize('data_gen', _sort_array_gens, ids=idfn) def test_sort_array_lit(data_gen): - array_lit = gen_scalar(data_gen) + array_lit = with_cpu_session(lambda spark: gen_scalar(data_gen)) assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen, length=10).select( f.sort_array(f.lit(array_lit), True), diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 6c7ce0ec67c..4a66919f72a 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -44,7 +44,8 @@ def mk_str_gen(pattern): @pytest.mark.parametrize('data_gen', all_gens + if_nested_gens, ids=idfn) def test_if_else(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen).selectExpr( @@ -71,7 +72,8 @@ def test_if_else_map(data_gen): @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens, ids=idfn) def test_case_when(data_gen): num_cmps = 20 - s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) + s1 = with_cpu_session( + lambda spark: gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))) # we want lots of false bool_gen = BooleanGen().with_special_case(False, weight=1000.0) gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)] @@ -100,7 +102,8 @@ def test_case_when(data_gen): @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_nanvl(data_gen): - s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) + s1 = with_cpu_session( + lambda spark: gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))) data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).select( @@ -111,7 +114,8 @@ def test_nanvl(data_gen): @pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens, ids=idfn) def test_nvl(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( @@ -129,7 +133,8 @@ def test_nvl(data_gen): @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens_nonempty_struct + map_gens_sample, ids=idfn) def test_coalesce(data_gen): num_cols = 20 - s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) + s1 = with_cpu_session( + lambda spark: gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen))) # we want lots of nulls gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=1000.0)) for x in range(0, num_cols)], nullable=False) @@ -149,7 +154,8 @@ def test_coalesce_constant_output(): @pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens, ids=idfn) def test_nvl2(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : three_col_df(spark, data_gen, data_gen, data_gen).selectExpr( @@ -161,7 +167,8 @@ def test_nvl2(data_gen): @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn) def test_nullif(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( @@ -173,7 +180,8 @@ def test_nullif(data_gen): @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn) def test_ifnull(data_gen): - (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))) null_lit = get_null_lit_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index e25d565fa0a..fdfc595bf4e 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -832,15 +832,11 @@ def _gen_scalars_common(data_gen, count, seed=0): def gen_scalars(data_gen, count, seed=0, force_no_nulls=False): """Generate scalar values.""" - def gen_scalars_help(data_gen, count, seed, force_no_nulls): - if force_no_nulls: - assert(not isinstance(data_gen, NullGen)) - src = _gen_scalars_common(data_gen, count, seed=seed) - data_type = src.data_type - return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count)) - return with_cpu_session(lambda spark: gen_scalars_help(data_gen=data_gen, - count=count, seed=seed, - force_no_nulls=force_no_nulls)) + if force_no_nulls: + assert(not isinstance(data_gen, NullGen)) + src = _gen_scalars_common(data_gen, count, seed=seed) + data_type = src.data_type + return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count)) def gen_scalar(data_gen, seed=0, force_no_nulls=False): """Generate a single scalar value.""" diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py index 5d7779ea35e..fc156d37493 100644 --- a/integration_tests/src/main/python/generate_expr_test.py +++ b/integration_tests/src/main/python/generate_expr_test.py @@ -46,7 +46,8 @@ def test_explode_makearray(data_gen): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', explode_gens, ids=idfn) def test_explode_litarray(data_gen): - array_lit = gen_scalar(ArrayGen(data_gen, min_length=3, max_length=3, nullable=False)) + array_lit = with_cpu_session( + lambda spark: gen_scalar(ArrayGen(data_gen, min_length=3, max_length=3, nullable=False))) assert_gpu_and_cpu_are_equal_collect( lambda spark : four_op_df(spark, data_gen).select(f.col('a'), f.col('b'), f.col('c'), f.explode(array_lit))) @@ -133,7 +134,8 @@ def test_posexplode_makearray(data_gen): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', explode_gens, ids=idfn) def test_posexplode_litarray(data_gen): - array_lit = gen_scalar(ArrayGen(data_gen, min_length=3, max_length=3, nullable=False)) + array_lit = with_cpu_session( + lambda spark: gen_scalar(ArrayGen(data_gen, min_length=3, max_length=3, nullable=False))) assert_gpu_and_cpu_are_equal_collect( lambda spark : four_op_df(spark, data_gen).select(f.col('a'), f.col('b'), f.col('c'), f.posexplode(array_lit))) diff --git a/integration_tests/src/main/python/map_test.py b/integration_tests/src/main/python/map_test.py index 90dc14f4afd..d8405b2b4fa 100644 --- a/integration_tests/src/main/python/map_test.py +++ b/integration_tests/src/main/python/map_test.py @@ -363,7 +363,8 @@ def test_str_to_map_expr_random_delimiters(): delim_gen = StringGen(pattern='[0-9a-z :,]', nullable=False) (pair_delim, keyval_delim) = ('', '') while pair_delim == keyval_delim: - (pair_delim, keyval_delim) = gen_scalars_for_sql(delim_gen, 2, force_no_nulls=True) + (pair_delim, keyval_delim) = with_cpu_session( + lambda spark: gen_scalars_for_sql(delim_gen, 2, force_no_nulls=True)) assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, data_gen).selectExpr( 'str_to_map(a) as m1', diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index 7b879d01238..025593b3e94 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -189,7 +189,8 @@ def test_pred_push_round_trip(spark_tmp_path, orc_gen, read_func, v1_enabled_lis gen_list = [('a', RepeatSeqGen(orc_gen, 100)), ('b', orc_gen), ('s1', StructGen([['sa', orc_gen]])), ('s2', StructGen([['sa', StructGen([['ssa', orc_gen]])]]))] - s0 = gen_scalar(orc_gen, force_no_nulls=True) + s0 = with_cpu_session( + lambda spark: gen_scalar(orc_gen, force_no_nulls=True)) with_cpu_session( lambda spark : gen_df(spark, gen_list).orderBy('a').write.orc(data_path)) all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index b6261b2295c..9c02d266d2f 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -300,7 +300,8 @@ def test_parquet_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_l def test_parquet_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' gen_list = [('a', RepeatSeqGen(parquet_gen, 100)), ('b', parquet_gen)] - s0 = gen_scalar(parquet_gen, force_no_nulls=True) + s0 = with_cpu_session( + lambda spark: gen_scalar(parquet_gen, force_no_nulls=True)) with_cpu_session( lambda spark : gen_df(spark, gen_list).orderBy('a').write.parquet(data_path), conf=rebase_write_corrected_conf) diff --git a/integration_tests/src/main/python/string_test.py b/integration_tests/src/main/python/string_test.py index 23800edfbca..0f8ff185fc4 100644 --- a/integration_tests/src/main/python/string_test.py +++ b/integration_tests/src/main/python/string_test.py @@ -346,7 +346,8 @@ def assert_gpu_did_fallback(op): def test_concat_ws_basic(): gen = StringGen(nullable=True) - (s1, s2) = gen_scalars(gen, 2, force_no_nulls=True) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, gen).select( f.concat_ws("-"), @@ -365,7 +366,8 @@ def test_concat_ws_basic(): def test_concat_ws_arrays(): gen = ArrayGen(StringGen(nullable=True), nullable=True) - (s1, s2) = gen_scalars(gen, 2, force_no_nulls=True) + (s1, s2) = with_cpu_session( + lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, gen).select( f.concat_ws("*", f.array(f.lit('2'), f.lit(''), f.lit('3'), f.lit('Z'))), @@ -536,7 +538,8 @@ def test_ephemeral_substring(): def test_repeat_scalar_and_column(): gen_s = StringGen(nullable=False) gen_r = IntegerGen(min_val=-100, max_val=100, special_cases=[0], nullable=True) - (s,) = gen_scalars_for_sql(gen_s, 1) + (s,) = with_cpu_session( + lambda spark: gen_scalars_for_sql(gen_s, 1)) assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, gen_r).selectExpr( 'repeat({}, a)'.format(s), diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index b4708b89668..54df60d3cbc 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -689,7 +689,8 @@ def do_it(spark): return df.withColumn('lead_def_c', f.lead('c', 2, None).over(base_window_spec)) \ .withColumn('lag_def_c', f.lag('c', 4, None).over(base_window_spec)) else: - default_val = gen_scalar_value(c_gen, force_no_nulls=False) + default_val = with_cpu_session( + lambda spark: gen_scalar_value(c_gen, force_no_nulls=False)) return df.withColumn('inc_max_c', f.max('c').over(inclusive_window_spec)) \ .withColumn('inc_min_c', f.min('c').over(inclusive_window_spec)) \ .withColumn('lead_def_c', f.lead('c', 2, default_val).over(base_window_spec)) \ From 84712e88dbc215d11f9a7cbb05b68ec76a087320 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Tue, 10 Oct 2023 14:23:50 +0800 Subject: [PATCH 4/7] clean up Signed-off-by: Haoyang Li --- .../src/main/python/arithmetic_ops_test.py | 2 +- .../src/main/python/array_test.py | 3 +-- integration_tests/src/main/python/ast_test.py | 21 +++++++------------ .../src/main/python/collection_ops_test.py | 15 +++++-------- integration_tests/src/main/python/orc_test.py | 3 +-- .../src/main/python/parquet_test.py | 3 +-- .../src/main/python/string_test.py | 9 +++----- .../src/main/python/window_function_test.py | 3 +-- 8 files changed, 20 insertions(+), 39 deletions(-) diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index a3ef0840dbd..e01693b3894 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -938,7 +938,7 @@ def test_columnar_pow(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr('pow(a, b)')) -@pytest.mark.parametrize('data_gen', [DecimalGen(25,-3)], ids=idfn) +@pytest.mark.parametrize('data_gen', all_basic_gens + _arith_decimal_gens, ids=idfn) def test_least(data_gen): num_cols = 20 s1 = with_cpu_session( diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index f26c78ef9ee..ec29dce70d1 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -214,8 +214,7 @@ def test_orderby_array_of_structs(data_gen): string_gen, boolean_gen, date_gen, timestamp_gen], ids=idfn) def test_array_contains(data_gen): arr_gen = ArrayGen(data_gen) - literal = with_cpu_session( - lambda spark: gen_scalar(data_gen, force_no_nulls=True)) + literal = with_cpu_session(lambda spark: gen_scalar(data_gen, force_no_nulls=True)) def get_input(spark): return two_col_df(spark, arr_gen, data_gen) diff --git a/integration_tests/src/main/python/ast_test.py b/integration_tests/src/main/python/ast_test.py index 1fa535c7075..c658ab9f165 100644 --- a/integration_tests/src/main/python/ast_test.py +++ b/integration_tests/src/main/python/ast_test.py @@ -75,8 +75,7 @@ def test_literal(spark_tmp_path, data_gen): # Write data to Parquet so Spark generates a plan using just the count of the data. data_path = spark_tmp_path + '/AST_TEST_DATA' with_cpu_session(lambda spark: gen_df(spark, [("a", IntegerGen())]).write.parquet(data_path)) - scalar = with_cpu_session( - lambda spark: gen_scalar(data_gen, force_no_nulls=True)) + scalar = with_cpu_session(lambda spark: gen_scalar(data_gen, force_no_nulls=True)) assert_gpu_ast(is_supported=True, func=lambda spark: spark.read.parquet(data_path).select(scalar)) @@ -235,8 +234,7 @@ def test_expm1(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_eq(data_descr): - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(data_descr[0], 2)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') == s1, @@ -245,8 +243,7 @@ def test_eq(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_ne(data_descr): - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(data_descr[0], 2)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') != s1, @@ -255,8 +252,7 @@ def test_ne(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_lt(data_descr): - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(data_descr[0], 2)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') < s1, @@ -265,8 +261,7 @@ def test_lt(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_lte(data_descr): - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(data_descr[0], 2)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') <= s1, @@ -275,8 +270,7 @@ def test_lte(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_gt(data_descr): - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(data_descr[0], 2)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') > s1, @@ -285,8 +279,7 @@ def test_gt(data_descr): @pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn) def test_gte(data_descr): - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(data_descr[0], 2)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(data_descr[0], 2)) assert_binary_ast(data_descr, lambda df: df.select( f.col('a') >= s1, diff --git a/integration_tests/src/main/python/collection_ops_test.py b/integration_tests/src/main/python/collection_ops_test.py index 8c44229a2a1..61470b2e179 100644 --- a/integration_tests/src/main/python/collection_ops_test.py +++ b/integration_tests/src/main/python/collection_ops_test.py @@ -67,10 +67,8 @@ def test_concat_double_list_with_lit(dg): @pytest.mark.parametrize('data_gen', non_nested_array_gens, ids=idfn) def test_concat_list_with_lit(data_gen): - lit_col1 = f.lit(with_cpu_session( - lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) - lit_col2 = f.lit(with_cpu_session( - lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col1 = f.lit(with_cpu_session(lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col2 = f.lit(with_cpu_session(lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, data_gen).select( @@ -80,8 +78,7 @@ def test_concat_list_with_lit(data_gen): def test_concat_string(): gen = mk_str_gen('.{0,5}') - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, gen).select( f.concat(), @@ -109,10 +106,8 @@ def test_map_concat(data_gen): @pytest.mark.parametrize('data_gen', map_gens_sample + decimal_64_map_gens + decimal_128_map_gens, ids=idfn) def test_map_concat_with_lit(data_gen): - lit_col1 = f.lit(with_cpu_session( - lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) - lit_col2 = f.lit(with_cpu_session( - lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col1 = f.lit(with_cpu_session(lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col2 = f.lit(with_cpu_session(lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, data_gen).select( f.map_concat(f.col('a'), f.col('b'), lit_col1), diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index 025593b3e94..b66903955bd 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -189,8 +189,7 @@ def test_pred_push_round_trip(spark_tmp_path, orc_gen, read_func, v1_enabled_lis gen_list = [('a', RepeatSeqGen(orc_gen, 100)), ('b', orc_gen), ('s1', StructGen([['sa', orc_gen]])), ('s2', StructGen([['sa', StructGen([['ssa', orc_gen]])]]))] - s0 = with_cpu_session( - lambda spark: gen_scalar(orc_gen, force_no_nulls=True)) + s0 = with_cpu_session(lambda spark: gen_scalar(orc_gen, force_no_nulls=True)) with_cpu_session( lambda spark : gen_df(spark, gen_list).orderBy('a').write.orc(data_path)) all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 9c02d266d2f..b3e04b91d93 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -300,8 +300,7 @@ def test_parquet_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_l def test_parquet_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' gen_list = [('a', RepeatSeqGen(parquet_gen, 100)), ('b', parquet_gen)] - s0 = with_cpu_session( - lambda spark: gen_scalar(parquet_gen, force_no_nulls=True)) + s0 = with_cpu_session(lambda spark: gen_scalar(parquet_gen, force_no_nulls=True)) with_cpu_session( lambda spark : gen_df(spark, gen_list).orderBy('a').write.parquet(data_path), conf=rebase_write_corrected_conf) diff --git a/integration_tests/src/main/python/string_test.py b/integration_tests/src/main/python/string_test.py index 0f8ff185fc4..0832621cdf4 100644 --- a/integration_tests/src/main/python/string_test.py +++ b/integration_tests/src/main/python/string_test.py @@ -346,8 +346,7 @@ def assert_gpu_did_fallback(op): def test_concat_ws_basic(): gen = StringGen(nullable=True) - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, gen).select( f.concat_ws("-"), @@ -366,8 +365,7 @@ def test_concat_ws_basic(): def test_concat_ws_arrays(): gen = ArrayGen(StringGen(nullable=True), nullable=True) - (s1, s2) = with_cpu_session( - lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) + (s1, s2) = with_cpu_session(lambda spark: gen_scalars(gen, 2, force_no_nulls=True)) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, gen).select( f.concat_ws("*", f.array(f.lit('2'), f.lit(''), f.lit('3'), f.lit('Z'))), @@ -538,8 +536,7 @@ def test_ephemeral_substring(): def test_repeat_scalar_and_column(): gen_s = StringGen(nullable=False) gen_r = IntegerGen(min_val=-100, max_val=100, special_cases=[0], nullable=True) - (s,) = with_cpu_session( - lambda spark: gen_scalars_for_sql(gen_s, 1)) + (s,) = with_cpu_session(lambda spark: gen_scalars_for_sql(gen_s, 1)) assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, gen_r).selectExpr( 'repeat({}, a)'.format(s), diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 54df60d3cbc..7b39594894a 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -689,8 +689,7 @@ def do_it(spark): return df.withColumn('lead_def_c', f.lead('c', 2, None).over(base_window_spec)) \ .withColumn('lag_def_c', f.lag('c', 4, None).over(base_window_spec)) else: - default_val = with_cpu_session( - lambda spark: gen_scalar_value(c_gen, force_no_nulls=False)) + default_val = with_cpu_session(lambda spark: gen_scalar_value(c_gen, force_no_nulls=False)) return df.withColumn('inc_max_c', f.max('c').over(inclusive_window_spec)) \ .withColumn('inc_min_c', f.min('c').over(inclusive_window_spec)) \ .withColumn('lead_def_c', f.lead('c', 2, default_val).over(base_window_spec)) \ From 808bc25e3a8519754b81cd865543752640e97599 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Wed, 11 Oct 2023 09:19:27 +0800 Subject: [PATCH 5/7] Update integration_tests/README.md Co-authored-by: Jason Lowe --- integration_tests/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/README.md b/integration_tests/README.md index f64115bd448..e04c07741d5 100644 --- a/integration_tests/README.md +++ b/integration_tests/README.md @@ -454,7 +454,7 @@ The marks you care about are all in marks.py For the most part you can ignore this file. It provides the underlying Spark session to operations that need it, but most tests should interact with it through `asserts.py`. -All data generation should be putted inside a spark session context of some kind. +All data generation should occur within a Spark session. Typically this is done by passing a lambda to functions in `asserts.py` such as `assert_gpu_and_cpu_are_equal_collect`. However, for scalar generation like `gen_scalars`, you may need to put it in a `with_cpu_session`. It is because negative scale decimals can have problems if called from outside of `with_spark_session`. From 3c433431ad4ce05e97c40470a71f0925e5ebda7b Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Wed, 11 Oct 2023 09:26:58 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: Haoyang Li --- integration_tests/README.md | 7 ++++--- integration_tests/src/main/python/ast_test.py | 2 +- integration_tests/src/main/python/conditionals_test.py | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/integration_tests/README.md b/integration_tests/README.md index e04c07741d5..fbf24f3d0de 100644 --- a/integration_tests/README.md +++ b/integration_tests/README.md @@ -454,9 +454,10 @@ The marks you care about are all in marks.py For the most part you can ignore this file. It provides the underlying Spark session to operations that need it, but most tests should interact with it through `asserts.py`. -All data generation should occur within a Spark session. -Typically this is done by passing a lambda to functions in `asserts.py` such as `assert_gpu_and_cpu_are_equal_collect`. -However, for scalar generation like `gen_scalars`, you may need to put it in a `with_cpu_session`. It is because negative scale decimals can have problems if called from outside of `with_spark_session`. +All data generation should occur within a Spark session. Typically this is done by passing a +lambda to functions in `asserts.py` such as `assert_gpu_and_cpu_are_equal_collect`. However, +for scalar generation like `gen_scalars`, you may need to put it in a `with_cpu_session`. It is +because negative scale decimals can have problems if called from outside of `with_spark_session`. ## Guidelines for Testing diff --git a/integration_tests/src/main/python/ast_test.py b/integration_tests/src/main/python/ast_test.py index c658ab9f165..dc2a1f6c93f 100644 --- a/integration_tests/src/main/python/ast_test.py +++ b/integration_tests/src/main/python/ast_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2022, NVIDIA CORPORATION. +# Copyright (c) 2021-2023, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 4a66919f72a..006c500c5b6 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 815f85c719769de8c57ebba54236ebdd690c18b9 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Wed, 11 Oct 2023 23:54:39 +0800 Subject: [PATCH 7/7] move f.lit into spark session Signed-off-by: Haoyang Li --- integration_tests/README.md | 9 +++++---- integration_tests/src/main/python/collection_ops_test.py | 8 ++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/integration_tests/README.md b/integration_tests/README.md index fbf24f3d0de..d1a47f83aaa 100644 --- a/integration_tests/README.md +++ b/integration_tests/README.md @@ -454,10 +454,11 @@ The marks you care about are all in marks.py For the most part you can ignore this file. It provides the underlying Spark session to operations that need it, but most tests should interact with it through `asserts.py`. -All data generation should occur within a Spark session. Typically this is done by passing a -lambda to functions in `asserts.py` such as `assert_gpu_and_cpu_are_equal_collect`. However, -for scalar generation like `gen_scalars`, you may need to put it in a `with_cpu_session`. It is -because negative scale decimals can have problems if called from outside of `with_spark_session`. +All data generation and Spark function calls should occur within a Spark session. Typically +this is done by passing a lambda to functions in `asserts.py` such as +`assert_gpu_and_cpu_are_equal_collect`. However, for scalar generation like `gen_scalars`, you +may need to put it in a `with_cpu_session`. It is because negative scale decimals can have +problems when calling `f.lit` from outside of `with_spark_session`. ## Guidelines for Testing diff --git a/integration_tests/src/main/python/collection_ops_test.py b/integration_tests/src/main/python/collection_ops_test.py index 61470b2e179..5751323ecee 100644 --- a/integration_tests/src/main/python/collection_ops_test.py +++ b/integration_tests/src/main/python/collection_ops_test.py @@ -67,8 +67,8 @@ def test_concat_double_list_with_lit(dg): @pytest.mark.parametrize('data_gen', non_nested_array_gens, ids=idfn) def test_concat_list_with_lit(data_gen): - lit_col1 = f.lit(with_cpu_session(lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) - lit_col2 = f.lit(with_cpu_session(lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col1 = with_cpu_session(lambda spark: f.lit(gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col2 = with_cpu_session(lambda spark: f.lit(gen_scalar(data_gen))).cast(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, data_gen).select( @@ -106,8 +106,8 @@ def test_map_concat(data_gen): @pytest.mark.parametrize('data_gen', map_gens_sample + decimal_64_map_gens + decimal_128_map_gens, ids=idfn) def test_map_concat_with_lit(data_gen): - lit_col1 = f.lit(with_cpu_session(lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) - lit_col2 = f.lit(with_cpu_session(lambda spark: gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col1 = with_cpu_session(lambda spark: f.lit(gen_scalar(data_gen))).cast(data_gen.data_type) + lit_col2 = with_cpu_session(lambda spark: f.lit(gen_scalar(data_gen))).cast(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( lambda spark: binary_op_df(spark, data_gen).select( f.map_concat(f.col('a'), f.col('b'), lit_col1),