Skip to content

Commit

Permalink
Use libcudf mixed joins for conditional hash joins
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe committed Jan 10, 2022
1 parent 54c0f94 commit 1751759
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 90 deletions.
132 changes: 123 additions & 9 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,12 +58,18 @@
nested_3d_struct_gens = StructGen([['child0', nested_2d_struct_gens]], nullable=False)
struct_gens = [basic_struct_gen, basic_struct_gen_with_no_null_child, nested_2d_struct_gens, nested_3d_struct_gens]

double_gen = [pytest.param(DoubleGen(), marks=[incompat])]

basic_nested_gens = single_level_array_gens + map_string_string_gen + [all_basic_struct_gen]

# data types supported by AST expressions
ast_gen = [boolean_gen, byte_gen, short_gen, int_gen, long_gen, timestamp_gen]
# data types supported by AST expressions in joins
join_ast_gen = [
boolean_gen, byte_gen, short_gen, int_gen, long_gen, date_gen, timestamp_gen
]

# data types not supported by AST expressions in joins
join_no_ast_gen = [
pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat]),
string_gen, null_gen, decimal_gen_default, decimal_gen_64bit
]

_sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1',
'spark.sql.join.preferSortMergeJoin': 'True',
Expand Down Expand Up @@ -349,7 +355,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', ast_gen, ids=idfn)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_right_broadcast_nested_loop_join_with_ast_condition(data_gen, join_type, batch_size):
Expand All @@ -366,7 +372,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', ast_gen, ids=idfn)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_left_broadcast_nested_loop_join_with_ast_condition(data_gen, batch_size):
def do_join(spark):
Expand Down Expand Up @@ -497,15 +503,74 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'Inner', 'Cross'], ids=idfn)
def test_broadcast_join_with_conditionals(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_with_condition_join_type_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# AST does not support cast or logarithm yet
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b > right.r_b), join_type)
conf = allow_negative_scale_of_decimal_conf
assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec', conf=conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan', 'Log', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_with_condition_ast_op_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# AST does not support cast or logarithm yet
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b > f.log(right.r_b)), join_type)
conf = allow_negative_scale_of_decimal_conf
exec = 'SortMergeJoinExec' if join_type in ['Right', 'FullOuter'] else 'BroadcastHashJoinExec'
assert_gpu_fallback_collect(do_join, exec, conf=conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('BroadcastExchangeExec', 'BroadcastHashJoinExec', 'Cast', 'GreaterThan', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_broadcast_join_with_condition_ast_type_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# AST does not support cast or logarithm yet
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b > right.r_b), join_type)
conf = allow_negative_scale_of_decimal_conf
exec = 'SortMergeJoinExec' if join_type in ['Right', 'FullOuter'] else 'BroadcastHashJoinExec'
assert_gpu_fallback_collect(do_join, exec, conf=conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn)
def test_broadcast_join_with_condition_post_filter(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b > right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
Expand All @@ -516,12 +581,61 @@ def do_join(spark):
return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), 'Inner')
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter'], ids=idfn)
def test_sortmerge_join_with_condition_ast(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('GreaterThan', 'ShuffleExchangeExec', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn)
def test_sortmerge_join_with_condition_join_type_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(right, (left.a == right.r_a) & (left.b >= right.r_b), join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('GreaterThan', 'Log', 'ShuffleExchangeExec', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [long_gen], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter'], ids=idfn)
def test_sortmerge_join_with_condition_ast_op_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
# AST does not support cast or logarithm yet
return left.join(right, (left.a == right.r_a) & (left.b > f.log(right.r_b)), join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@allow_non_gpu('GreaterThan', 'ShuffleExchangeExec', 'SortMergeJoinExec')
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_no_ast_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'FullOuter', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_sortmerge_join_with_condition_ast_type_fallback(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(right, (left.a == right.r_a) & (left.b > right.r_b), join_type)
assert_gpu_fallback_collect(do_join, 'SortMergeJoinExec', conf=_sortmerge_join_conf)


_mixed_df1_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)),
('b', IntegerGen()), ('c', LongGen())]
_mixed_df2_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)),
('b', StringGen()), ('c', BooleanGen())]


