Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spillable cache for GpuCartesianRDD #1784

Closed
wants to merge 39 commits into from
Closed
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
86fc104
spillable cache for GpuCartesianRDD
sperlingxx Feb 22, 2021
07b2d15
lazy cache
sperlingxx Feb 23, 2021
72b2e12
Update cudf dependency to 0.18 (#1828)
NvTimLiu Mar 1, 2021
653c33a
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 1, 2021
bb03535
Update mortgage tests to support reading multiple dataset formats (#1…
NvTimLiu Mar 1, 2021
17657fe
Remove benchmarks (#1826)
jlowe Mar 1, 2021
c40ec37
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 1, 2021
50fd165
Merge pull request #1835 from jlowe/fix-merge
jlowe Mar 1, 2021
6483543
Spark 3.0.2 shim no longer a snapshot shim (#1831)
jlowe Mar 1, 2021
c52e9a5
Merge pull request #1837 from NVIDIA/branch-0.4
nvauto Mar 1, 2021
7e210c2
Make databricks build.sh more convenient for dev (#1838)
tgravescs Mar 1, 2021
51049a6
Add a shim provider for Spark 3.2.0 development branch (#1704)
gerashegalov Mar 2, 2021
28b00a7
Cleanup unused Jenkins files and scripts (#1829)
NvTimLiu Mar 2, 2021
e614ef4
Spark 3.1.1 shim no longer a snapshot shim (#1832)
jlowe Mar 2, 2021
fc9cecf
Update to note support for 3.0.2 (#1842)
sameerz Mar 2, 2021
85bfacb
Fix fails on the mortgage ETL test (#1845)
NvTimLiu Mar 2, 2021
63a2e3d
Have most of range partitioning run on the GPU (#1796)
revans2 Mar 2, 2021
c776be9
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 2, 2021
e06c226
Fix NullPointerException on null partition insert (#1744)
gerashegalov Mar 2, 2021
5b93033
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 2, 2021
923fa4e
Merge pull request #1848 from jlowe/fix-merge
jlowe Mar 2, 2021
dea867a
Update changelog for 0.4 (#1849)
sameerz Mar 2, 2021
95c3e75
Merge pull request #1850 from NVIDIA/branch-0.4
nvauto Mar 2, 2021
40c0eda
Refactor join code to reduce duplicated code (#1839)
jlowe Mar 2, 2021
19d1f05
Add shim for Spark 3.0.3 (#1834)
jlowe Mar 3, 2021
32213fa
Cost-based optimizer (#1616)
andygrove Mar 3, 2021
24ab0ae
Fix Part Suite Tests (#1852)
revans2 Mar 3, 2021
eab507e
Add shim for Spark 3.1.2 (#1836)
jlowe Mar 3, 2021
ad0b6d9
fix shuffle manager doc on ucx library path (#1858)
rongou Mar 4, 2021
dc2847c
Disable coalesce batch spilling to avoid cudf contiguous_split bug (#…
jlowe Mar 4, 2021
3c243c7
Merge branch 'branch-0.4' into fix-merge
jlowe Mar 4, 2021
2439b4b
Fix tests for Spark 3.2.0 shim (#1869)
revans2 Mar 4, 2021
6e57e27
Add in support for DateAddInterval (#1841)
nartal1 Mar 5, 2021
60fb754
Merge pull request #1875 from jlowe/fix-merge
pxLi Mar 5, 2021
1a32484
spillable cache for GpuCartesianRDD
sperlingxx Feb 22, 2021
b908c73
lazy cache
sperlingxx Feb 23, 2021
4718d00
adapt new interface of SpillableColumnarBatch
sperlingxx Mar 5, 2021
6fd391b
fix merge conflicts
sperlingxx Mar 5, 2021
8a46305
fix merge conflicts
sperlingxx Mar 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package org.apache.spark.sql.rapids

import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import scala.collection.mutable

import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, MetricsLevel}
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, MetricsLevel, SpillableColumnarBatch, SpillPriorities}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.{Dependency, NarrowDependency, Partition, SparkContext, TaskContext}
Expand Down Expand Up @@ -141,15 +143,32 @@ class GpuCartesianRDD(
override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
val currSplit = split.asInstanceOf[GpuCartesianPartition]
rdd1.iterator(currSplit.s1, context).flatMap { lhs =>

// create a buffer to cache stream-side data in a spillable manner
val spillBatchBuffer = mutable.ArrayBuffer[SpillableColumnarBatch]()

rdd1.iterator(currSplit.s1, context).zipWithIndex.flatMap { case (lhs, index) =>
val table = withResource(lhs) { lhs =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are doing this right we need to make lhs spillable too. Because we are going to do the join, and return multiple values while holding on to it. This means we will likely have to modify BroadcastNestedLoopJoinExecBase as well to be able to deal with this. We might need to make it a functor or something like that so we can keep the old behavior for broadcast nested loop join until we can update the broadcast tables to also be spillable, which is coming.

GpuColumnVector.from(lhs.getBatch)
}
// Ideally instead of looping through and recomputing rdd2 for
// each batch in rdd1 we would instead cache rdd2 in a way that
// it could spill to disk so we can avoid re-computation

val streamIterator = if (index == 0) {
// lazily compute and cache stream-side data
rdd2.iterator(currSplit.s2, context).map { serializableBatch =>
closeOnExcept(spillBatchBuffer) { buffer =>
val batch = SpillableColumnarBatch(
serializableBatch.getBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now missing the spillable callback argument that was added in #1719 which should be used to tie any spilling to spill metrics added to this exec node.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jlowe, I created another PR #1878 for this improvement, because current is based on branch-0.4 (not branch-0.5).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future a new PR isn't necessary when rebasing. You just need to retarget the PR base branch, as was already done for this PR, and then merge in the new base branch.

buffer += batch
batch.getColumnarBatch()
}
}
} else {
// fetch stream-side data directly if they are cached
spillBatchBuffer.toIterator.map(_.getColumnarBatch())
}

val ret = GpuBroadcastNestedLoopJoinExecBase.innerLikeJoin(
rdd2.iterator(currSplit.s2, context).map(i => i.getBatch),
streamIterator,
table,
GpuBuildLeft,
boundCondition,
Expand All @@ -161,7 +180,10 @@ class GpuCartesianRDD(
filterTime,
totalTime)

CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](ret, table.close())
CompletionIterator[ColumnarBatch, Iterator[ColumnarBatch]](ret, {
table.close()
spillBatchBuffer.safeClose()
})
}
}

Expand Down