-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
spillable cache for GpuCartesianRDD #1784
Conversation
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
build |
// create a buffer to cache stream-side data in a spillable manner | ||
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]() | ||
closeOnExcept(spillBatchBuffer) { buffer => | ||
rdd2.iterator(currSplit.s2, context).foreach { cb => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jlowe what do you think about the following proposal?
It would be nice if we could combine this loop with the one below. My thinking is that the stream side is probably large enough that not all of it can fit in memory. That means we will likely spill as we populate spillBatchBuffer
, and then spill again each time through the loop as we do the join. If we can combine the two loops so spillBatchBuffer
is lazily populated the first time through the inner loop, then hopefully we will spill less because we touch the data one less time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agreed. I'd rather not add yet another loop through the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've replaced this with a lazy populating implementation.
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]() | ||
closeOnExcept(spillBatchBuffer) { buffer => | ||
rdd2.iterator(currSplit.s2, context).foreach { cb => | ||
// TODO: is it necessary to create a specific spill priorities for spillBatchBuffer? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The priority you set is fine. The issue is with avoiding spilling, and we may want to actually move to more dynamic spill priorities at some point.
cb.getBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) | ||
} | ||
} | ||
|
||
rdd1.iterator(currSplit.s1, context).flatMap { lhs => | ||
val table = withResource(lhs) { lhs => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are doing this right we need to make lhs
spillable too. Because we are going to do the join, and return multiple values while holding on to it. This means we will likely have to modify BroadcastNestedLoopJoinExecBase as well to be able to deal with this. We might need to make it a functor or something like that so we can keep the old behavior for broadcast nested loop join until we can update the broadcast tables to also be spillable, which is coming.
Also I am a little nervous about putting something like this into 0.4. It is for operators that are off by default, but I would feel better if we could retarget this for 0.5 so we have more time to test it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would feel better if we could retarget this for 0.5 so we have more time to test it.
+1. This should target the 0.5 release.
// create a buffer to cache stream-side data in a spillable manner | ||
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]() | ||
closeOnExcept(spillBatchBuffer) { buffer => | ||
rdd2.iterator(currSplit.s2, context).foreach { cb => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agreed. I'd rather not add yet another loop through the data.
build |
This is still targeting branch-0.4. It needs to be retargeted to branch-0.5. |
* Depend on the cuDF v0.18 Change rapids brannch-0.4 to depend on cuDF v0.18 release jars Prepare for the for the rapids v0.4.0 release Signed-off-by: Tim Liu <timl@nvidia.com> * cudf 0.17-SNAPSHOT to 0.17
…IDIA#1808) * mortgage support multiple dataset formats change mortgage sample class to support dataset formats csv/orc/parquet Signed-off-by: Tim Liu <timl@nvidia.com> * Update 1, copyright 2021 2, throw an error if there are more than 5 arguments 3, match-case optimize Signed-off-by: Tim Liu <timl@nvidia.com> * Update 1, print some helpful info for the input arguments 2, exit instead of exeption, when arguments are wrongly set * fix typo * Fix Nothing value in 'case _ =>' * update
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
Fix merge conflict with branch-0.4
* Spark 3.0.2 shim no longer a snapshot shim Signed-off-by: Jason Lowe <jlowe@nvidia.com> * Remove 3.0.2-SNAPSHOT support
[auto-merge] branch-0.4 to branch-0.5 [skip ci] [bot]
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Gera Shegalov <gera@apache.org> Add a shim provider for Spark 3.2.0 development branch. Closes NVIDIA#1490 - fix overflows in aggregate buffer for GpuSum by wiring the explicit output column type - unit tests for the new shim - consolidate version profiles in the parent pom
* Cleanup unused Jenkins files and scripts NVIDIA#1568 Move Databricks scripts to GitLab so we can use the common scripts for the nightly build job and integration tests job Remove unused Dockerfiles Signed-off-by: Tim Liu <timl@nvidia.com> * rm Dockerfile.integration.ubuntu16 * Restore Databricks nightly scripts Signed-off-by: Tim Liu <timl@nvidia.com>
* Spark 3.1.1 shim no longer a snapshot shim Signed-off-by: Jason Lowe <jlowe@nvidia.com> * Remove 3.1.0, 3.1.0-SNAPSHOT, and 3.1.1-SNAPSHOT support * Remove obsolete comment
* Update to note support for 3.0.2 Signed-off-by: Sameer Raheja <sraheja@nvidia.com> * Update FAQ to reflect 3.0.2 and 3.1.1 support Signed-off-by: Sameer Raheja <sraheja@nvidia.com>
In the 'Map' of dataset-format, the function of 'Run.csv()/Run.orc()/Run.parquet' will be executed one by one, then it causes the dataset format error, because the dataset format in the current test is 'parquet' Change 'Run.csv()/Run.orc()/Run.parquet' into the lambda expressions, to avoid running the 'Run.xxx()' functions in the dataFrameFormatMap Signed-off-by: Tim Liu <timl@nvidia.com>
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
Port apache/spark#31320 to close NVIDIA#1735 Signed-off-by: Gera Shegalov <gera@apache.org>
Fix merge conflict with branch-0.4
* Update changelog for 0.4 Signed-off-by: Sameer Raheja <sraheja@nvidia.com> * Update generate-changelog script Signed-off-by: Sameer Raheja <sraheja@nvidia.com>
[auto-merge] branch-0.4 to branch-0.5 [skip ci] [bot]
* Refactor join code to reduce duplicated code Signed-off-by: Jason Lowe <jlowe@nvidia.com> * Move nodeName override to base class
* Add shim for Spark 3.0.3 Signed-off-by: Jason Lowe <jlowe@nvidia.com> * Add premerge testing for Spark 3.0.2 and Spark 3.0.3
* Fix Part Suite Tests Signed-off-by: Robert (Bobby) Evans <bobby@apache.org> * Addressed review comments
rdd2.iterator(currSplit.s2, context).map { serializableBatch => | ||
closeOnExcept(spillBatchBuffer) { buffer => | ||
val batch = SpillableColumnarBatch( | ||
serializableBatch.getBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now missing the spillable callback argument that was added in #1719 which should be used to tie any spilling to spill metrics added to this exec node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future a new PR isn't necessary when rebasing. You just need to retarget the PR base branch, as was already done for this PR, and then merge in the new base branch.
* Add shim for Spark 3.1.2 Signed-off-by: Jason Lowe <jlowe@nvidia.com> * Add Spark 3.1.2 to premerge testing
* fix shuffle manager doc on ucx library path Signed-off-by: Rong Ou <rong.ou@gmail.com> * remove ld library path line Signed-off-by: Rong Ou <rong.ou@gmail.com>
…VIDIA#1871) Signed-off-by: Jason Lowe <jlowe@nvidia.com>
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Niranjan Artal <nartal@nvidia.com>
Fix merge conflict with branch-0.4
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Superceded by #1878 |
Use SpillableColumnarBatch to cache stream-side data in case of re-computation caused by nested-loop.