Skip to content

Commit

Permalink
Add support for deletion vector writes for Delta Lake 2.4
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove committed Jul 8, 2023
1 parent 34f09ae commit 1221464
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import com.nvidia.spark.rapids.{DataFromReplacementRule, RapidsConf, RapidsMeta,
import com.nvidia.spark.rapids.delta.RapidsDeltaUtils

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.commands.{DeleteCommand, DeletionVectorUtils}
import org.apache.spark.sql.delta.commands.DeleteCommand
import org.apache.spark.sql.delta.rapids.GpuDeltaLog
import org.apache.spark.sql.delta.rapids.delta24x.GpuDeleteCommand
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.command.RunnableCommand

class DeleteCommandMeta(
Expand All @@ -38,13 +37,6 @@ class DeleteCommandMeta(
willNotWorkOnGpu("Delta Lake output acceleration has been disabled. To enable set " +
s"${RapidsConf.ENABLE_DELTA_WRITE} to true")
}
val dvFeatureEnabled = DeletionVectorUtils.deletionVectorsWritable(
deleteCmd.deltaLog.unsafeVolatileSnapshot)
if (dvFeatureEnabled && deleteCmd.conf.getConf(
DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS)) {
// https://github.com/NVIDIA/spark-rapids/issues/8554
willNotWorkOnGpu("Deletion vectors are not supported on GPU")
}
RapidsDeltaUtils.tagForDeltaWrite(this, deleteCmd.target.schema, deleteCmd.deltaLog,
Map.empty, SparkSession.active)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaOperations, DeltaTableUtils, DeltaUDF, OptimisticTransaction}
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, FileAction}
import org.apache.spark.sql.delta.commands.{DeleteCommandMetrics, DeleteMetric, DeletionVectorUtils, DeltaCommand}
import org.apache.spark.sql.delta.commands.{DeleteCommandMetrics, DeleteMetric, DeleteWithDeletionVectorsHelper, DeletionVectorUtils, DeltaCommand}
import org.apache.spark.sql.delta.commands.DeleteCommand.{rewritingFilesMsg, FINDING_TOUCHED_FILES_MSG}
import org.apache.spark.sql.delta.commands.MergeIntoCommand.totalBytesAndDistinctPartitionValues
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
Expand Down Expand Up @@ -201,11 +201,35 @@ case class GpuDeleteCommand(
sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)

if (shouldWriteDVs) {
// this should be unreachable because we fall back to CPU
// if deletion vectors are enabled. The tracking issue for adding deletion vector
// support is https://github.com/NVIDIA/spark-rapids/issues/8554
throw new IllegalStateException("Deletion vectors are not supported on GPU")

val targetDf = DeleteWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
sparkSession,
target,
fileIndex)

// Does the target table already has DVs enabled? If so, we need to read the table
// with deletion vectors.
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)

val touchedFiles = DeleteWithDeletionVectorsHelper.findTouchedFiles(
sparkSession,
txn,
mustReadDeletionVectors,
deltaLog,
targetDf,
fileIndex,
cond)

if (touchedFiles.nonEmpty) {
val (actions, metricMap) = DeleteWithDeletionVectorsHelper.processUnmodifiedData(
sparkSession,
touchedFiles,
txn.snapshot)
metrics("numDeletedRows").set(metricMap("numDeletedRows"))
numRemovedFiles = metricMap("numRemovedFiles")
actions
} else {
Nil // Nothing to update
}
} else {
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
Expand Down
49 changes: 26 additions & 23 deletions integration_tests/src/main/python/delta_lake_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
"spark.rapids.sql.command.DeleteCommandEdge": "true"})

