From 3b7b86eeec1b24b08874aa1382d1bc3b8b970e55 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Fri, 19 Mar 2021 21:46:41 +0800 Subject: [PATCH] support GpuSize (#1972) Signed-off-by: sperlingxx --- docs/configs.md | 1 + docs/supported_ops.md | 90 +++++++++++++++++++ .../src/main/python/collection_ops_test.py | 34 +++++++ .../nvidia/spark/rapids/GpuOverrides.scala | 9 ++ .../spark/rapids/collectionOperations.scala | 71 +++++++++++++++ 5 files changed, 205 insertions(+) create mode 100644 integration_tests/src/main/python/collection_ops_test.py create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala diff --git a/docs/configs.md b/docs/configs.md index 003532d9112..56e6d5411eb 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -221,6 +221,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Signum|`sign`, `signum`|Returns -1.0, 0.0 or 1.0 as expr is negative, 0 or positive|true|None| spark.rapids.sql.expression.Sin|`sin`|Sine|true|None| spark.rapids.sql.expression.Sinh|`sinh`|Hyperbolic sine|true|None| +spark.rapids.sql.expression.Size|`size`, `cardinality`|The size of an array or a map|true|None| spark.rapids.sql.expression.SortOrder| |Sort order|true|None| spark.rapids.sql.expression.SparkPartitionID|`spark_partition_id`|Returns the current partition id|true|None| spark.rapids.sql.expression.SpecifiedWindowFrame| |Specification of the width of the group (or "frame") of input rows around which a window function is evaluated|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 841fe7dd616..9a70c83f8ac 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -12309,6 +12309,96 @@ Accelerator support is described below. +Size +`size`, `cardinality` +The size of an array or a map +None +project +input + + + + + + + + + + + + + + +S* +S* + + + + +result + + + +S + + + + + + + + + + + + + + + + +lambda +input + + + + + + + + + + + + + + +NS +NS + + + + +result + + + +NS + + + + + + + + + + + + + + + + SortOrder Sort order diff --git a/integration_tests/src/main/python/collection_ops_test.py b/integration_tests/src/main/python/collection_ops_test.py new file mode 100644 index 00000000000..367a297ed3f --- /dev/null +++ b/integration_tests/src/main/python/collection_ops_test.py @@ -0,0 +1,34 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect +from data_gen import * +from pyspark.sql.types import * + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn) +def test_size_of_array(data_gen, size_of_null): + gen = ArrayGen(data_gen) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, gen).selectExpr('size(a)'), + conf={'spark.sql.legacy.sizeOfNull': size_of_null}) + +@pytest.mark.parametrize('data_gen', map_gens_sample, ids=idfn) +@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn) +def test_size_of_map(data_gen, size_of_null): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, data_gen).selectExpr('size(a)'), + conf={'spark.sql.legacy.sizeOfNull': size_of_null}) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 180dab5f67a..e4818c31605 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2336,6 +2336,15 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[Length](a, conf, p, r) { override def convertToGpu(child: Expression): GpuExpression = GpuLength(child) }), + expr[Size]( + "The size of an array or a map", + ExprChecks.unaryProjectNotLambda(TypeSig.INT, TypeSig.INT, + (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all), + (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)), + (a, conf, p, r) => new UnaryExprMeta[Size](a, conf, p, r) { + override def convertToGpu(child: Expression): GpuExpression = + GpuSize(child, a.legacySizeOfNull) + }), expr[UnscaledValue]( "Convert a Decimal to an unscaled long value for some aggregation optimizations", ExprChecks.unaryProject(TypeSig.LONG, TypeSig.LONG, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala new file mode 100644 index 00000000000..9129dc9a32d --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.ColumnVector + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.types._ + +case class GpuSize(child: Expression, legacySizeOfNull: Boolean) + extends GpuUnaryExpression { + + require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType], + s"The size function doesn't support the operand type ${child.dataType}") + + override def dataType: DataType = IntegerType + override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable + + override protected def doColumnar(input: GpuColumnVector): ColumnVector = { + val inputBase = input.getBase + if (inputBase.getRowCount == 0) { + return GpuColumnVector.from(GpuScalar.from(0), 0, IntegerType).getBase + } + + // Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering + // MapData is represented as List of Struct in terms of cuDF. + // We compute list size via subtracting the offset of next element(row) to the current offset. + val collectionSize = { + // Here is a hack: using index -1 to fetch the offset column of list. + // In terms of cuDF native, the offset is the first (index 0) child of list_column_view. + // In JNI layer, we add 1 to the child index when fetching child column of ListType to keep + // alignment. + // So, in JVM layer, we have to use -1 as index to fetch the real first child of list_column. + withResource(inputBase.getChildColumnView(-1)) { offset => + withResource(offset.subVector(1)) { upBound => + withResource(offset.subVector(0, offset.getRowCount.toInt - 1)) { lowBound => + upBound.sub(lowBound) + } + } + } + } + + val nullScalar = if (legacySizeOfNull) { + GpuScalar.from(-1) + } else { + GpuScalar.from(null, IntegerType) + } + + withResource(collectionSize) { collectionSize => + withResource(nullScalar) { nullScalar => + withResource(inputBase.isNull) { inputIsNull => + inputIsNull.ifElse(nullScalar, collectionSize) + } + } + } + } +}