Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable sort for single-level nesting struct columns on GPU #1883

Merged
merged 35 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cd3c436
Enable sort of structs
gerashegalov Mar 5, 2021
00ba3ff
struct gens for test_single_orderby
gerashegalov Mar 5, 2021
1463028
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 5, 2021
586f28f
revert unneeded data_gen change
gerashegalov Mar 5, 2021
18d53a4
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 10, 2021
c6407d0
interactive tests pass
gerashegalov Mar 11, 2021
88c8a9a
add integration tests
gerashegalov Mar 11, 2021
ddd60e4
revert GpuTimeMath refactor
gerashegalov Mar 11, 2021
ce7568c
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 11, 2021
ad934b2
add tests for limit
gerashegalov Mar 12, 2021
5910d1a
Consolidate RAT settings in parent pom
gerashegalov Mar 12, 2021
c538bc2
Merge branch 'ratPluginSettingsInParent' into issue-1605-struct-sort
gerashegalov Mar 12, 2021
d7ab1ea
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 12, 2021
33762de
Enable saving data for debugging
gerashegalov Mar 12, 2021
efbfa3f
Save schema for debugging
gerashegalov Mar 12, 2021
21a05c7
wip
gerashegalov Mar 13, 2021
1dd4b23
cleanup
gerashegalov Mar 13, 2021
b6f44da
comment
gerashegalov Mar 13, 2021
1f4514d
uncomment test params
gerashegalov Mar 15, 2021
327e100
Bobby's review:
gerashegalov Mar 16, 2021
205b7c3
Bobby's review #2
gerashegalov Mar 16, 2021
9f01f85
proper handling of UDT nested
gerashegalov Mar 16, 2021
ff313ac
doc update
gerashegalov Mar 16, 2021
3391834
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 17, 2021
db64198
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 20, 2021
d0ba59b
workaround upper/lower_bound unsupported
gerashegalov Mar 23, 2021
46269e7
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 23, 2021
7b57913
Restrict struct-sort to a single partition
gerashegalov Mar 23, 2021
196ebc9
improve error messages
gerashegalov Mar 24, 2021
4b1f742
correct CPU constraints
gerashegalov Mar 25, 2021
7e8c9f8
test fix
gerashegalov Mar 25, 2021
9ee262b
check for nesting in RP
gerashegalov Mar 25, 2021
e5f9996
wip
gerashegalov Mar 26, 2021
7d0bdcc
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 26, 2021
5910092
reworked sortexec type list
gerashegalov Mar 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ Accelerator supports are described below.
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -354,9 +354,9 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -379,7 +379,7 @@ Accelerator supports are described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -12421,7 +12421,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -12442,7 +12442,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
30 changes: 24 additions & 6 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def start(self, rand):
POS_FLOAT_NAN_MAX_VALUE = struct.unpack('f', struct.pack('I', 0x7fffffff))[0]
class FloatGen(DataGen):
"""Generate floats, which some built in corner cases."""
def __init__(self, nullable=True,
def __init__(self, nullable=True,
no_nans=False, special_cases=None):
self._no_nans = no_nans
if special_cases is None:
Expand Down Expand Up @@ -334,7 +334,7 @@ def gen_float():
POS_DOUBLE_NAN_MAX_VALUE = struct.unpack('d', struct.pack('L', 0x7fffffffffffffff))[0]
class DoubleGen(DataGen):
"""Generate doubles, which some built in corner cases."""
def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False,
def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False,
nullable=True, special_cases = None):
self._min_exp = min_exp
self._max_exp = max_exp
Expand Down Expand Up @@ -447,7 +447,7 @@ def __init__(self, start=None, end=None, nullable=True):

self._start_day = self._to_days_since_epoch(start)
self._end_day = self._to_days_since_epoch(end)

self.with_special_case(start)
self.with_special_case(end)

Expand Down Expand Up @@ -652,9 +652,27 @@ def gen_scalar_value(data_gen, seed=0, force_no_nulls=False):
v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]

def debug_df(df):
"""print out the contents of a dataframe for debugging."""
print('COLLECTED\n{}'.format(df.collect()))
def debug_df(df, path = None, file_format = 'json', num_parts = 1):
revans2 marked this conversation as resolved.
Show resolved Hide resolved
"""Print out or save the contents and the schema of a dataframe for debugging."""

if path is not None:
# Save the dataframe and its schema
# The schema can be re-created by using DataType.fromJson and used
# for loading the dataframe
file_name = f"{path}.{file_format}"
schema_file_name = f"{path}.schema.json"

df.coalesce(num_parts).write.format(file_format).save(file_name)
print(f"SAVED df output for debugging at {file_name}")

schema_json = df.schema.json()
schema_file = open(schema_file_name , 'w')
schema_file.write(schema_json)
schema_file.close()
print(f"SAVED df schema for debugging along in the output dir")
else:
print('COLLECTED\n{}'.format(df.collect()))