@ignore_order
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'FullOuter', 'Cross'], ids=idfn)
def test_broadcast_join_mixed(join_type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ abstract class AbstractGpuJoinIterator(
* Called to setup the next join gatherer instance when the previous instance is done or
* there is no previous instance. Because this is likely to call next or has next on the
* stream side all implementations must track their own opTime metrics.
* @param startNanoTime system nanoseconds timestamp at the top of the iterator loop, useful for
* calculating the time spent producing the next stream batch
* @return some gatherer to use next or None if there is no next gatherer or the loop should try
* to build the gatherer again (e.g.: to skip a degenerate join result batch)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ class GpuBroadcastHashJoinMeta(
join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val rightKeys: Seq[BaseExprMeta[_]] =
join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val condition: Option[BaseExprMeta[_]] =
val conditionMeta: Option[BaseExprMeta[_]] =
join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val buildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide)

override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] =
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition)
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta)

override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition
override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta

override def tagPlanForGpu(): Unit = {
GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys,
join.condition)
conditionMeta)
val Seq(leftChild, rightChild) = childPlans
val buildSideMeta = buildSide match {
case GpuBuildLeft => leftChild
Expand All @@ -69,6 +69,12 @@ class GpuBroadcastHashJoinMeta(
}

override def convertToGpu(): GpuExec = {
val condition = conditionMeta.map(_.convertToGpu())
val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) {
(condition, None)
} else {
(None, condition)
}
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
// The broadcast part of this must be a BroadcastExchangeExec
val buildSideMeta = buildSide match {
Expand All @@ -81,11 +87,11 @@ class GpuBroadcastHashJoinMeta(
rightKeys.map(_.convertToGpu()),
join.joinType,
buildSide,
None,
joinCondition,
left, right)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
// For inner joins we can apply a post-join condition for any conditions that cannot be
// evaluated directly in a mixed join that leverages a cudf AST expression
filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,42 @@ class GpuShuffledHashJoinMeta(
join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val rightKeys: Seq[BaseExprMeta[_]] =
join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val condition: Option[BaseExprMeta[_]] =
val conditionMeta: Option[BaseExprMeta[_]] =
join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val buildSide: GpuBuildSide = GpuJoinUtils.getGpuBuildSide(join.buildSide)

override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition
override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta

override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] =
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition)
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta)

override def tagPlanForGpu(): Unit = {
GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys,
join.condition)
conditionMeta)
}

override def convertToGpu(): GpuExec = {
val condition = conditionMeta.map(_.convertToGpu())
val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) {
(condition, None)
} else {
(None, condition)
}
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
val joinExec = GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
join.joinType,
buildSide,
None,
joinCondition,
left,
right,
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
// For inner joins we can apply a post-join condition for any conditions that cannot be
// evaluated directly in a mixed join that leverages a cudf AST expression
filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class GpuSortMergeJoinMeta(
join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val rightKeys: Seq[BaseExprMeta[_]] =
join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val condition: Option[BaseExprMeta[_]] = join.condition.map(
val conditionMeta: Option[BaseExprMeta[_]] = join.condition.map(
GpuOverrides.wrapExpr(_, conf, Some(this)))
val buildSide: GpuBuildSide = if (GpuHashJoin.canBuildRight(join.joinType)) {
GpuBuildRight
Expand All @@ -41,15 +41,15 @@ class GpuSortMergeJoinMeta(
throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join")
}

override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition
override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta

override val namedChildExprs: Map[String, Seq[BaseExprMeta[_]]] =
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, condition)
JoinTypeChecks.equiJoinMeta(leftKeys, rightKeys, conditionMeta)

override def tagPlanForGpu(): Unit = {
// Use conditions from Hash Join
GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys,
join.condition)
conditionMeta)

if (!conf.enableReplaceSortMergeJoin) {
willNotWorkOnGpu(s"Not replacing sort merge join with hash join, " +
Expand All @@ -74,20 +74,26 @@ class GpuSortMergeJoinMeta(
}

override def convertToGpu(): GpuExec = {
val condition = conditionMeta.map(_.convertToGpu())
val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) {
(condition, None)
} else {
(None, condition)
}
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
val joinExec = GpuShuffledHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
join.joinType,
buildSide,
None,
joinCondition,
left,
right,
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
// For inner joins we can apply a post-join condition for any conditions that cannot be
// evaluated directly in a mixed join that leverages a cudf AST expression
filterCondition.map(c => GpuFilterExec(c, joinExec)).getOrElse(joinExec)
}
}
Loading

0 comments on commit 1751759

Please sign in to comment.