From cd3c436dafaa081d93705d26a0d31ed1c3ffb5b4 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 5 Mar 2021 01:34:47 -0800 Subject: [PATCH 01/26] Enable sort of structs Signed-off-by: Gera Shegalov --- integration_tests/src/main/python/data_gen.py | 16 ++++++------ .../src/main/python/sort_test.py | 14 +++++------ .../nvidia/spark/rapids/GpuOverrides.scala | 25 ++++++++++++++++--- .../com/nvidia/spark/rapids/GpuSortExec.scala | 11 ++++---- 4 files changed, 42 insertions(+), 24 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index ac1bc35d033..66e1d43ab00 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -752,6 +752,14 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): all_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] +# all of the basic types in a single struct +all_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens)]) + +# Some struct gens, but not all because of nesting +struct_gens_sample = [all_basic_struct_gen, + StructGen([['child0', byte_gen]]), + StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] + # TODO add in some array generators to this once that is supported for sorting # a selection of generators that should be orderable (sortable and compareable) orderable_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -784,14 +792,6 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): # Some array gens, but not all because of nesting array_gens_sample = single_level_array_gens + nested_array_gens_sample -# all of the basic types in a single struct -all_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens)]) - -# Some struct gens, but not all because of nesting -struct_gens_sample = [all_basic_struct_gen, - StructGen([['child0', byte_gen]]), - StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] - simple_string_to_string_map_gen = MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen(), max_length=10) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 40874f10c2e..8c718fb1de2 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -27,7 +27,7 @@ DecimalGen(precision=7, scale=-3, nullable=False), DecimalGen(precision=7, scale=3, nullable=False), DecimalGen(precision=7, scale=7, nullable=False), DecimalGen(precision=12, scale=2, nullable=False)] -@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) +@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_orderby(data_gen, order): assert_gpu_and_cpu_are_equal_collect( @@ -35,12 +35,12 @@ def test_single_orderby(data_gen, order): conf = allow_negative_scale_of_decimal_conf) # SPARK CPU itself has issue with negative scale for take ordered and project -orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)] -@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn) -@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) -def test_single_orderby_with_limit(data_gen, order): - assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) +# orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)] +# @pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn) +# @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) +# def test_single_orderby_with_limit(data_gen, order): +# assert_gpu_and_cpu_are_equal_collect( +# lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 2e3501462e9..1686a76757f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -412,6 +412,16 @@ object GpuOverrides { "\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R", "?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">") + private[this] val sortOrderTypeSigs = ( + TypeSig.commonCudfTypes + + TypeSig.NULL + + TypeSig.DECIMAL + + TypeSig.STRUCT.nested( + TypeSig.commonCudfTypes + + TypeSig.NULL + + TypeSig.DECIMAL + )) + def canRegexpBeTreatedLikeARegularString(strLit: UTF8String): Boolean = { val s = strLit.toString !regexList.exists(pattern => s.contains(pattern)) @@ -1752,16 +1762,23 @@ object GpuOverrides { expr[SortOrder]( "Sort order", ExprChecks.projectOnly( - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + sortOrderTypeSigs, TypeSig.orderable, Seq(ParamCheck( "input", - TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, + sortOrderTypeSigs, TypeSig.orderable))), - (a, conf, p, r) => new BaseExprMeta[SortOrder](a, conf, p, r) { + (sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) { + override def tagExprForGpu(): Unit = { + val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering + if (sortOrder.nullOrdering != directionDefaultNullOrdering) { + willNotWorkOnGpu(s"Only default null ordering $directionDefaultNullOrdering " + + s"supported. Found: ${sortOrder.nullOrdering}") + } + } // One of the few expressions that are not replaced with a GPU version override def convertToGpu(): Expression = - a.withNewChildren(childExprs.map(_.convertToGpu())) + sortOrder.withNewChildren(childExprs.map(_.convertToGpu())) }), expr[Count]( "Count aggregate operator", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala index 4cffa3c3210..4dc236f0497 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala @@ -45,11 +45,12 @@ class GpuSortMeta( willNotWorkOnGpu("string literal values are not supported in a sort") } val sortOrderDataTypes = sort.sortOrder.map(_.dataType) - if (sortOrderDataTypes.exists(dtype => - dtype.isInstanceOf[ArrayType] || dtype.isInstanceOf[StructType] - || dtype.isInstanceOf[MapType])) { - willNotWorkOnGpu("Nested types in Sort Order are not supported") - } + sortOrderDataTypes.collect { + case at: ArrayType => at + case mt: MapType => mt + }.foreach(mapOrArrayDT => willNotWorkOnGpu( + s"Nested type $mapOrArrayDT in Sort Order is not supported") + ) } } From 00ba3ff175dd812c518e1e391cdaad29664da9db Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 5 Mar 2021 09:34:15 -0800 Subject: [PATCH 02/26] struct gens for test_single_orderby Signed-off-by: Gera Shegalov --- integration_tests/src/main/python/sort_test.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 8c718fb1de2..1a1d637a0d5 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -27,7 +27,7 @@ DecimalGen(precision=7, scale=-3, nullable=False), DecimalGen(precision=7, scale=3, nullable=False), DecimalGen(precision=7, scale=7, nullable=False), DecimalGen(precision=12, scale=2, nullable=False)] -@pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn) +@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen + [all_basic_struct_gen], ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_orderby(data_gen, order): assert_gpu_and_cpu_are_equal_collect( @@ -35,12 +35,12 @@ def test_single_orderby(data_gen, order): conf = allow_negative_scale_of_decimal_conf) # SPARK CPU itself has issue with negative scale for take ordered and project -# orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)] -# @pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn) -# @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) -# def test_single_orderby_with_limit(data_gen, order): -# assert_gpu_and_cpu_are_equal_collect( -# lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) +orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)] +@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn) +@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) +def test_single_orderby_with_limit(data_gen, order): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) From 586f28f20dbe1621c743722627766e4439cf0014 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 5 Mar 2021 10:43:03 -0800 Subject: [PATCH 03/26] revert unneeded data_gen change --- integration_tests/src/main/python/data_gen.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 66e1d43ab00..ac1bc35d033 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -752,14 +752,6 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): all_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] -# all of the basic types in a single struct -all_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens)]) - -# Some struct gens, but not all because of nesting -struct_gens_sample = [all_basic_struct_gen, - StructGen([['child0', byte_gen]]), - StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] - # TODO add in some array generators to this once that is supported for sorting # a selection of generators that should be orderable (sortable and compareable) orderable_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -792,6 +784,14 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): # Some array gens, but not all because of nesting array_gens_sample = single_level_array_gens + nested_array_gens_sample +# all of the basic types in a single struct +all_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens)]) + +# Some struct gens, but not all because of nesting +struct_gens_sample = [all_basic_struct_gen, + StructGen([['child0', byte_gen]]), + StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])] + simple_string_to_string_map_gen = MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen(), max_length=10) From c6407d09ff0c9012cae2194301d4f9911fd5a283 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 10 Mar 2021 22:23:32 -0800 Subject: [PATCH 04/26] interactive tests pass Signed-off-by: Gera Shegalov --- dist/pom.xml | 1 + docs/supported_ops.md | 6 ++--- pom.xml | 2 ++ .../nvidia/spark/rapids/GpuOverrides.scala | 3 ++- .../com/nvidia/spark/rapids/TypeChecks.scala | 27 +++++++++++-------- .../sql/rapids/datetimeExpressions.scala | 5 ++-- 6 files changed, 27 insertions(+), 17 deletions(-) diff --git a/dist/pom.xml b/dist/pom.xml index 1c7b0505917..f0fad18fbe8 100644 --- a/dist/pom.xml +++ b/dist/pom.xml @@ -186,6 +186,7 @@ dependency-reduced-pom.xml *pom.xml.asc + .* diff --git a/docs/supported_ops.md b/docs/supported_ops.md index ecd8573bdf5..cffda9e60d0 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -379,7 +379,7 @@ Accelerator supports are described below. NS NS NS -NS +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -12237,7 +12237,7 @@ Accelerator support is described below. NS NS -NS +PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT) NS @@ -12258,7 +12258,7 @@ Accelerator support is described below. NS NS -NS +PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT) NS diff --git a/pom.xml b/pom.xml index 9c3c67c865d..bf35c3af2ae 100644 --- a/pom.xml +++ b/pom.xml @@ -573,6 +573,8 @@ .git/** .pytest_cache/** .github/pull_request_template.md + .vscode/** + .metals/** **/*.md **/*.iml NOTICE-binary diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 3960c6b5e3c..79cd4aad5cf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1827,6 +1827,7 @@ object GpuOverrides { s"supported. Found: ${sortOrder.nullOrdering}") } } + // One of the few expressions that are not replaced with a GPU version override def convertToGpu(): Expression = sortOrder.withNewChildren(childExprs.map(_.convertToGpu())) @@ -2574,7 +2575,7 @@ object GpuOverrides { }), exec[TakeOrderedAndProjectExec]( "Take the first limit elements as defined by the sortOrder, and do projection if needed.", - ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, TypeSig.all), + ExecChecks(sortOrderTypeSigs, TypeSig.all), (takeExec, conf, p, r) => new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) { val sortOrder: Seq[BaseExprMeta[SortOrder]] = diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 4a98c089e33..a0c4d200d97 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -542,18 +542,23 @@ class ExecChecks private( override def tag(meta: RapidsMeta[_, _, _]): Unit = { val plan = meta.wrapped.asInstanceOf[SparkPlan] val allowDecimal = meta.conf.decimalTypeEnabled - if (!check.areAllSupportedByPlugin(plan.output.map(_.dataType), allowDecimal)) { - val unsupported = plan.output.map(_.dataType) - .filter(!check.isSupportedByPlugin(_, allowDecimal)) - .toSet - meta.willNotWorkOnGpu(s"unsupported data types in output: ${unsupported.mkString(", ")}") + + val unsupportedOutputTypes = plan.output + .filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal)) + .toSet + + if (unsupportedOutputTypes.nonEmpty) { + meta.willNotWorkOnGpu("unsupported data types in output: " + + unsupportedOutputTypes.mkString(", ")) } - if (!check.areAllSupportedByPlugin( - plan.children.flatMap(_.output.map(_.dataType)), - allowDecimal)) { - val unsupported = plan.children.flatMap(_.output.map(_.dataType)) - .filter(!check.isSupportedByPlugin(_, allowDecimal)).toSet - meta.willNotWorkOnGpu(s"unsupported data types in input: ${unsupported.mkString(", ")}") + + val unsupportedInputTypes = plan.children.flatMap { childPlan => + childPlan.output.filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal)) + }.toSet + + if (unsupportedInputTypes.nonEmpty) { + meta.willNotWorkOnGpu("unsupported data types in input: " + + unsupportedInputTypes.mkString(", ")) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index fbeeb4f7ac8..14db49372c7 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -135,6 +135,8 @@ abstract class GpuTimeMath( override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess + protected val microSecondsInOneDay: Long = TimeUnit.DAYS.toMicros(1) + override def columnarEval(batch: ColumnarBatch): Any = { var lhs: Any = null var rhs: Any = null @@ -147,7 +149,7 @@ abstract class GpuTimeMath( if (intvl.months != 0) { throw new UnsupportedOperationException("Months aren't supported at the moment") } - val usToSub = intvl.days.toLong * 24 * 60 * 60 * 1000 * 1000 + intvl.microseconds + val usToSub = intvl.days * microSecondsInOneDay + intvl.microseconds if (usToSub != 0) { withResource(Scalar.fromLong(usToSub)) { us_s => withResource(l.getBase.logicalCastTo(DType.INT64)) { us => @@ -230,7 +232,6 @@ case class GpuDateAddInterval(start: Expression, if (intvl.months != 0) { throw new UnsupportedOperationException("Months aren't supported at the moment") } - val microSecondsInOneDay = TimeUnit.DAYS.toMicros(1) val microSecToDays = if (intvl.microseconds < 0) { // This is to calculate when subtraction is performed. Need to take into account the // interval( which are less than days). Convert it into days which needs to be From 88c8a9ae8a928545461a40b71c89f3de8944751e Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 11 Mar 2021 01:11:36 -0800 Subject: [PATCH 05/26] add integration tests Signed-off-by: Gera Shegalov --- integration_tests/src/main/python/data_gen.py | 6 ++--- .../src/main/python/sort_test.py | 26 ++++++++++++++++++- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index ac1bc35d033..355aafb1df9 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -284,7 +284,7 @@ def start(self, rand): POS_FLOAT_NAN_MAX_VALUE = struct.unpack('f', struct.pack('I', 0x7fffffff))[0] class FloatGen(DataGen): """Generate floats, which some built in corner cases.""" - def __init__(self, nullable=True, + def __init__(self, nullable=True, no_nans=False, special_cases=None): self._no_nans = no_nans if special_cases is None: @@ -319,7 +319,7 @@ def gen_float(): POS_DOUBLE_NAN_MAX_VALUE = struct.unpack('d', struct.pack('L', 0x7fffffffffffffff))[0] class DoubleGen(DataGen): """Generate doubles, which some built in corner cases.""" - def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False, + def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False, nullable=True, special_cases = None): self._min_exp = min_exp self._max_exp = max_exp @@ -432,7 +432,7 @@ def __init__(self, start=None, end=None, nullable=True): self._start_day = self._to_days_since_epoch(start) self._end_day = self._to_days_since_epoch(end) - + self.with_special_case(start) self.with_special_case(end) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 360ee3015c9..634e206b847 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -27,13 +27,37 @@ DecimalGen(precision=7, scale=-3, nullable=False), DecimalGen(precision=7, scale=3, nullable=False), DecimalGen(precision=7, scale=7, nullable=False), DecimalGen(precision=12, scale=2, nullable=False)] -@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen + [all_basic_struct_gen], ids=idfn) +@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_orderby(data_gen, order): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order), conf = allow_negative_scale_of_decimal_conf) +@pytest.mark.parametrize('data_gen', [ + pytest.param(all_basic_struct_gen), + pytest.param(StructGen([['child0', all_basic_struct_gen]]), + marks=pytest.mark.xfail(reason='second-level structs are not supported')), + pytest.param(ArrayGen(string_gen), + marks=pytest.mark.xfail(reason="arrays are not supported")), + pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen), + marks=pytest.mark.xfail(reason="maps are not supported")), +], ids=idfn) +@pytest.mark.parametrize('order', [ + pytest.param(f.col('a').asc()), + pytest.param(f.col('a').asc_nulls_first()), + pytest.param(f.col('a').asc_nulls_last(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc()), + pytest.param(f.col('a').desc_nulls_first(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc_nulls_last()), +], ids=idfn) +def test_single_nested_order_orderby(data_gen, order): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).orderBy(order), + conf = allow_negative_scale_of_decimal_conf) + # SPARK CPU itself has issue with negative scale for take ordered and project orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)] @pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn) From ddd60e48d9ea063e800c44b6937c5ab56f4cf1d2 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 11 Mar 2021 11:20:04 -0800 Subject: [PATCH 06/26] revert GpuTimeMath refactor Signed-off-by: Gera Shegalov --- .../org/apache/spark/sql/rapids/datetimeExpressions.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 14db49372c7..fbeeb4f7ac8 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -135,8 +135,6 @@ abstract class GpuTimeMath( override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess - protected val microSecondsInOneDay: Long = TimeUnit.DAYS.toMicros(1) - override def columnarEval(batch: ColumnarBatch): Any = { var lhs: Any = null var rhs: Any = null @@ -149,7 +147,7 @@ abstract class GpuTimeMath( if (intvl.months != 0) { throw new UnsupportedOperationException("Months aren't supported at the moment") } - val usToSub = intvl.days * microSecondsInOneDay + intvl.microseconds + val usToSub = intvl.days.toLong * 24 * 60 * 60 * 1000 * 1000 + intvl.microseconds if (usToSub != 0) { withResource(Scalar.fromLong(usToSub)) { us_s => withResource(l.getBase.logicalCastTo(DType.INT64)) { us => @@ -232,6 +230,7 @@ case class GpuDateAddInterval(start: Expression, if (intvl.months != 0) { throw new UnsupportedOperationException("Months aren't supported at the moment") } + val microSecondsInOneDay = TimeUnit.DAYS.toMicros(1) val microSecToDays = if (intvl.microseconds < 0) { // This is to calculate when subtraction is performed. Need to take into account the // interval( which are less than days). Convert it into days which needs to be From ad934b2aa86bed028d20e54215d19a79705b7de0 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 11 Mar 2021 16:29:38 -0800 Subject: [PATCH 07/26] add tests for limit --- .../src/main/python/sort_test.py | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 634e206b847..10edc497f34 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -53,7 +53,7 @@ def test_single_orderby(data_gen, order): marks=pytest.mark.xfail(reason='opposite null order not supported')), pytest.param(f.col('a').desc_nulls_last()), ], ids=idfn) -def test_single_nested_order_orderby(data_gen, order): +def test_single_nested_orderby(data_gen, order): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order), conf = allow_negative_scale_of_decimal_conf) @@ -66,6 +66,29 @@ def test_single_orderby_with_limit(data_gen, order): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) +@pytest.mark.parametrize('data_gen', [ + pytest.param(all_basic_struct_gen), + pytest.param(StructGen([['child0', all_basic_struct_gen]]), + marks=pytest.mark.xfail(reason='second-level structs are not supported')), + pytest.param(ArrayGen(string_gen), + marks=pytest.mark.xfail(reason="arrays are not supported")), + pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen), + marks=pytest.mark.xfail(reason="maps are not supported")), +], ids=idfn) +@pytest.mark.parametrize('order', [ + pytest.param(f.col('a').asc()), + pytest.param(f.col('a').asc_nulls_first()), + pytest.param(f.col('a').asc_nulls_last(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc()), + pytest.param(f.col('a').desc_nulls_first(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc_nulls_last()), +], ids=idfn) +def test_single_nested_orderby_with_limit(data_gen, order): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) + @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_sort_in_part(data_gen, order): From 5910d1ae7b8904a50ddb44234c55fb72ab2901a3 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 11 Mar 2021 16:49:56 -0800 Subject: [PATCH 08/26] Consolidate RAT settings in parent pom - Create a single config for RAT in parent. - Update RAT exlusion list to filter out hidden diretories Signed-off-by: Gera Shegalov --- dist/pom.xml | 6 ------ integration_tests/pom.xml | 8 -------- pom.xml | 14 +++++--------- sql-plugin/pom.xml | 8 -------- tests-spark310+/pom.xml | 6 ------ tests/pom.xml | 7 ------- 6 files changed, 5 insertions(+), 44 deletions(-) diff --git a/dist/pom.xml b/dist/pom.xml index 1c7b0505917..6c23d2278db 100644 --- a/dist/pom.xml +++ b/dist/pom.xml @@ -182,12 +182,6 @@ org.apache.rat apache-rat-plugin - - - dependency-reduced-pom.xml - *pom.xml.asc - - diff --git a/integration_tests/pom.xml b/integration_tests/pom.xml index b7925a7b582..cf95ae0b56d 100644 --- a/integration_tests/pom.xml +++ b/integration_tests/pom.xml @@ -153,14 +153,6 @@ org.apache.rat apache-rat-plugin - - - src/test/resources/** - **/*.md - .pytest_cache/** - rmm_log.txt - - org.codehaus.mojo diff --git a/pom.xml b/pom.xml index 9c3c67c865d..9f4ecd40209 100644 --- a/pom.xml +++ b/pom.xml @@ -566,25 +566,21 @@ apache-rat-plugin - .docker/** - .download/** - .gitattributes - .gitignore - .git/** - .pytest_cache/** - .github/pull_request_template.md **/*.md **/*.iml NOTICE-binary docs/dev/idea-code-style-settings.xml - **/.m2/** - .gnupg/** pom.xml.asc jenkins/databricks/*.patch *.jar docs/demo/**/*.ipynb docs/demo/**/*.zpln **/src/main/resources/META-INF/services/* + **/src/test/resources/** + rmm_log.txt + dependency-reduced-pom.xml + **/.*/** + src/main/java/com/nvidia/spark/rapids/format/* **/target/**/* diff --git a/sql-plugin/pom.xml b/sql-plugin/pom.xml index bf293e2800a..61496834992 100644 --- a/sql-plugin/pom.xml +++ b/sql-plugin/pom.xml @@ -148,14 +148,6 @@ org.apache.rat apache-rat-plugin - - - dependency-reduced-pom.xml - src/main/format/README.md - src/main/java/com/nvidia/spark/rapids/format/* - src/test/resources/** - - diff --git a/tests-spark310+/pom.xml b/tests-spark310+/pom.xml index e05a532377f..567a705cc39 100644 --- a/tests-spark310+/pom.xml +++ b/tests-spark310+/pom.xml @@ -86,12 +86,6 @@ org.apache.rat apache-rat-plugin - - - src/test/resources/** - **/*.md - - diff --git a/tests/pom.xml b/tests/pom.xml index 0bd1229e09a..826efacb78a 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -110,13 +110,6 @@ org.apache.rat apache-rat-plugin - - - src/test/resources/** - **/*.md - rmm_log.txt - - From 33762de38a7f699b985d534d138da2f22c268d3d Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 12 Mar 2021 12:23:28 -0800 Subject: [PATCH 09/26] Enable saving data for debugging Signed-off-by: Gera Shegalov --- integration_tests/src/main/python/data_gen.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 355aafb1df9..49a9d3019c8 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -637,9 +637,16 @@ def gen_scalar_value(data_gen, seed=0, force_no_nulls=False): v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls)) return v[0] -def debug_df(df): - """print out the contents of a dataframe for debugging.""" - print('COLLECTED\n{}'.format(df.collect())) +def debug_df(df, path = None, file_format = 'json', num_parts = 1): + if path is not None: + file_name = f"{path}.{file_format}" + df.coalesce(1).write.json(file_name) + print(f"SAVED df output for debugging at {file_name}") + + if path is None: + """print out the contents of a dataframe for debugging.""" + print('COLLECTED\n{}'.format(df.collect())) + df.explain() df.printSchema() return df From efbfa3f8f9696b740f2eca567961b71d33b2d39d Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 12 Mar 2021 13:34:08 -0800 Subject: [PATCH 10/26] Save schema for debugging Signed-off-by: Gera Shegalov --- integration_tests/src/main/python/data_gen.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 49a9d3019c8..47e80fab785 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -640,9 +640,18 @@ def gen_scalar_value(data_gen, seed=0, force_no_nulls=False): def debug_df(df, path = None, file_format = 'json', num_parts = 1): if path is not None: file_name = f"{path}.{file_format}" + schema_file_name = f"{path}.schema.json" + df.coalesce(1).write.json(file_name) print(f"SAVED df output for debugging at {file_name}") + schema_json = df.schema.json() + schema_file = open(schema_file_name , 'w') + schema_file.write(schema_json) + schema_file.close() + print(f"SAVED df schema for debugging along in the output dir") + + if path is None: """print out the contents of a dataframe for debugging.""" print('COLLECTED\n{}'.format(df.collect())) From 21a05c7e4a0f8cde36296eea421a9a982711baf2 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 12 Mar 2021 17:57:35 -0800 Subject: [PATCH 11/26] wip --- .../src/main/python/sort_test.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 10edc497f34..f07d91f78b1 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -68,26 +68,26 @@ def test_single_orderby_with_limit(data_gen, order): @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), - pytest.param(StructGen([['child0', all_basic_struct_gen]]), - marks=pytest.mark.xfail(reason='second-level structs are not supported')), - pytest.param(ArrayGen(string_gen), - marks=pytest.mark.xfail(reason="arrays are not supported")), - pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen), - marks=pytest.mark.xfail(reason="maps are not supported")), + # pytest.param(StructGen([['child0', all_basic_struct_gen]]), + # marks=pytest.mark.xfail(reason='second-level structs are not supported')), + # pytest.param(ArrayGen(string_gen), + # marks=pytest.mark.xfail(reason="arrays are not supported")), + # pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen), + # marks=pytest.mark.xfail(reason="maps are not supported")), ], ids=idfn) @pytest.mark.parametrize('order', [ - pytest.param(f.col('a').asc()), - pytest.param(f.col('a').asc_nulls_first()), - pytest.param(f.col('a').asc_nulls_last(), - marks=pytest.mark.xfail(reason='opposite null order not supported')), + # pytest.param(f.col('a').asc()), + # pytest.param(f.col('a').asc_nulls_first()), + # pytest.param(f.col('a').asc_nulls_last(), + # marks=pytest.mark.xfail(reason='opposite null order not supported')), pytest.param(f.col('a').desc()), - pytest.param(f.col('a').desc_nulls_first(), - marks=pytest.mark.xfail(reason='opposite null order not supported')), - pytest.param(f.col('a').desc_nulls_last()), + # pytest.param(f.col('a').desc_nulls_first(), + # marks=pytest.mark.xfail(reason='opposite null order not supported')), + # pytest.param(f.col('a').desc_nulls_last()), ], ids=idfn) def test_single_nested_orderby_with_limit(data_gen, order): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) + lambda spark : debug_df(unary_op_df(spark, data_gen), 'trouble-input').orderBy(order).limit(100)) @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) From 1dd4b235ffb39760e3ffdf902ab39815f8a55343 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Sat, 13 Mar 2021 13:15:33 -0800 Subject: [PATCH 12/26] cleanup --- docs/supported_ops.md | 4 ++-- integration_tests/src/main/python/sort_test.py | 2 +- .../src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index cffda9e60d0..3058c720828 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -144,12 +144,12 @@ Accelerator supports are described below. S* S S* +S NS NS NS NS -NS -NS +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index f07d91f78b1..475c6c0a03b 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -87,7 +87,7 @@ def test_single_orderby_with_limit(data_gen, order): ], ids=idfn) def test_single_nested_orderby_with_limit(data_gen, order): assert_gpu_and_cpu_are_equal_collect( - lambda spark : debug_df(unary_op_df(spark, data_gen), 'trouble-input').orderBy(order).limit(100)) + lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 8221807933f..2c15888d947 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -423,6 +423,7 @@ object GpuOverrides { "\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R", "?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">") + // TODO need a better name since used in multiple contexts private[this] val sortOrderTypeSigs = ( TypeSig.commonCudfTypes + TypeSig.NULL + @@ -2628,7 +2629,7 @@ object GpuOverrides { }), exec[CollectLimitExec]( "Reduce to single partition and apply limit", - ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL, TypeSig.all), + ExecChecks(sortOrderTypeSigs, TypeSig.all), (collectLimitExec, conf, p, r) => new GpuCollectLimitMeta(collectLimitExec, conf, p, r)) .disabledByDefault("Collect Limit replacement can be slower on the GPU, if huge number " + "of rows in a batch it could help by limiting the number of rows transferred from " + From b6f44da639d319e0e2c15a0511da44c029caac62 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Sat, 13 Mar 2021 13:25:56 -0800 Subject: [PATCH 13/26] comment --- integration_tests/src/main/python/data_gen.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 47e80fab785..220e25faee8 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -639,6 +639,9 @@ def gen_scalar_value(data_gen, seed=0, force_no_nulls=False): def debug_df(df, path = None, file_format = 'json', num_parts = 1): if path is not None: + # Save the dataframe and its schema + # The schema can be re-created by using DataType.fromJson and used + # for loading the dataframe file_name = f"{path}.{file_format}" schema_file_name = f"{path}.schema.json" From 1f4514dc4c250c6849af170fc29dbba354082264 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Sun, 14 Mar 2021 22:23:47 -0700 Subject: [PATCH 14/26] uncomment test params --- .../src/main/python/sort_test.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 475c6c0a03b..10edc497f34 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -68,22 +68,22 @@ def test_single_orderby_with_limit(data_gen, order): @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), - # pytest.param(StructGen([['child0', all_basic_struct_gen]]), - # marks=pytest.mark.xfail(reason='second-level structs are not supported')), - # pytest.param(ArrayGen(string_gen), - # marks=pytest.mark.xfail(reason="arrays are not supported")), - # pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen), - # marks=pytest.mark.xfail(reason="maps are not supported")), + pytest.param(StructGen([['child0', all_basic_struct_gen]]), + marks=pytest.mark.xfail(reason='second-level structs are not supported')), + pytest.param(ArrayGen(string_gen), + marks=pytest.mark.xfail(reason="arrays are not supported")), + pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen), + marks=pytest.mark.xfail(reason="maps are not supported")), ], ids=idfn) @pytest.mark.parametrize('order', [ - # pytest.param(f.col('a').asc()), - # pytest.param(f.col('a').asc_nulls_first()), - # pytest.param(f.col('a').asc_nulls_last(), - # marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').asc()), + pytest.param(f.col('a').asc_nulls_first()), + pytest.param(f.col('a').asc_nulls_last(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), pytest.param(f.col('a').desc()), - # pytest.param(f.col('a').desc_nulls_first(), - # marks=pytest.mark.xfail(reason='opposite null order not supported')), - # pytest.param(f.col('a').desc_nulls_last()), + pytest.param(f.col('a').desc_nulls_first(), + marks=pytest.mark.xfail(reason='opposite null order not supported')), + pytest.param(f.col('a').desc_nulls_last()), ], ids=idfn) def test_single_nested_orderby_with_limit(data_gen, order): assert_gpu_and_cpu_are_equal_collect( From 327e1005caad4f39db19b21a6cd845ae6d95f416 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 16 Mar 2021 07:27:47 -0700 Subject: [PATCH 15/26] Bobby's review: - fixed typesigs - use forgotten test params --- docs/supported_ops.md | 20 +++++++++---------- integration_tests/src/main/python/data_gen.py | 9 ++++----- .../nvidia/spark/rapids/GpuOverrides.scala | 18 +++++++++++------ .../com/nvidia/spark/rapids/TypeChecks.scala | 13 ++++++++++++ 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 3058c720828..f51c3491c12 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -145,11 +145,11 @@ Accelerator supports are described below. S S* S -NS -NS -NS -NS -PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +S +S +S* +S* +S* NS @@ -375,11 +375,11 @@ Accelerator supports are described below. S S* S -NS -NS -NS -NS -PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +S +S +S* +S* +S* NS diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 220e25faee8..e299dbce8b2 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -638,6 +638,8 @@ def gen_scalar_value(data_gen, seed=0, force_no_nulls=False): return v[0] def debug_df(df, path = None, file_format = 'json', num_parts = 1): + """Print out or save the contents and the schema of a dataframe for debugging.""" + if path is not None: # Save the dataframe and its schema # The schema can be re-created by using DataType.fromJson and used @@ -645,7 +647,7 @@ def debug_df(df, path = None, file_format = 'json', num_parts = 1): file_name = f"{path}.{file_format}" schema_file_name = f"{path}.schema.json" - df.coalesce(1).write.json(file_name) + df.coalesce(num_parts).write.format(file_format).save(file_name) print(f"SAVED df output for debugging at {file_name}") schema_json = df.schema.json() @@ -653,10 +655,7 @@ def debug_df(df, path = None, file_format = 'json', num_parts = 1): schema_file.write(schema_json) schema_file.close() print(f"SAVED df schema for debugging along in the output dir") - - - if path is None: - """print out the contents of a dataframe for debugging.""" + else: print('COLLECTED\n{}'.format(df.collect())) df.explain() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 2c15888d947..1cc0d9f1797 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -424,7 +424,7 @@ object GpuOverrides { "?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">") // TODO need a better name since used in multiple contexts - private[this] val sortOrderTypeSigs = ( + private[this] val pluginSupportedOrderableSig = ( TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + @@ -434,6 +434,11 @@ object GpuOverrides { TypeSig.DECIMAL )) + private[this] def isNestedType(dataType: DataType) = dataType match { + case ArrayType(_, _) | MapType(_, _, _) | StructType(_) => true + case _ => false + } + // this listener mechanism is global and is intended for use by unit tests only private val listeners: ListBuffer[GpuOverridesListener] = new ListBuffer[GpuOverridesListener]() @@ -1803,16 +1808,17 @@ object GpuOverrides { expr[SortOrder]( "Sort order", ExprChecks.projectOnly( - sortOrderTypeSigs, + pluginSupportedOrderableSig, TypeSig.orderable, Seq(ParamCheck( "input", - sortOrderTypeSigs, + pluginSupportedOrderableSig, TypeSig.orderable))), (sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) { override def tagExprForGpu(): Unit = { val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering - if (sortOrder.nullOrdering != directionDefaultNullOrdering) { + if (isNestedType(sortOrder.dataType) && + sortOrder.nullOrdering != directionDefaultNullOrdering) { willNotWorkOnGpu(s"Only default null ordering $directionDefaultNullOrdering " + s"supported. Found: ${sortOrder.nullOrdering}") } @@ -2565,7 +2571,7 @@ object GpuOverrides { }), exec[TakeOrderedAndProjectExec]( "Take the first limit elements as defined by the sortOrder, and do projection if needed.", - ExecChecks(sortOrderTypeSigs, TypeSig.all), + ExecChecks(TypeSig.all - TypeSig.UDT, TypeSig.all), (takeExec, conf, p, r) => new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) { val sortOrder: Seq[BaseExprMeta[SortOrder]] = @@ -2629,7 +2635,7 @@ object GpuOverrides { }), exec[CollectLimitExec]( "Reduce to single partition and apply limit", - ExecChecks(sortOrderTypeSigs, TypeSig.all), + ExecChecks(TypeSig.all - TypeSig.UDT, TypeSig.all), (collectLimitExec, conf, p, r) => new GpuCollectLimitMeta(collectLimitExec, conf, p, r)) .disabledByDefault("Collect Limit replacement can be slower on the GPU, if huge number " + "of rows in a batch it could help by limiting the number of rows transferred from " + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index a0c4d200d97..1f76280dabf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -170,6 +170,19 @@ final class TypeSig private( new TypeSig(it, nt, lt, nts) } + /** + * Remove a type signature. The reverse of + + * @param other what to remove + * @return the new signature + */ + def - (other: TypeSig): TypeSig = { + val it = initialTypes -- other.initialTypes + val nt = nestedTypes -- other.nestedTypes + val lt = litOnlyTypes -- other.litOnlyTypes + val nts = notes -- other.notes.keySet + new TypeSig(it, nt, lt, nts) + } + /** * Add nested types to this type signature. Note that these do not stack so if nesting has * nested types too they are ignored. From 205b7c3e3cd4097425f246fa4f4833523997b6a6 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 16 Mar 2021 11:52:53 -0700 Subject: [PATCH 16/26] Bobby's review #2 --- .../main/scala/com/nvidia/spark/rapids/GpuOverrides.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 1cc0d9f1797..9df03dfecea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -423,7 +423,6 @@ object GpuOverrides { "\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R", "?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">") - // TODO need a better name since used in multiple contexts private[this] val pluginSupportedOrderableSig = ( TypeSig.commonCudfTypes + TypeSig.NULL + @@ -1820,7 +1819,7 @@ object GpuOverrides { if (isNestedType(sortOrder.dataType) && sortOrder.nullOrdering != directionDefaultNullOrdering) { willNotWorkOnGpu(s"Only default null ordering $directionDefaultNullOrdering " + - s"supported. Found: ${sortOrder.nullOrdering}") + s"supported for nested types. Found: ${sortOrder.nullOrdering}") } } @@ -2704,8 +2703,7 @@ object GpuOverrides { "The backend for the sort operator", // The SortOrder TypeSig will govern what types can actually be used as sorting key data type. // The types below are allowed as inputs and outputs. - ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY + - TypeSig.STRUCT).nested(), TypeSig.all), + ExecChecks(TypeSig.all - TypeSig.UDT, TypeSig.all), (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)), exec[ExpandExec]( "The backend for the expand operator", From 9f01f85bc38a963e3cf19bd227d71ab86b99e33a Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 16 Mar 2021 12:27:53 -0700 Subject: [PATCH 17/26] proper handling of UDT nested --- .../main/scala/com/nvidia/spark/rapids/GpuOverrides.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 9df03dfecea..776622b779f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2570,7 +2570,7 @@ object GpuOverrides { }), exec[TakeOrderedAndProjectExec]( "Take the first limit elements as defined by the sortOrder, and do projection if needed.", - ExecChecks(TypeSig.all - TypeSig.UDT, TypeSig.all), + ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all), (takeExec, conf, p, r) => new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) { val sortOrder: Seq[BaseExprMeta[SortOrder]] = @@ -2634,7 +2634,7 @@ object GpuOverrides { }), exec[CollectLimitExec]( "Reduce to single partition and apply limit", - ExecChecks(TypeSig.all - TypeSig.UDT, TypeSig.all), + ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all), (collectLimitExec, conf, p, r) => new GpuCollectLimitMeta(collectLimitExec, conf, p, r)) .disabledByDefault("Collect Limit replacement can be slower on the GPU, if huge number " + "of rows in a batch it could help by limiting the number of rows transferred from " + @@ -2703,7 +2703,7 @@ object GpuOverrides { "The backend for the sort operator", // The SortOrder TypeSig will govern what types can actually be used as sorting key data type. // The types below are allowed as inputs and outputs. - ExecChecks(TypeSig.all - TypeSig.UDT, TypeSig.all), + ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all), (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)), exec[ExpandExec]( "The backend for the expand operator", From ff313ac78ea1ac7ec4235e32919ad5bcf1ea9374 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 16 Mar 2021 13:28:19 -0700 Subject: [PATCH 18/26] doc update --- docs/supported_ops.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index f51c3491c12..85c236d9ad1 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -147,9 +147,9 @@ Accelerator supports are described below. S S S -S* -S* -S* +PS* (missing nested UDT) +PS* (missing nested UDT) +PS* (missing nested UDT) NS @@ -352,11 +352,11 @@ Accelerator supports are described below. S S* S -NS -NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) -NS -PS* (missing nested BINARY, CALENDAR, MAP, UDT) +S +S +PS* (missing nested UDT) +PS* (missing nested UDT) +PS* (missing nested UDT) NS @@ -377,9 +377,9 @@ Accelerator supports are described below. S S S -S* -S* -S* +PS* (missing nested UDT) +PS* (missing nested UDT) +PS* (missing nested UDT) NS From d0ba59bb0dc1f571b8ae07f448fe3a7c57034485 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Mon, 22 Mar 2021 19:12:43 -0700 Subject: [PATCH 19/26] workaround upper/lower_bound unsupported --- .../src/main/python/sort_test.py | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 10edc497f34..b1dee9d67cd 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -34,6 +34,12 @@ def test_single_orderby(data_gen, order): lambda spark : unary_op_df(spark, data_gen).orderBy(order), conf = allow_negative_scale_of_decimal_conf) +@pytest.mark.parametrize('shuffle_parts', [ + pytest.param(1), + # This xfail would normally work but device-side asserts currently lead to memmory + # leaks, and they cause persitent bad state and good tests start failing + # pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) +]) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), pytest.param(StructGen([['child0', all_basic_struct_gen]]), @@ -53,10 +59,14 @@ def test_single_orderby(data_gen, order): marks=pytest.mark.xfail(reason='opposite null order not supported')), pytest.param(f.col('a').desc_nulls_last()), ], ids=idfn) -def test_single_nested_orderby(data_gen, order): +def test_single_nested_orderby_plain(data_gen, order, shuffle_parts): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order), - conf = allow_negative_scale_of_decimal_conf) + # TODO no interference with range partition once implemented + conf = { + **allow_negative_scale_of_decimal_conf, + **{'spark.sql.shuffle.partitions': shuffle_parts} + }) # SPARK CPU itself has issue with negative scale for take ordered and project orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)] @@ -66,6 +76,12 @@ def test_single_orderby_with_limit(data_gen, order): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) +@pytest.mark.parametrize('shuffle_parts', [ + pytest.param(1), + # This xfail would normally work but device-side asserts currently lead to memmory + # leaks, and they cause persitent bad state and good tests start failing + # pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) +]) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), pytest.param(StructGen([['child0', all_basic_struct_gen]]), @@ -85,9 +101,10 @@ def test_single_orderby_with_limit(data_gen, order): marks=pytest.mark.xfail(reason='opposite null order not supported')), pytest.param(f.col('a').desc_nulls_last()), ], ids=idfn) -def test_single_nested_orderby_with_limit(data_gen, order): +def test_single_nested_orderby_with_limit(data_gen, order, shuffle_parts): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) + lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100), + conf = {'spark.sql.shuffle.partitions': shuffle_parts}) @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) From 7b57913f6fa4056bdd76faed7bb901465f8ac63a Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 23 Mar 2021 13:07:44 -0700 Subject: [PATCH 20/26] Restrict struct-sort to a single partition - prevents device asserts for cudf upper/lower_bound --- integration_tests/src/main/python/sort_test.py | 8 ++------ .../rapids/shims/spark300/Spark300Shims.scala | 14 ++++++++++++++ .../com/nvidia/spark/rapids/GpuOverrides.scala | 16 +++++++++++----- .../com/nvidia/spark/rapids/SparkShims.scala | 2 ++ 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index b1dee9d67cd..e574457a000 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -36,9 +36,7 @@ def test_single_orderby(data_gen, order): @pytest.mark.parametrize('shuffle_parts', [ pytest.param(1), - # This xfail would normally work but device-side asserts currently lead to memmory - # leaks, and they cause persitent bad state and good tests start failing - # pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) + pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) ]) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), @@ -78,9 +76,7 @@ def test_single_orderby_with_limit(data_gen, order): @pytest.mark.parametrize('shuffle_parts', [ pytest.param(1), - # This xfail would normally work but device-side asserts currently lead to memmory - # leaks, and they cause persitent bad state and good tests start failing - # pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) + pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) ]) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 39df892a1f2..6896d863438 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -52,6 +52,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.python.WindowInPandasExec import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{COALESCE_PARTITIONS_INITIAL_PARTITION_NUM, SHUFFLE_PARTITIONS} import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase} import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecMetaBase @@ -625,4 +626,17 @@ class Spark300Shims extends SparkShims { override def hasAliasQuoteFix: Boolean = false override def hasCastFloatTimestampUpcast: Boolean = false + + // https://github.com/apache/spark/blob/39542bb81f8570219770bb6533c077f44f6cbd2a/ + // sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L3329 + override def numShufflePartitions: Int = { + val sqlConf = SQLConf.get + val defaultPartitions = sqlConf.getConf(SHUFFLE_PARTITIONS) + if (sqlConf.adaptiveExecutionEnabled && sqlConf.coalesceShufflePartitionsEnabled) { + sqlConf.getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM) + .getOrElse(defaultPartitions) + } else { + defaultPartitions + } + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 0032e4e1d1b..37333d946f9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1826,11 +1826,17 @@ object GpuOverrides { TypeSig.orderable))), (sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) { override def tagExprForGpu(): Unit = { - val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering - if (isNestedType(sortOrder.dataType) && - sortOrder.nullOrdering != directionDefaultNullOrdering) { - willNotWorkOnGpu(s"Only default null ordering $directionDefaultNullOrdering " + - s"supported for nested types. Found: ${sortOrder.nullOrdering}") + if (isNestedType(sortOrder.dataType)) { + val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering + if (sortOrder.nullOrdering != directionDefaultNullOrdering) { + willNotWorkOnGpu(s"Only default null ordering $directionDefaultNullOrdering " + + s"supported for nested types. Found: ${sortOrder.nullOrdering}") + } + + if (ShimLoader.getSparkShims.numShufflePartitions > 1) { + willNotWorkOnGpu(s"Only single partition sort is enabled until " + + s"Range Partitioning for structs is implemented") + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 0da3ec1cdab..cea81dd75f6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -217,4 +217,6 @@ trait SparkShims { def hasAliasQuoteFix: Boolean def hasCastFloatTimestampUpcast: Boolean + + def numShufflePartitions: Int } From 196ebc9091bc32b2e2292d09f92ee029b01c93ea Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 23 Mar 2021 18:58:00 -0700 Subject: [PATCH 21/26] improve error messages --- .../com/nvidia/spark/rapids/GpuOverrides.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 37333d946f9..b9c2f8f7135 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1827,15 +1827,18 @@ object GpuOverrides { (sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) { override def tagExprForGpu(): Unit = { if (isNestedType(sortOrder.dataType)) { + val nullOrdering = sortOrder.nullOrdering val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering - if (sortOrder.nullOrdering != directionDefaultNullOrdering) { - willNotWorkOnGpu(s"Only default null ordering $directionDefaultNullOrdering " + - s"supported for nested types. Found: ${sortOrder.nullOrdering}") + val direction = sortOrder.direction.sql + if (nullOrdering != directionDefaultNullOrdering) { + willNotWorkOnGpu(s"only default null ordering $directionDefaultNullOrdering " + + s"for direction $direction is supported for nested types; actual: ${nullOrdering}") } - if (ShimLoader.getSparkShims.numShufflePartitions > 1) { - willNotWorkOnGpu(s"Only single partition sort is enabled until " + - s"Range Partitioning for structs is implemented") + val numShufflePartitions = ShimLoader.getSparkShims.numShufflePartitions + if (numShufflePartitions > 1) { + willNotWorkOnGpu("only single partition sort is enabled until Range Partitioning " + + s"for structs is implemented, actual partions: $numShufflePartitions") } } } From 4b1f74244f71e38c69b90a44443b53df3082f8fd Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 25 Mar 2021 01:14:12 -0700 Subject: [PATCH 22/26] correct CPU constraints --- .../src/main/python/sort_test.py | 24 +++++++++++++++---- .../rapids/shims/spark300/Spark300Shims.scala | 14 ----------- .../nvidia/spark/rapids/GpuOverrides.scala | 22 +++++++++++------ .../spark/rapids/GpuTransitionOverrides.scala | 9 +++++-- .../com/nvidia/spark/rapids/RapidsConf.scala | 8 +++++++ .../com/nvidia/spark/rapids/SparkShims.scala | 2 -- 6 files changed, 50 insertions(+), 29 deletions(-) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index e574457a000..8ca37dadebb 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -38,6 +38,10 @@ def test_single_orderby(data_gen, order): pytest.param(1), pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) ]) +@pytest.mark.parametrize('stable_sort', [ + pytest.param(True), + pytest.param(False, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) +]) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), pytest.param(StructGen([['child0', all_basic_struct_gen]]), @@ -57,13 +61,17 @@ def test_single_orderby(data_gen, order): marks=pytest.mark.xfail(reason='opposite null order not supported')), pytest.param(f.col('a').desc_nulls_last()), ], ids=idfn) -def test_single_nested_orderby_plain(data_gen, order, shuffle_parts): +def test_single_nested_orderby_plain(data_gen, order, shuffle_parts, stable_sort): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order), # TODO no interference with range partition once implemented conf = { **allow_negative_scale_of_decimal_conf, - **{'spark.sql.shuffle.partitions': shuffle_parts} + **{ + 'spark.sql.shuffle.partitions': shuffle_parts, + 'spark.rapids.sql.stableSort.enabled': stable_sort, + 'spark.rapids.allowCpuRangePartitioning': False + } }) # SPARK CPU itself has issue with negative scale for take ordered and project @@ -78,6 +86,10 @@ def test_single_orderby_with_limit(data_gen, order): pytest.param(1), pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) ]) +@pytest.mark.parametrize('stable_sort', [ + pytest.param(True), + pytest.param(False, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) +]) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), pytest.param(StructGen([['child0', all_basic_struct_gen]]), @@ -97,10 +109,14 @@ def test_single_orderby_with_limit(data_gen, order): marks=pytest.mark.xfail(reason='opposite null order not supported')), pytest.param(f.col('a').desc_nulls_last()), ], ids=idfn) -def test_single_nested_orderby_with_limit(data_gen, order, shuffle_parts): +def test_single_nested_orderby_with_limit(data_gen, order, shuffle_parts, stable_sort): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100), - conf = {'spark.sql.shuffle.partitions': shuffle_parts}) + conf = { + 'spark.sql.shuffle.partitions': shuffle_parts, + 'spark.rapids.sql.stableSort.enabled': stable_sort, + 'spark.rapids.allowCpuRangePartitioning': False + }) @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn) @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 6896d863438..39df892a1f2 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -52,7 +52,6 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.python.WindowInPandasExec import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.{COALESCE_PARTITIONS_INITIAL_PARTITION_NUM, SHUFFLE_PARTITIONS} import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase} import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecMetaBase @@ -626,17 +625,4 @@ class Spark300Shims extends SparkShims { override def hasAliasQuoteFix: Boolean = false override def hasCastFloatTimestampUpcast: Boolean = false - - // https://github.com/apache/spark/blob/39542bb81f8570219770bb6533c077f44f6cbd2a/ - // sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L3329 - override def numShufflePartitions: Int = { - val sqlConf = SQLConf.get - val defaultPartitions = sqlConf.getConf(SHUFFLE_PARTITIONS) - if (sqlConf.adaptiveExecutionEnabled && sqlConf.coalesceShufflePartitionsEnabled) { - sqlConf.getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM) - .getOrElse(defaultPartitions) - } else { - defaultPartitions - } - } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index b9c2f8f7135..78d71e075e4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1834,12 +1834,6 @@ object GpuOverrides { willNotWorkOnGpu(s"only default null ordering $directionDefaultNullOrdering " + s"for direction $direction is supported for nested types; actual: ${nullOrdering}") } - - val numShufflePartitions = ShimLoader.getSparkShims.numShufflePartitions - if (numShufflePartitions > 1) { - willNotWorkOnGpu("only single partition sort is enabled until Range Partitioning " + - s"for structs is implemented, actual partions: $numShufflePartitions") - } } } @@ -2524,6 +2518,14 @@ object GpuOverrides { override val childExprs: Seq[BaseExprMeta[_]] = rp.ordering.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + override def tagPartForGpu() { + val numPartitions = rp.numPartitions + if (numPartitions > 1) { + willNotWorkOnGpu("only single partition sort is supported, " + + s"actual partitions: $numPartitions") + } + } + override def convertToGpu(): GpuPartitioning = { if (rp.numPartitions > 1) { val gpuOrdering = childExprs.map(_.convertToGpu()).asInstanceOf[Seq[SortOrder]] @@ -2771,7 +2773,13 @@ object GpuOverrides { // The SortOrder TypeSig will govern what types can actually be used as sorting key data type. // The types below are allowed as inputs and outputs. ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all), - (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)), + (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r) { + override def tagPlanForGpu() { + if (!conf.stableSort && sort.sortOrder.exists(so => isNestedType(so.dataType))) { + willNotWorkOnGpu(s"it's disabled unless ${RapidsConf.STABLE_SORT.key} is true") + } + } + }), exec[ExpandExec]( "The backend for the expand operator", ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index d00ddb23e87..c0960c70f61 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -16,7 +16,9 @@ package com.nvidia.spark.rapids +import org.apache.spark.RangePartitioner import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec} @@ -406,8 +408,11 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { case _: GpuColumnarToRowExecParent => () // Ignored case _: ExecutedCommandExec => () // Ignored case _: RDDScanExec => () // Ignored - case _: ShuffleExchangeExec => () // Ignored for now, we don't force it to the GPU if - // children are not on the gpu + case shuffleExchange: ShuffleExchangeExec if conf.cpuRangePartitioningPermitted + || !shuffleExchange.outputPartitioning.isInstanceOf[RangePartitioning] => { + // Ignored for now, we don't force it to the GPU if + // children are not on the gpu + } case other => if (!plan.supportsColumnar && !conf.testingAllowedNonGpu.contains(getBaseNameFromClass(other.getClass.toString))) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 4d91f46aece..d2b46ecc154 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -949,6 +949,12 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val CPU_RANGE_PARTITIONING_ALLOWED = conf("spark.rapids.allowCpuRangePartitioning") + .doc("Option to control enforcement of range partitioning on GPU.") + .internal() + .booleanConf + .createWithDefault(true) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -1273,6 +1279,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val getAlluxioPathsToReplace: Option[Seq[String]] = get(ALLUXIO_PATHS_REPLACE) + lazy val cpuRangePartitioningPermitted = get(CPU_RANGE_PARTITIONING_ALLOWED) + def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = { val default = !(isDisabledByDefault || incompat) || (incompat && isIncompatEnabled) conf.get(key).map(toBoolean(_, key)).getOrElse(default) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index cea81dd75f6..0da3ec1cdab 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -217,6 +217,4 @@ trait SparkShims { def hasAliasQuoteFix: Boolean def hasCastFloatTimestampUpcast: Boolean - - def numShufflePartitions: Int } From 7e8c9f88c324223de616dd9eb4d92f9cb0cb788e Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 25 Mar 2021 02:03:55 -0700 Subject: [PATCH 23/26] test fix --- docs/supported_ops.md | 30 +++++++++---------- .../src/main/python/sort_test.py | 12 +------- .../nvidia/spark/rapids/GpuOverrides.scala | 6 ++-- 3 files changed, 19 insertions(+), 29 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index f47c4a3d519..9aec6529863 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -145,11 +145,11 @@ Accelerator supports are described below. S S* S -S -S -PS* (missing nested UDT) -PS* (missing nested UDT) -PS* (missing nested UDT) +NS +NS +NS +NS +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -352,11 +352,11 @@ Accelerator supports are described below. S S* S -S -S -PS* (missing nested UDT) -PS* (missing nested UDT) -PS* (missing nested UDT) +NS +NS +NS +NS +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -375,11 +375,11 @@ Accelerator supports are described below. S S* S -S -S -PS* (missing nested UDT) -PS* (missing nested UDT) -PS* (missing nested UDT) +NS +NS +NS +NS +PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 8ca37dadebb..9f398f0c286 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -82,14 +82,6 @@ def test_single_orderby_with_limit(data_gen, order): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100)) -@pytest.mark.parametrize('shuffle_parts', [ - pytest.param(1), - pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) -]) -@pytest.mark.parametrize('stable_sort', [ - pytest.param(True), - pytest.param(False, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607")) -]) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), pytest.param(StructGen([['child0', all_basic_struct_gen]]), @@ -109,12 +101,10 @@ def test_single_orderby_with_limit(data_gen, order): marks=pytest.mark.xfail(reason='opposite null order not supported')), pytest.param(f.col('a').desc_nulls_last()), ], ids=idfn) -def test_single_nested_orderby_with_limit(data_gen, order, shuffle_parts, stable_sort): +def test_single_nested_orderby_with_limit(data_gen, order): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100), conf = { - 'spark.sql.shuffle.partitions': shuffle_parts, - 'spark.rapids.sql.stableSort.enabled': stable_sort, 'spark.rapids.allowCpuRangePartitioning': False }) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 78d71e075e4..aaf2a912c24 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2639,7 +2639,7 @@ object GpuOverrides { }), exec[TakeOrderedAndProjectExec]( "Take the first limit elements as defined by the sortOrder, and do projection if needed.", - ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all), + ExecChecks(pluginSupportedOrderableSig, TypeSig.all), (takeExec, conf, p, r) => new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) { val sortOrder: Seq[BaseExprMeta[SortOrder]] = @@ -2703,7 +2703,7 @@ object GpuOverrides { }), exec[CollectLimitExec]( "Reduce to single partition and apply limit", - ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all), + ExecChecks(pluginSupportedOrderableSig, TypeSig.all), (collectLimitExec, conf, p, r) => new GpuCollectLimitMeta(collectLimitExec, conf, p, r)) .disabledByDefault("Collect Limit replacement can be slower on the GPU, if huge number " + "of rows in a batch it could help by limiting the number of rows transferred from " + @@ -2772,7 +2772,7 @@ object GpuOverrides { "The backend for the sort operator", // The SortOrder TypeSig will govern what types can actually be used as sorting key data type. // The types below are allowed as inputs and outputs. - ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all), + ExecChecks(pluginSupportedOrderableSig, TypeSig.all), (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r) { override def tagPlanForGpu() { if (!conf.stableSort && sort.sortOrder.exists(so => isNestedType(so.dataType))) { From 9ee262bc206a0ca33efccc536122dfbefd988ed4 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 25 Mar 2021 11:21:15 -0700 Subject: [PATCH 24/26] check for nesting in RP --- .../main/scala/com/nvidia/spark/rapids/GpuOverrides.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index aaf2a912c24..652f5b10435 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2520,8 +2520,8 @@ object GpuOverrides { override def tagPartForGpu() { val numPartitions = rp.numPartitions - if (numPartitions > 1) { - willNotWorkOnGpu("only single partition sort is supported, " + + if (numPartitions > 1 && rp.ordering.exists(so => isNestedType(so.dataType))) { + willNotWorkOnGpu("only single partition sort is supported for nested types, " + s"actual partitions: $numPartitions") } } @@ -2776,7 +2776,8 @@ object GpuOverrides { (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r) { override def tagPlanForGpu() { if (!conf.stableSort && sort.sortOrder.exists(so => isNestedType(so.dataType))) { - willNotWorkOnGpu(s"it's disabled unless ${RapidsConf.STABLE_SORT.key} is true") + willNotWorkOnGpu("it's disabled for nested types " + + s"unless ${RapidsConf.STABLE_SORT.key} is true") } } }), From e5f99968569b62ecf1605b7f05da1332c90bee2c Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 25 Mar 2021 17:52:17 -0700 Subject: [PATCH 25/26] wip --- docs/supported_ops.md | 4 +-- .../nvidia/spark/rapids/GpuOverrides.scala | 27 ++++++++----------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 9aec6529863..b115249e8f6 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -354,9 +354,9 @@ Accelerator supports are described below. S NS NS +PS* (missing nested BINARY, CALENDAR, MAP, STRUCT, UDT) NS -NS -PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, MAP, STRUCT, UDT) NS diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 652f5b10435..88c4c8164a5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -423,18 +423,13 @@ object GpuOverrides { "\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R", "?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">") - private[this] val pluginSupportedOrderableSig = ( - TypeSig.commonCudfTypes + - TypeSig.NULL + - TypeSig.DECIMAL + - TypeSig.STRUCT.nested( - TypeSig.commonCudfTypes + - TypeSig.NULL + - TypeSig.DECIMAL - )) - - private[this] def isNestedType(dataType: DataType) = dataType match { - case ArrayType(_, _) | MapType(_, _, _) | StructType(_) => true + private[this] val _commonTypes = TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + private[this] val pluginSupportedOrderableSig = _commonTypes + + TypeSig.STRUCT.nested(_commonTypes) + + private[this] def isStructType(dataType: DataType) = dataType match { + case StructType(_) => true case _ => false } @@ -1826,7 +1821,7 @@ object GpuOverrides { TypeSig.orderable))), (sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) { override def tagExprForGpu(): Unit = { - if (isNestedType(sortOrder.dataType)) { + if (isStructType(sortOrder.dataType)) { val nullOrdering = sortOrder.nullOrdering val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering val direction = sortOrder.direction.sql @@ -2520,7 +2515,7 @@ object GpuOverrides { override def tagPartForGpu() { val numPartitions = rp.numPartitions - if (numPartitions > 1 && rp.ordering.exists(so => isNestedType(so.dataType))) { + if (numPartitions > 1 && rp.ordering.exists(so => isStructType(so.dataType))) { willNotWorkOnGpu("only single partition sort is supported for nested types, " + s"actual partitions: $numPartitions") } @@ -2772,10 +2767,10 @@ object GpuOverrides { "The backend for the sort operator", // The SortOrder TypeSig will govern what types can actually be used as sorting key data type. // The types below are allowed as inputs and outputs. - ExecChecks(pluginSupportedOrderableSig, TypeSig.all), + ExecChecks(pluginSupportedOrderableSig + TypeSig.ARRAY.nested(), TypeSig.all), (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r) { override def tagPlanForGpu() { - if (!conf.stableSort && sort.sortOrder.exists(so => isNestedType(so.dataType))) { + if (!conf.stableSort && sort.sortOrder.exists(so => isStructType(so.dataType))) { willNotWorkOnGpu("it's disabled for nested types " + s"unless ${RapidsConf.STABLE_SORT.key} is true") } From 59100920fe12e63d009fdaf9a0d09fc568019874 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 25 Mar 2021 19:50:12 -0700 Subject: [PATCH 26/26] reworked sortexec type list --- docs/supported_ops.md | 4 ++-- .../src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 1ab7049890e..fe803eb5d21 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -354,9 +354,9 @@ Accelerator supports are described below. S NS NS -PS* (missing nested BINARY, CALENDAR, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, MAP, UDT) NS -PS* (missing nested BINARY, CALENDAR, MAP, STRUCT, UDT) +PS* (missing nested BINARY, CALENDAR, MAP, UDT) NS diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index e9c1f42607a..874b4df020a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2781,7 +2781,8 @@ object GpuOverrides { "The backend for the sort operator", // The SortOrder TypeSig will govern what types can actually be used as sorting key data type. // The types below are allowed as inputs and outputs. - ExecChecks(pluginSupportedOrderableSig + TypeSig.ARRAY.nested(), TypeSig.all), + ExecChecks(pluginSupportedOrderableSig + (TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all), (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r) { override def tagPlanForGpu() { if (!conf.stableSort && sort.sortOrder.exists(so => isStructType(so.dataType))) {