Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert Null Join Filter #745

Merged
merged 2 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def setup(self, spark):
}
if not self.tpcds_format in formats:
raise RuntimeError("{} is not a supported tpcds input type".format(self.tpcds_format))
formats.get(self.tpcds_format)(jvm_session,self.tpcds_path)
formats.get(self.tpcds_format)(jvm_session, self.tpcds_path, True)

def do_test_query(self, query):
spark = get_spark_i_know_what_i_am_doing()
Expand Down
16 changes: 2 additions & 14 deletions integration_tests/src/main/python/tpcds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b',
'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49',
'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q68', 'q69',
'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69',
'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89',
'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99',
'ss_max', 'ss_maxb']
Expand All @@ -35,17 +35,5 @@
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', queries)
def test_tpcds(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query),
conf={'spark.rapids.sql.variableFloatAgg.enabled': 'true'})

no_var_agg_queries = ['q67', 'q70']

@incompat
@ignore_order
@approximate_float
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', no_var_agg_queries)
def test_tpcds_no_var_agg(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query))
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ case class Table(
basePath + "/" + name + rest
}

def readCSV(spark: SparkSession, basePath: String): DataFrame =
def readCSV(spark: SparkSession, basePath: String, appendDat: Boolean = true): DataFrame =
spark.read.option("delimiter", "|")
.schema(schema)
.csv(path(basePath))
.csv(path(basePath, appendDat))

def setupCSV(spark: SparkSession, basePath: String): Unit =
readCSV(spark, basePath).createOrReplaceTempView(name)
def setupCSV(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit =
readCSV(spark, basePath, appendDat).createOrReplaceTempView(name)

def setupParquet(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit =
spark.read.parquet(path(basePath, appendDat)).createOrReplaceTempView(name)

def setupOrc(spark: SparkSession, basePath: String): Unit =
spark.read.orc(path(basePath)).createOrReplaceTempView(name)
def setupOrc(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit =
spark.read.orc(path(basePath, appendDat)).createOrReplaceTempView(name)

def setup(
spark: SparkSession,
Expand Down Expand Up @@ -137,16 +137,16 @@ object TpcdsLikeSpark {
tables.foreach(_.csvToOrc(spark, baseInput, baseOutput, writePartitioning))
}

def setupAllCSV(spark: SparkSession, basePath: String): Unit = {
tables.foreach(_.setupCSV(spark, basePath))
def setupAllCSV(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = {
tables.foreach(_.setupCSV(spark, basePath, appendDat))
}

def setupAllParquet(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = {
tables.foreach(_.setupParquet(spark, basePath, appendDat))
}

def setupAllOrc(spark: SparkSession, basePath: String): Unit = {
tables.foreach(_.setupOrc(spark, basePath))
def setupAllOrc(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = {
tables.foreach(_.setupOrc(spark, basePath, appendDat))
}

def setupAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,10 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -40,11 +39,6 @@ object GpuHashJoin {
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
cb
}
}

trait GpuHashJoin extends GpuExec with HashJoin {
Expand Down Expand Up @@ -116,63 +110,6 @@ trait GpuHashJoin extends GpuExec with HashJoin {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, BuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, BuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
GpuFilter(streamedBatch, streamedTableNullFilterExpression)

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand All @@ -197,11 +134,7 @@ trait GpuHashJoin extends GpuExec with HashJoin {
override def hasNext: Boolean = {
while (nextCb.isEmpty && (first || stream.hasNext)) {
if (stream.hasNext) {
val cb = if (shouldFilterStreamTableForNulls) {
filterStreamedTable(stream.next())
} else {
stream.next()
}
val cb = stream.next()
val startTime = System.nanoTime()
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
numOutputBatches, joinTime, filterTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@ case class GpuShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
val filtered = filterBuiltTableIfNeeded(combined)
combinedSize =
GpuColumnVector.extractColumns(filtered)
.map(_.getBase.getDeviceMemorySize).sum.toInt
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}
val buildBatch =
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
val builtTable = try {
// Combine does not inc any reference counting
val combined = combine(keys, buildBatch)
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
} finally {
keys.close()
buildBatch.close()
}

val delta = System.nanoTime() - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,10 @@ case class GpuBroadcastHashJoinExec(
val boundCondition = condition.map(GpuBindReferences.bindReference(_, output))

lazy val builtTable = {
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}

// TODO clean up intermediate results...
val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)
val combined = combine(keys, broadcastRelation.value.batch)
val ret = GpuColumnVector.from(combined)
// Don't warn for a leak, because we cannot control when we are done with this
(0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected())
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.HashJoin
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand All @@ -41,11 +40,6 @@ object GpuHashJoin {
}
case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported")
}

def incRefCount(cb: ColumnarBatch): ColumnarBatch = {
GpuColumnVector.extractBases(cb).foreach(_.incRefCount())
cb
}
}

trait GpuHashJoin extends GpuExec with HashJoin {
Expand Down Expand Up @@ -117,58 +111,6 @@ trait GpuHashJoin extends GpuExec with HashJoin {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, BuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, BuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
GpuFilter(streamedBatch, streamedTableNullFilterExpression)

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand All @@ -193,11 +135,7 @@ trait GpuHashJoin extends GpuExec with HashJoin {
override def hasNext: Boolean = {
while (nextCb.isEmpty && (first || stream.hasNext)) {
if (stream.hasNext) {
val cb = if (shouldFilterStreamTableForNulls) {
filterStreamedTable(stream.next())
} else {
stream.next()
}
val cb = stream.next()
val startTime = System.nanoTime()
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
numOutputBatches, joinTime, filterTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,20 @@ case class GpuShuffledHashJoinExec(
streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
var combinedSize = 0

val startTime = System.nanoTime()
val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
val filtered = filterBuiltTableIfNeeded(combined)
combinedSize =
GpuColumnVector.extractColumns(filtered)
.map(_.getBase.getDeviceMemorySize).sum.toInt
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
}
}
val buildBatch =
ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput)
val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys)
val builtTable = try {
// Combine does not inc any reference counting
val combined = combine(keys, buildBatch)
combinedSize =
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(combined)
} finally {
keys.close()
buildBatch.close()
}

val delta = System.nanoTime() - startTime
Expand Down
Loading