Skip to content

Commit

Permalink
Join support for DecimalType (NVIDIA#1475)
Browse files Browse the repository at this point in the history
* DecimalType support for joins

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* update copyrights

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Jan 8, 2021
1 parent d347102 commit c1110c8
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 31 deletions.
35 changes: 23 additions & 12 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,9 @@
all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
BooleanGen(), DateGen(), TimestampGen(), null_gen,
pytest.param(FloatGen(), marks=[incompat]),
pytest.param(DoubleGen(), marks=[incompat])]
pytest.param(DoubleGen(), marks=[incompat]),
decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision,
decimal_gen_neg_scale, decimal_gen_64bit]

all_gen_no_nulls = [StringGen(nullable=False), ByteGen(nullable=False),
ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False),
Expand All @@ -35,9 +37,18 @@

_sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1',
'spark.sql.join.preferSortMergeJoin': 'True',
'spark.sql.shuffle.partitions': '2'
'spark.sql.shuffle.partitions': '2',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

_cartesean_join_conf = {'spark.rapids.sql.exec.CartesianProductExec': 'true',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

_broadcastnestedloop_join_conf = {'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

def create_df(spark, data_gen, left_length, right_length):
left = binary_op_df(spark, data_gen, length=left_length)
right = binary_op_df(spark, data_gen, length=right_length).withColumnRenamed("a", "r_a")\
Expand Down Expand Up @@ -67,7 +78,7 @@ def test_broadcast_join_right_table(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -77,7 +88,7 @@ def test_cartesean_join(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
return left.crossJoin(right)
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.CartesianProductExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_cartesean_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -89,7 +100,7 @@ def test_cartesean_join_special_case(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.crossJoin(right).selectExpr('COUNT(*)')
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.CartesianProductExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_cartesean_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -99,7 +110,7 @@ def test_broadcast_nested_loop_join(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
return left.crossJoin(broadcast(right))
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_broadcastnestedloop_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -109,7 +120,7 @@ def test_broadcast_nested_loop_join_special_case(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
return left.crossJoin(broadcast(right)).selectExpr('COUNT(*)')
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_broadcastnestedloop_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -125,7 +136,7 @@ def do_join(spark):
# that do not expose the error
return left.join(broadcast(right),
(left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_broadcastnestedloop_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -138,7 +149,7 @@ def test_broadcast_join_left_table(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 250, 500)
return broadcast(left).join(right, left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -150,7 +161,7 @@ def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)


_mixed_df1_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)),
Expand All @@ -170,7 +181,7 @@ def do_join(spark):
right = gen_df(spark, _mixed_df2_with_nulls, length=500).withColumnRenamed("a", "r_a")\
.withColumnRenamed("b", "r_b").withColumnRenamed("c", "r_c")
return left.join(broadcast(right), left.a.eqNullSafe(right.r_a), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

@ignore_order
@allow_non_gpu('DataWritingCommandExec')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -179,15 +179,15 @@ class Spark300Shims extends SparkShims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,15 +42,15 @@ class Spark301Shims extends Spark300Shims {
super.getExecs ++ Seq(
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -139,15 +139,15 @@ class Spark301dbShims extends Spark301Shims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -249,15 +249,15 @@ class Spark310Shims extends Spark301Shims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2332,16 +2332,16 @@ object GpuOverrides {
}),
exec[BroadcastExchangeExec](
"The backend for broadcast exchange of data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(exchange, conf, p, r) => new GpuBroadcastMeta(exchange, conf, p, r)),
exec[BroadcastNestedLoopJoinExec](
"Implementation of join using brute force",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastNestedLoopJoinMeta(join, conf, p, r))
.disabledByDefault("large joins can cause out of memory errors"),
exec[CartesianProductExec](
"Implementation of join using brute force",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new SparkPlanMeta[CartesianProductExec](join, conf, p, r) {
val condition: Option[BaseExprMeta[_]] =
join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
Expand Down

0 comments on commit c1110c8

Please sign in to comment.