Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spillable cache for GpuCartesianRDD #1784

Closed
wants to merge 39 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
86fc104
spillable cache for GpuCartesianRDD
sperlingxx Feb 22, 2021
07b2d15
lazy cache
sperlingxx Feb 23, 2021
72b2e12
Update cudf dependency to 0.18 (#1828)
NvTimLiu Mar 1, 2021
653c33a
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 1, 2021
bb03535
Update mortgage tests to support reading multiple dataset formats (#1…
NvTimLiu Mar 1, 2021
17657fe
Remove benchmarks (#1826)
jlowe Mar 1, 2021
c40ec37
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 1, 2021
50fd165
Merge pull request #1835 from jlowe/fix-merge
jlowe Mar 1, 2021
6483543
Spark 3.0.2 shim no longer a snapshot shim (#1831)
jlowe Mar 1, 2021
c52e9a5
Merge pull request #1837 from NVIDIA/branch-0.4
nvauto Mar 1, 2021
7e210c2
Make databricks build.sh more convenient for dev (#1838)
tgravescs Mar 1, 2021
51049a6
Add a shim provider for Spark 3.2.0 development branch (#1704)
gerashegalov Mar 2, 2021
28b00a7
Cleanup unused Jenkins files and scripts (#1829)
NvTimLiu Mar 2, 2021
e614ef4
Spark 3.1.1 shim no longer a snapshot shim (#1832)
jlowe Mar 2, 2021
fc9cecf
Update to note support for 3.0.2 (#1842)
sameerz Mar 2, 2021
85bfacb
Fix fails on the mortgage ETL test (#1845)
NvTimLiu Mar 2, 2021
63a2e3d
Have most of range partitioning run on the GPU (#1796)
revans2 Mar 2, 2021
c776be9
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 2, 2021
e06c226
Fix NullPointerException on null partition insert (#1744)
gerashegalov Mar 2, 2021
5b93033
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 2, 2021
923fa4e
Merge pull request #1848 from jlowe/fix-merge
jlowe Mar 2, 2021
dea867a
Update changelog for 0.4 (#1849)
sameerz Mar 2, 2021
95c3e75
Merge pull request #1850 from NVIDIA/branch-0.4
nvauto Mar 2, 2021
40c0eda
Refactor join code to reduce duplicated code (#1839)
jlowe Mar 2, 2021
19d1f05
Add shim for Spark 3.0.3 (#1834)
jlowe Mar 3, 2021
32213fa
Cost-based optimizer (#1616)
andygrove Mar 3, 2021
24ab0ae
Fix Part Suite Tests (#1852)
revans2 Mar 3, 2021
eab507e
Add shim for Spark 3.1.2 (#1836)
jlowe Mar 3, 2021
ad0b6d9
fix shuffle manager doc on ucx library path (#1858)
rongou Mar 4, 2021
dc2847c
Disable coalesce batch spilling to avoid cudf contiguous_split bug (#…
jlowe Mar 4, 2021
3c243c7
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 4, 2021
2439b4b
Fix tests for Spark 3.2.0 shim (#1869)
revans2 Mar 4, 2021
6e57e27
Add in support for DateAddInterval (#1841)
nartal1 Mar 5, 2021
60fb754
Merge pull request #1875 from jlowe/fix-merge
pxLi Mar 5, 2021
1a32484
spillable cache for GpuCartesianRDD
sperlingxx Feb 22, 2021
b908c73
lazy cache
sperlingxx Feb 23, 2021
4718d00
adapt new interface of SpillableColumnarBatch
sperlingxx Mar 5, 2021
6fd391b
fix merge conflicts
sperlingxx Mar 5, 2021
8a46305
fix merge conflicts
sperlingxx Mar 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package org.apache.spark.sql.rapids

import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import scala.collection.mutable

import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, MetricsLevel}
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, MetricsLevel, SpillableColumnarBatch, SpillPriorities}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.{Dependency, NarrowDependency, Partition, SparkContext, TaskContext}
Expand Down Expand Up @@ -141,27 +143,41 @@ class GpuCartesianRDD(
override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
val currSplit = split.asInstanceOf[GpuCartesianPartition]

// create a buffer to cache stream-side data in a spillable manner
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]()
closeOnExcept(spillBatchBuffer) { buffer =>
rdd2.iterator(currSplit.s2, context).foreach { cb =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowe what do you think about the following proposal?

It would be nice if we could combine this loop with the one below. My thinking is that the stream side is probably large enough that not all of it can fit in memory. That means we will likely spill as we populate spillBatchBuffer, and then spill again each time through the loop as we do the join. If we can combine the two loops so spillBatchBuffer is lazily populated the first time through the inner loop, then hopefully we will spill less because we touch the data one less time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed. I'd rather not add yet another loop through the data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced this with a lazy populating implementation.

// TODO: is it necessary to create a specific spill priorities for spillBatchBuffer?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The priority you set is fine. The issue is with avoiding spilling, and we may want to actually move to more dynamic spill priorities at some point.

buffer += SpillableColumnarBatch(
cb.getBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
}
}

rdd1.iterator(currSplit.s1, context).flatMap { lhs =>
val table = withResource(lhs) { lhs =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are doing this right we need to make lhs spillable too. Because we are going to do the join, and return multiple values while holding on to it. This means we will likely have to modify BroadcastNestedLoopJoinExecBase as well to be able to deal with this. We might need to make it a functor or something like that so we can keep the old behavior for broadcast nested loop join until we can update the broadcast tables to also be spillable, which is coming.

GpuColumnVector.from(lhs.getBatch)
}
// Ideally instead of looping through and recomputing rdd2 for
// each batch in rdd1 we would instead cache rdd2 in a way that
// it could spill to disk so we can avoid re-computation
val ret = GpuBroadcastNestedLoopJoinExecBase.innerLikeJoin(
rdd2.iterator(currSplit.s2, context).map(i => i.getBatch),
table,
GpuBuildLeft,
boundCondition,
outputSchema,
joinTime,
joinOutputRows,
numOutputRows,
numOutputBatches,
filterTime,
totalTime)
val ret = closeOnExcept(spillBatchBuffer) { buffer =>
GpuBroadcastNestedLoopJoinExecBase.innerLikeJoin(
// fetch stream-side data from buffer in case of re-computation
buffer.toIterator.map(spill => spill.getColumnarBatch()),
table,
GpuBuildLeft,
boundCondition,
outputSchema,
joinTime,
joinOutputRows,
numOutputRows,
numOutputBatches,
filterTime,
totalTime)
}

CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](ret, table.close())
CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](ret, {
table.close()
spillBatchBuffer.safeClose()
})
}
}

Expand Down