-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
spillable cache for GpuCartesianRDD #1784
Changes from 2 commits
86fc104
07b2d15
72b2e12
653c33a
bb03535
17657fe
c40ec37
50fd165
6483543
c52e9a5
7e210c2
51049a6
28b00a7
e614ef4
fc9cecf
85bfacb
63a2e3d
c776be9
e06c226
5b93033
923fa4e
dea867a
95c3e75
40c0eda
19d1f05
32213fa
24ab0ae
eab507e
ad0b6d9
dc2847c
3c243c7
2439b4b
6e57e27
60fb754
1a32484
b908c73
4718d00
6fd391b
8a46305
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ package org.apache.spark.sql.rapids | |
|
||
import java.io.{IOException, ObjectInputStream, ObjectOutputStream} | ||
|
||
import scala.collection.mutable | ||
|
||
import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange, Table} | ||
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, MetricsLevel} | ||
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, MetricsLevel, SpillableColumnarBatch, SpillPriorities} | ||
import com.nvidia.spark.rapids.RapidsPluginImplicits._ | ||
|
||
import org.apache.spark.{Dependency, NarrowDependency, Partition, SparkContext, TaskContext} | ||
|
@@ -141,15 +143,32 @@ class GpuCartesianRDD( | |
override def compute(split: Partition, context: TaskContext): | ||
Iterator[ColumnarBatch] = { | ||
val currSplit = split.asInstanceOf[GpuCartesianPartition] | ||
rdd1.iterator(currSplit.s1, context).flatMap { lhs => | ||
|
||
// create a buffer to cache stream-side data in a spillable manner | ||
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]() | ||
|
||
rdd1.iterator(currSplit.s1, context).zipWithIndex.flatMap { case (lhs, index) => | ||
val table = withResource(lhs) { lhs => | ||
GpuColumnVector.from(lhs.getBatch) | ||
} | ||
// Ideally instead of looping through and recomputing rdd2 for | ||
// each batch in rdd1 we would instead cache rdd2 in a way that | ||
// it could spill to disk so we can avoid re-computation | ||
|
||
val streamIterator = if (index == 0) { | ||
// lazily compute and cache stream-side data | ||
rdd2.iterator(currSplit.s2, context).map { serializableBatch => | ||
closeOnExcept(spillBatchBuffer) { buffer => | ||
val batch = SpillableColumnarBatch( | ||
serializableBatch.getBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is now missing the spillable callback argument that was added in #1719 which should be used to tie any spilling to spill metrics added to this exec node. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future a new PR isn't necessary when rebasing. You just need to retarget the PR base branch, as was already done for this PR, and then merge in the new base branch. |
||
buffer += batch | ||
batch.getColumnarBatch() | ||
} | ||
} | ||
} else { | ||
// fetch stream-side data directly if they are cached | ||
spillBatchBuffer.toIterator.map(_.getColumnarBatch()) | ||
} | ||
|
||
val ret = GpuBroadcastNestedLoopJoinExecBase.innerLikeJoin( | ||
rdd2.iterator(currSplit.s2, context).map(i => i.getBatch), | ||
streamIterator, | ||
table, | ||
GpuBuildLeft, | ||
boundCondition, | ||
|
@@ -161,7 +180,10 @@ class GpuCartesianRDD( | |
filterTime, | ||
totalTime) | ||
|
||
CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](ret, table.close()) | ||
CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](ret, { | ||
table.close() | ||
spillBatchBuffer.safeClose() | ||
}) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are doing this right we need to make
lhs
spillable too. Because we are going to do the join, and return multiple values while holding on to it. This means we will likely have to modify BroadcastNestedLoopJoinExecBase as well to be able to deal with this. We might need to make it a functor or something like that so we can keep the old behavior for broadcast nested loop join until we can update the broadcast tables to also be spillable, which is coming.