Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable sort for single-level nesting struct columns on GPU #1883

Merged
merged 35 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cd3c436
Enable sort of structs
gerashegalov Mar 5, 2021
00ba3ff
struct gens for test_single_orderby
gerashegalov Mar 5, 2021
1463028
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 5, 2021
586f28f
revert unneeded data_gen change
gerashegalov Mar 5, 2021
18d53a4
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 10, 2021
c6407d0
interactive tests pass
gerashegalov Mar 11, 2021
88c8a9a
add integration tests
gerashegalov Mar 11, 2021
ddd60e4
revert GpuTimeMath refactor
gerashegalov Mar 11, 2021
ce7568c
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 11, 2021
ad934b2
add tests for limit
gerashegalov Mar 12, 2021
5910d1a
Consolidate RAT settings in parent pom
gerashegalov Mar 12, 2021
c538bc2
Merge branch 'ratPluginSettingsInParent' into issue-1605-struct-sort
gerashegalov Mar 12, 2021
d7ab1ea
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 12, 2021
33762de
Enable saving data for debugging
gerashegalov Mar 12, 2021
efbfa3f
Save schema for debugging
gerashegalov Mar 12, 2021
21a05c7
wip
gerashegalov Mar 13, 2021
1dd4b23
cleanup
gerashegalov Mar 13, 2021
b6f44da
comment
gerashegalov Mar 13, 2021
1f4514d
uncomment test params
gerashegalov Mar 15, 2021
327e100
Bobby's review:
gerashegalov Mar 16, 2021
205b7c3
Bobby's review #2
gerashegalov Mar 16, 2021
9f01f85
proper handling of UDT nested
gerashegalov Mar 16, 2021
ff313ac
doc update
gerashegalov Mar 16, 2021
3391834
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 17, 2021
db64198
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 20, 2021
d0ba59b
workaround upper/lower_bound unsupported
gerashegalov Mar 23, 2021
46269e7
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 23, 2021
7b57913
Restrict struct-sort to a single partition
gerashegalov Mar 23, 2021
196ebc9
improve error messages
gerashegalov Mar 24, 2021
4b1f742
correct CPU constraints
gerashegalov Mar 25, 2021
7e8c9f8
test fix
gerashegalov Mar 25, 2021
9ee262b
check for nesting in RP
gerashegalov Mar 25, 2021
e5f9996
wip
gerashegalov Mar 26, 2021
7d0bdcc
Merge remote-tracking branch 'origin/branch-0.5' into issue-1605-stru…
gerashegalov Mar 26, 2021
5910092
reworked sortexec type list
gerashegalov Mar 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
<excludes>
<exclude>dependency-reduced-pom.xml</exclude>
<exclude>*pom.xml.asc</exclude>
<exclude>.*</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
6 changes: 3 additions & 3 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ Accelerator supports are described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -12237,7 +12237,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -12258,7 +12258,7 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def start(self, rand):
POS_FLOAT_NAN_MAX_VALUE = struct.unpack('f', struct.pack('I', 0x7fffffff))[0]
class FloatGen(DataGen):
"""Generate floats, which some built in corner cases."""
def __init__(self, nullable=True,
def __init__(self, nullable=True,
no_nans=False, special_cases=None):
self._no_nans = no_nans
if special_cases is None:
Expand Down Expand Up @@ -319,7 +319,7 @@ def gen_float():
POS_DOUBLE_NAN_MAX_VALUE = struct.unpack('d', struct.pack('L', 0x7fffffffffffffff))[0]
class DoubleGen(DataGen):
"""Generate doubles, which some built in corner cases."""
def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False,
def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False,
nullable=True, special_cases = None):
self._min_exp = min_exp
self._max_exp = max_exp
Expand Down Expand Up @@ -432,7 +432,7 @@ def __init__(self, start=None, end=None, nullable=True):

self._start_day = self._to_days_since_epoch(start)
self._end_day = self._to_days_since_epoch(end)

self.with_special_case(start)
self.with_special_case(end)

Expand Down
24 changes: 24 additions & 0 deletions integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ def test_single_orderby(data_gen, order):
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
conf = allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', [
pytest.param(all_basic_struct_gen),
pytest.param(StructGen([['child0', all_basic_struct_gen]]),
marks=pytest.mark.xfail(reason='second-level structs are not supported')),
pytest.param(ArrayGen(string_gen),
marks=pytest.mark.xfail(reason="arrays are not supported")),
pytest.param(MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen),
marks=pytest.mark.xfail(reason="maps are not supported")),
], ids=idfn)
@pytest.mark.parametrize('order', [
pytest.param(f.col('a').asc()),
pytest.param(f.col('a').asc_nulls_first()),
pytest.param(f.col('a').asc_nulls_last(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc()),
pytest.param(f.col('a').desc_nulls_first(),
marks=pytest.mark.xfail(reason='opposite null order not supported')),
pytest.param(f.col('a').desc_nulls_last()),
], ids=idfn)
def test_single_nested_order_orderby(data_gen, order):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
conf = allow_negative_scale_of_decimal_conf)

