-
Notifications
You must be signed in to change notification settings - Fork 232
/
array_test.py
111 lines (94 loc) · 5.16 KB
/
array_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql
from conftest import is_dataproc_runtime
from data_gen import *
from pyspark.sql.types import *
from pyspark.sql.functions import array_contains, col, first, isnan, lit
# Once we support arrays as literals then we can support a[null] and
# negative indexes for all array gens. When that happens
# test_nested_array_index should go away and this should test with
# array_gens_sample instead
@pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn)
def test_array_index(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
'a[0]',
'a[1]',
'a[null]',
'a[3]',
'a[50]',
'a[-1]'),
conf=allow_negative_scale_of_decimal_conf)
# Once we support arrays as literals then we can support a[null] for
# all array gens. See test_array_index for more info
@pytest.mark.parametrize('data_gen', nested_array_gens_sample, ids=idfn)
def test_nested_array_index(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
'a[0]',
'a[1]',
'a[3]',
'a[50]'))
@pytest.mark.parametrize('data_gen', all_basic_gens + [decimal_gen_default, decimal_gen_scale_precision], ids=idfn)
def test_make_array(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr(
'array(a, b)',
'array(b, a, null, {}, {})'.format(s1, s2)))
@pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn)
def test_orderby_array_unique(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark : append_unique_int_col_to_df(spark, unary_op_df(spark, data_gen)),
'array_table',
'select array_table.a, array_table.uniq_int from array_table order by uniq_int',
conf=allow_negative_scale_of_decimal_conf)
@pytest.mark.parametrize('data_gen', [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10)], ids=idfn)
def test_orderby_array_of_arrays(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark : append_unique_int_col_to_df(spark, unary_op_df(spark, data_gen)),
'array_table',
'select array_table.a, array_table.uniq_int from array_table order by uniq_int')
@pytest.mark.parametrize('data_gen', [ArrayGen(StructGen([['child0', byte_gen],
['child1', string_gen],
['child2', float_gen]]))], ids=idfn)
def test_orderby_array_of_structs(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark : append_unique_int_col_to_df(spark, unary_op_df(spark, data_gen)),
'array_table',
'select array_table.a, array_table.uniq_int from array_table order by uniq_int')
@pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen, long_gen,
FloatGen(no_nans=True), DoubleGen(no_nans=True),
string_gen, boolean_gen, date_gen, timestamp_gen], ids=idfn)
def test_array_contains(data_gen):
arr_gen = ArrayGen(data_gen)
lit = gen_scalar(data_gen, force_no_nulls=True)
assert_gpu_and_cpu_are_equal_collect(lambda spark: two_col_df(
spark, arr_gen, data_gen).select(array_contains(col('a'), lit.cast(data_gen.data_type)),
array_contains(col('a'), col('b')),
array_contains(col('a'), col('a')[5])), no_nans_conf)
# Test array_contains() with a literal key that is extracted from the input array of doubles
# that does contain NaNs. Note that the config is still set to indicate that the input has NaNs
# but we verify that the plan is on the GPU despite that if the value being looked up is not a NaN.
@pytest.mark.parametrize('data_gen', [double_gen], ids=idfn)
def test_array_contains_for_nans(data_gen):
arr_gen = ArrayGen(data_gen)
def main_df(spark):
df = three_col_df(spark, arr_gen, data_gen, arr_gen)
chk_val = df.select(col('a')[0].alias('t')).filter(~isnan(col('t'))).collect()[0][0]
return df.select(array_contains(col('a'), chk_val))
assert_gpu_and_cpu_are_equal_collect(main_df)