def delta_sql_delete_test(spark_tmp_path, use_cdf, dest_table_func, delete_sql,
check_func, partition_columns=None):
check_func, partition_columns=None, enable_deletion_vectors=False):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
setup_dest_tables(spark, data_path, dest_table_func, use_cdf, partition_columns)
setup_dest_tables(spark, data_path, dest_table_func, use_cdf, partition_columns, enable_deletion_vectors)
def do_delete(spark, path):
return spark.sql(delete_sql.format(path=path))
with_cpu_session(setup_tables)
Expand All @@ -38,7 +38,8 @@ def do_delete(spark, path):
def assert_delta_sql_delete_collect(spark_tmp_path, use_cdf, dest_table_func, delete_sql,
partition_columns=None,
conf=delta_delete_enabled_conf,
skip_sql_result_check=False):
skip_sql_result_check=False,
enable_deletion_vectors=False):
def read_data(spark, path):
read_func = read_delta_path_with_cdf if use_cdf else read_delta_path
df = read_func(spark, path)
Expand All @@ -57,7 +58,7 @@ def checker(data_path, do_delete):
assert_equal(cpu_result, gpu_result)
with_cpu_session(lambda spark: assert_gpu_and_cpu_delta_logs_equivalent(spark, data_path))
delta_sql_delete_test(spark_tmp_path, use_cdf, dest_table_func, delete_sql, checker,
partition_columns)
partition_columns, enable_deletion_vectors)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@delta_lake
Expand All @@ -83,29 +84,31 @@ def write_func(spark, path):
assert_gpu_fallback_write(write_func, read_delta_path, data_path,
"ExecutedCommandExec", disable_conf)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu("BroadcastHashJoinExec,BroadcastExchangeExec", *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.skipif(is_before_spark_340(), reason="Deletion vectors new in Delta Lake 2.4 / Apache Spark 3.4")
@pytest.mark.parametrize("enable_deletion_vectors", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8654")
def test_delta_deletion_vector_fallback(spark_tmp_path, use_cdf):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
setup_dest_tables(spark, data_path,
dest_table_func=lambda spark: unary_op_df(spark, int_gen),
use_cdf=use_cdf, enable_deletion_vectors=True)
def write_func(spark, path):
delete_sql="DELETE FROM delta.`{}`".format(path)
spark.sql(delete_sql)
with_cpu_session(setup_tables)
disable_conf = copy_and_update(delta_writes_enabled_conf,
{"spark.rapids.sql.command.DeleteCommand": "true",
"spark.rapids.sql.command.DeleteCommandEdge": "true",
"spark.databricks.delta.delete.deletionVectors.persistent": "true"})

assert_gpu_fallback_write(write_func, read_delta_path, data_path,
"ExecutedCommandExec", disable_conf)
def test_delta_deletion_vector(spark_tmp_path, use_cdf, enable_deletion_vectors, partition_columns):
# Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice
num_slices_to_test = 1 if is_databricks_runtime() else 10
def generate_dest_data(spark):
return three_col_df(spark,
SetValuesGen(IntegerType(), range(5)),
SetValuesGen(StringType(), "abcdefg"),
string_gen, num_slices=num_slices_to_test) \
.coalesce(1)\
.sort("a", ascending=True)
delete_sql = "DELETE FROM delta.`{path}` WHERE b < 'd'"
dv_conf = copy_and_update(delta_delete_enabled_conf,
{"spark.databricks.delta.delete.deletionVectors.persistent": enable_deletion_vectors})
assert_delta_sql_delete_collect(spark_tmp_path, use_cdf, generate_dest_data,
delete_sql, partition_columns,
enable_deletion_vectors=enable_deletion_vectors,
conf=dv_conf)

@allow_non_gpu(*delta_meta_allow)
@delta_lake
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/src/main/python/delta_lake_merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def setup_dest_tables(spark, data_path, dest_table_func, use_cdf, partition_colu
path = "{}/{}".format(data_path, name)
dest_df = dest_table_func(spark)
writer = dest_df.write.format("delta")
ddl = schema_to_ddl(spark, dest_df.schema)
sql_text = "CREATE TABLE delta.`{path}` ({ddl}) USING DELTA".format(path=path, ddl=ddl)
if partition_columns:
sql_text += " PARTITIONED BY ({})".format(",".join(partition_columns))
if use_cdf:
ddl = schema_to_ddl(spark, dest_df.schema)
sql_text = "CREATE TABLE delta.`{path}` ({ddl}) USING DELTA".format(path=path, ddl=ddl)
if partition_columns:
sql_text += " PARTITIONED BY ({})".format(",".join(partition_columns))
sql_text += " TBLPROPERTIES (delta.enableChangeDataFeed = true)"
spark.sql(sql_text)
writer = writer.mode("append")
Expand Down
16 changes: 16 additions & 0 deletions integration_tests/src/main/python/delta_lake_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def fixup_operation_metrics(opm):
for k in metrics_to_remove:
opm.pop(k, None)

def fixup_deletion_vector(dv):
# we remove offset and pathOrInlineDv because they are non-deterministic, but
# we compare other fields such as sizeInBytes, cardinality, and storageType
dv.pop("offset", None)
dv.pop("pathOrInlineDv", None)

TMP_TABLE_PATTERN=re.compile(r"tmp_table_\w+")
TMP_TABLE_PATH_PATTERN=re.compile(r"delta.`[^`]*`")
REF_ID_PATTERN=re.compile(r"#[0-9]+")
Expand Down Expand Up @@ -96,6 +102,11 @@ def assert_delta_log_json_equivalent(filename, c_json, g_json):
del_keys(("modificationTime", "size"), c_val, g_val)
fixup_path(c_val)
fixup_path(g_val)
c_dv = c_val.get('deletionVector', {})
g_dv = g_val.get('deletionVector', {})
assert c_dv.keys() == g_dv.keys(), "Delta log {} 'add/deletionVector' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_dv, g_dv)
fixup_deletion_vector(c_dv)
fixup_deletion_vector(g_dv)
elif key == "cdc":
assert c_val.keys() == g_val.keys(), "Delta log {} 'cdc' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_val, g_val)
del_keys(("size",), c_val, g_val)
Expand All @@ -112,6 +123,11 @@ def assert_delta_log_json_equivalent(filename, c_json, g_json):
del_keys(("deletionTimestamp", "size"), c_val, g_val)
fixup_path(c_val)
fixup_path(g_val)
c_dv = c_val.get('deletionVector', {})
g_dv = g_val.get('deletionVector', {})
assert c_dv.keys() == g_dv.keys(), "Delta log {} 'remove/deletionVector' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_dv, g_dv)
fixup_deletion_vector(c_dv)
fixup_deletion_vector(g_dv)
assert c_val == g_val, "Delta log {} is different at key '{}':\nCPU: {}\nGPU: {}".format(filename, key, c_val, g_val)

def decode_jsons(json_data):
Expand Down

0 comments on commit 1221464

Please sign in to comment.