Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for writing deletion vectors in Delta Lake 2.4 #8674

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import com.nvidia.spark.rapids.{DataFromReplacementRule, RapidsConf, RapidsMeta,
import com.nvidia.spark.rapids.delta.RapidsDeltaUtils

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.commands.{DeleteCommand, DeletionVectorUtils}
import org.apache.spark.sql.delta.commands.DeleteCommand
import org.apache.spark.sql.delta.rapids.GpuDeltaLog
import org.apache.spark.sql.delta.rapids.delta24x.GpuDeleteCommand
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.command.RunnableCommand

class DeleteCommandMeta(
Expand All @@ -38,13 +37,6 @@ class DeleteCommandMeta(
willNotWorkOnGpu("Delta Lake output acceleration has been disabled. To enable set " +
s"${RapidsConf.ENABLE_DELTA_WRITE} to true")
}
val dvFeatureEnabled = DeletionVectorUtils.deletionVectorsWritable(
deleteCmd.deltaLog.unsafeVolatileSnapshot)
if (dvFeatureEnabled && deleteCmd.conf.getConf(
DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS)) {
// https://github.com/NVIDIA/spark-rapids/issues/8554
willNotWorkOnGpu("Deletion vectors are not supported on GPU")
}
RapidsDeltaUtils.tagForDeltaWrite(this, deleteCmd.target.schema, deleteCmd.deltaLog,
Map.empty, SparkSession.active)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaOperations, DeltaTableUtils, DeltaUDF, OptimisticTransaction}
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, FileAction}
import org.apache.spark.sql.delta.commands.{DeleteCommandMetrics, DeleteMetric, DeletionVectorUtils, DeltaCommand}
import org.apache.spark.sql.delta.commands.{DeleteCommandMetrics, DeleteMetric, DeleteWithDeletionVectorsHelper, DeletionVectorUtils, DeltaCommand}
import org.apache.spark.sql.delta.commands.DeleteCommand.{rewritingFilesMsg, FINDING_TOUCHED_FILES_MSG}
import org.apache.spark.sql.delta.commands.MergeIntoCommand.totalBytesAndDistinctPartitionValues
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
Expand Down Expand Up @@ -201,11 +201,35 @@ case class GpuDeleteCommand(
sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)

if (shouldWriteDVs) {
// this should be unreachable because we fall back to CPU
// if deletion vectors are enabled. The tracking issue for adding deletion vector
// support is https://github.com/NVIDIA/spark-rapids/issues/8554
throw new IllegalStateException("Deletion vectors are not supported on GPU")

val targetDf = DeleteWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
sparkSession,
target,
fileIndex)

// Does the target table already has DVs enabled? If so, we need to read the table
// with deletion vectors.
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)

val touchedFiles = DeleteWithDeletionVectorsHelper.findTouchedFiles(
sparkSession,
txn,
mustReadDeletionVectors,
deltaLog,
targetDf,
fileIndex,
cond)

if (touchedFiles.nonEmpty) {
val (actions, metricMap) = DeleteWithDeletionVectorsHelper.processUnmodifiedData(
sparkSession,
touchedFiles,
txn.snapshot)
metrics("numDeletedRows").set(metricMap("numDeletedRows"))
numRemovedFiles = metricMap("numRemovedFiles")
actions
} else {
Nil // Nothing to update
}
} else {
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
Expand Down
49 changes: 26 additions & 23 deletions integration_tests/src/main/python/delta_lake_delete_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
"spark.rapids.sql.command.DeleteCommandEdge": "true"})

def delta_sql_delete_test(spark_tmp_path, use_cdf, dest_table_func, delete_sql,
check_func, partition_columns=None):
check_func, partition_columns=None, enable_deletion_vectors=False):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
setup_dest_tables(spark, data_path, dest_table_func, use_cdf, partition_columns)
setup_dest_tables(spark, data_path, dest_table_func, use_cdf, partition_columns, enable_deletion_vectors)
def do_delete(spark, path):
return spark.sql(delete_sql.format(path=path))
with_cpu_session(setup_tables)
Expand All @@ -38,7 +38,8 @@ def do_delete(spark, path):
def assert_delta_sql_delete_collect(spark_tmp_path, use_cdf, dest_table_func, delete_sql,
partition_columns=None,
conf=delta_delete_enabled_conf,
skip_sql_result_check=False):
skip_sql_result_check=False,
enable_deletion_vectors=False):
def read_data(spark, path):
read_func = read_delta_path_with_cdf if use_cdf else read_delta_path
df = read_func(spark, path)
Expand All @@ -57,7 +58,7 @@ def checker(data_path, do_delete):
assert_equal(cpu_result, gpu_result)
with_cpu_session(lambda spark: assert_gpu_and_cpu_delta_logs_equivalent(spark, data_path))
delta_sql_delete_test(spark_tmp_path, use_cdf, dest_table_func, delete_sql, checker,
partition_columns)
partition_columns, enable_deletion_vectors)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@delta_lake
Expand All @@ -83,29 +84,31 @@ def write_func(spark, path):
assert_gpu_fallback_write(write_func, read_delta_path, data_path,
"ExecutedCommandExec", disable_conf)

