Skip to content

Commit

Permalink
Support lists to/from the GPU (NVIDIA#1104)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored and sperlingxx committed Nov 20, 2020
1 parent 93116c0 commit 4019778
Show file tree
Hide file tree
Showing 12 changed files with 417 additions and 291 deletions.
11 changes: 4 additions & 7 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ else
if python -c 'import findspark';
then
echo "FOUND findspark"
FIND_SPARK=1
else
TEST_PARALLEL=0
FIND_SPARK=0
echo "findspark not installed cannot run tests in parallel"
fi
if python -c 'import xdist.plugin';
Expand All @@ -71,16 +69,16 @@ else
then
# With xdist 0 and 1 are the same parallelsm but
# 0 is more effecient
TEST_PARALLEL=""
TEST_PARALLEL_OPTS=""
MEMORY_FRACTION='1'
else
MEMORY_FRACTION=`python -c "print(1/($TEST_PARALLEL + 1))"`
TEST_PARALLEL="-n $TEST_PARALLEL"
TEST_PARALLEL_OPTS="-n $TEST_PARALLEL"
fi
RUN_DIR="$SCRIPTPATH"/target/run_dir
mkdir -p "$RUN_DIR"
cd "$RUN_DIR"
if [[ "${FIND_SPARK}" == "1" ]];
if [[ "${TEST_PARALLEL_OPTS}" != "" ]];
then
export PYSP_TEST_spark_driver_extraClassPath="${ALL_JARS// /:}"
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS"
Expand All @@ -93,7 +91,7 @@ else

python \
"$SCRIPTPATH"/runtests.py --rootdir "$SCRIPTPATH" "$SCRIPTPATH"/src/main/python \
$TEST_PARALLEL \
$TEST_PARALLEL_OPTS \
-v -rfExXs "$TEST_TAGS" \
--std_input_path="$SCRIPTPATH"/src/test/resources/ \
"$TEST_ARGS" \
Expand All @@ -112,6 +110,5 @@ else
"$TEST_ARGS" \
$RUN_TEST_PARAMS \
"$@"

fi
fi
47 changes: 47 additions & 0 deletions integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from marks import incompat
from pyspark.sql.types import *
import pyspark.sql.functions as f

# Once we support arrays as literals then we can support a[null] and
# negative indexes for all array gens. When that happens
# test_nested_array_index should go away and this should test with
# array_gens_sample instead
@pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn)
def test_array_index(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
'a[0]',
'a[1]',
'a[null]',
'a[3]',
'a[50]',
'a[-1]'))

# Once we support arrays as literals then we can support a[null] for
# all array gens. See test_array_index for more info
@pytest.mark.parametrize('data_gen', nested_array_gens_sample, ids=idfn)
def test_nested_array_index(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
'a[0]',
'a[1]',
'a[3]',
'a[50]'))
21 changes: 21 additions & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ def __init__(self, child, length):
self._length = length
self._index = 0

def __repr__(self):
return super().__repr__() + '(' + str(self._child) + ')'

def _loop_values(self):
ret = self._vals[self._index]
self._index = (self._index + 1) % self._length
Expand Down Expand Up @@ -362,6 +365,9 @@ def __init__(self, children, nullable=True, special_cases=[]):
super().__init__(StructType(tmp), nullable=nullable, special_cases=special_cases)
self.children = children

def __repr__(self):
return super().__repr__() + '(' + ','.join([str(i) for i in self.children]) + ')'

def start(self, rand):
for name, child in self.children:
child.start(rand)
Expand Down Expand Up @@ -482,6 +488,9 @@ def __init__(self, child_gen, min_length=0, max_length=100, nullable=True):
self._max_length = max_length
self._child_gen = child_gen

def __repr__(self):
return super().__repr__() + '(' + str(self._child_gen) + ')'

def start(self, rand):
self._child_gen.start(rand)
def gen_array():
Expand Down Expand Up @@ -643,10 +652,12 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
all_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, timestamp_gen]

# TODO add in some array generators to this once that is supported for sorting
# a selection of generators that should be orderable (sortable and compareable)
orderable_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, timestamp_gen]

