-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mergeSort late batch materialization and free already merged batches eagerly #6931
mergeSort late batch materialization and free already merged batches eagerly #6931
Conversation
…eagerly Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
build |
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala
Outdated
Show resolved
Hide resolved
build |
Fixed one leak above. I am seeing a number of other leaks in the tests that don't seem to be related to my change, some included |
build |
build |
@@ -15,6 +15,8 @@ | |||
*/ | |||
package com.nvidia.spark.rapids | |||
|
|||
import java.util |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove this import. When reading Scala code below it's looks more useful to see explicitly that the method operates on Java API
(r: util.AbstractCollection[T]) | ||
(block: util.AbstractCollection[T] => V): V = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(r: util.AbstractCollection[T]) | |
(block: util.AbstractCollection[T] => V): V = { | |
(r: java.util.AbstractCollection[T]) | |
(block: java.util.AbstractCollection[T] => V): V = { |
(r: util.AbstractCollection[T]) | ||
(block: util.AbstractCollection[T] => V): V = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(r: util.AbstractCollection[T]) | |
(block: util.AbstractCollection[T] => V): V = { | |
(r: java.util.AbstractCollection[T]) | |
(block: java.util.AbstractCollection[T] => V): V = { |
@@ -421,29 +422,18 @@ case class GpuOutOfCoreSortIterator( | |||
while (!pending.isEmpty && sortedSize < targetSize) { | |||
// Keep going until we have enough data to return | |||
var bytesLeftToFetch = targetSize | |||
val mergedBatch = withResource(ArrayBuffer[SpillableColumnarBatch]()) { pendingSort => | |||
val pendingSort = new util.ArrayDeque[SpillableColumnarBatch]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should stay with the java one. ArrayStack
was removed in scala 2.13.x, so I'd rather not introduce code that we'll need to remove if we start to build for scala 2.13.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker but we won't have to change it for 2.13. Scalac can cross-compile it to 2.13 without a code change at the expense of a deprecation warning.
warning: value ArrayStack in package mutable is deprecated (since 2.13.0): Use Stack instead of ArrayStack; it now uses an array-based implementation
build |
@gerashegalov mind taking another look please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -421,29 +422,18 @@ case class GpuOutOfCoreSortIterator( | |||
while (!pending.isEmpty && sortedSize < targetSize) { | |||
// Keep going until we have enough data to return | |||
var bytesLeftToFetch = targetSize | |||
val mergedBatch = withResource(ArrayBuffer[SpillableColumnarBatch]()) { pendingSort => | |||
val pendingSort = new util.ArrayDeque[SpillableColumnarBatch]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker but we won't have to change it for 2.13. Scalac can cross-compile it to 2.13 without a code change at the expense of a deprecation warning.
warning: value ArrayStack in package mutable is deprecated (since 2.13.0): Use Stack instead of ArrayStack; it now uses an array-based implementation
@gerashegalov moved to |
build |
build |
// In the current version of cudf merge does not work for lists and maps. | ||
// This should be fixed by https://github.com/rapidsai/cudf/issues/8050 | ||
// Nested types in sort key columns is not supported either. | ||
if (hasNestedInKeyColumns || hasUnsupportedNestedInRideColumns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This corner case is common enough that I think we should optimize this a bit. A full sort is a lot more expensive than a merge sort, and now we are doing N-1
full sorts where as before we were only doing 1 full sort. Could we move this up so the code is more like
if (spillableBatches.size == 1) {
} else if (hasNestedInKeyColumns || hasUnsupportedNestedInRideColumns) {
// Unspill all of the input batches
// concat the input batches
// close the input batches
// sort the concated batch
// close the concated batch
} else {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@revans2 that should be fixed now
build |
…#9102) This PR adds in retry support for more operations in GpuOutOfCoreSortIterator, including computing the split offset and bringing the data back to GPU to remove the projected columns. Besides, to keep being eager to close the input batches in the mergeSortAndClose function (introduced by #6931), instead of retrying the call to the whole mergeSortAndClose function, we retry the operations inside it, including bringing the data back to GPU, concatenating tables, sort the concatenated table and merging the input tables. It also covers a small followup change in GpuColumnToRowExec for PR #9088. --------- Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Alessandro Bellina abellina@nvidia.com
Contributes to #6758
This PR changes the
mergeSort
function tomergeSortAndClose
in order to eagerly close input data to the merge. This helps alleviate memory pressure.In order to quantify the reduced memory pressure, I ran NDSv2 at 3TB in our performance cluster, configured with a max memory pool of 10GB (1/4th the usual) and with a single task allowed on the GPU (again, 1/4th the usual). Running in this mode I found that query67/query53 were the only queries that spilled. I then isolated the memory usage of the spilling stage and found that the sort was the main culprit, specifically our call to
cudf::merge
. This function in cuDF requires the input to be kept alive and it can incur at worst case 3x the input. We were callingcudf::merge
with several tables at time (I saw up to 8 tables at once). If instead we callcudf::merge
to merge pairs of tables from the plugin, we can eagerly close the input.The above has the added benefit we only need to materialize on the GPU 2 input batches from the spill store.
Overall Query67 went from ~70GiB spilled from GPU memory to ~35GiB in the 1/4th memory setup.