Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TakeOrderedAndProject #1579

Merged
merged 3 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,37 @@ with `collect`, `show` or `write` a new `DataFrame` is constructed causing Spark
query. This is why `spark.rapids.sql.enabled` is still respected when running, even if explain shows
stale results.

### Why does the plan for the GPU query look different from the CPU query?

Typically, there is a one to one mapping between CPU stages in a plan and GPU stages. There are a
few places where this is not the case.

* `WholeStageCodeGen` - The GPU plan typically does not do code generation, and does not support
generating code for an entire stage in the plan. Code generation reduces the cost of processing
data one row at a time. The GPU plan processes the data in a columnar format, so the costs
of processing a batch is amortized over the entire batch of data and code generation is not
needed.

* `ColumnarToRow` and `RowToColumnar` transitions - The CPU version of Spark plans typically process
data in a row based format. The main exception to this is reading some kinds of columnar data,
like Parquet. Transitioning between the CPU and the GPU also requires transitioning between row
and columnar formatted data.

* `GpuCoalesceBatches` and `GpuShuffleCoalesce` - Processing data on the GPU scales
sublinearly. That means doubling the data does often takes less than half the time. Because of
this we want to process larger batches of data when possible. These operators will try to combine
smaller batches of data into fewer, larger batches to process more efficiently.

* `SortMergeJoin` - The RAPIDS accelerator does not support sort merge joins yet. For now, we
translate sort merge joins into shuffled hash joins. Because of this there are times when sorts
may be removed or other sorts added to meet the ordering requirements of the query.

* `TakeOrderedAndProject` - The `TakeOrderedAndProject` operator will take the top N entries in
each task, shuffle the results to a single executor and then take the top N results from that.
The GPU plan often has more metrics than the CPU versions do, and when we tried to combine all of
these operations into a single stage the metrics were confusing to understand. Instead, we split
the single stage up into multiple smaller parts, so the metrics are clearer.

### What versions of Apache Spark does the RAPIDS Accelerator for Apache Spark support?

The RAPIDS Accelerator for Apache Spark requires version 3.0.0 or 3.0.1 of Apache Spark. Because the
Expand Down
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.ProjectExec"></a>spark.rapids.sql.exec.ProjectExec|The backend for most select, withColumn and dropColumn statements|true|None|
<a name="sql.exec.RangeExec"></a>spark.rapids.sql.exec.RangeExec|The backend for range operator|true|None|
<a name="sql.exec.SortExec"></a>spark.rapids.sql.exec.SortExec|The backend for the sort operator|true|None|
<a name="sql.exec.TakeOrderedAndProjectExec"></a>spark.rapids.sql.exec.TakeOrderedAndProjectExec|Take the first limit elements as defined by the sortOrder, and do projection if needed.|true|None|
<a name="sql.exec.UnionExec"></a>spark.rapids.sql.exec.UnionExec|The backend for the union operator|true|None|
<a name="sql.exec.CustomShuffleReaderExec"></a>spark.rapids.sql.exec.CustomShuffleReaderExec|A wrapper of shuffle query stage|true|None|
<a name="sql.exec.HashAggregateExec"></a>spark.rapids.sql.exec.HashAggregateExec|The backend for hash based aggregations|true|None|
Expand Down
31 changes: 27 additions & 4 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -281,8 +281,8 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -360,6 +360,29 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<td>TakeOrderedAndProjectExec</td>
<td>Take the first limit elements as defined by the sortOrder, and do projection if needed.</td>
<td>None</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>UnionExec</td>
<td>The backend for the union operator</td>
<td>None</td>
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def setup(self, spark):
}
if not self.tpcds_format in formats:
raise RuntimeError("{} is not a supported tpcds input type".format(self.tpcds_format))
formats.get(self.tpcds_format)(jvm_session, self.tpcds_path, True)
formats.get(self.tpcds_format)(jvm_session, self.tpcds_path, True, True)

def do_test_query(self, query):
spark = get_spark_i_know_what_i_am_doing()
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -221,13 +221,13 @@ def __init__(self, precision=None, scale=None, nullable=True, special_cases=[]):
DECIMAL_MIN = Decimal('-' + ('9' * precision) + 'e' + str(-scale))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
DECIMAL_MAX = Decimal(('9'* precision) + 'e' + str(-scale))
super().__init__(DecimalType(precision, scale), nullable=nullable, special_cases=special_cases)
self._scale = scale
self._precision = precision
self.scale = scale
self.precision = precision
pattern = "[0-9]{1,"+ str(precision) + "}e" + str(-scale)
self.base_strs = sre_yield.AllStrings(pattern, flags=0, charset=sre_yield.CHARSET, max_count=_MAX_CHOICES)

def __repr__(self):
return super().__repr__() + '(' + str(self._precision) + ',' + str(self._scale) + ')'
return super().__repr__() + '(' + str(self.precision) + ',' + str(self.scale) + ')'

