Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Delta Write fallback tests on Databricks 12.2 [databricks] #8628

Merged
merged 6 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions integration_tests/src/main/python/delta_lake_merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
{"spark.rapids.sql.command.MergeIntoCommand": "true",
"spark.rapids.sql.command.MergeIntoCommandEdge": "true"})

delta_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"
delta_write_fallback_check = "DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"

def read_delta_path(spark, path):
return spark.read.format("delta").load(path)

Expand Down Expand Up @@ -101,7 +104,7 @@ def checker(data_path, do_merge):
delta_sql_merge_test(spark_tmp_path, spark_tmp_table_factory, use_cdf,
src_table_func, dest_table_func, merge_sql, checker, partition_columns)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu(delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("disable_conf",
Expand All @@ -112,11 +115,10 @@ def checker(data_path, do_merge):
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_merge_disabled_fallback(spark_tmp_path, spark_tmp_table_factory, disable_conf):
def checker(data_path, do_merge):
assert_gpu_fallback_write(do_merge, read_delta_path, data_path,
"ExecutedCommandExec", conf=disable_conf)
delta_write_fallback_check, conf=disable_conf)
merge_sql = "MERGE INTO {dest_table} USING {src_table} ON {dest_table}.a == {src_table}.a" \
" WHEN NOT MATCHED THEN INSERT *"
delta_sql_merge_test(spark_tmp_path, spark_tmp_table_factory,
Expand All @@ -129,7 +131,7 @@ def checker(data_path, do_merge):
@allow_non_gpu("ExecutedCommandExec,BroadcastHashJoinExec,ColumnarToRowExec,BroadcastExchangeExec,DataWritingCommandExec", *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(not is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
@pytest.mark.skipif(not is_databricks122_or_later(), reason="NOT MATCHED BY SOURCE only since DBR 12.2")
def test_delta_merge_not_matched_by_source_fallback(spark_tmp_path, spark_tmp_table_factory):
def checker(data_path, do_merge):
assert_gpu_fallback_write(do_merge, read_delta_path, data_path, "ExecutedCommandExec")
Expand Down
8 changes: 5 additions & 3 deletions integration_tests/src/main/python/delta_lake_update_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
{"spark.rapids.sql.command.UpdateCommand": "true",
"spark.rapids.sql.command.UpdateCommandEdge": "true"})

delta_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"
delta_write_fallback_check = "DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"

def delta_sql_update_test(spark_tmp_path, use_cdf, dest_table_func, update_sql,
check_func, partition_columns=None):
data_path = spark_tmp_path + "/DELTA_DATA"
Expand Down Expand Up @@ -60,7 +63,7 @@ def checker(data_path, do_update):
delta_sql_update_test(spark_tmp_path, use_cdf, dest_table_func, update_sql, checker,
partition_columns)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu(delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("disable_conf",
Expand All @@ -70,7 +73,6 @@ def checker(data_path, do_update):
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_update_disabled_fallback(spark_tmp_path, disable_conf):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
Expand All @@ -82,7 +84,7 @@ def write_func(spark, path):
spark.sql(update_sql)
with_cpu_session(setup_tables)
assert_gpu_fallback_write(write_func, read_delta_path, data_path,
"ExecutedCommandExec", disable_conf)
delta_write_fallback_check, disable_conf)

@allow_non_gpu(*delta_meta_allow)
@delta_lake
Expand Down
43 changes: 21 additions & 22 deletions integration_tests/src/main/python/delta_lake_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@

delta_writes_enabled_conf = {"spark.rapids.sql.format.delta.write.enabled": "true"}

delta_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"
delta_write_fallback_check = "DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"

delta_optimized_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec,DeltaOptimizedWriterExec" if is_databricks122_or_later() else "ExecutedCommandExec"

def fixup_path(d):
"""Modify the 'path' value to remove random IDs in the pathname"""
parts = d["path"].split("-")
Expand Down Expand Up @@ -146,22 +151,21 @@ def get_last_operation_metrics(path):
.selectExpr("operationMetrics")\
.head()[0])

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu(delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("disable_conf",
[{"spark.rapids.sql.format.delta.write.enabled": "false"},
{"spark.rapids.sql.format.parquet.enabled": "false"},
{"spark.rapids.sql.format.parquet.write.enabled": "false"}], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_disabled_fallback(spark_tmp_path, disable_conf):
data_path = spark_tmp_path + "/DELTA_DATA"
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=copy_and_update(writer_confs, disable_conf))

@allow_non_gpu(*delta_meta_allow)
Expand Down Expand Up @@ -321,12 +325,11 @@ def setup_tables(spark):
conf=confs)
with_cpu_session(lambda spark: assert_gpu_and_cpu_delta_logs_equivalent(spark, data_path))

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("ts_write", ["INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_legacy_timestamp_fallback(spark_tmp_path, ts_write):
gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
data_path = spark_tmp_path + "/DELTA_DATA"
Expand All @@ -339,17 +342,16 @@ def test_delta_write_legacy_timestamp_fallback(spark_tmp_path, ts_write):
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=all_confs)

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"},
{"parquet.encryption.column.keys": "k2:a"},
{"parquet.encryption.footer.key": "k1", "parquet.encryption.column.keys": "k2:a"}])
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_encryption_option_fallback(spark_tmp_path, write_options):
def write_func(spark, path):
writer = unary_op_df(spark, int_gen).coalesce(1).write.format("delta")
Expand All @@ -361,10 +363,10 @@ def write_func(spark, path):
write_func,
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=delta_writes_enabled_conf)

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"},
Expand All @@ -378,10 +380,10 @@ def test_delta_write_encryption_runtimeconfig_fallback(spark_tmp_path, write_opt
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=copy_and_update(write_options, delta_writes_enabled_conf))

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"},
Expand All @@ -403,40 +405,38 @@ def reset_hadoop_confs(spark):
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=delta_writes_enabled_conf)
finally:
with_cpu_session(reset_hadoop_confs)

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize('codec', ['gzip'])
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_compression_fallback(spark_tmp_path, codec):
data_path = spark_tmp_path + "/DELTA_DATA"
confs=copy_and_update(delta_writes_enabled_conf, {"spark.sql.parquet.compression.codec": codec})
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=confs)

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_legacy_format_fallback(spark_tmp_path):
data_path = spark_tmp_path + "/DELTA_DATA"
confs=copy_and_update(delta_writes_enabled_conf, {"spark.sql.parquet.writeLegacyFormat": "true"})
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=confs)

@allow_non_gpu(*delta_meta_allow)
Expand Down Expand Up @@ -801,12 +801,11 @@ def test_delta_write_optimized_supported_types_partitioned(spark_tmp_path):
data_path,
conf=confs)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu(delta_optimized_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order(local=True)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake optimized writes are only supported on Databricks")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
@pytest.mark.parametrize("gen", [
simple_string_to_string_map_gen,
StructGen([("x", ArrayGen(int_gen))]),
Expand All @@ -821,7 +820,7 @@ def test_delta_write_optimized_unsupported_sort_fallback(spark_tmp_path, gen):
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=confs)

@allow_non_gpu(*delta_meta_allow)
Expand Down