Skip to content

Commit

Permalink
support GpuSize (NVIDIA#1972)
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
  • Loading branch information
sperlingxx authored Mar 19, 2021
1 parent be6abd3 commit 3b7b86e
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Signum"></a>spark.rapids.sql.expression.Signum|`sign`, `signum`|Returns -1.0, 0.0 or 1.0 as expr is negative, 0 or positive|true|None|
<a name="sql.expression.Sin"></a>spark.rapids.sql.expression.Sin|`sin`|Sine|true|None|
<a name="sql.expression.Sinh"></a>spark.rapids.sql.expression.Sinh|`sinh`|Hyperbolic sine|true|None|
<a name="sql.expression.Size"></a>spark.rapids.sql.expression.Size|`size`, `cardinality`|The size of an array or a map|true|None|
<a name="sql.expression.SortOrder"></a>spark.rapids.sql.expression.SortOrder| |Sort order|true|None|
<a name="sql.expression.SparkPartitionID"></a>spark.rapids.sql.expression.SparkPartitionID|`spark_partition_id`|Returns the current partition id|true|None|
<a name="sql.expression.SpecifiedWindowFrame"></a>spark.rapids.sql.expression.SpecifiedWindowFrame| |Specification of the width of the group (or "frame") of input rows around which a window function is evaluated|true|None|
Expand Down
90 changes: 90 additions & 0 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -12309,6 +12309,96 @@ Accelerator support is described below.
<td> </td>
</tr>
<tr>
<td rowSpan="4">Size</td>
<td rowSpan="4">`size`, `cardinality`</td>
<td rowSpan="4">The size of an array or a map</td>
<td rowSpan="4">None</td>
<td rowSpan="2">project</td>
<td>input</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S*</td>
<td>S*</td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">lambda</td>
<td>input</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="2">SortOrder</td>
<td rowSpan="2"> </td>
<td rowSpan="2">Sort order</td>
Expand Down
34 changes: 34 additions & 0 deletions integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn)
def test_size_of_array(data_gen, size_of_null):
gen = ArrayGen(data_gen)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr('size(a)'),
conf={'spark.sql.legacy.sizeOfNull': size_of_null})

@pytest.mark.parametrize('data_gen', map_gens_sample, ids=idfn)
@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn)
def test_size_of_map(data_gen, size_of_null):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).selectExpr('size(a)'),
conf={'spark.sql.legacy.sizeOfNull': size_of_null})
Original file line number Diff line number Diff line change
Expand Up @@ -2336,6 +2336,15 @@ object GpuOverrides {
(a, conf, p, r) => new UnaryExprMeta[Length](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression = GpuLength(child)
}),
expr[Size](
"The size of an array or a map",
ExprChecks.unaryProjectNotLambda(TypeSig.INT, TypeSig.INT,
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
(a, conf, p, r) => new UnaryExprMeta[Size](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
GpuSize(child, a.legacySizeOfNull)
}),
expr[UnscaledValue](
"Convert a Decimal to an unscaled long value for some aggregation optimizations",
ExprChecks.unaryProject(TypeSig.LONG, TypeSig.LONG,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import ai.rapids.cudf.ColumnVector

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types._

case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
extends GpuUnaryExpression {

require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType],
s"The size function doesn't support the operand type ${child.dataType}")

override def dataType: DataType = IntegerType
override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable

override protected def doColumnar(input: GpuColumnVector): ColumnVector = {
val inputBase = input.getBase
if (inputBase.getRowCount == 0) {
return GpuColumnVector.from(GpuScalar.from(0), 0, IntegerType).getBase
}

// Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering
// MapData is represented as List of Struct in terms of cuDF.
// We compute list size via subtracting the offset of next element(row) to the current offset.
val collectionSize = {
// Here is a hack: using index -1 to fetch the offset column of list.
// In terms of cuDF native, the offset is the first (index 0) child of list_column_view.
// In JNI layer, we add 1 to the child index when fetching child column of ListType to keep
// alignment.
// So, in JVM layer, we have to use -1 as index to fetch the real first child of list_column.
withResource(inputBase.getChildColumnView(-1)) { offset =>
withResource(offset.subVector(1)) { upBound =>
withResource(offset.subVector(0, offset.getRowCount.toInt - 1)) { lowBound =>
upBound.sub(lowBound)
}
}
}
}

val nullScalar = if (legacySizeOfNull) {
GpuScalar.from(-1)
} else {
GpuScalar.from(null, IntegerType)
}

withResource(collectionSize) { collectionSize =>
withResource(nullScalar) { nullScalar =>
withResource(inputBase.isNull) { inputIsNull =>
inputIsNull.ifElse(nullScalar, collectionSize)
}
}
}
}
}

0 comments on commit 3b7b86e

Please sign in to comment.