Skip to content

Commit

Permalink
Merge branch 'NVIDIA:branch-23.12' into branch-23.12
Browse files Browse the repository at this point in the history
  • Loading branch information
sameerz authored Nov 1, 2023
2 parents fd596a4 + 56d1be1 commit 92d59d4
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 32 deletions.
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.SparkPartitionID"></a>spark.rapids.sql.expression.SparkPartitionID|`spark_partition_id`|Returns the current partition id|true|None|
<a name="sql.expression.SpecifiedWindowFrame"></a>spark.rapids.sql.expression.SpecifiedWindowFrame| |Specification of the width of the group (or "frame") of input rows around which a window function is evaluated|true|None|
<a name="sql.expression.Sqrt"></a>spark.rapids.sql.expression.Sqrt|`sqrt`|Square root|true|None|
<a name="sql.expression.Stack"></a>spark.rapids.sql.expression.Stack|`stack`|Separates expr1, ..., exprk into n rows.|true|None|
<a name="sql.expression.StartsWith"></a>spark.rapids.sql.expression.StartsWith| |Starts with|true|None|
<a name="sql.expression.StringInstr"></a>spark.rapids.sql.expression.StringInstr|`instr`|Instr string operator|true|None|
<a name="sql.expression.StringLPad"></a>spark.rapids.sql.expression.StringLPad|`lpad`|Pad a string on the left|true|None|
Expand Down
118 changes: 93 additions & 25 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -13351,22 +13351,22 @@ are limited.
<td> </td>
</tr>
<tr>
<td rowSpan="3">StartsWith</td>
<td rowSpan="3"> </td>
<td rowSpan="3">Starts with</td>
<td rowSpan="3">Stack</td>
<td rowSpan="3">`stack`</td>
<td rowSpan="3">Separates expr1, ..., exprk into n rows.</td>
<td rowSpan="3">None</td>
<td rowSpan="3">project</td>
<td>src</td>
<td>n</td>
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>Literal value only</em></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -13377,29 +13377,28 @@ are limited.
<td> </td>
</tr>
<tr>
<td>search</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>Literal value only</em></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>expr</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -13414,6 +13413,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -13445,6 +13445,74 @@ are limited.
<th>UDT</th>
</tr>
<tr>
<td rowSpan="3">StartsWith</td>
<td rowSpan="3"> </td>
<td rowSpan="3">Starts with</td>
<td rowSpan="3">None</td>
<td rowSpan="3">project</td>
<td>src</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>search</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>Literal value only</em></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>result</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="3">StringInstr</td>
<td rowSpan="3">`instr`</td>
<td rowSpan="3">Instr string operator</td>
Expand Down
33 changes: 33 additions & 0 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,36 @@ def test_generate_outer_fallback():
lambda spark: spark.sql("SELECT array(struct(1, 'a'), struct(2, 'b')) as x")\
.repartition(1).selectExpr("inline_outer(x)"),
"GenerateExec")

# gpu stack not guarantee to produce the same output order as Spark does
@ignore_order(local=True)
def test_stack():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.range(100).selectExpr('*', 'stack(3, id, 2L, 3L, 4L, 5L, 6L)'))

# gpu stack not guarantee to produce the same output order as Spark does
@ignore_order(local=True)
def test_stack_mixed_types():
data_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in
enumerate(all_basic_gens + decimal_gens)], nullable=False)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen, length=100)
.selectExpr('*', 'stack(2, child1, child2, child3, child4, child5, child6, ' +
'child7, child8, child9, child10, child11, child12, child13, ' +
'1Y, 2S, 3, 4L, 5.0f, 6.0d, "7", false, to_date("2009-01-01"), ' +
'to_timestamp("2010-01-01 00:00:00"), null, 1234.567, ' +
'1234567890.12, 123456789012345678.90)'))

# gpu stack not guarantee to produce the same output order as Spark does
@ignore_order(local=True)
def test_stack_nested_types():
data_gen = StructGen([['array', ArrayGen(IntegerGen(nullable=False))],
['map', MapGen(IntegerGen(nullable=False), StringGen(nullable=False))],
['struct', StructGen([['col1', IntegerGen(nullable=False)],
['col2', StringGen(nullable=False)]])]
], nullable=False)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen, length=100)
.selectExpr('*', 'stack(2, map, array, struct, ' +
'map(1, "a", 2, "b"), array(1, 2, 3), struct(1, "a"))'))

Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, OrderByArg, Scalar, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuOverrides.extractLit
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.{ShimExpression, ShimUnaryExecNode}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Generator, ReplicateRows}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Generator, ReplicateRows, Stack}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{GenerateExec, SparkPlan}
import org.apache.spark.sql.rapids.GpuCreateArray
Expand All @@ -52,13 +53,64 @@ class GpuGenerateExecSparkPlanMeta(
}
}

