Skip to content

Commit

Permalink
Fall back to CPU for Delta Lake metadata queries [databricks] (#5912)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jul 8, 2022
1 parent ba2682c commit 6ee8d0f
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Name | Description | Default Value
<a name="sql.csv.read.double.enabled"></a>spark.rapids.sql.csv.read.double.enabled|CSV reading is not 100% compatible when reading doubles.|true
<a name="sql.csv.read.float.enabled"></a>spark.rapids.sql.csv.read.float.enabled|CSV reading is not 100% compatible when reading floats.|true
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.detectDeltaLogQueries"></a>spark.rapids.sql.detectDeltaLogQueries|Queries against Delta Lake _delta_log JSON files are not efficient on the GPU. When this option is enabled, the plugin will attempt to detect these queries and fall back to the CPU.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NOT_ON_GPU
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ def pytest_addoption(parser):
parser.addoption(
"--iceberg", action="store_true", default=False, help="if true enable Iceberg tests"
)
parser.addoption(
"--delta_lake", action="store_true", default=False, help="if true enable Delta Lake tests"
)
36 changes: 36 additions & 0 deletions integration_tests/src/main/python/delta_lake_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from asserts import assert_gpu_fallback_collect
from marks import allow_non_gpu, delta_lake
from spark_session import with_cpu_session, is_databricks91_or_later

_conf = {'spark.rapids.sql.explain': 'ALL'}

@delta_lake
@allow_non_gpu('FileSourceScanExec')
@pytest.mark.skipif(not is_databricks91_or_later(), reason="Delta Lake is already configured on Databricks so we just run these tests there for now")
def test_delta_metadata_query_fallback(spark_tmp_table_factory):
table = spark_tmp_table_factory.get()
def setup_delta_table(spark):
df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ["id", "data"])
df.write.format("delta").save("/tmp/delta-table/{}".format(table))
with_cpu_session(setup_delta_table)
# note that this is just testing that any reads against a delta log json file fall back to CPU and does
# not test the actual metadata queries that the delta lake plugin generates so does not fully test the
# plugin code
assert_gpu_fallback_collect(
lambda spark : spark.read.json("/tmp/delta-table/{}/_delta_log/00000000000000000000.json".format(table)),
"FileSourceScanExec", conf = _conf)
1 change: 1 addition & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@
nightly_host_mem_consuming_case = pytest.mark.nightly_host_mem_consuming_case
fuzz_test = pytest.mark.fuzz_test
iceberg = pytest.mark.iceberg
delta_lake = pytest.mark.delta_lake
8 changes: 8 additions & 0 deletions jenkins/databricks/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ ICEBERG_CONFS="--packages org.apache.iceberg:iceberg-spark-runtime-${ICEBERG_SPA
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=/tmp/spark-warehouse-$$"

DELTA_LAKE_CONFS=""

# Enable event log for qualification & profiling tools testing
export PYSP_TEST_spark_eventLog_enabled=true
mkdir -p /tmp/spark-events
Expand Down Expand Up @@ -138,4 +140,10 @@ else
SPARK_SUBMIT_FLAGS="$SPARK_CONF $ICEBERG_CONFS" TEST_PARALLEL=1 \
bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m iceberg --iceberg --test_type=$TEST_TYPE
fi

if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "DELTA_LAKE_ONLY" ]]; then
## Run Delta Lake tests
SPARK_SUBMIT_FLAGS="$SPARK_CONF $DELTA_LAKE_CONFS" TEST_PARALLEL=1 \
bash /home/ubuntu/spark-rapids/integration_tests/run_pyspark_from_build.sh --runtime_env="databricks" -m "delta_lake" --delta_lake --test_type=$TEST_TYPE
fi
fi
Original file line number Diff line number Diff line change
Expand Up @@ -4275,8 +4275,30 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
}
}

/** Determine whether query is running against Delta Lake _delta_log JSON files */
def isDeltaLakeMetadataQuery(plan: SparkPlan): Boolean = {
val deltaLogScans = PlanUtils.findOperators(plan, {
case f: FileSourceScanExec =>
// example filename: "file:/tmp/delta-table/_delta_log/00000000000000000000.json"
f.relation.inputFiles.exists(name =>
name.contains("/_delta_log/") && name.endsWith(".json"))
case rdd: RDDScanExec =>
// example rdd name: "Delta Table State #1 - file:///tmp/delta-table/_delta_log"
rdd.inputRDD != null &&
rdd.inputRDD.name != null &&
rdd.inputRDD.name.startsWith("Delta Table State") &&
rdd.inputRDD.name.endsWith("/_delta_log")
case _ =>
false
})
deltaLogScans.nonEmpty
}

private def applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
val wrap = GpuOverrides.wrapAndTagPlan(plan, conf)
if (conf.isDetectDeltaLogQueries && isDeltaLakeMetadataQuery(plan)) {
wrap.entirePlanWillNotWork("Delta Lake metadata queries are not efficient on GPU")
}
val reasonsToNotReplaceEntirePlan = wrap.getReasonsNotToReplaceEntirePlan
if (conf.allowDisableEntirePlan && reasonsToNotReplaceEntirePlan.nonEmpty) {
if (conf.shouldExplain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,13 @@ object RapidsConf {
.booleanConf
.createWithDefault(value = false)

val DETECT_DELTA_LOG_QUERIES = conf("spark.rapids.sql.detectDeltaLogQueries")
.doc("Queries against Delta Lake _delta_log JSON files are not efficient on the GPU. When " +
"this option is enabled, the plugin will attempt to detect these queries and fall back " +
"to the CPU.")
.booleanConf
.createWithDefault(value = true)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -1927,6 +1934,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE)

lazy val isDetectDeltaLogQueries: Boolean = get(DETECT_DELTA_LOG_QUERIES)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down

0 comments on commit 6ee8d0f

Please sign in to comment.