Skip to content

Commit

Permalink
Enable Delta Write fallback tests on Databricks 12.2 [databricks] (#8628
Browse files Browse the repository at this point in the history
)

* Enable Delta Write fallback tests on Databricks 12.2

* revert metrics change

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* disable test_delta_delete_rows due to numBytesAdded issue

* enable more tests

* revert accidental change

* disable some tests that are not fallback tests and fail due to deletion vector metrics missing

---------

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Jun 30, 2023
1 parent 3f7071d commit 68365a2
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 29 deletions.
10 changes: 6 additions & 4 deletions integration_tests/src/main/python/delta_lake_merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
{"spark.rapids.sql.command.MergeIntoCommand": "true",
"spark.rapids.sql.command.MergeIntoCommandEdge": "true"})

delta_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"
delta_write_fallback_check = "DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"

def read_delta_path(spark, path):
return spark.read.format("delta").load(path)

Expand Down Expand Up @@ -101,7 +104,7 @@ def checker(data_path, do_merge):
delta_sql_merge_test(spark_tmp_path, spark_tmp_table_factory, use_cdf,
src_table_func, dest_table_func, merge_sql, checker, partition_columns)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu(delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("disable_conf",
Expand All @@ -112,11 +115,10 @@ def checker(data_path, do_merge):
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_merge_disabled_fallback(spark_tmp_path, spark_tmp_table_factory, disable_conf):
def checker(data_path, do_merge):
assert_gpu_fallback_write(do_merge, read_delta_path, data_path,
"ExecutedCommandExec", conf=disable_conf)
delta_write_fallback_check, conf=disable_conf)
merge_sql = "MERGE INTO {dest_table} USING {src_table} ON {dest_table}.a == {src_table}.a" \
" WHEN NOT MATCHED THEN INSERT *"
delta_sql_merge_test(spark_tmp_path, spark_tmp_table_factory,
Expand All @@ -129,7 +131,7 @@ def checker(data_path, do_merge):
@allow_non_gpu("ExecutedCommandExec,BroadcastHashJoinExec,ColumnarToRowExec,BroadcastExchangeExec,DataWritingCommandExec", *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(not is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
@pytest.mark.skipif(not is_databricks122_or_later(), reason="NOT MATCHED BY SOURCE only since DBR 12.2")
def test_delta_merge_not_matched_by_source_fallback(spark_tmp_path, spark_tmp_table_factory):
def checker(data_path, do_merge):
assert_gpu_fallback_write(do_merge, read_delta_path, data_path, "ExecutedCommandExec")
Expand Down
8 changes: 5 additions & 3 deletions integration_tests/src/main/python/delta_lake_update_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
{"spark.rapids.sql.command.UpdateCommand": "true",
"spark.rapids.sql.command.UpdateCommandEdge": "true"})

delta_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"
delta_write_fallback_check = "DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"

def delta_sql_update_test(spark_tmp_path, use_cdf, dest_table_func, update_sql,
check_func, partition_columns=None):
data_path = spark_tmp_path + "/DELTA_DATA"
Expand Down Expand Up @@ -60,7 +63,7 @@ def checker(data_path, do_update):
delta_sql_update_test(spark_tmp_path, use_cdf, dest_table_func, update_sql, checker,
partition_columns)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu(delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("disable_conf",
Expand All @@ -70,7 +73,6 @@ def checker(data_path, do_update):
delta_writes_enabled_conf # Test disabled by default
], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_update_disabled_fallback(spark_tmp_path, disable_conf):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
Expand All @@ -82,7 +84,7 @@ def write_func(spark, path):
spark.sql(update_sql)
with_cpu_session(setup_tables)
assert_gpu_fallback_write(write_func, read_delta_path, data_path,
"ExecutedCommandExec", disable_conf)
delta_write_fallback_check, disable_conf)

@allow_non_gpu(*delta_meta_allow)
@delta_lake
Expand Down
43 changes: 21 additions & 22 deletions integration_tests/src/main/python/delta_lake_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@

delta_writes_enabled_conf = {"spark.rapids.sql.format.delta.write.enabled": "true"}

delta_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"
delta_write_fallback_check = "DataWritingCommandExec" if is_databricks122_or_later() else "ExecutedCommandExec"

delta_optimized_write_fallback_allow = "ExecutedCommandExec,DataWritingCommandExec,DeltaOptimizedWriterExec" if is_databricks122_or_later() else "ExecutedCommandExec"

def fixup_path(d):
"""Modify the 'path' value to remove random IDs in the pathname"""
parts = d["path"].split("-")
Expand Down Expand Up @@ -146,22 +151,21 @@ def get_last_operation_metrics(path):
.selectExpr("operationMetrics")\
.head()[0])

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu(delta_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("disable_conf",
[{"spark.rapids.sql.format.delta.write.enabled": "false"},
{"spark.rapids.sql.format.parquet.enabled": "false"},
{"spark.rapids.sql.format.parquet.write.enabled": "false"}], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_disabled_fallback(spark_tmp_path, disable_conf):
data_path = spark_tmp_path + "/DELTA_DATA"
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=copy_and_update(writer_confs, disable_conf))

@allow_non_gpu(*delta_meta_allow)
Expand Down Expand Up @@ -321,12 +325,11 @@ def setup_tables(spark):
conf=confs)
with_cpu_session(lambda spark: assert_gpu_and_cpu_delta_logs_equivalent(spark, data_path))

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("ts_write", ["INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_legacy_timestamp_fallback(spark_tmp_path, ts_write):
gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
data_path = spark_tmp_path + "/DELTA_DATA"
Expand All @@ -339,17 +342,16 @@ def test_delta_write_legacy_timestamp_fallback(spark_tmp_path, ts_write):
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=all_confs)

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"},
{"parquet.encryption.column.keys": "k2:a"},
{"parquet.encryption.footer.key": "k1", "parquet.encryption.column.keys": "k2:a"}])
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_encryption_option_fallback(spark_tmp_path, write_options):
def write_func(spark, path):
writer = unary_op_df(spark, int_gen).coalesce(1).write.format("delta")
Expand All @@ -361,10 +363,10 @@ def write_func(spark, path):
write_func,
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=delta_writes_enabled_conf)

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"},
Expand All @@ -378,10 +380,10 @@ def test_delta_write_encryption_runtimeconfig_fallback(spark_tmp_path, write_opt
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=copy_and_update(write_options, delta_writes_enabled_conf))

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("write_options", [{"parquet.encryption.footer.key": "k1"},
Expand All @@ -403,40 +405,38 @@ def reset_hadoop_confs(spark):
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=delta_writes_enabled_conf)
finally:
with_cpu_session(reset_hadoop_confs)

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize('codec', ['gzip'])
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_compression_fallback(spark_tmp_path, codec):
data_path = spark_tmp_path + "/DELTA_DATA"
confs=copy_and_update(delta_writes_enabled_conf, {"spark.sql.parquet.compression.codec": codec})
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=confs)

