-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce low shuffle merge. #10786
Introduce low shuffle merge. #10786
Conversation
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadFileFormatWithMetrics.scala
Outdated
Show resolved
Hide resolved
...spark341db/src/main/scala/com/nvidia/spark/rapids/delta/shims/MergeIntoCommandMetaShim.scala
Show resolved
Hide resolved
...elta-spark341db/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
...lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
...c/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add tests? EIther unit or integration tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have not finished the review yet, but here are some early comments. Like @razajafri said, there needs to be tests for this along with benchmarking to show the performance vs. baseline CPU and GPU without low shuffle vs GPU with low shuffle in various setups (e.g.: lots of rows updating, very few rows updating, etc.)
...lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
...lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
...common/src/main/delta-io/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Outdated
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Show resolved
Hide resolved
...b/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
...b/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuLowShuffleMergeCommand.scala
Show resolved
Hide resolved
build |
b72112f
to
f7b1ab4
Compare
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update the pr by fixing comments and adding some integration tests.
...lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
...lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
...lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
...common/src/main/delta-io/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Outdated
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Show resolved
Hide resolved
...spark341db/src/main/scala/com/nvidia/spark/rapids/delta/shims/MergeIntoCommandMetaShim.scala
Show resolved
Hide resolved
f7b1ab4
to
d31d5f0
Compare
cc @jlowe @razajafri I've fixed comments and added integrations test, PTAL. |
build |
...lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
...common/src/main/delta-io/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
delta-lake/common/src/main/scala/com/nvidia/spark/rapids/delta/DeltaProviderImplBase.scala
Outdated
Show resolved
Hide resolved
...4x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
...b/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadFileFormatWithMetrics.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala
Show resolved
Hide resolved
...4x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
...c/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala
Outdated
Show resolved
Hide resolved
build |
cc @jlowe I've fixed all comments, PTAL |
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need performance numbers for various setups and a tracking issue for porting the code to other platforms.
Also note the user documentation for Delta Lake support will need to be updated to describe this new feature after it's merged.
...a-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala
Outdated
Show resolved
Hide resolved
integration_tests/src/main/python/delta_lake_low_shuffle_merge_test.py
Outdated
Show resolved
Hide resolved
cc @jlowe I have fixed all tests and it should work now, but with some following issues to resolve:
|
build |
Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>
Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>
8151cbc
to
a94133e
Compare
build |
1 similar comment
build |
Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>
build |
Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>
@@ -206,7 +206,7 @@ ci_scala213() { | |||
cd .. # Run integration tests in the project root dir to leverage test cases and resource files | |||
export TEST_TAGS="not premerge_ci_1" | |||
export TEST_TYPE="pre-commit" | |||
export TEST_PARALLEL=5 | |||
export TEST_PARALLEL=4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have hit similar bug here: #8652 I did in my local env that it can pass by changing it to 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem like a change that should be made as part of Delta Lake low shuffle merge but rather as a separate PR, especially if you can get it to fail without your low shuffle merge changes. cc: @NvTimLiu for visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I haven't figured out why the failure in #8652 disappeared when I changed it. I think there exists a bug in the array_test.py
which is unreleated to my change, but when I add more integration tests the test order changed and it just works. I think we eventually need to fix #8652 , but for this pr we should make this change to workaournd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'm saying above is that this change is not really related to this PR. It's a significant change in CI scripts that will affect performance of CI, since we'll run fewer tests in parallel. That's why I think this should be a separate change, not hidden in a large PR as a side-effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that without this change, the integration tests will fail at array_test.py
.
build |
.../src/main/databricks/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatBase.scala
Outdated
Show resolved
Hide resolved
.../src/main/databricks/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatBase.scala
Outdated
Show resolved
Hide resolved
...common/src/main/delta-io/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Outdated
Show resolved
Hide resolved
...e/delta-24x/src/main/scala/com/nvidia/spark/rapids/delta/delta24x/MergeIntoCommandMeta.scala
Show resolved
Hide resolved
...common/src/main/delta-io/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
...4x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
...4x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
...e/delta-24x/src/main/scala/com/nvidia/spark/rapids/delta/delta24x/MergeIntoCommandMeta.scala
Outdated
Show resolved
Hide resolved
...4x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuLowShuffleMergeCommand.scala
Show resolved
Hide resolved
build |
1 similar comment
build |
...e/delta-24x/src/main/scala/com/nvidia/spark/rapids/delta/delta24x/MergeIntoCommandMeta.scala
Outdated
Show resolved
Hide resolved
...b/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
...4x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
...4x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuLowShuffleMergeCommand.scala
Outdated
Show resolved
Hide resolved
...4x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuLowShuffleMergeCommand.scala
Show resolved
Hide resolved
build |
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatUtils.scala
Outdated
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormatUtils.scala
Outdated
Show resolved
Hide resolved
...ake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuRapidsRepartitionByFilePath.scala
Outdated
Show resolved
Hide resolved
withResource(partitionIdExpr.columnarEval(firstRow)) { gpuCol => | ||
withResource(gpuCol.copyToHost()) { hostCol => | ||
val partitionId = hostCol.getInt(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit wasteful, producing and running a full columnar batch for effectively one scalar. Arguably the expression should be done on the CPU, as it would be faster for computing the single hash value on the file path and would not need to run a separate job, manifesting a full columnar batch of redundant file names. Worth tracking in a followup issue, as this needlessly adds to memory pressure on the GPU.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, it could be an improvement in follow up issue.
build |
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the updates, @liurenjie1024! This is getting close. Would be good to file the followup issues, ideally pointing to them with TODO's in the code. Also need performance numbers as mentioned before.
" WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *" | ||
|
||
conf = copy_and_update(delta_merge_enabled_conf, | ||
{"spark.rapids.sql.exec.RapidsRepartitionByFilePathExec": "false"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test says it's testing when the file scan override fails, but it's not disabling the file scan. Instead it's disabling the custom exec for low shuffle merge which seems very unlikely to happen in practice. Having coverage of that rare occurrence is great, but there's not a test for when the file scan falls back which will be more common. That should be added.
Sure, I will do some experiments to measure performance improvements . |
Close by #10979 |
Close #10905 .
This pr is the first one to introduces low shuffle merge optimization to speed up merge. Currently we only support databricks 13.3, we will add support more versions once this pr gets merged.