diff --git a/docs/configs.md b/docs/configs.md
index 9fee7fa2770..441cab5b9c8 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -71,6 +71,7 @@ Name | Description | Default Value
spark.rapids.sql.csv.read.double.enabled|CSV reading is not 100% compatible when reading doubles.|true
spark.rapids.sql.csv.read.float.enabled|CSV reading is not 100% compatible when reading floats.|true
spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
+spark.rapids.sql.detectDeltaLogQueries|Queries against Delta Lake _delta_log JSON files are not efficient on the GPU. When this option is enabled, the plugin will attempt to detect these queries and fall back to the CPU.|true
spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NOT_ON_GPU
spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py
index de431183059..cc9805ad0a5 100644
--- a/integration_tests/conftest.py
+++ b/integration_tests/conftest.py
@@ -45,3 +45,6 @@ def pytest_addoption(parser):
parser.addoption(
"--iceberg", action="store_true", default=False, help="if true enable Iceberg tests"
)
+ parser.addoption(
+ "--delta_lake", action="store_true", default=False, help="if true enable Delta Lake tests"
+ )
diff --git a/integration_tests/src/main/python/delta_lake_test.py b/integration_tests/src/main/python/delta_lake_test.py
new file mode 100644
index 00000000000..b087cd675d4
--- /dev/null
+++ b/integration_tests/src/main/python/delta_lake_test.py
@@ -0,0 +1,36 @@
+# Copyright (c) 2022, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+from asserts import assert_gpu_fallback_collect
+from marks import allow_non_gpu, delta_lake
+from spark_session import with_cpu_session, is_databricks91_or_later
+
+_conf = {'spark.rapids.sql.explain': 'ALL'}
+
+@delta_lake
+@allow_non_gpu('FileSourceScanExec')
+@pytest.mark.skipif(not is_databricks91_or_later(), reason="Delta Lake is already configured on Databricks so we just run these tests there for now")
+def test_delta_metadata_query_fallback(spark_tmp_table_factory):
+ table = spark_tmp_table_factory.get()
+ def setup_delta_table(spark):
+ df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ["id", "data"])
+ df.write.format("delta").save("/tmp/delta-table/{}".format(table))
+ with_cpu_session(setup_delta_table)
+ # note that this is just testing that any reads against a delta log json file fall back to CPU and does
+ # not test the actual metadata queries that the delta lake plugin generates so does not fully test the
+ # plugin code
+ assert_gpu_fallback_collect(
+ lambda spark : spark.read.json("/tmp/delta-table/{}/_delta_log/00000000000000000000.json".format(table)),
+ "FileSourceScanExec", conf = _conf)
diff --git a/integration_tests/src/main/python/marks.py b/integration_tests/src/main/python/marks.py
index 1bc0a2353d0..518ad5bdebc 100644
--- a/integration_tests/src/main/python/marks.py
+++ b/integration_tests/src/main/python/marks.py
@@ -28,3 +28,4 @@
nightly_host_mem_consuming_case = pytest.mark.nightly_host_mem_consuming_case
fuzz_test = pytest.mark.fuzz_test
iceberg = pytest.mark.iceberg
+delta_lake = pytest.mark.delta_lake
diff --git a/jenkins/databricks/test.sh b/jenkins/databricks/test.sh
index 0ff4381005c..35885cc4640 100755
--- a/jenkins/databricks/test.sh
+++ b/jenkins/databricks/test.sh
@@ -84,6 +84,8 @@ ICEBERG_CONFS="--packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPA
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=/tmp/spark-warehouse-$$"
+DELTA_LAKE_CONFS=""
+
# Enable event log for qualification & profiling tools testing
export PYSP_TEST_spark_eventLog_enabled=true
mkdir -p /tmp/spark-events
@@ -138,4 +140,10 @@ else
SPARK_SUBMIT_FLAGS="$SPARK_CONF $ICEBERG_CONFS" TEST_PARALLEL=1 \
bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m iceberg --iceberg --test_type=$TEST_TYPE
fi
+
+ if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "DELTA_LAKE_ONLY" ]]; then
+ ## Run Delta Lake tests
+ SPARK_SUBMIT_FLAGS="$SPARK_CONF $DELTA_LAKE_CONFS" TEST_PARALLEL=1 \
+ bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m "delta_lake" --delta_lake --test_type=$TEST_TYPE
+ fi
fi
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index 825c481ee37..580fb32b259 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -4275,8 +4275,30 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
}
}
+ /** Determine whether query is running against Delta Lake _delta_log JSON files */
+ def isDeltaLakeMetadataQuery(plan: SparkPlan): Boolean = {
+ val deltaLogScans = PlanUtils.findOperators(plan, {
+ case f: FileSourceScanExec =>
+ // example filename: "file:/tmp/delta-table/_delta_log/00000000000000000000.json"
+ f.relation.inputFiles.exists(name =>
+ name.contains("/_delta_log/") && name.endsWith(".json"))
+ case rdd: RDDScanExec =>
+ // example rdd name: "Delta Table State #1 - file:///tmp/delta-table/_delta_log"
+ rdd.inputRDD != null &&
+ rdd.inputRDD.name != null &&
+ rdd.inputRDD.name.startsWith("Delta Table State") &&
+ rdd.inputRDD.name.endsWith("/_delta_log")
+ case _ =>
+ false
+ })
+ deltaLogScans.nonEmpty
+ }
+
private def applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
val wrap = GpuOverrides.wrapAndTagPlan(plan, conf)
+ if (conf.isDetectDeltaLogQueries && isDeltaLakeMetadataQuery(plan)) {
+ wrap.entirePlanWillNotWork("Delta Lake metadata queries are not efficient on GPU")
+ }
val reasonsToNotReplaceEntirePlan = wrap.getReasonsNotToReplaceEntirePlan
if (conf.allowDisableEntirePlan && reasonsToNotReplaceEntirePlan.nonEmpty) {
if (conf.shouldExplain) {
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
index 714eda201e5..d5e5517e7fc 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
@@ -1440,6 +1440,13 @@ object RapidsConf {
.booleanConf
.createWithDefault(value = false)
+ val DETECT_DELTA_LOG_QUERIES = conf("spark.rapids.sql.detectDeltaLogQueries")
+ .doc("Queries against Delta Lake _delta_log JSON files are not efficient on the GPU. When " +
+ "this option is enabled, the plugin will attempt to detect these queries and fall back " +
+ "to the CPU.")
+ .booleanConf
+ .createWithDefault(value = true)
+
private def printSectionHeader(category: String): Unit =
println(s"\n### $category")
@@ -1927,6 +1934,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE)
+ lazy val isDetectDeltaLogQueries: Boolean = get(DETECT_DELTA_LOG_QUERIES)
+
private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because