Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable sort for single-level nesting struct columns on GPU #1883

Merged
merged 35 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cd3c436
Enable sort of structs
gerashegalov Mar 5, 2021
00ba3ff
struct gens for test_single_orderby
gerashegalov Mar 5, 2021
1463028
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 5, 2021
586f28f
revert unneeded data_gen change
gerashegalov Mar 5, 2021
18d53a4
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 10, 2021
c6407d0
interactive tests pass
gerashegalov Mar 11, 2021
88c8a9a
add integration tests
gerashegalov Mar 11, 2021
ddd60e4
revert GpuTimeMath refactor
gerashegalov Mar 11, 2021
ce7568c
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 11, 2021
ad934b2
add tests for limit
gerashegalov Mar 12, 2021
5910d1a
Consolidate RAT settings in parent pom
gerashegalov Mar 12, 2021
c538bc2
Merge branch 'ratPluginSettingsInParent' into issue-1605-struct-sort
gerashegalov Mar 12, 2021
d7ab1ea
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 12, 2021
33762de
Enable saving data for debugging
gerashegalov Mar 12, 2021
efbfa3f
Save schema for debugging
gerashegalov Mar 12, 2021
21a05c7
wip
gerashegalov Mar 13, 2021
1dd4b23
cleanup
gerashegalov Mar 13, 2021
b6f44da
comment
gerashegalov Mar 13, 2021
1f4514d
uncomment test params
gerashegalov Mar 15, 2021
327e100
Bobby's review:
gerashegalov Mar 16, 2021
205b7c3
Bobby's review #2
gerashegalov Mar 16, 2021
9f01f85
proper handling of UDT nested
gerashegalov Mar 16, 2021
ff313ac
doc update
gerashegalov Mar 16, 2021
3391834
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 17, 2021
db64198
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 20, 2021
d0ba59b
workaround upper/lower_bound unsupported
gerashegalov Mar 23, 2021
46269e7
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 23, 2021
7b57913
Restrict struct-sort to a single partition
gerashegalov Mar 23, 2021
196ebc9
improve error messages
gerashegalov Mar 24, 2021
4b1f742
correct CPU constraints
gerashegalov Mar 25, 2021
7e8c9f8
test fix
gerashegalov Mar 25, 2021
9ee262b
check for nesting in RP
gerashegalov Mar 25, 2021
e5f9996
wip
gerashegalov Mar 26, 2021
7d0bdcc
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 26, 2021
5910092
reworked sortexec type list
gerashegalov Mar 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ Accelerator supports are described below.
<td>S*</td>
<td>S</td>
<td>S*</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS* (missing nested UDT)</em></td>
<td><em>PS* (missing nested UDT)</em></td>
<td><em>PS* (missing nested UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -352,11 +352,11 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td>S</td>
<td>S</td>
<td><em>PS* (missing nested UDT)</em></td>
<td><em>PS* (missing nested UDT)</em></td>
<td><em>PS* (missing nested UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -375,11 +375,11 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td>S</td>
<td><em>PS* (missing nested UDT)</em></td>
<td><em>PS* (missing nested UDT)</em></td>
<td><em>PS* (missing nested UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -12421,7 +12421,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -12442,7 +12442,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
30 changes: 24 additions & 6 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def start(self, rand):
POS_FLOAT_NAN_MAX_VALUE = struct.unpack('f', struct.pack('I', 0x7fffffff))[0]
class FloatGen(DataGen):
"""Generate floats, which some built in corner cases."""
def __init__(self, nullable=True,
def __init__(self, nullable=True,
no_nans=False, special_cases=None):
self._no_nans = no_nans
if special_cases is None:
Expand Down Expand Up @@ -334,7 +334,7 @@ def gen_float():
POS_DOUBLE_NAN_MAX_VALUE = struct.unpack('d', struct.pack('L', 0x7fffffffffffffff))[0]
class DoubleGen(DataGen):
"""Generate doubles, which some built in corner cases."""
def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False,
def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False,
nullable=True, special_cases = None):
self._min_exp = min_exp
self._max_exp = max_exp
Expand Down Expand Up @@ -447,7 +447,7 @@ def __init__(self, start=None, end=None, nullable=True):

self._start_day = self._to_days_since_epoch(start)
self._end_day = self._to_days_since_epoch(end)

self.with_special_case(start)
self.with_special_case(end)

Expand Down Expand Up @@ -652,9 +652,27 @@ def gen_scalar_value(data_gen, seed=0, force_no_nulls=False):
v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]

def debug_df(df):
"""print out the contents of a dataframe for debugging."""
print('COLLECTED\n{}'.format(df.collect()))
def debug_df(df, path = None, file_format = 'json', num_parts = 1):
revans2 marked this conversation as resolved.
Show resolved Hide resolved
"""Print out or save the contents and the schema of a dataframe for debugging."""

if path is not None:
# Save the dataframe and its schema
# The schema can be re-created by using DataType.fromJson and used
# for loading the dataframe
file_name = f"{path}.{file_format}"
schema_file_name = f"{path}.schema.json"

df.coalesce(num_parts).write.format(file_format).save(file_name)
print(f"SAVED df output for debugging at {file_name}")

schema_json = df.schema.json()
schema_file = open(schema_file_name , 'w')
schema_file.write(schema_json)
schema_file.close()
print(f"SAVED df schema for debugging along in the output dir")
else:
print('COLLECTED\n{}'.format(df.collect()))

df.explain()
df.printSchema()
return df
Expand Down
60 changes: 60 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,38 @@ def test_single_orderby(data_gen, order):
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
conf = allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('shuffle_parts', [
pytest.param(1),
pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607"))
])
@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]]),
marks=pytest.mark.xfail(reason='second-level structs are not supported')),
pytest.param(ArrayGen(string_gen),
marks=pytest.mark.xfail(reason="arrays are not supported")),
pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen),
marks=pytest.mark.xfail(reason="maps are not supported")),
], ids=idfn)
@pytest.mark.parametrize('order', [
pytest.param(f.col('a').asc()),
pytest.param(f.col('a').asc_nulls_first()),
pytest.param(f.col('a').asc_nulls_last(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc()),
pytest.param(f.col('a').desc_nulls_first(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc_nulls_last()),
], ids=idfn)
def test_single_nested_orderby_plain(data_gen, order, shuffle_parts):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
# TODO no interference with range partition once implemented
conf = {
**allow_negative_scale_of_decimal_conf,
**{'spark.sql.shuffle.partitions': shuffle_parts}
})

# SPARK CPU itself has issue with negative scale for take ordered and project
orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)]
@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn)
Expand All @@ -42,6 +74,34 @@ def test_single_orderby_with_limit(data_gen, order):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100))

