Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spillable cache for GpuCartesianRDD #1784

Closed
wants to merge 39 commits into from

Conversation

sperlingxx
Copy link
Collaborator

Use SpillableColumnarBatch to cache stream-side data in case of re-computation caused by nested-loop.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

// create a buffer to cache stream-side data in a spillable manner
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]()
closeOnExcept(spillBatchBuffer) { buffer =>
rdd2.iterator(currSplit.s2, context).foreach { cb =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowe what do you think about the following proposal?

It would be nice if we could combine this loop with the one below. My thinking is that the stream side is probably large enough that not all of it can fit in memory. That means we will likely spill as we populate spillBatchBuffer, and then spill again each time through the loop as we do the join. If we can combine the two loops so spillBatchBuffer is lazily populated the first time through the inner loop, then hopefully we will spill less because we touch the data one less time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed. I'd rather not add yet another loop through the data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced this with a lazy populating implementation.

val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]()
closeOnExcept(spillBatchBuffer) { buffer =>
rdd2.iterator(currSplit.s2, context).foreach { cb =>
// TODO: is it necessary to create a specific spill priorities for spillBatchBuffer?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The priority you set is fine. The issue is with avoiding spilling, and we may want to actually move to more dynamic spill priorities at some point.

cb.getBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
}
}

rdd1.iterator(currSplit.s1, context).flatMap { lhs =>
val table = withResource(lhs) { lhs =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are doing this right we need to make lhs spillable too. Because we are going to do the join, and return multiple values while holding on to it. This means we will likely have to modify BroadcastNestedLoopJoinExecBase as well to be able to deal with this. We might need to make it a functor or something like that so we can keep the old behavior for broadcast nested loop join until we can update the broadcast tables to also be spillable, which is coming.

@revans2
Copy link
Collaborator

revans2 commented Feb 22, 2021

Also I am a little nervous about putting something like this into 0.4. It is for operators that are off by default, but I would feel better if we could retarget this for 0.5 so we have more time to test it.

Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would feel better if we could retarget this for 0.5 so we have more time to test it.

+1. This should target the 0.5 release.

// create a buffer to cache stream-side data in a spillable manner
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]()
closeOnExcept(spillBatchBuffer) { buffer =>
rdd2.iterator(currSplit.s2, context).foreach { cb =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed. I'd rather not add yet another loop through the data.

@sperlingxx
Copy link
Collaborator Author

build

@jlowe
Copy link
Member

jlowe commented Feb 23, 2021

This is still targeting branch-0.4. It needs to be retargeted to branch-0.5.

@sperlingxx sperlingxx changed the base branch from branch-0.4 to branch-0.5 March 1, 2021 10:30
NvTimLiu and others added 19 commits March 2, 2021 00:20
* Depend on the cuDF v0.18

Change rapids brannch-0.4 to depend on cuDF v0.18 release jars

Prepare for the for the rapids v0.4.0 release

Signed-off-by: Tim Liu <timl@nvidia.com>

* cudf 0.17-SNAPSHOT to 0.17
…IDIA#1808)

* mortgage support multiple dataset formats

change mortgage sample class to support dataset formats csv/orc/parquet

Signed-off-by: Tim Liu <timl@nvidia.com>

* Update

1, copyright 2021
2, throw an error if there are more than 5 arguments
3, match-case optimize

Signed-off-by: Tim Liu <timl@nvidia.com>

* Update

1, print some helpful info for the input arguments
2, exit instead of exeption, when arguments are wrongly set

* fix typo

* Fix Nothing value in 'case _ =>'

* update
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
Fix merge conflict with branch-0.4
* Spark 3.0.2 shim no longer a snapshot shim

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Remove 3.0.2-SNAPSHOT support
[auto-merge] branch-0.4 to branch-0.5 [skip ci] [bot]
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Gera Shegalov <gera@apache.org>

Add a shim provider for Spark 3.2.0 development branch. Closes NVIDIA#1490
- fix overflows in aggregate buffer for GpuSum by wiring the explicit output column type
- unit tests for the new shim
- consolidate version profiles in the parent pom
* Cleanup unused Jenkins files and scripts

NVIDIA#1568

Move Databricks scripts to GitLab so we can use the common scripts for the nightly build job and integration tests job

Remove unused Dockerfiles

Signed-off-by: Tim Liu <timl@nvidia.com>

* rm Dockerfile.integration.ubuntu16

* Restore Databricks nightly scripts

Signed-off-by: Tim Liu <timl@nvidia.com>
* Spark 3.1.1 shim no longer a snapshot shim

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Remove 3.1.0, 3.1.0-SNAPSHOT, and 3.1.1-SNAPSHOT support

* Remove obsolete comment
* Update to note support for 3.0.2

Signed-off-by: Sameer Raheja <sraheja@nvidia.com>

* Update FAQ to reflect 3.0.2 and 3.1.1 support

Signed-off-by: Sameer Raheja <sraheja@nvidia.com>
In the 'Map' of dataset-format, the function of 'Run.csv()/Run.orc()/Run.parquet' will be executed one by one, then it causes the dataset format error, because the dataset format in the current test is 'parquet'

Change 'Run.csv()/Run.orc()/Run.parquet' into the lambda expressions, to avoid running the 'Run.xxx()' functions in the dataFrameFormatMap

Signed-off-by: Tim Liu <timl@nvidia.com>
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
Port apache/spark#31320 to close NVIDIA#1735

Signed-off-by: Gera Shegalov <gera@apache.org>
Fix merge conflict with branch-0.4
sameerz and others added 6 commits March 2, 2021 13:56
* Update changelog for 0.4

Signed-off-by: Sameer Raheja <sraheja@nvidia.com>

* Update generate-changelog script

Signed-off-by: Sameer Raheja <sraheja@nvidia.com>
[auto-merge] branch-0.4 to branch-0.5 [skip ci] [bot]
* Refactor join code to reduce duplicated code

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Move nodeName override to base class
* Add shim for Spark 3.0.3

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Add premerge testing for Spark 3.0.2 and Spark 3.0.3
* Fix Part Suite Tests

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>

* Addressed review comments
rdd2.iterator(currSplit.s2, context).map { serializableBatch =>
closeOnExcept(spillBatchBuffer) { buffer =>
val batch = SpillableColumnarBatch(
serializableBatch.getBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now missing the spillable callback argument that was added in #1719 which should be used to tie any spilling to spill metrics added to this exec node.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jlowe, I created another PR #1878 for this improvement, because current is based on branch-0.4 (not branch-0.5).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future a new PR isn't necessary when rebasing. You just need to retarget the PR base branch, as was already done for this PR, and then merge in the new base branch.

* Add shim for Spark 3.1.2

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Add Spark 3.1.2 to premerge testing
@sameerz sameerz added the task Work required that improves the product but is not user facing label Mar 4, 2021
rongou and others added 10 commits March 4, 2021 08:30
* fix shuffle manager doc on ucx library path

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* remove ld library path line

Signed-off-by: Rong Ou <rong.ou@gmail.com>
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Niranjan Artal <nartal@nvidia.com>
Fix merge conflict with branch-0.4
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@jlowe
Copy link
Member

jlowe commented Mar 5, 2021

Superceded by #1878

@jlowe jlowe closed this Mar 5, 2021
@sperlingxx sperlingxx deleted the spill_cart_rdd branch April 8, 2021 03:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
task Work required that improves the product but is not user facing
Projects
None yet
Development

Successfully merging this pull request may close these issues.