-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
spillable cache for GpuCartesianRDD #1784
Changes from 1 commit
86fc104
07b2d15
72b2e12
653c33a
bb03535
17657fe
c40ec37
50fd165
6483543
c52e9a5
7e210c2
51049a6
28b00a7
e614ef4
fc9cecf
85bfacb
63a2e3d
c776be9
e06c226
5b93033
923fa4e
dea867a
95c3e75
40c0eda
19d1f05
32213fa
24ab0ae
eab507e
ad0b6d9
dc2847c
3c243c7
2439b4b
6e57e27
60fb754
1a32484
b908c73
4718d00
6fd391b
8a46305
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ package org.apache.spark.sql.rapids | |
|
||
import java.io.{IOException, ObjectInputStream, ObjectOutputStream} | ||
|
||
import scala.collection.mutable | ||
|
||
import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange, Table} | ||
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, MetricsLevel} | ||
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, MetricsLevel, SpillableColumnarBatch, SpillPriorities} | ||
import com.nvidia.spark.rapids.RapidsPluginImplicits._ | ||
|
||
import org.apache.spark.{Dependency, NarrowDependency, Partition, SparkContext, TaskContext} | ||
|
@@ -141,27 +143,41 @@ class GpuCartesianRDD( | |
override def compute(split: Partition, context: TaskContext): | ||
Iterator[ColumnarBatch] = { | ||
val currSplit = split.asInstanceOf[GpuCartesianPartition] | ||
|
||
// create a buffer to cache stream-side data in a spillable manner | ||
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]() | ||
closeOnExcept(spillBatchBuffer) { buffer => | ||
rdd2.iterator(currSplit.s2, context).foreach { cb => | ||
// TODO: is it necessary to create a specific spill priorities for spillBatchBuffer? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The priority you set is fine. The issue is with avoiding spilling, and we may want to actually move to more dynamic spill priorities at some point. |
||
buffer += SpillableColumnarBatch( | ||
cb.getBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) | ||
} | ||
} | ||
|
||
rdd1.iterator(currSplit.s1, context).flatMap { lhs => | ||
val table = withResource(lhs) { lhs => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are doing this right we need to make |
||
GpuColumnVector.from(lhs.getBatch) | ||
} | ||
// Ideally instead of looping through and recomputing rdd2 for | ||
// each batch in rdd1 we would instead cache rdd2 in a way that | ||
// it could spill to disk so we can avoid re-computation | ||
val ret = GpuBroadcastNestedLoopJoinExecBase.innerLikeJoin( | ||
rdd2.iterator(currSplit.s2, context).map(i => i.getBatch), | ||
table, | ||
GpuBuildLeft, | ||
boundCondition, | ||
outputSchema, | ||
joinTime, | ||
joinOutputRows, | ||
numOutputRows, | ||
numOutputBatches, | ||
filterTime, | ||
totalTime) | ||
val ret = closeOnExcept(spillBatchBuffer) { buffer => | ||
GpuBroadcastNestedLoopJoinExecBase.innerLikeJoin( | ||
// fetch stream-side data from buffer in case of re-computation | ||
buffer.toIterator.map(spill => spill.getColumnarBatch()), | ||
table, | ||
GpuBuildLeft, | ||
boundCondition, | ||
outputSchema, | ||
joinTime, | ||
joinOutputRows, | ||
numOutputRows, | ||
numOutputBatches, | ||
filterTime, | ||
totalTime) | ||
} | ||
|
||
CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](ret, table.close()) | ||
CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](ret, { | ||
table.close() | ||
spillBatchBuffer.safeClose() | ||
}) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jlowe what do you think about the following proposal?
It would be nice if we could combine this loop with the one below. My thinking is that the stream side is probably large enough that not all of it can fit in memory. That means we will likely spill as we populate
spillBatchBuffer
, and then spill again each time through the loop as we do the join. If we can combine the two loops sospillBatchBuffer
is lazily populated the first time through the inner loop, then hopefully we will spill less because we touch the data one less time.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agreed. I'd rather not add yet another loop through the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've replaced this with a lazy populating implementation.