@allow_non_gpu("ExecutedCommandExec", *delta_meta_allow)
@allow_non_gpu("BroadcastHashJoinExec,BroadcastExchangeExec", *delta_meta_allow)
@delta_lake
@ignore_order
@pytest.mark.parametrize("use_cdf", [True, False], ids=idfn)
@pytest.mark.skipif(is_before_spark_340(), reason="Deletion vectors new in Delta Lake 2.4 / Apache Spark 3.4")
@pytest.mark.parametrize("enable_deletion_vectors", [True, False], ids=idfn)
@pytest.mark.parametrize("partition_columns", [None, ["a"]], ids=idfn)
@pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
@pytest.mark.xfail(is_databricks122_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8654")
def test_delta_deletion_vector_fallback(spark_tmp_path, use_cdf):
data_path = spark_tmp_path + "/DELTA_DATA"
def setup_tables(spark):
setup_dest_tables(spark, data_path,
dest_table_func=lambda spark: unary_op_df(spark, int_gen),
use_cdf=use_cdf, enable_deletion_vectors=True)
def write_func(spark, path):
delete_sql="DELETE FROM delta.`{}`".format(path)
spark.sql(delete_sql)
with_cpu_session(setup_tables)
disable_conf = copy_and_update(delta_writes_enabled_conf,
{"spark.rapids.sql.command.DeleteCommand": "true",
"spark.rapids.sql.command.DeleteCommandEdge": "true",
"spark.databricks.delta.delete.deletionVectors.persistent": "true"})

assert_gpu_fallback_write(write_func, read_delta_path, data_path,
"ExecutedCommandExec", disable_conf)
def test_delta_deletion_vector(spark_tmp_path, use_cdf, enable_deletion_vectors, partition_columns):
# Databricks changes the number of files being written, so we cannot compare logs unless there's only one slice
num_slices_to_test = 1 if is_databricks_runtime() else 10
def generate_dest_data(spark):
return three_col_df(spark,
SetValuesGen(IntegerType(), range(5)),
SetValuesGen(StringType(), "abcdefg"),
string_gen, num_slices=num_slices_to_test) \
.coalesce(1)\
.sort("a", ascending=True)
andygrove marked this conversation as resolved.
Show resolved Hide resolved
delete_sql = "DELETE FROM delta.`{path}` WHERE b < 'd'"
dv_conf = copy_and_update(delta_delete_enabled_conf,
{"spark.databricks.delta.delete.deletionVectors.persistent": enable_deletion_vectors})
assert_delta_sql_delete_collect(spark_tmp_path, use_cdf, generate_dest_data,
delete_sql, partition_columns,
enable_deletion_vectors=enable_deletion_vectors,
conf=dv_conf)

@allow_non_gpu(*delta_meta_allow)
@delta_lake
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/src/main/python/delta_lake_merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def setup_dest_tables(spark, data_path, dest_table_func, use_cdf, partition_colu
path = "{}/{}".format(data_path, name)
dest_df = dest_table_func(spark)
writer = dest_df.write.format("delta")
ddl = schema_to_ddl(spark, dest_df.schema)
sql_text = "CREATE TABLE delta.`{path}` ({ddl}) USING DELTA".format(path=path, ddl=ddl)
if partition_columns:
sql_text += " PARTITIONED BY ({})".format(",".join(partition_columns))
if use_cdf:
ddl = schema_to_ddl(spark, dest_df.schema)
sql_text = "CREATE TABLE delta.`{path}` ({ddl}) USING DELTA".format(path=path, ddl=ddl)
if partition_columns:
sql_text += " PARTITIONED BY ({})".format(",".join(partition_columns))
sql_text += " TBLPROPERTIES (delta.enableChangeDataFeed = true)"
spark.sql(sql_text)
writer = writer.mode("append")
Expand Down
16 changes: 16 additions & 0 deletions integration_tests/src/main/python/delta_lake_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def fixup_operation_metrics(opm):
for k in metrics_to_remove:
opm.pop(k, None)

def fixup_deletion_vector(dv):
# we remove offset and pathOrInlineDv because they are non-deterministic, but
# we compare other fields such as sizeInBytes, cardinality, and storageType
dv.pop("offset", None)
dv.pop("pathOrInlineDv", None)

TMP_TABLE_PATTERN=re.compile(r"tmp_table_\w+")
TMP_TABLE_PATH_PATTERN=re.compile(r"delta.`[^`]*`")
REF_ID_PATTERN=re.compile(r"#[0-9]+")
Expand Down Expand Up @@ -96,6 +102,11 @@ def assert_delta_log_json_equivalent(filename, c_json, g_json):
del_keys(("modificationTime", "size"), c_val, g_val)
fixup_path(c_val)
fixup_path(g_val)
c_dv = c_val.get('deletionVector', {})
g_dv = g_val.get('deletionVector', {})
assert c_dv.keys() == g_dv.keys(), "Delta log {} 'add/deletionVector' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_dv, g_dv)
fixup_deletion_vector(c_dv)
fixup_deletion_vector(g_dv)
elif key == "cdc":
assert c_val.keys() == g_val.keys(), "Delta log {} 'cdc' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_val, g_val)
del_keys(("size",), c_val, g_val)
Expand All @@ -112,6 +123,11 @@ def assert_delta_log_json_equivalent(filename, c_json, g_json):
del_keys(("deletionTimestamp", "size"), c_val, g_val)
fixup_path(c_val)
fixup_path(g_val)
c_dv = c_val.get('deletionVector', {})
g_dv = g_val.get('deletionVector', {})
assert c_dv.keys() == g_dv.keys(), "Delta log {} 'remove/deletionVector' keys mismatch:\nCPU: {}\nGPU: {}".format(filename, c_dv, g_dv)
fixup_deletion_vector(c_dv)
fixup_deletion_vector(g_dv)
assert c_val == g_val, "Delta log {} is different at key '{}':\nCPU: {}\nGPU: {}".format(filename, key, c_val, g_val)

def decode_jsons(json_data):
Expand Down