@allow_non_gpu(*delta_meta_allow, "ExecutedCommandExec")
@allow_non_gpu(*delta_meta_allow, delta_write_fallback_allow)
@delta_lake
@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
def test_delta_write_legacy_format_fallback(spark_tmp_path):
data_path = spark_tmp_path + "/DELTA_DATA"
confs=copy_and_update(delta_writes_enabled_conf, {"spark.sql.parquet.writeLegacyFormat": "true"})
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, int_gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=confs)

@allow_non_gpu(*delta_meta_allow)
Expand Down Expand Up @@ -801,12 +801,11 @@ def test_delta_write_optimized_supported_types_partitioned(spark_tmp_path):
data_path,
conf=confs)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu(delta_optimized_write_fallback_allow, *delta_meta_allow)
@delta_lake
@ignore_order(local=True)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.skipif(not is_databricks_runtime(), reason="Delta Lake optimized writes are only supported on Databricks")
@pytest.mark.skipif(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8423")
@pytest.mark.parametrize("gen", [
simple_string_to_string_map_gen,
StructGen([("x", ArrayGen(int_gen))]),
Expand All @@ -821,7 +820,7 @@ def test_delta_write_optimized_unsupported_sort_fallback(spark_tmp_path, gen):
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("delta").save(path),
lambda spark, path: spark.read.format("delta").load(path),
data_path,
"ExecutedCommandExec",
delta_write_fallback_check,
conf=confs)

@allow_non_gpu(*delta_meta_allow)
Expand Down

0 comments on commit 68365a2

Please sign in to comment.