df.explain()
df.printSchema()
return df
Expand Down
66 changes: 66 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,46 @@ def test_single_orderby(data_gen, order):
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
conf = allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('shuffle_parts', [
pytest.param(1),
pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607"))
])
@pytest.mark.parametrize('stable_sort', [
pytest.param(True),
pytest.param(False, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607"))
])
@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]]),
marks=pytest.mark.xfail(reason='second-level structs are not supported')),
pytest.param(ArrayGen(string_gen),
marks=pytest.mark.xfail(reason="arrays are not supported")),
pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen),
marks=pytest.mark.xfail(reason="maps are not supported")),
], ids=idfn)
@pytest.mark.parametrize('order', [
pytest.param(f.col('a').asc()),
pytest.param(f.col('a').asc_nulls_first()),
pytest.param(f.col('a').asc_nulls_last(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc()),
pytest.param(f.col('a').desc_nulls_first(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc_nulls_last()),
], ids=idfn)
def test_single_nested_orderby_plain(data_gen, order, shuffle_parts, stable_sort):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
# TODO no interference with range partition once implemented
conf = {
**allow_negative_scale_of_decimal_conf,
**{
'spark.sql.shuffle.partitions': shuffle_parts,
'spark.rapids.sql.stableSort.enabled': stable_sort,
'spark.rapids.allowCpuRangePartitioning': False
}
})

# SPARK CPU itself has issue with negative scale for take ordered and project
orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)]
@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn)
Expand All @@ -42,6 +82,32 @@ def test_single_orderby_with_limit(data_gen, order):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100))

@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]]),
marks=pytest.mark.xfail(reason='second-level structs are not supported')),
pytest.param(ArrayGen(string_gen),
marks=pytest.mark.xfail(reason="arrays are not supported")),
pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen),
marks=pytest.mark.xfail(reason="maps are not supported")),
], ids=idfn)
@pytest.mark.parametrize('order', [
pytest.param(f.col('a').asc()),
pytest.param(f.col('a').asc_nulls_first()),
pytest.param(f.col('a').asc_nulls_last(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc()),
pytest.param(f.col('a').desc_nulls_first(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc_nulls_last()),
], ids=idfn)
def test_single_nested_orderby_with_limit(data_gen, order):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100),
conf = {
'spark.rapids.allowCpuRangePartitioning': False
})