def start(self, rand):
strs = self.base_strs
Expand Down
17 changes: 16 additions & 1 deletion integration_tests/src/main/python/sort_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,14 @@ def test_single_orderby(data_gen, order):
lambda spark : unary_op_df(spark, data_gen).orderBy(order),
jlowe marked this conversation as resolved.
Show resolved Hide resolved
conf = allow_negative_scale_of_decimal_conf)

# SPARK CPU itself has issue with negative scale for take ordered and project
orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen) if not (isinstance(n, DecimalGen) and n.scale < 0)]
@pytest.mark.parametrize('data_gen', orderable_without_neg_decimal, ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_orderby_with_limit(data_gen, order):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).orderBy(order).limit(100))

@pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn)
@pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
def test_single_sort_in_part(data_gen, order):
Expand All @@ -52,3 +60,10 @@ def test_multi_orderby(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()),
conf = allow_negative_scale_of_decimal_conf)

# SPARK CPU itself has issue with negative scale for take ordered and project
orderable_gens_sort_without_neg_decimal = [n for n in orderable_gens_sort if not (isinstance(n, DecimalGen) and n.scale < 0)]
@pytest.mark.parametrize('data_gen', orderable_gens_sort_without_neg_decimal, ids=idfn)
def test_multi_orderby_with_limit(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()).limit(100))
Original file line number Diff line number Diff line change
Expand Up @@ -2355,13 +2355,38 @@ object GpuOverrides {
GpuDataWritingCommandExec(childDataWriteCmds.head.convertToGpu(),
childPlans.head.convertIfNeeded())
}),
exec[TakeOrderedAndProjectExec](
"Take the first limit elements as defined by the sortOrder, and do projection if needed.",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL, TypeSig.all),
(takeExec, conf, p, r) =>
new SparkPlanMeta[TakeOrderedAndProjectExec](takeExec, conf, p, r) {
val sortOrder: Seq[BaseExprMeta[SortOrder]] =
takeExec.sortOrder.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val projectList: Seq[BaseExprMeta[NamedExpression]] =
takeExec.projectList.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
override val childExprs: Seq[BaseExprMeta[_]] = sortOrder ++ projectList

override def convertToGpu(): GpuExec = {
// To avoid metrics confusion we split a single stage up into multiple parts
val so = sortOrder.map(_.convertToGpu().asInstanceOf[SortOrder])
GpuTopN(takeExec.limit,
so,
projectList.map(_.convertToGpu().asInstanceOf[NamedExpression]),
ShimLoader.getSparkShims.getGpuShuffleExchangeExec(GpuSinglePartitioning(Seq.empty),
GpuTopN(takeExec.limit,
so,
takeExec.child.output,
childPlans.head.convertIfNeeded())))
}
}),
exec[LocalLimitExec](
"Per-partition limiting of results",
ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL,
TypeSig.all),
(localLimitExec, conf, p, r) =>
new SparkPlanMeta[LocalLimitExec](localLimitExec, conf, p, r) {
override def convertToGpu(): GpuExec =
GpuLocalLimitExec(localLimitExec.limit, childPlans(0).convertIfNeeded())
GpuLocalLimitExec(localLimitExec.limit, childPlans.head.convertIfNeeded())
}),
exec[ArrowEvalPythonExec](
"The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" +
Expand All @@ -2388,11 +2413,11 @@ object GpuOverrides {
}),
exec[GlobalLimitExec](
"Limiting of results across partitions",
ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(globalLimitExec, conf, p, r) =>
new SparkPlanMeta[GlobalLimitExec](globalLimitExec, conf, p, r) {
override def convertToGpu(): GpuExec =
GpuGlobalLimitExec(globalLimitExec.limit, childPlans(0).convertIfNeeded())
GpuGlobalLimitExec(globalLimitExec.limit, childPlans.head.convertIfNeeded())
}),
exec[CollectLimitExec](
"Reduce to single partition and apply limit",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,8 @@ trait SpillableColumnarBatch extends AutoCloseable {
* with decompressing the data if necessary.
jlowe marked this conversation as resolved.
Show resolved Hide resolved
*/
def getColumnarBatch(): ColumnarBatch

def sizeInBytes: Long
}

/**
Expand All @@ -57,6 +59,7 @@ class JustRowsColumnarBatch(numRows: Int) extends SpillableColumnarBatch {
override def getColumnarBatch(): ColumnarBatch =
new ColumnarBatch(Array.empty, numRows)
override def close(): Unit = () // NOOP nothing to close
override val sizeInBytes: Long = 0L
}

/**
Expand All @@ -82,6 +85,11 @@ class SpillableColumnarBatchImpl (id: TempSpillBufferId,
*/
def spillId: TempSpillBufferId = id

override lazy val sizeInBytes: Long =
withResource(RapidsBufferCatalog.acquireBuffer(id)) { buff =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
buff.size
}

/**
* Set a new spill priority.
*/
Expand Down
Loading