Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mergeSort late batch materialization and free already merged batches eagerly #6931

Merged

Conversation

abellina
Copy link
Collaborator

@abellina abellina commented Oct 27, 2022

Signed-off-by: Alessandro Bellina abellina@nvidia.com

Contributes to #6758

This PR changes the mergeSort function to mergeSortAndClose in order to eagerly close input data to the merge. This helps alleviate memory pressure.

In order to quantify the reduced memory pressure, I ran NDSv2 at 3TB in our performance cluster, configured with a max memory pool of 10GB (1/4th the usual) and with a single task allowed on the GPU (again, 1/4th the usual). Running in this mode I found that query67/query53 were the only queries that spilled. I then isolated the memory usage of the spilling stage and found that the sort was the main culprit, specifically our call to cudf::merge. This function in cuDF requires the input to be kept alive and it can incur at worst case 3x the input. We were calling cudf::merge with several tables at time (I saw up to 8 tables at once). If instead we call cudf::merge to merge pairs of tables from the plugin, we can eagerly close the input.

The above has the added benefit we only need to materialize on the GPU 2 input batches from the spill store.

Overall Query67 went from ~70GiB spilled from GPU memory to ~35GiB in the 1/4th memory setup.

…eagerly

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

Fixed one leak above. I am seeing a number of other leaks in the tests that don't seem to be related to my change, some included HostMemoryBuffer. I'll take a look at that soon, but it likely will be a different PR.

@abellina
Copy link
Collaborator Author

build

jlowe
jlowe previously approved these changes Oct 28, 2022
@sameerz sameerz added the reliability Features to improve reliability or bugs that severly impact the reliability of the plugin label Oct 28, 2022
@abellina
Copy link
Collaborator Author

build

jlowe
jlowe previously approved these changes Oct 28, 2022
@@ -15,6 +15,8 @@
*/
package com.nvidia.spark.rapids

import java.util
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this import. When reading Scala code below it's looks more useful to see explicitly that the method operates on Java API

Comment on lines 74 to 75
(r: util.AbstractCollection[T])
(block: util.AbstractCollection[T] => V): V = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(r: util.AbstractCollection[T])
(block: util.AbstractCollection[T] => V): V = {
(r: java.util.AbstractCollection[T])
(block: java.util.AbstractCollection[T] => V): V = {

Comment on lines 155 to 156
(r: util.AbstractCollection[T])
(block: util.AbstractCollection[T] => V): V = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(r: util.AbstractCollection[T])
(block: util.AbstractCollection[T] => V): V = {
(r: java.util.AbstractCollection[T])
(block: java.util.AbstractCollection[T] => V): V = {

@@ -421,29 +422,18 @@ case class GpuOutOfCoreSortIterator(
while (!pending.isEmpty && sortedSize < targetSize) {
// Keep going until we have enough data to return
var bytesLeftToFetch = targetSize
val mergedBatch = withResource(ArrayBuffer[SpillableColumnarBatch]()) { pendingSort =>
val pendingSort = new util.ArrayDeque[SpillableColumnarBatch]()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should stay with the java one. ArrayStack was removed in scala 2.13.x, so I'd rather not introduce code that we'll need to remove if we start to build for scala 2.13.

Copy link
Collaborator

@gerashegalov gerashegalov Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker but we won't have to change it for 2.13. Scalac can cross-compile it to 2.13 without a code change at the expense of a deprecation warning.

warning: value ArrayStack in package mutable is deprecated (since 2.13.0): Use Stack instead of ArrayStack; it now uses an array-based implementation

@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

abellina commented Nov 1, 2022

@gerashegalov mind taking another look please?

gerashegalov
gerashegalov previously approved these changes Nov 1, 2022
Copy link
Collaborator

@gerashegalov gerashegalov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -421,29 +422,18 @@ case class GpuOutOfCoreSortIterator(
while (!pending.isEmpty && sortedSize < targetSize) {
// Keep going until we have enough data to return
var bytesLeftToFetch = targetSize
val mergedBatch = withResource(ArrayBuffer[SpillableColumnarBatch]()) { pendingSort =>
val pendingSort = new util.ArrayDeque[SpillableColumnarBatch]()
Copy link
Collaborator

@gerashegalov gerashegalov Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker but we won't have to change it for 2.13. Scalac can cross-compile it to 2.13 without a code change at the expense of a deprecation warning.

warning: value ArrayStack in package mutable is deprecated (since 2.13.0): Use Stack instead of ArrayStack; it now uses an array-based implementation

@abellina
Copy link
Collaborator Author

abellina commented Nov 2, 2022

@gerashegalov moved to ArrayStack with 6c143e9

@abellina
Copy link
Collaborator Author

abellina commented Nov 2, 2022

build

@abellina
Copy link
Collaborator Author

abellina commented Nov 2, 2022

build

gerashegalov
gerashegalov previously approved these changes Nov 2, 2022
// In the current version of cudf merge does not work for lists and maps.
// This should be fixed by https://github.com/rapidsai/cudf/issues/8050
// Nested types in sort key columns is not supported either.
if (hasNestedInKeyColumns || hasUnsupportedNestedInRideColumns) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This corner case is common enough that I think we should optimize this a bit. A full sort is a lot more expensive than a merge sort, and now we are doing N-1 full sorts where as before we were only doing 1 full sort. Could we move this up so the code is more like

if (spillableBatches.size == 1) {
} else if (hasNestedInKeyColumns || hasUnsupportedNestedInRideColumns) {
  // Unspill all of the input batches
  // concat the input batches
  // close the input batches
  // sort the concated batch
  // close the concated batch
} else {
  ...
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 that should be fixed now

@abellina
Copy link
Collaborator Author

abellina commented Nov 2, 2022

build

@abellina abellina merged commit 6cbbe3f into NVIDIA:branch-22.12 Nov 2, 2022
@abellina abellina deleted the oom/reduce_memory_usage_in_merge_sort branch November 2, 2022 21:18
firestarman added a commit that referenced this pull request Aug 31, 2023
…#9102)

This PR adds in retry support for more operations in GpuOutOfCoreSortIterator, including computing the split offset and bringing the data back to GPU to remove the projected columns.

Besides, to keep being eager to close the input batches in the mergeSortAndClose function (introduced by #6931), instead of retrying the call to the whole mergeSortAndClose function, we retry the operations inside it, including bringing the data back to GPU, concatenating tables, sort the concatenated table and merging the input tables.

It also covers a small followup change in GpuColumnToRowExec for PR #9088.
---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants