diff --git a/docs/configs.md b/docs/configs.md
index 003532d9112..56e6d5411eb 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -221,6 +221,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
spark.rapids.sql.expression.Signum|`sign`, `signum`|Returns -1.0, 0.0 or 1.0 as expr is negative, 0 or positive|true|None|
spark.rapids.sql.expression.Sin|`sin`|Sine|true|None|
spark.rapids.sql.expression.Sinh|`sinh`|Hyperbolic sine|true|None|
+spark.rapids.sql.expression.Size|`size`, `cardinality`|The size of an array or a map|true|None|
spark.rapids.sql.expression.SortOrder| |Sort order|true|None|
spark.rapids.sql.expression.SparkPartitionID|`spark_partition_id`|Returns the current partition id|true|None|
spark.rapids.sql.expression.SpecifiedWindowFrame| |Specification of the width of the group (or "frame") of input rows around which a window function is evaluated|true|None|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index 841fe7dd616..9a70c83f8ac 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -12309,6 +12309,96 @@ Accelerator support is described below.
|
+Size |
+`size`, `cardinality` |
+The size of an array or a map |
+None |
+project |
+input |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+S* |
+S* |
+ |
+ |
+
+
+result |
+ |
+ |
+ |
+S |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+
+
+lambda |
+input |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+NS |
+NS |
+ |
+ |
+
+
+result |
+ |
+ |
+ |
+NS |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+ |
+
+
SortOrder |
|
Sort order |
diff --git a/integration_tests/src/main/python/collection_ops_test.py b/integration_tests/src/main/python/collection_ops_test.py
new file mode 100644
index 00000000000..367a297ed3f
--- /dev/null
+++ b/integration_tests/src/main/python/collection_ops_test.py
@@ -0,0 +1,34 @@
+# Copyright (c) 2021, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from asserts import assert_gpu_and_cpu_are_equal_collect
+from data_gen import *
+from pyspark.sql.types import *
+
+@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
+@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn)
+def test_size_of_array(data_gen, size_of_null):
+ gen = ArrayGen(data_gen)
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: unary_op_df(spark, gen).selectExpr('size(a)'),
+ conf={'spark.sql.legacy.sizeOfNull': size_of_null})
+
+@pytest.mark.parametrize('data_gen', map_gens_sample, ids=idfn)
+@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn)
+def test_size_of_map(data_gen, size_of_null):
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: unary_op_df(spark, data_gen).selectExpr('size(a)'),
+ conf={'spark.sql.legacy.sizeOfNull': size_of_null})
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index 180dab5f67a..e4818c31605 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -2336,6 +2336,15 @@ object GpuOverrides {
(a, conf, p, r) => new UnaryExprMeta[Length](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression = GpuLength(child)
}),
+ expr[Size](
+ "The size of an array or a map",
+ ExprChecks.unaryProjectNotLambda(TypeSig.INT, TypeSig.INT,
+ (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all),
+ (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
+ (a, conf, p, r) => new UnaryExprMeta[Size](a, conf, p, r) {
+ override def convertToGpu(child: Expression): GpuExpression =
+ GpuSize(child, a.legacySizeOfNull)
+ }),
expr[UnscaledValue](
"Convert a Decimal to an unscaled long value for some aggregation optimizations",
ExprChecks.unaryProject(TypeSig.LONG, TypeSig.LONG,
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala
new file mode 100644
index 00000000000..9129dc9a32d
--- /dev/null
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2021, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids
+
+import ai.rapids.cudf.ColumnVector
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.types._
+
+case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
+ extends GpuUnaryExpression {
+
+ require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType],
+ s"The size function doesn't support the operand type ${child.dataType}")
+
+ override def dataType: DataType = IntegerType
+ override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable
+
+ override protected def doColumnar(input: GpuColumnVector): ColumnVector = {
+ val inputBase = input.getBase
+ if (inputBase.getRowCount == 0) {
+ return GpuColumnVector.from(GpuScalar.from(0), 0, IntegerType).getBase
+ }
+
+ // Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering
+ // MapData is represented as List of Struct in terms of cuDF.
+ // We compute list size via subtracting the offset of next element(row) to the current offset.
+ val collectionSize = {
+ // Here is a hack: using index -1 to fetch the offset column of list.
+ // In terms of cuDF native, the offset is the first (index 0) child of list_column_view.
+ // In JNI layer, we add 1 to the child index when fetching child column of ListType to keep
+ // alignment.
+ // So, in JVM layer, we have to use -1 as index to fetch the real first child of list_column.
+ withResource(inputBase.getChildColumnView(-1)) { offset =>
+ withResource(offset.subVector(1)) { upBound =>
+ withResource(offset.subVector(0, offset.getRowCount.toInt - 1)) { lowBound =>
+ upBound.sub(lowBound)
+ }
+ }
+ }
+ }
+
+ val nullScalar = if (legacySizeOfNull) {
+ GpuScalar.from(-1)
+ } else {
+ GpuScalar.from(null, IntegerType)
+ }
+
+ withResource(collectionSize) { collectionSize =>
+ withResource(nullScalar) { nullScalar =>
+ withResource(inputBase.isNull) { inputIsNull =>
+ inputIsNull.ifElse(nullScalar, collectionSize)
+ }
+ }
+ }
+ }
+}