Skip to content

Commit

Permalink
support GpuScalarSubquery (NVIDIA#1639)
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
  • Loading branch information
sperlingxx authored Feb 4, 2021
1 parent bcdab58 commit 609696e
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 3 deletions.
3 changes: 2 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Log2"></a>spark.rapids.sql.expression.Log2|`log2`|Log base 2|true|None|
<a name="sql.expression.Logarithm"></a>spark.rapids.sql.expression.Logarithm|`log`|Log variable base|true|None|
<a name="sql.expression.Lower"></a>spark.rapids.sql.expression.Lower|`lower`, `lcase`|String lowercase operator|false|This is not 100% compatible with the Spark version because in some cases unicode characters change byte width when changing the case. The GPU string conversion does not support these characters. For a full list of unsupported characters see https://github.com/rapidsai/cudf/issues/3132|
<a name="sql.expression.MakeDecimal"></a>spark.rapids.sql.expression.MakeDecimal| |Create a Decimal from an unscaled long value form some aggregation optimizations|true|None|
<a name="sql.expression.MakeDecimal"></a>spark.rapids.sql.expression.MakeDecimal| |Create a Decimal from an unscaled long value for some aggregation optimizations|true|None|
<a name="sql.expression.Md5"></a>spark.rapids.sql.expression.Md5|`md5`|MD5 hash operator|true|None|
<a name="sql.expression.Minute"></a>spark.rapids.sql.expression.Minute|`minute`|Returns the minute component of the string/timestamp|true|None|
<a name="sql.expression.MonotonicallyIncreasingID"></a>spark.rapids.sql.expression.MonotonicallyIncreasingID|`monotonically_increasing_id`|Returns monotonically increasing 64-bit integers|true|None|
Expand Down Expand Up @@ -256,6 +256,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Min"></a>spark.rapids.sql.expression.Min|`min`|Min aggregate operator|true|None|
<a name="sql.expression.Sum"></a>spark.rapids.sql.expression.Sum|`sum`|Sum aggregate operator|true|None|
<a name="sql.expression.NormalizeNaNAndZero"></a>spark.rapids.sql.expression.NormalizeNaNAndZero| |Normalize NaN and zero|true|None|
<a name="sql.expression.ScalarSubquery"></a>spark.rapids.sql.expression.ScalarSubquery| |Subquery that will return only one row and one column|true|None|
<a name="sql.expression.HiveGenericUDF"></a>spark.rapids.sql.expression.HiveGenericUDF| |Hive Generic UDF, support requires the UDF to implement a RAPIDS accelerated interface|true|None|
<a name="sql.expression.HiveSimpleUDF"></a>spark.rapids.sql.expression.HiveSimpleUDF| |Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface|true|None|

Expand Down
28 changes: 27 additions & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -8902,7 +8902,7 @@ Accelerator support is described below.
<tr>
<td rowSpan="2">MakeDecimal</td>
<td rowSpan="2"> </td>
<td rowSpan="2">Create a Decimal from an unscaled long value form some aggregation optimizations</td>
<td rowSpan="2">Create a Decimal from an unscaled long value for some aggregation optimizations</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>input</td>
Expand Down Expand Up @@ -16998,6 +16998,32 @@ Accelerator support is described below.
<td> </td>
</tr>
<tr>
<td rowSpan="1">ScalarSubquery</td>
<td rowSpan="1"> </td>
<td rowSpan="1">Subquery that will return only one row and one column</td>
<td rowSpan="1">None</td>
<td rowSpan="1">project</td>
<td>result</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="4">HiveGenericUDF</td>
<td rowSpan="4"> </td>
<td rowSpan="4">Hive Generic UDF, support requires the UDF to implement a RAPIDS accelerated interface</td>
Expand Down
34 changes: 34 additions & 0 deletions integration_tests/src/main/python/subsuqery_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from asserts import assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from marks import *

gens = [('l', LongGen()), ('i', IntegerGen()), ('f', FloatGen()), (
's', StringGen())]


@ignore_order
@pytest.mark.parametrize('data_gen', [gens], ids=idfn)
def test_scalar_subquery(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, data_gen, length=2048),
'table',
'''
select l, i, f, (select count(s) from table) as c
from table
where l > (select max(i) from table) or f < (select min(i) from table)
''')
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.ScalarSubquery
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec, ExecutedCommandExec}
Expand Down Expand Up @@ -2256,13 +2257,23 @@ object GpuOverrides {
override def convertToGpu(child: Expression): GpuExpression = GpuUnscaledValue(child)
}),
expr[MakeDecimal](
"Create a Decimal from an unscaled long value form some aggregation optimizations",
"Create a Decimal from an unscaled long value for some aggregation optimizations",
ExprChecks.unaryProject(TypeSig.DECIMAL, TypeSig.DECIMAL,
TypeSig.LONG, TypeSig.LONG),
(a, conf, p, r) => new UnaryExprMeta[MakeDecimal](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow)
}),
expr[ScalarSubquery](
"Subquery that will return only one row and one column",
ExprChecks.projectOnly(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
Nil, None),
(a, conf, p, r) => new ExprMeta[ScalarSubquery](a, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuScalarSubquery(a.plan, a.exprId)
}
),
GpuScalaUDF.exprMeta
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids

import com.nvidia.spark.rapids.GpuExpression

import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId}
import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* GPU placeholder of ScalarSubquery, which returns the scalar result with columnarEval method.
* This placeholder is to make ScalarSubquery working as a GPUExpression to cooperate
* other GPU overrides.
*/
case class GpuScalarSubquery(
plan: BaseSubqueryExec,
exprId: ExprId)
extends ExecSubqueryExpression with GpuExpression {

override def dataType: DataType = plan.schema.fields.head.dataType
override def children: Seq[Expression] = Seq.empty
override def nullable: Boolean = true
override def toString: String = plan.simpleString(SQLConf.get.maxToStringFields)
override def withNewPlan(query: BaseSubqueryExec): GpuScalarSubquery = copy(plan = query)

override def semanticEquals(other: Expression): Boolean = other match {
case s: GpuScalarSubquery => plan.sameResult(s.plan)
case _ => false
}

// the first column in first row from `query`.
@volatile private var result: Any = _
@volatile private var updated: Boolean = false

override def updateResult(): Unit = {
val rows = plan.executeCollect()
if (rows.length > 1) {
sys.error(s"more than one row returned by a subquery used as an expression:\n$plan")
} else if (rows.length == 1) {
assert(rows.head.numFields == 1,
s"Expects 1 field, but got ${rows.head.numFields}; something went wrong in analysis")
result = rows.head.get(0, dataType)
} else {
// If there is no rows returned, the result should be null.
result = null
}
updated = true
}

override def columnarEval(batch: ColumnarBatch): Any = {
require(updated, s"$this has not finished")
result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import org.apache.spark.sql.execution.{ScalarSubquery, SparkPlan}

class ScalarSubquerySuite extends SparkQueryCompareTestSuite {

private def checkExecPlan(plan: SparkPlan): Unit = {
if (!plan.conf.getAllConfs(RapidsConf.SQL_ENABLED.key).toBoolean) return
plan.find(_.expressions.exists(e => e.find(_.isInstanceOf[ScalarSubquery]).nonEmpty)) match {
case Some(plan) =>
throw new AssertionError(s"Assume no (cpu)ScalarSubquery, but found in $plan")
case None =>
}
}

testSparkResultsAreEqual("Uncorrelated Scalar Subquery", longsFromCSVDf,
repart = 0) {
frame => {
frame.createOrReplaceTempView("table")
val ret = frame.sparkSession.sql(
"SELECT longs, (SELECT max(more_longs) FROM table) FROM table")
checkExecPlan(ret.queryExecution.executedPlan)
ret
}
}
}

0 comments on commit 609696e

Please sign in to comment.