Skip to content

Commit

Permalink
Merge branch 'branch-22.08' into jtb-zstd-integ
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrennan333 committed Jul 14, 2022
2 parents 1abcbe6 + 4a313e7 commit 560d643
Show file tree
Hide file tree
Showing 10 changed files with 596 additions and 598 deletions.
114 changes: 74 additions & 40 deletions docs/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ The RAPIDS Accelerator for Apache Spark officially supports:
- [Databricks Runtime 9.1, 10.4](get-started/getting-started-databricks.md)
- [Google Cloud Dataproc 2.0](get-started/getting-started-gcp.md)
- [Azure Synapse](get-started/getting-started-azure-synapse-analytics.md)
- Cloudera provides the plugin packaged through
- Cloudera provides the plugin packaged through
[CDS 3.2](https://docs.cloudera.com/cdp-private-cloud-base/7.1.7/cds-3/topics/spark-spark-3-overview.html)
which is supported on the following
which is supported on the following
[CDP Private Cloud Base releases](https://docs.cloudera.com/cdp-private-cloud-base/7.1.7/cds-3/topics/spark-3-requirements.html).

Most distributions based on a supported Apache Spark version should work, but because the plugin
Expand All @@ -38,7 +38,7 @@ to set up testing and validation on their distributions.
CUDA 11.x is currently supported. Please look [here](download.md) for download links for the latest
release.

### What hardware is supported?
### What hardware is supported?

The plugin is tested and supported on V100, T4, A2, A10, A30 and A100 datacenter GPUs. It is possible
to run the plugin on GeForce desktop hardware with Volta or better architectures. GeForce hardware
Expand Down Expand Up @@ -73,7 +73,7 @@ Spark driver and executor logs with messages that are similar to the following:
### What is the right hardware setup to run GPU accelerated Spark?

GPU accelerated Spark can run on any NVIDIA Pascal or better GPU architecture, including Volta,
Turing or Ampere.
Turing or Ampere.

### What parts of Apache Spark are accelerated?

Expand Down Expand Up @@ -213,30 +213,30 @@ logging configuration.
Typically, there is a one to one mapping between CPU stages in a plan and GPU stages. There are a
few places where this is not the case.

* `WholeStageCodeGen` - The GPU plan typically does not do code generation, and does not support
generating code for an entire stage in the plan. Code generation reduces the cost of processing
data one row at a time. The GPU plan processes the data in a columnar format, so the costs
* `WholeStageCodeGen` - The GPU plan typically does not do code generation, and does not support
generating code for an entire stage in the plan. Code generation reduces the cost of processing
data one row at a time. The GPU plan processes the data in a columnar format, so the costs
of processing a batch is amortized over the entire batch of data and code generation is not
needed.

* `ColumnarToRow` and `RowToColumnar` transitions - The CPU version of Spark plans typically process
data in a row based format. The main exception to this is reading some kinds of columnar data,
* `ColumnarToRow` and `RowToColumnar` transitions - The CPU version of Spark plans typically process
data in a row based format. The main exception to this is reading some kinds of columnar data,
like Parquet. Transitioning between the CPU and the GPU also requires transitioning between row
and columnar formatted data.

* `GpuCoalesceBatches` and `GpuShuffleCoalesce` - Processing data on the GPU scales
* `GpuCoalesceBatches` and `GpuShuffleCoalesce` - Processing data on the GPU scales
sublinearly. That means doubling the data does often takes less than half the time. Because of
this we want to process larger batches of data when possible. These operators will try to combine
smaller batches of data into fewer, larger batches to process more efficiently.

* `SortMergeJoin` - The RAPIDS Accelerator does not support sort merge joins yet. For now, we
translate sort merge joins into shuffled hash joins. Because of this there are times when sorts
* `SortMergeJoin` - The RAPIDS Accelerator does not support sort merge joins yet. For now, we
translate sort merge joins into shuffled hash joins. Because of this there are times when sorts
may be removed or other sorts added to meet the ordering requirements of the query.

* `TakeOrderedAndProject` - The `TakeOrderedAndProject` operator will take the top N entries in
each task, shuffle the results to a single executor and then take the top N results from that.
The GPU plan often has more metrics than the CPU versions do, and when we tried to combine all of
these operations into a single stage the metrics were confusing to understand. Instead, we split
* `TakeOrderedAndProject` - The `TakeOrderedAndProject` operator will take the top N entries in
each task, shuffle the results to a single executor and then take the top N results from that.
The GPU plan often has more metrics than the CPU versions do, and when we tried to combine all of
these operations into a single stage the metrics were confusing to understand. Instead, we split
the single stage up into multiple smaller parts, so the metrics are clearer.

### Why does `explain()` show that the GPU will be used even after setting `spark.rapids.sql.enabled` to `false`?
Expand Down Expand Up @@ -271,9 +271,9 @@ Queries on Databricks will not fail but it can not benefit from DPP.

### Is Adaptive Query Execution (AQE) Supported?

Any operation that is supported on GPU will stay on the GPU when AQE is enabled.
Any operation that is supported on GPU will stay on the GPU when AQE is enabled.

AQE is not supported on Databricks with the plugin.
AQE is not supported on Databricks with the plugin.
If AQE is enabled on Databricks, queries may fail with `StackOverflowError` error.

#### Why does my query show as not on the GPU when Adaptive Query Execution is enabled?
Expand All @@ -282,24 +282,58 @@ When running an `explain()` on a query where AQE is on, it is possible that AQE
the plan. In this case a message stating `AdaptiveSparkPlan isFinalPlan=false` will be printed at
the top of the physical plan, and the explain output will show the query plan with CPU operators.
As the query runs, the plan on the UI will update and show operations running on the GPU. This can
happen for any AdaptiveSparkPlan where `isFinalPlan=false`.
happen for any AdaptiveSparkPlan where `isFinalPlan=false`.

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ...
```

Once the query has been executed you can access the finalized plan on WebUI and in the user code
running on the Driver, e.g. in a REPL or notebook, to confirm that the query has executed on GPU:

```Python
>>> df=spark.range(0,100).selectExpr("sum(*) as sum")
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#11]
+- HashAggregate(keys=[], functions=[partial_sum(id#0L)])
+- Range (0, 100, step=1, splits=16)


>>> df.collect()
[Row(sum=4950)]
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
GpuColumnarToRow false
+- GpuHashAggregate(keys=[], functions=[gpubasicsum(id#0L, LongType, false)]), filters=ArrayBuffer(None))
+- GpuShuffleCoalesce 2147483647
+- ShuffleQueryStage 0
+- GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#64]
+- GpuHashAggregate(keys=[], functions=[partial_gpubasicsum(id#0L, LongType, false)]), filters=ArrayBuffer(None))
+- GpuRange (0, 100, step=1, splits=16)
+- == Initial Plan ==
HashAggregate(keys=[], functions=[sum(id#0L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#11]
+- HashAggregate(keys=[], functions=[partial_sum(id#0L)])
+- Range (0, 100, step=1, splits=16)
```

### Are cache and persist supported?

Yes cache and persist are supported, the cache is GPU accelerated
but still stored on the host memory.
Please refer to [RAPIDS Cache Serializer](./additional-functionality/cache-serializer.md)
Yes cache and persist are supported, the cache is GPU accelerated
but still stored on the host memory.
Please refer to [RAPIDS Cache Serializer](./additional-functionality/cache-serializer.md)
for more details.

### Can I cache data into GPU memory?

No, that is not currently supported.
No, that is not currently supported.
It would require much larger changes to Apache Spark to be able to support this.

### Is PySpark supported?
Expand All @@ -308,8 +342,8 @@ Yes

### Are the R APIs for Spark supported?

Yes, but we don't actively test them, because the RAPIDS Accelerator hooks into Spark not at
the various language APIs but at the Catalyst level after all the various APIs have converged into
Yes, but we don't actively test them, because the RAPIDS Accelerator hooks into Spark not at
the various language APIs but at the Catalyst level after all the various APIs have converged into
the DataFrame API.

### Are the Java APIs for Spark supported?
Expand Down Expand Up @@ -380,8 +414,8 @@ There are multiple reasons why this a problematic configuration:
### Is [Multi-Instance GPU (MIG)](https://docs.nvidia.com/cuda/mig/index.html) supported?

Yes, but it requires support from the underlying cluster manager to isolate the MIG GPU instance
for each executor (e.g.: by setting `CUDA_VISIBLE_DEVICES`,
[YARN with docker isolation](https://github.com/NVIDIA/spark-rapids-examples/tree/branch-22.08/examples/MIG-Support)
for each executor (e.g.: by setting `CUDA_VISIBLE_DEVICES`,
[YARN with docker isolation](https://github.com/NVIDIA/spark-rapids-examples/tree/branch-22.08/examples/MIG-Support)
or other means).

Note that MIG is not recommended for use with the RAPIDS Accelerator since it significantly
Expand All @@ -391,7 +425,7 @@ without MIG. Also note that the UCX-based shuffle plugin will not work as well i
configuration because
[MIG does not support direct GPU to GPU transfers](https://docs.nvidia.com/datacenter/tesla/mig-user-guide/index.html#app-considerations).

However MIG can be advantageous if the cluster is intended to be shared amongst other processes
However MIG can be advantageous if the cluster is intended to be shared amongst other processes
(like ML / DL jobs).

### How can I run custom expressions/UDFs on the GPU?
Expand Down Expand Up @@ -419,7 +453,7 @@ setting.

If the UDF can not be implemented by RAPIDS Accelerated UDFs or be automatically translated to
Apache Spark operations, the RAPIDS Accelerator has an experimental feature to transfer only the
data it needs between GPU and CPU inside a query operation, instead of falling this operation back
data it needs between GPU and CPU inside a query operation, instead of falling this operation back
to CPU. This feature can be enabled by setting `spark.rapids.sql.rowBasedUDF.enabled` to true.


Expand Down Expand Up @@ -500,26 +534,26 @@ Below are some troubleshooting tips on GPU query performance issue:
the query performance. For example, if I/O is the bottleneck, we suggest optimizing the backend
storage I/O performance because the most suitable query type is computation bound instead of
I/O or network bound.

* Make sure at least the most time consuming part of the query is on the GPU. Please refer to
[Getting Started on Spark workload qualification](./get-started/getting-started-workload-qualification.md)
for more details. Ideally we hope the whole query is fully on the GPU, but if some minor part of
the query, eg. a small JDBC table scan, can not run on the GPU, it won't cause much performance
overhead. If there are some CPU fallbacks, check if those are some known features which can be
enabled by turning on some RAPIDS Accelerator parameters. If the features needed do not exist in
for more details. Ideally we hope the whole query is fully on the GPU, but if some minor part of
the query, eg. a small JDBC table scan, can not run on the GPU, it won't cause much performance
overhead. If there are some CPU fallbacks, check if those are some known features which can be
enabled by turning on some RAPIDS Accelerator parameters. If the features needed do not exist in
the most recent release of the RAPIDS Accelerator, please file a
[feature request](https://github.com/NVIDIA/spark-rapids/issues) with a minimum reproducing example.

* Tune the Spark and RAPIDS Accelerator parameters such as `spark.sql.shuffle.partitions`,
* Tune the Spark and RAPIDS Accelerator parameters such as `spark.sql.shuffle.partitions`,
`spark.sql.files.maxPartitionBytes` and `spark.rapids.sql.concurrentGpuTasks` as these configurations can affect performance of queries significantly.
Please refer to [Tuning Guide](./tuning-guide.md) for more details.

### Why is Avro library not found by RAPIDS?

If you are getting a warning `Avro library not found by the RAPIDS plugin.` or if you are getting the
`java.lang.NoClassDefFoundError: org/apache/spark/sql/v2/avro/AvroScan` error, make sure you ran the
Spark job by using the `--jars` or `--packages` option followed by the file path or maven path to
RAPIDS jar since that is the preferred way to run RAPIDS accelerator.
If you are getting a warning `Avro library not found by the RAPIDS plugin.` or if you are getting the
`java.lang.NoClassDefFoundError: org/apache/spark/sql/v2/avro/AvroScan` error, make sure you ran the
Spark job by using the `--jars` or `--packages` option followed by the file path or maven path to
RAPIDS jar since that is the preferred way to run RAPIDS accelerator.

Note, you can add locally installed jars for external packages such as Avro Data Sources and the RAPIDS Accelerator jars via `spark.driver.extraClassPath` (--driver-class-path in the client mode) on the driver side, and `spark.executor.extraClassPath` on the executor side. However, you should not mix the deploy methods for either of the external modules. Either deploy both Spark Avro and RAPIDS Accelerator jars as local jars via `extraClassPath` settings or use the `--jars` or `--packages` options.

Expand All @@ -530,7 +564,7 @@ As a consequence, per Issue #5796, if you also use the RAPIDS Shuffle Manager,
Starting from 22.06, the default value for `spark.rapids.memory.gpu.pool` is changed to `ASYNC` from
`ARENA` for CUDA 11.5+. For CUDA 11.4 and older, it will fall back to `ARENA`.

### I have more questions, where do I go?
### I have more questions, where do I go?
We use github to track bugs, feature requests, and answer questions. File an
[issue](https://github.com/NVIDIA/spark-rapids/issues/new/choose) for a bug or feature request. Ask
or answer a question on the [discussion board](https://github.com/NVIDIA/spark-rapids/discussions).
21 changes: 21 additions & 0 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,3 +642,24 @@ def test_orc_scan_with_aggregate_no_pushdown_on_col_partition(spark_tmp_path, ag
assert_gpu_and_cpu_are_equal_collect(
lambda spark: _do_orc_scan_with_agg_on_partitioned_column(spark, data_path, aggregate),
conf=_orc_aggregate_pushdown_enabled_conf)


@pytest.mark.parametrize('offset', [1,2,3,4], ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
def test_read_type_casting_integral(spark_tmp_path, offset, reader_confs, v1_enabled_list):
int_gens = [boolean_gen] + integral_gens
gen_list = [('c' + str(i), gen) for i, gen in enumerate(int_gens)]
data_path = spark_tmp_path + '/ORC_DATA'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.orc(data_path))

# build the read schema by a left shift of int_gens
shifted_int_gens = int_gens[offset:] + int_gens[:offset]
rs_gen_list = [('c' + str(i), gen) for i, gen in enumerate(shifted_int_gens)]
rs = StructGen(rs_gen_list, nullable=False).data_type
all_confs = copy_and_update(reader_confs,
{'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema(rs).orc(data_path),
conf=all_confs)
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ trait OrcShims311until320Base {
conf: Configuration,
orcReader: Reader,
dataReader: DataReader,
gen: (StripeInformation, OrcProto.StripeFooter, Array[Int], Array[Int]) => OrcOutputStripe,
gen: (StripeInformation, OrcProto.StripeFooter, Array[Int]) => OrcOutputStripe,
evolution: SchemaEvolution,
sargApp: SargApplier,
sargColumns: Array[Boolean],
ignoreNonUtf8BloomFilter: Boolean,
writerVersion: OrcFile.WriterVersion,
fileIncluded: Array[Boolean],
columnMapping: Array[Int],
idMapping: Array[Int]): ArrayBuffer[OrcOutputStripe] = {
columnMapping: Array[Int]): ArrayBuffer[OrcOutputStripe] = {
val result = new ArrayBuffer[OrcOutputStripe](stripes.length)
stripes.foreach { stripe =>
val stripeFooter = dataReader.readStripeFooter(stripe)
Expand All @@ -82,7 +81,7 @@ trait OrcShims311until320Base {
}

if (needStripe) {
result.append(gen(stripe, stripeFooter, columnMapping, idMapping))
result.append(gen(stripe, stripeFooter, columnMapping))
}
}
result
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.hadoop.conf.Configuration
import org.apache.orc.Reader

import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.types.StructType

object OrcReadingShims {

/**
* @return Returns the combination of requested column ids from the given ORC file and
* boolean flag to find if the pruneCols is allowed or not. Requested Column id can be
* -1, which means the requested column doesn't exist in the ORC file. Returns None
* if the given ORC file is empty.
*/
def requestedColumnIds(
isCaseSensitive: Boolean,
dataSchema: StructType,
requiredSchema: StructType,
reader: Reader,
conf: Configuration): Option[(Array[Int], Boolean)] =
OrcUtils.requestedColumnIds(isCaseSensitive, dataSchema, requiredSchema, reader, conf)
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,14 @@ trait OrcShims320untilAllBase {
conf: Configuration,
orcReader: Reader,
dataReader: DataReader,
gen: (StripeInformation, OrcProto.StripeFooter, Array[Int], Array[Int]) => OrcOutputStripe,
gen: (StripeInformation, OrcProto.StripeFooter, Array[Int])=> OrcOutputStripe,
evolution: SchemaEvolution,
sargApp: SargApplier,
sargColumns: Array[Boolean],
ignoreNonUtf8BloomFilter: Boolean,
writerVersion: OrcFile.WriterVersion,
fileIncluded: Array[Boolean],
columnMapping: Array[Int],
idMapping: Array[Int]): ArrayBuffer[OrcOutputStripe] = {
columnMapping: Array[Int]): ArrayBuffer[OrcOutputStripe] = {

val orcReaderImpl = orcReader.asInstanceOf[ReaderImpl]
val maxDiskRangeChunkLimit = OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(conf)
Expand All @@ -114,7 +113,7 @@ trait OrcShims320untilAllBase {
}

if (needStripe) {
result.append(gen(stripe, stripeFooter, columnMapping, idMapping))
result.append(gen(stripe, stripeFooter, columnMapping))
}
}
result
Expand Down
Loading

0 comments on commit 560d643

Please sign in to comment.