From 6ee8d0f1bb62690d4a9b53628acdc376039f8be9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 8 Jul 2022 15:13:49 -0600 Subject: [PATCH] Fall back to CPU for Delta Lake metadata queries [databricks] (#5912) --- docs/configs.md | 1 + integration_tests/conftest.py | 3 ++ .../src/main/python/delta_lake_test.py | 36 +++++++++++++++++++ integration_tests/src/main/python/marks.py | 1 + jenkins/databricks/test.sh | 8 +++++ .../nvidia/spark/rapids/GpuOverrides.scala | 22 ++++++++++++ .../com/nvidia/spark/rapids/RapidsConf.scala | 9 +++++ 7 files changed, 80 insertions(+) create mode 100644 integration_tests/src/main/python/delta_lake_test.py diff --git a/docs/configs.md b/docs/configs.md index 9fee7fa2770..441cab5b9c8 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -71,6 +71,7 @@ Name | Description | Default Value spark.rapids.sql.csv.read.double.enabled|CSV reading is not 100% compatible when reading doubles.|true spark.rapids.sql.csv.read.float.enabled|CSV reading is not 100% compatible when reading floats.|true spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true +spark.rapids.sql.detectDeltaLogQueries|Queries against Delta Lake _delta_log JSON files are not efficient on the GPU. When this option is enabled, the plugin will attempt to detect these queries and fall back to the CPU.|true spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NOT_ON_GPU spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py index de431183059..cc9805ad0a5 100644 --- a/integration_tests/conftest.py +++ b/integration_tests/conftest.py @@ -45,3 +45,6 @@ def pytest_addoption(parser): parser.addoption( "--iceberg", action="store_true", default=False, help="if true enable Iceberg tests" ) + parser.addoption( + "--delta_lake", action="store_true", default=False, help="if true enable Delta Lake tests" + ) diff --git a/integration_tests/src/main/python/delta_lake_test.py b/integration_tests/src/main/python/delta_lake_test.py new file mode 100644 index 00000000000..b087cd675d4 --- /dev/null +++ b/integration_tests/src/main/python/delta_lake_test.py @@ -0,0 +1,36 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from asserts import assert_gpu_fallback_collect +from marks import allow_non_gpu, delta_lake +from spark_session import with_cpu_session, is_databricks91_or_later + +_conf = {'spark.rapids.sql.explain': 'ALL'} + +@delta_lake +@allow_non_gpu('FileSourceScanExec') +@pytest.mark.skipif(not is_databricks91_or_later(), reason="Delta Lake is already configured on Databricks so we just run these tests there for now") +def test_delta_metadata_query_fallback(spark_tmp_table_factory): + table = spark_tmp_table_factory.get() + def setup_delta_table(spark): + df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ["id", "data"]) + df.write.format("delta").save("/tmp/delta-table/{}".format(table)) + with_cpu_session(setup_delta_table) + # note that this is just testing that any reads against a delta log json file fall back to CPU and does + # not test the actual metadata queries that the delta lake plugin generates so does not fully test the + # plugin code + assert_gpu_fallback_collect( + lambda spark : spark.read.json("/tmp/delta-table/{}/_delta_log/00000000000000000000.json".format(table)), + "FileSourceScanExec", conf = _conf) diff --git a/integration_tests/src/main/python/marks.py b/integration_tests/src/main/python/marks.py index 1bc0a2353d0..518ad5bdebc 100644 --- a/integration_tests/src/main/python/marks.py +++ b/integration_tests/src/main/python/marks.py @@ -28,3 +28,4 @@ nightly_host_mem_consuming_case = pytest.mark.nightly_host_mem_consuming_case fuzz_test = pytest.mark.fuzz_test iceberg = pytest.mark.iceberg +delta_lake = pytest.mark.delta_lake diff --git a/jenkins/databricks/test.sh b/jenkins/databricks/test.sh index 0ff4381005c..35885cc4640 100755 --- a/jenkins/databricks/test.sh +++ b/jenkins/databricks/test.sh @@ -84,6 +84,8 @@ ICEBERG_CONFS="--packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPA --conf spark.sql.catalog.spark_catalog.type=hadoop \ --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/spark-warehouse-$$" +DELTA_LAKE_CONFS="" + # Enable event log for qualification & profiling tools testing export PYSP_TEST_spark_eventLog_enabled=true mkdir -p /tmp/spark-events @@ -138,4 +140,10 @@ else SPARK_SUBMIT_FLAGS="$SPARK_CONF $ICEBERG_CONFS" TEST_PARALLEL=1 \ bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m iceberg --iceberg --test_type=$TEST_TYPE fi + + if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "DELTA_LAKE_ONLY" ]]; then + ## Run Delta Lake tests + SPARK_SUBMIT_FLAGS="$SPARK_CONF $DELTA_LAKE_CONFS" TEST_PARALLEL=1 \ + bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m "delta_lake" --delta_lake --test_type=$TEST_TYPE + fi fi diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 825c481ee37..580fb32b259 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -4275,8 +4275,30 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { } } + /** Determine whether query is running against Delta Lake _delta_log JSON files */ + def isDeltaLakeMetadataQuery(plan: SparkPlan): Boolean = { + val deltaLogScans = PlanUtils.findOperators(plan, { + case f: FileSourceScanExec => + // example filename: "file:/tmp/delta-table/_delta_log/00000000000000000000.json" + f.relation.inputFiles.exists(name => + name.contains("/_delta_log/") && name.endsWith(".json")) + case rdd: RDDScanExec => + // example rdd name: "Delta Table State #1 - file:///tmp/delta-table/_delta_log" + rdd.inputRDD != null && + rdd.inputRDD.name != null && + rdd.inputRDD.name.startsWith("Delta Table State") && + rdd.inputRDD.name.endsWith("/_delta_log") + case _ => + false + }) + deltaLogScans.nonEmpty + } + private def applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = { val wrap = GpuOverrides.wrapAndTagPlan(plan, conf) + if (conf.isDetectDeltaLogQueries && isDeltaLakeMetadataQuery(plan)) { + wrap.entirePlanWillNotWork("Delta Lake metadata queries are not efficient on GPU") + } val reasonsToNotReplaceEntirePlan = wrap.getReasonsNotToReplaceEntirePlan if (conf.allowDisableEntirePlan && reasonsToNotReplaceEntirePlan.nonEmpty) { if (conf.shouldExplain) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 714eda201e5..d5e5517e7fc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1440,6 +1440,13 @@ object RapidsConf { .booleanConf .createWithDefault(value = false) + val DETECT_DELTA_LOG_QUERIES = conf("spark.rapids.sql.detectDeltaLogQueries") + .doc("Queries against Delta Lake _delta_log JSON files are not efficient on the GPU. When " + + "this option is enabled, the plugin will attempt to detect these queries and fall back " + + "to the CPU.") + .booleanConf + .createWithDefault(value = true) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -1927,6 +1934,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE) + lazy val isDetectDeltaLogQueries: Boolean = get(DETECT_DELTA_LOG_QUERIES) + private val optimizerDefaults = Map( // this is not accurate because CPU projections do have a cost due to appending values // to each row that is produced, but this needs to be a really small number because