@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_sort_in_part(data_gen, order):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,21 @@ object GpuOverrides {
"\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R",
"?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">")

private[this] val pluginSupportedOrderableSig = (
TypeSig.commonCudfTypes +
TypeSig.NULL +
TypeSig.DECIMAL +
TypeSig.STRUCT.nested(
TypeSig.commonCudfTypes +
TypeSig.NULL +
TypeSig.DECIMAL
))

private[this] def isNestedType(dataType: DataType) = dataType match {
case ArrayType(_, _) | MapType(_, _, _) | StructType(_) => true
case _ => false
}

// this listener mechanism is global and is intended for use by unit tests only
private val listeners: ListBuffer[GpuOverridesListener] = new ListBuffer[GpuOverridesListener]()

Expand Down Expand Up @@ -1803,16 +1818,28 @@ object GpuOverrides {
expr[SortOrder](
"Sort order",
ExprChecks.projectOnly(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
pluginSupportedOrderableSig,
TypeSig.orderable,
Seq(ParamCheck(
"input",
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
pluginSupportedOrderableSig,
TypeSig.orderable))),
(a, conf, p, r) => new BaseExprMeta[SortOrder](a, conf, p, r) {
(sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (isNestedType(sortOrder.dataType)) {
val nullOrdering = sortOrder.nullOrdering
val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering
val direction = sortOrder.direction.sql
if (nullOrdering != directionDefaultNullOrdering) {
willNotWorkOnGpu(s"only default null ordering $directionDefaultNullOrdering " +
s"for direction $direction is supported for nested types; actual: ${nullOrdering}")
}
}
}

// One of the few expressions that are not replaced with a GPU version
override def convertToGpu(): Expression =
a.withNewChildren(childExprs.map(_.convertToGpu()))
sortOrder.withNewChildren(childExprs.map(_.convertToGpu()))
}),
expr[Count](
"Count aggregate operator",
Expand Down Expand Up @@ -2491,6 +2518,14 @@ object GpuOverrides {
override val childExprs: Seq[BaseExprMeta[_]] =
rp.ordering.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override def tagPartForGpu() {
val numPartitions = rp.numPartitions
if (numPartitions > 1) {
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
willNotWorkOnGpu("only single partition sort is supported, " +
s"actual partitions: $numPartitions")
}
}

override def convertToGpu(): GpuPartitioning = {
if (rp.numPartitions > 1) {
val gpuOrdering = childExprs.map(_.convertToGpu()).asInstanceOf[Seq[SortOrder]]
Expand Down Expand Up @@ -2604,7 +2639,7 @@ object GpuOverrides {
}),
exec[TakeOrderedAndProjectExec](
"Take the first limit elements as defined by the sortOrder, and do projection if needed.",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, TypeSig.all),
revans2 marked this conversation as resolved.
Show resolved Hide resolved
ExecChecks(pluginSupportedOrderableSig, TypeSig.all),
(takeExec, conf, p, r) =>
new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
Expand Down Expand Up @@ -2668,7 +2703,7 @@ object GpuOverrides {
}),
exec[CollectLimitExec](
"Reduce to single partition and apply limit",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL, TypeSig.all),
ExecChecks(pluginSupportedOrderableSig, TypeSig.all),
(collectLimitExec, conf, p, r) => new GpuCollectLimitMeta(collectLimitExec, conf, p, r))
.disabledByDefault("Collect Limit replacement can be slower on the GPU, if huge number " +
"of rows in a batch it could help by limiting the number of rows transferred from " +
Expand Down Expand Up @@ -2737,9 +2772,14 @@ object GpuOverrides {
"The backend for the sort operator",
// The SortOrder TypeSig will govern what types can actually be used as sorting key data type.
// The types below are allowed as inputs and outputs.
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
jlowe marked this conversation as resolved.
Show resolved Hide resolved
TypeSig.STRUCT).nested(), TypeSig.all),
(sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)),
ExecChecks(pluginSupportedOrderableSig, TypeSig.all),
(sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r) {
override def tagPlanForGpu() {
if (!conf.stableSort && sort.sortOrder.exists(so => isNestedType(so.dataType))) {
willNotWorkOnGpu(s"it's disabled unless ${RapidsConf.STABLE_SORT.key} is true")
}
}
}),
exec[ExpandExec](
"The backend for the expand operator",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.nvidia.spark.rapids

import org.apache.spark.RangePartitioner
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec}
Expand Down Expand Up @@ -406,8 +408,11 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case _: GpuColumnarToRowExecParent => () // Ignored
case _: ExecutedCommandExec => () // Ignored
case _: RDDScanExec => () // Ignored
case _: ShuffleExchangeExec => () // Ignored for now, we don't force it to the GPU if
// children are not on the gpu
case shuffleExchange: ShuffleExchangeExec if conf.cpuRangePartitioningPermitted
|| !shuffleExchange.outputPartitioning.isInstanceOf[RangePartitioning] => {
// Ignored for now, we don't force it to the GPU if
// children are not on the gpu
}
case other =>
if (!plan.supportsColumnar &&
!conf.testingAllowedNonGpu.contains(getBaseNameFromClass(other.getClass.toString))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,12 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val CPU_RANGE_PARTITIONING_ALLOWED = conf("spark.rapids.allowCpuRangePartitioning")
.doc("Option to control enforcement of range partitioning on GPU.")
.internal()
.booleanConf
.createWithDefault(true)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -1273,6 +1279,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val getAlluxioPathsToReplace: Option[Seq[String]] = get(ALLUXIO_PATHS_REPLACE)

lazy val cpuRangePartitioningPermitted = get(CPU_RANGE_PARTITIONING_ALLOWED)

def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = {
val default = !(isDisabledByDefault || incompat) || (incompat && isIncompatEnabled)
conf.get(key).map(toBoolean(_, key)).getOrElse(default)
Expand Down
40 changes: 29 additions & 11 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ final class TypeSig private(
new TypeSig(it, nt, lt, nts)
}

/**
* Remove a type signature. The reverse of +
* @param other what to remove
* @return the new signature
*/
def - (other: TypeSig): TypeSig = {
val it = initialTypes -- other.initialTypes
val nt = nestedTypes -- other.nestedTypes
val lt = litOnlyTypes -- other.litOnlyTypes
val nts = notes -- other.notes.keySet
new TypeSig(it, nt, lt, nts)
}

/**
* Add nested types to this type signature. Note that these do not stack so if nesting has
* nested types too they are ignored.
Expand Down Expand Up @@ -542,18 +555,23 @@ class ExecChecks private(
override def tag(meta: RapidsMeta[_, _, _]): Unit = {
val plan = meta.wrapped.asInstanceOf[SparkPlan]
val allowDecimal = meta.conf.decimalTypeEnabled
if (!check.areAllSupportedByPlugin(plan.output.map(_.dataType), allowDecimal)) {
val unsupported = plan.output.map(_.dataType)
.filter(!check.isSupportedByPlugin(_, allowDecimal))
.toSet
meta.willNotWorkOnGpu(s"unsupported data types in output: ${unsupported.mkString(", ")}")

val unsupportedOutputTypes = plan.output
.filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal))
.toSet

if (unsupportedOutputTypes.nonEmpty) {
meta.willNotWorkOnGpu("unsupported data types in output: " +
unsupportedOutputTypes.mkString(", "))
}
if (!check.areAllSupportedByPlugin(
plan.children.flatMap(_.output.map(_.dataType)),
allowDecimal)) {
val unsupported = plan.children.flatMap(_.output.map(_.dataType))
.filter(!check.isSupportedByPlugin(_, allowDecimal)).toSet
meta.willNotWorkOnGpu(s"unsupported data types in input: ${unsupported.mkString(", ")}")

val unsupportedInputTypes = plan.children.flatMap { childPlan =>
childPlan.output.filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal))
}.toSet

if (unsupportedInputTypes.nonEmpty) {
meta.willNotWorkOnGpu("unsupported data types in input: " +
unsupportedInputTypes.mkString(", "))
}
}

Expand Down