# SPARK CPU itself has issue with negative scale for take ordered and project
orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)]
@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn)
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@
<exclude>.git/**</exclude>
<exclude>.pytest_cache/**</exclude>
<exclude>.github/pull_request_template.md</exclude>
<exclude>.vscode/**</exclude>
<exclude>.metals/**</exclude>
<exclude>**/*.md</exclude>
<exclude>**/*.iml</exclude>
<exclude>NOTICE-binary</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,16 @@ object GpuOverrides {
"\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R",
"?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">")

private[this] val sortOrderTypeSigs = (
TypeSig.commonCudfTypes +
TypeSig.NULL +
TypeSig.DECIMAL +
TypeSig.STRUCT.nested(
TypeSig.commonCudfTypes +
TypeSig.NULL +
TypeSig.DECIMAL
))

// this listener mechanism is global and is intended for use by unit tests only
private val listeners: ListBuffer[GpuOverridesListener] = new ListBuffer[GpuOverridesListener]()

Expand Down Expand Up @@ -1803,16 +1813,24 @@ object GpuOverrides {
expr[SortOrder](
"Sort order",
ExprChecks.projectOnly(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
sortOrderTypeSigs,
TypeSig.orderable,
Seq(ParamCheck(
"input",
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
sortOrderTypeSigs,
TypeSig.orderable))),
(a, conf, p, r) => new BaseExprMeta[SortOrder](a, conf, p, r) {
(sortOrder, conf, p, r) => new BaseExprMeta[SortOrder](sortOrder, conf, p, r) {
override def tagExprForGpu(): Unit = {
val directionDefaultNullOrdering = sortOrder.direction.defaultNullOrdering
if (sortOrder.nullOrdering != directionDefaultNullOrdering) {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
willNotWorkOnGpu(s"Only default null ordering $directionDefaultNullOrdering " +
s"supported. Found: ${sortOrder.nullOrdering}")
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
}
}

// One of the few expressions that are not replaced with a GPU version
override def convertToGpu(): Expression =
a.withNewChildren(childExprs.map(_.convertToGpu()))
sortOrder.withNewChildren(childExprs.map(_.convertToGpu()))
}),
expr[Count](
"Count aggregate operator",
Expand Down Expand Up @@ -2557,7 +2575,7 @@ object GpuOverrides {
}),
exec[TakeOrderedAndProjectExec](
"Take the first limit elements as defined by the sortOrder, and do projection if needed.",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, TypeSig.all),
revans2 marked this conversation as resolved.
Show resolved Hide resolved
ExecChecks(sortOrderTypeSigs, TypeSig.all),
(takeExec, conf, p, r) =>
new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
Expand Down
27 changes: 16 additions & 11 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -542,18 +542,23 @@ class ExecChecks private(
override def tag(meta: RapidsMeta[_, _, _]): Unit = {
val plan = meta.wrapped.asInstanceOf[SparkPlan]
val allowDecimal = meta.conf.decimalTypeEnabled
if (!check.areAllSupportedByPlugin(plan.output.map(_.dataType), allowDecimal)) {
val unsupported = plan.output.map(_.dataType)
.filter(!check.isSupportedByPlugin(_, allowDecimal))
.toSet
meta.willNotWorkOnGpu(s"unsupported data types in output: ${unsupported.mkString(", ")}")

val unsupportedOutputTypes = plan.output
.filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal))
.toSet

if (unsupportedOutputTypes.nonEmpty) {
meta.willNotWorkOnGpu("unsupported data types in output: " +
unsupportedOutputTypes.mkString(", "))
}
if (!check.areAllSupportedByPlugin(
plan.children.flatMap(_.output.map(_.dataType)),
allowDecimal)) {
val unsupported = plan.children.flatMap(_.output.map(_.dataType))
.filter(!check.isSupportedByPlugin(_, allowDecimal)).toSet
meta.willNotWorkOnGpu(s"unsupported data types in input: ${unsupported.mkString(", ")}")

val unsupportedInputTypes = plan.children.flatMap { childPlan =>
childPlan.output.filterNot(attr => check.isSupportedByPlugin(attr.dataType, allowDecimal))
}.toSet

if (unsupportedInputTypes.nonEmpty) {
meta.willNotWorkOnGpu("unsupported data types in input: " +
unsupportedInputTypes.mkString(", "))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ abstract class GpuTimeMath(

override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess

protected val microSecondsInOneDay: Long = TimeUnit.DAYS.toMicros(1)
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved

override def columnarEval(batch: ColumnarBatch): Any = {
var lhs: Any = null
var rhs: Any = null
Expand All @@ -147,7 +149,7 @@ abstract class GpuTimeMath(
if (intvl.months != 0) {
throw new UnsupportedOperationException("Months aren't supported at the moment")
}
val usToSub = intvl.days.toLong * 24 * 60 * 60 * 1000 * 1000 + intvl.microseconds
val usToSub = intvl.days * microSecondsInOneDay + intvl.microseconds
if (usToSub != 0) {
withResource(Scalar.fromLong(usToSub)) { us_s =>
withResource(l.getBase.logicalCastTo(DType.INT64)) { us =>
Expand Down Expand Up @@ -230,7 +232,6 @@ case class GpuDateAddInterval(start: Expression,
if (intvl.months != 0) {
throw new UnsupportedOperationException("Months aren't supported at the moment")
}
val microSecondsInOneDay = TimeUnit.DAYS.toMicros(1)
val microSecToDays = if (intvl.microseconds < 0) {
// This is to calculate when subtraction is performed. Need to take into account the
// interval( which are less than days). Convert it into days which needs to be
Expand Down