Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman committed Nov 16, 2023
1 parent 69f05f1 commit 6907c38
Showing 1 changed file with 38 additions and 32 deletions.
70 changes: 38 additions & 32 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,18 +1029,12 @@ def do_join(spark):


@ignore_order(local=True)
@allow_non_gpu('ProjectExec', 'BroadcastHashJoinExec', 'BroadcastExchangeExec',
'FilterExec', 'ColumnarToRowExec')
@pytest.mark.parametrize("is_gpu_parquet", [True, False],
ids=["GpuParquetScan", "ParquetScan"])
@pytest.mark.parametrize("is_gpu_broadcast", [True, False],
ids=["GpuBroadcastExchange", "BroadcastExchange"])
def test_broadcast_hash_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_parquet,
is_gpu_broadcast):
@allow_non_gpu("ProjectExec", "FilterExec", "BroadcastHashJoinExec", "ColumnarToRowExec", "BroadcastExchangeExec")
@pytest.mark.parametrize("disable_build", [True, False])
def test_broadcast_hash_join_fix_fallback_by_inputfile(spark_tmp_path, disable_build):
data_path_parquet = spark_tmp_path + "/parquet"
data_path_orc = spark_tmp_path + "/orc"
# The smaller one (orc) will be the build side, when disabling parquet scan,
# the join exec node will be put on CPU by InputFileBlockRule.
# The smaller one (orc) will be the build side (a broadcast)
with_cpu_session(lambda spark: spark.range(100).write.orc(data_path_orc))
with_cpu_session(lambda spark: spark.range(10000).withColumn("id2", col("id") + 10)
.write.parquet(data_path_parquet))
Expand All @@ -1050,30 +1044,33 @@ def do_join(spark):
return left.join(broadcast(right), "id", "inner")\
.selectExpr("*", "input_file_block_length()")

join_class = 'GpuBroadcastHashJoinExec'\
if is_gpu_parquet and is_gpu_broadcast else 'BroadcastHashJoinExec'
assert_cpu_and_gpu_are_equal_collect_with_capture(
if disable_build:
# To reproduce the error
# '''
# java.lang.IllegalStateException: the broadcast must be on the GPU too
# at com.nvidia.spark.rapids.shims.GpuBroadcastJoinMeta.verifyBuildSideWasReplaced...
# '''
scan_name = 'OrcScan'
else:
# An additional case that the exec contains the input file expression is not disabled
# by InputFileBlockRule mistakenly. When the stream side scan runs on CPU, but the
# build side scan runs on GPU, the InputFileBlockRule will not put the exec on
# CPU, leading to wrong output.
scan_name = 'ParquetScan'
assert_gpu_and_cpu_are_equal_collect(
do_join,
exist_classes=join_class,
conf={"spark.sql.autoBroadcastJoinThreshold": "10M",
"spark.sql.sources.useV1SourceList": "",
"spark.rapids.sql.input.ParquetScan": is_gpu_parquet,
"spark.rapids.sql.exec.BroadcastExchangeExec": is_gpu_broadcast})
"spark.rapids.sql.input." + scan_name: False})


@ignore_order(local=True)
@allow_non_gpu('ProjectExec', 'BroadcastNestedLoopJoinExec', 'BroadcastExchangeExec',
'FilterExec', 'ColumnarToRowExec')
@pytest.mark.parametrize("is_gpu_parquet", [True, False],
ids=["GpuParquetScan", "ParquetScan"])
@pytest.mark.parametrize("is_gpu_broadcast", [True, False],
ids=["GpuBroadcastExchange", "BroadcastExchange"])
def test_broadcast_nested_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_parquet,
is_gpu_broadcast):
@allow_non_gpu("ProjectExec", "BroadcastNestedLoopJoinExec", "ColumnarToRowExec", "BroadcastExchangeExec")
@pytest.mark.parametrize("disable_build", [True, False])
def test_broadcast_nested_join_fix_fallback_by_inputfile(spark_tmp_path, disable_build):
data_path_parquet = spark_tmp_path + "/parquet"
data_path_orc = spark_tmp_path + "/orc"
# The smaller one (orc) will be the build side, when disabling parquet scan,
# the join exec node will be put on CPU by InputFileBlockRule.
# The smaller one (orc) will be the build side (a broadcast)
with_cpu_session(lambda spark: spark.range(50).write.orc(data_path_orc))
with_cpu_session(lambda spark: spark.range(500).withColumn("id2", col("id") + 10)
.write.parquet(data_path_parquet))
Expand All @@ -1082,12 +1079,21 @@ def do_join(spark):
right = spark.read.orc(data_path_orc)
return left.crossJoin(broadcast(right)).selectExpr("*", "input_file_block_length()")

join_class = 'GpuBroadcastNestedLoopJoinExec' \
if is_gpu_parquet and is_gpu_broadcast else 'BroadcastNestedLoopJoinExec'
assert_cpu_and_gpu_are_equal_collect_with_capture(
if disable_build:
# To reproduce the error
# '''
# java.lang.IllegalStateException: the broadcast must be on the GPU too
# at com.nvidia.spark.rapids.shims.GpuBroadcastJoinMeta.verifyBuildSideWasReplaced...
# '''
scan_name = 'OrcScan'
else:
# An additional case that the exec contains the input file expression is not disabled
# by InputFileBlockRule mistakenly. When the stream side scan runs on CPU, but the
# build side scan runs on GPU, the InputFileBlockRule will not put the exec on
# CPU, leading to wrong output.
scan_name = 'ParquetScan'
assert_gpu_and_cpu_are_equal_collect(
do_join,
exist_classes=join_class,
conf={"spark.sql.autoBroadcastJoinThreshold": "-1",
"spark.sql.sources.useV1SourceList": "",
"spark.rapids.sql.input.ParquetScan": is_gpu_parquet,
"spark.rapids.sql.exec.BroadcastExchangeExec": is_gpu_broadcast})
"spark.rapids.sql.input." + scan_name: False})

0 comments on commit 6907c38

Please sign in to comment.