@pytest.mark.parametrize('shuffle_parts', [
pytest.param(1),
pytest.param(200, marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1607"))
])
@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]]),
marks=pytest.mark.xfail(reason='second-level structs are not supported')),
pytest.param(ArrayGen(string_gen),
marks=pytest.mark.xfail(reason="arrays are not supported")),
pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen),
marks=pytest.mark.xfail(reason="maps are not supported")),
], ids=idfn)
@pytest.mark.parametrize('order', [
pytest.param(f.col('a').asc()),
pytest.param(f.col('a').asc_nulls_first()),
pytest.param(f.col('a').asc_nulls_last(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc()),
pytest.param(f.col('a').desc_nulls_first(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc_nulls_last()),
], ids=idfn)
def test_single_nested_orderby_with_limit(data_gen, order, shuffle_parts):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100),
conf = {'spark.sql.shuffle.partitions': shuffle_parts})

@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_sort_in_part(data_gen, order):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{COALESCE_PARTITIONS_INITIAL_PARTITION_NUM, SHUFFLE_PARTITIONS}
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecMetaBase
Expand Down Expand Up @@ -625,4 +626,17 @@ class Spark300Shims extends SparkShims {
override def hasAliasQuoteFix: Boolean = false

override def hasCastFloatTimestampUpcast: Boolean = false

// https://github.com/apache/spark/blob/39542bb81f8570219770bb6533c077f44f6cbd2a/
// sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L3329
override def numShufflePartitions: Int = {
val sqlConf = SQLConf.get
val defaultPartitions = sqlConf.getConf(SHUFFLE_PARTITIONS)
if (sqlConf.adaptiveExecutionEnabled && sqlConf.coalesceShufflePartitionsEnabled) {
sqlConf.getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM)
.getOrElse(defaultPartitions)
} else {
defaultPartitions
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,21 @@ object GpuOverrides {
"\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R",
"?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">")

private[this] val pluginSupportedOrderableSig = (
TypeSig.commonCudfTypes +
TypeSig.NULL +
TypeSig.DECIMAL +
TypeSig.STRUCT.nested(
TypeSig.commonCudfTypes +
TypeSig.NULL +
TypeSig.DECIMAL
))

private[this] def isNestedType(dataType: DataType) = dataType match {
case ArrayType(_, _) | MapType(_, _, _) | StructType(_) => true
case _ => false
}

// this listener mechanism is global and is intended for use by unit tests only
private val listeners: ListBuffer[GpuOverridesListener] = new ListBuffer[GpuOverridesListener]()

Expand Down Expand Up @@ -1803,16 +1818,34 @@ object GpuOverrides {
expr[SortOrder](
"Sort order",
ExprChecks.projectOnly(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
pluginSupportedOrderableSig,
TypeSig.orderable,
Seq(ParamCheck(
"input",
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
pluginSupportedOrderableSig,
TypeSig.orderable))),
(a, conf, p, r) => new BaseExprMeta[SortOrder](a, conf, p, r) {
(sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (isNestedType(sortOrder.dataType)) {
val nullOrdering = sortOrder.nullOrdering
val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering
val direction = sortOrder.direction.sql
if (nullOrdering != directionDefaultNullOrdering) {
willNotWorkOnGpu(s"only default null ordering $directionDefaultNullOrdering " +
s"for direction $direction is supported for nested types; actual: ${nullOrdering}")
}

val numShufflePartitions = ShimLoader.getSparkShims.numShufflePartitions
if (numShufflePartitions > 1) {
willNotWorkOnGpu("only single partition sort is enabled until Range Partitioning " +
s"for structs is implemented, actual partions: $numShufflePartitions")
}
}
}

// One of the few expressions that are not replaced with a GPU version
override def convertToGpu(): Expression =
a.withNewChildren(childExprs.map(_.convertToGpu()))
sortOrder.withNewChildren(childExprs.map(_.convertToGpu()))
}),
expr[Count](
"Count aggregate operator",
Expand Down Expand Up @@ -2604,7 +2637,7 @@ object GpuOverrides {
}),
exec[TakeOrderedAndProjectExec](
"Take the first limit elements as defined by the sortOrder, and do projection if needed.",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, TypeSig.all),
revans2 marked this conversation as resolved.
Show resolved Hide resolved
ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all),
(takeExec, conf, p, r) =>
new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
Expand Down Expand Up @@ -2668,7 +2701,7 @@ object GpuOverrides {
}),
exec[CollectLimitExec](
"Reduce to single partition and apply limit",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL, TypeSig.all),
ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all),
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
(collectLimitExec, conf, p, r) => new GpuCollectLimitMeta(collectLimitExec, conf, p, r))
.disabledByDefault("Collect Limit replacement can be slower on the GPU, if huge number " +
"of rows in a batch it could help by limiting the number of rows transferred from " +
Expand Down Expand Up @@ -2737,8 +2770,7 @@ object GpuOverrides {
"The backend for the sort operator",
// The SortOrder TypeSig will govern what types can actually be used as sorting key data type.
// The types below are allowed as inputs and outputs.
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
jlowe marked this conversation as resolved.
Show resolved Hide resolved
TypeSig.STRUCT).nested(), TypeSig.all),
ExecChecks(TypeSig.all - TypeSig.UDT.nested(), TypeSig.all),
(sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)),
exec[ExpandExec](
"The backend for the expand operator",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,6 @@ trait SparkShims {
def hasAliasQuoteFix: Boolean

def hasCastFloatTimestampUpcast: Boolean

def numShufflePartitions: Int
}
40 changes: 29 additions & 11 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ final class TypeSig private(
new TypeSig(it, nt, lt, nts)
}

/**
* Remove a type signature. The reverse of +
* @param other what to remove
* @return the new signature
*/
def - (other: TypeSig): TypeSig = {
val it = initialTypes -- other.initialTypes
val nt = nestedTypes -- other.nestedTypes
val lt = litOnlyTypes -- other.litOnlyTypes
val nts = notes -- other.notes.keySet
new TypeSig(it, nt, lt, nts)
}

/**
* Add nested types to this type signature. Note that these do not stack so if nesting has
* nested types too they are ignored.
Expand Down Expand Up @@ -542,18 +555,23 @@ class ExecChecks private(
override def tag(meta: RapidsMeta[_, _, _]): Unit = {
val plan = meta.wrapped.asInstanceOf[SparkPlan]
val allowDecimal = meta.conf.decimalTypeEnabled
if (!check.areAllSupportedByPlugin(plan.output.map(_.dataType), allowDecimal)) {
val unsupported = plan.output.map(_.dataType)
.filter(!check.isSupportedByPlugin(_, allowDecimal))
.toSet
meta.willNotWorkOnGpu(s"unsupported data types in output: ${unsupported.mkString(", ")}")

val unsupportedOutputTypes = plan.output
.filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal))
.toSet

if (unsupportedOutputTypes.nonEmpty) {
meta.willNotWorkOnGpu("unsupported data types in output: " +
unsupportedOutputTypes.mkString(", "))
}
if (!check.areAllSupportedByPlugin(
plan.children.flatMap(_.output.map(_.dataType)),
allowDecimal)) {
val unsupported = plan.children.flatMap(_.output.map(_.dataType))
.filter(!check.isSupportedByPlugin(_, allowDecimal)).toSet
meta.willNotWorkOnGpu(s"unsupported data types in input: ${unsupported.mkString(", ")}")

val unsupportedInputTypes = plan.children.flatMap { childPlan =>
childPlan.output.filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal))
}.toSet

if (unsupportedInputTypes.nonEmpty) {
meta.willNotWorkOnGpu("unsupported data types in input: " +
unsupportedInputTypes.mkString(", "))
}
}

Expand Down