private def getExpandExecForStack(stackMeta: GpuStackMeta): GpuExec = {
val numRows = extractLit(stackMeta.childExprs.head.convertToCpu()) match {
case Some(lit) => lit.value.asInstanceOf[Int]
case _ => throw new IllegalStateException("First parameter of stack should be a literal Int")
}
val numFields = Math.ceil((stackMeta.childExprs.length - 1.0) / numRows).toInt
val projections: Seq[Seq[Expression]] = for (row <- 0 until numRows) yield {
val childExprsGpu = childExprs.tail.map(_.convertToGpu())
val otherProj: Seq[Expression] = for (req <- childExprsGpu) yield {
req
}
val stackProj: Seq[Expression] = for (col <- 0 until numFields) yield {
val index = row * numFields + col
val dataChildren = stackMeta.childExprs.tail
if (index >= dataChildren.length) {
val typeInfo = dataChildren(col).dataType
GpuLiteral(null, typeInfo)
} else {
dataChildren(index).convertToGpu()
}
}
otherProj ++ stackProj
}
val output: Seq[Attribute] = gen.requiredChildOutput ++ gen.generatorOutput.take(numFields)
GpuExpandExec(projections, output, childPlans.head.convertIfNeeded())(
useTieredProject = conf.isTieredProjectEnabled)
}

override def convertToGpu(): GpuExec = {
GpuGenerateExec(
childExprs.head.convertToGpu().asInstanceOf[GpuGenerator],
gen.requiredChildOutput,
gen.outer,
gen.generatorOutput,
childPlans.head.convertIfNeeded())
// if child expression contains Stack, use GpuExpandExec instead
childExprs.head match {
case stackMeta: GpuStackMeta =>
getExpandExecForStack(stackMeta)
case genMeta =>
GpuGenerateExec(
genMeta.convertToGpu().asInstanceOf[GpuGenerator],
gen.requiredChildOutput,
gen.outer,
gen.generatorOutput,
childPlans.head.convertIfNeeded())
}
}
}

class GpuStackMeta(
stack: Stack,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends BaseExprMeta[Stack](stack, conf, parent, rule) {

override val childExprs: Seq[BaseExprMeta[_]] = stack.children
.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override def convertToGpu(): GpuExpression = {
// There is no need to implement convertToGpu() here, because GpuGenerateExec will handle
// stack logic in terms of GpuExpandExec, no convertToGpu() will be called during the process
throw new UnsupportedOperationException(s"Should not be here: $this")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3300,6 +3300,19 @@ object GpuOverrides extends Logging {
override val supportOuter: Boolean = true
override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs.head.convertToGpu())
}),
expr[Stack](
"Separates expr1, ..., exprk into n rows.",
ExprChecks.projectOnly(
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP),
TypeSig.ARRAY.nested(TypeSig.all),
Seq(ParamCheck("n", TypeSig.lit(TypeEnum.INT), TypeSig.INT)),
Some(RepeatingParamCheck("expr",
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 +
TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(),
TypeSig.all))),
(a, conf, p, r) => new GpuStackMeta(a, conf, p, r)
),
expr[ReplicateRows](
"Given an input row replicates the row N times",
ExprChecks.projectOnly(
Expand Down
1 change: 1 addition & 0 deletions tools/generated_files/operatorsScore.csv
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ SortOrder,4
SparkPartitionID,4
SpecifiedWindowFrame,4
Sqrt,4
Stack,4
StartsWith,4
StddevPop,4
StddevSamp,4
Expand Down
3 changes: 3 additions & 0 deletions tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,9 @@ Sqrt,S,`sqrt`,None,project,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,
Sqrt,S,`sqrt`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Sqrt,S,`sqrt`,None,AST,input,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Sqrt,S,`sqrt`,None,AST,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Stack,S,`stack`,None,project,n,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Stack,S,`stack`,None,project,expr,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS
Stack,S,`stack`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA
StartsWith,S, ,None,project,src,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
StartsWith,S, ,None,project,search,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
StartsWith,S, ,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Expand Down

0 comments on commit 92d59d4

Please sign in to comment.