# TODO add in some array generators to this once that is supported for these operations
# a selection of generators that can be compared for equality
eq_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, timestamp_gen]
Expand All @@ -655,3 +666,13 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
date_n_time_gens = [date_gen, timestamp_gen]

boolean_gens = [boolean_gen]

single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens]

# Be careful to not make these too large of data generation takes for ever
# This is only a few nested array gens, because nesting can be very deep
nested_array_gens_sample = [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10)]

# Some array gens, but not all because of nesting
array_gens_sample = single_level_array_gens + nested_array_gens_sample
38 changes: 38 additions & 0 deletions integration_tests/src/main/python/row_conversion_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from marks import incompat, approximate_float
from pyspark.sql.types import *
import pyspark.sql.functions as f


# This is one of the most basic tests where we verify that we can
# move data onto and off of the GPU without messing up. All data
# that comes from data_gen is row formatted, with how pyspark
# currently works and when we do a collect all of that data has
# to be brought back to the CPU (rows) to be returned.
# So we just need a very simple operation in the middle that
# can be done on the GPU.
def test_row_conversions():
gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen],
["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen],
["i", timestamp_gen], ["j", date_gen], ["k", ArrayGen(byte_gen)],
["l", ArrayGen(string_gen)], ["m", ArrayGen(float_gen)],
["n", ArrayGen(boolean_gen)], ["o", ArrayGen(ArrayGen(short_gen))]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again"))
13 changes: 4 additions & 9 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,14 @@
def mk_str_gen(pattern):
return StringGen(pattern).with_special_case('').with_special_pattern('.{0,10}')

# Because of limitations in array support we need to combine these two together to make
# this work. This should be split up into separate tests once support is better.
def test_split_with_array_index():
def test_split():
data_gen = mk_str_gen('([ABC]{0,3}_?){0,7}')
delim = '_'
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
'split(a, "AB")[0]',
'split(a, "_")[1]',
'split(a, "_")[null]',
'split(a, "_")[3]',
'split(a, "_")[0]',
'split(a, "_")[-1]'))
'split(a, "AB")',
'split(a, "C")',
'split(a, "_")'))

@pytest.mark.parametrize('data_gen,delim', [(mk_str_gen('([ABC]{0,3}_?){0,7}'), '_'),
(mk_str_gen('([MNP_]{0,3}\\.?){0,5}'), '.'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@
*/
public class GpuColumnVector extends GpuColumnVectorBase {

private static HostColumnVector.DataType convertFrom(DataType spark, boolean nullable) {
if (spark instanceof ArrayType) {
ArrayType arrayType = (ArrayType) spark;
return new HostColumnVector.ListType(nullable,
convertFrom(arrayType.elementType(), arrayType.containsNull()));
} else if (spark instanceof MapType) {
MapType mapType = (MapType) spark;
return new HostColumnVector.ListType(nullable,
new HostColumnVector.StructType(false, Arrays.asList(
convertFrom(mapType.keyType(), false),
convertFrom(mapType.valueType(), mapType.valueContainsNull())
)));
} else {
// Only works for basic types
return new HostColumnVector.BasicType(nullable, getRapidsType(spark));
}
}

public static final class GpuColumnarBatchBuilder implements AutoCloseable {
private final ai.rapids.cudf.HostColumnVector.ColumnBuilder[] builders;
private final StructField[] fields;
Expand All @@ -60,17 +78,9 @@ public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch)
try {
for (int i = 0; i < len; i++) {
StructField field = fields[i];
if (field.dataType() instanceof MapType) {
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.ListType(true,
new HostColumnVector.StructType(true, Arrays.asList(
new HostColumnVector.BasicType(true, DType.STRING),
new HostColumnVector.BasicType(true, DType.STRING)))), rows);
} else {
DType type = getRapidsType(field);
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.BasicType(true, type), rows);
}
success = true;
builders[i] = new HostColumnVector.ColumnBuilder(convertFrom(field.dataType(), field.nullable()), rows);
}
success = true;
} finally {
if (!success) {
for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) {
Expand Down
Loading

0 comments on commit 4019778

Please sign in to comment.