From ec965a1ae219f31fb8523adcb70856ee6c1b28bb Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 6 Nov 2020 19:04:23 -0600 Subject: [PATCH] [FEA] Support Databricks 7.3 LTS Runtime (#1076) * Add databricks 301 shim layer * copy rest of 300db files * update to build databricks 300 and 301 separately * update profile name * Fix missing include * fixes * changes * Fixes * databricks changes 3.0.1 * Fix broadcast like * Fix broadcast hash join to be a "like" one * remove functions in the 301 base class * Update build scripts to use more from mvn properties * Fix order of variables * comment out slck for now * Fix missing variables * update docs and build args * use quotes for profeiles * python escape Signed-off-by: Thomas Graves * remove %q * debug * Fix maven location install * rearrange * Fix version parameter * add bck shutdown * increase timeout * update deploy script * fix comment * fix deploy dir * update * add back in tests * minor updates * put slack back * Upmerge, fix copyright, and remove extra import * cleanup imports Signed-off-by: Thomas Graves * remove a couple more imports * remove ZoneId import Signed-off-by: Thomas Graves * upmerge to the latest changes to GpuHashJoin * Add fail on error for version file generation * revert fail on error in pom file --- docs/FAQ.md | 1 + .../get-started/getting-started-databricks.md | 8 +- .../src/main/python/spark_init_internal.py | 3 + jenkins/Jenkinsfile.databricks301nightly | 116 +++++++ jenkins/Jenkinsfile.databricksnightly | 19 +- jenkins/databricks/build.sh | 42 ++- jenkins/databricks/deploy.sh | 11 +- jenkins/databricks/run-tests.py | 43 +-- shims/aggregator/pom.xml | 17 + shims/pom.xml | 6 + shims/spark301db/pom.xml | 117 +++++++ ...idia.spark.rapids.SparkShimServiceProvider | 1 + .../spark301db/GpuBroadcastExchangeExec.scala | 40 +++ .../spark301db/GpuBroadcastHashJoinExec.scala | 158 +++++++++ .../GpuBroadcastNestedLoopJoinExec.scala | 43 +++ .../rapids/shims/spark301db/GpuHashJoin.scala | 328 ++++++++++++++++++ .../spark301db/GpuShuffleExchangeExec.scala | 45 +++ .../spark301db/GpuShuffledHashJoinExec.scala | 151 ++++++++ .../spark301db/GpuSortMergeJoinExec.scala | 110 ++++++ .../shims/spark301db/Spark301dbShims.scala | 210 +++++++++++ .../spark301db/SparkShimServiceProvider.scala | 35 ++ .../shims/spark301db/GpuFileScanRDD.scala | 196 +++++++++++ 22 files changed, 1628 insertions(+), 72 deletions(-) create mode 100644 jenkins/Jenkinsfile.databricks301nightly create mode 100644 shims/spark301db/pom.xml create mode 100644 shims/spark301db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastExchangeExec.scala create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastNestedLoopJoinExec.scala create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuHashJoin.scala create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffleExchangeExec.scala create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala create mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/SparkShimServiceProvider.scala create mode 100644 shims/spark301db/src/main/scala/org/apache/spark/sql/rapids/shims/spark301db/GpuFileScanRDD.scala diff --git a/docs/FAQ.md b/docs/FAQ.md index c14beaaeecf..a25c44b6546 100644 --- a/docs/FAQ.md +++ b/docs/FAQ.md @@ -29,6 +29,7 @@ top of these changes and release updates as quickly as possible. The RAPIDS Accelerator for Apache Spark officially supports [Apache Spark](get-started/getting-started-on-prem.md), [Databricks Runtime 7.0](get-started/getting-started-databricks.md) +[Databricks Runtime 7.3](get-started/getting-started-databricks.md) and [Google Cloud Dataproc](get-started/getting-started-gcp.md). Most distributions based off of Apache Spark 3.0.0 should work, but because the plugin replaces parts of the physical plan that Apache Spark considers to be internal the code for those plans diff --git a/docs/get-started/getting-started-databricks.md b/docs/get-started/getting-started-databricks.md index 201413cbb84..b7a2b8b6a69 100644 --- a/docs/get-started/getting-started-databricks.md +++ b/docs/get-started/getting-started-databricks.md @@ -9,15 +9,15 @@ parent: Getting-Started This guide will run through how to set up the RAPIDS Accelerator for Apache Spark 3.0 on Databricks. At the end of this guide, the reader will be able to run a sample Apache Spark application that runs on NVIDIA GPUs on Databricks. ## Prerequisites - * Apache Spark 3.0 running in DataBricks Runtime 7.0 ML with GPU - * AWS: 7.0 ML (includes Apache Spark 3.0.0, GPU, Scala 2.12) - * Azure: 7.0 ML (GPU, Scala 2.12, Spark 3.0.0) + * Apache Spark 3.0 running in DataBricks Runtime 7.0 ML with GPU or Runtime 7.3 ML with GPU + * AWS: 7.0 ML (includes Apache Spark 3.0.0, GPU, Scala 2.12) or 7.3 LTS ML (includes Apache Spark 3.0.1, GPU, Scala 2.12) + * Azure: 7.0 ML (GPU, Scala 2.12, Spark 3.0.0) or 7.3 LTS ML (GPU, Scala 2.12, Spark 3.0.1) The number of GPUs per node dictates the number of Spark executors that can run in that node. ## Start a Databricks Cluster Create a Databricks cluster by going to Clusters, then clicking “+ Create Cluster”. Ensure the cluster meets the prerequisites above by configuring it as follows: -1. On AWS, make sure to use 7.0 ML (GPU, Scala 2.12, Spark 3.0.0), or for Azure, choose 7.0 ML (GPU, Scala 2.12, Spark 3.0.0). +1. Select the DataBricks Runtime Version from one of the supported runtimes specified in the Prerequisites section. 2. Under Autopilot Options, disable auto scaling. 3. Choose the number of workers that matches the number of GPUs you want to use. 4. Select a worker type. On AWS, use nodes with 1 GPU each such as `p3.xlarge` or `g4dn.xlarge`. p2 nodes do not meet the architecture requirements for the Spark worker (although they can be used for the driver node). For Azure, choose GPU nodes such as Standard_NC6s_v3. diff --git a/integration_tests/src/main/python/spark_init_internal.py b/integration_tests/src/main/python/spark_init_internal.py index 65e1e0335eb..95c8b66c330 100644 --- a/integration_tests/src/main/python/spark_init_internal.py +++ b/integration_tests/src/main/python/spark_init_internal.py @@ -20,9 +20,12 @@ def _spark__init(): # due to bugs in pyspark/pytest it looks like any configs set here # can be reset in the middle of a test if specific operations are done (some types of cast etc) # enableHiveSupport() is needed for parquet bucket tests + # disable adaptive query execution by default because some CSPs have it on by default and we don't + # support everywhere _s = SparkSession.builder \ .config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \ .config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback')\ + .config("spark.sql.adaptive.enabled", "false") \ .enableHiveSupport() \ .appName('rapids spark plugin integration tests (python)').getOrCreate() #TODO catch the ClassNotFound error that happens if the classpath is not set up properly and diff --git a/jenkins/Jenkinsfile.databricks301nightly b/jenkins/Jenkinsfile.databricks301nightly new file mode 100644 index 00000000000..5c73d225654 --- /dev/null +++ b/jenkins/Jenkinsfile.databricks301nightly @@ -0,0 +1,116 @@ +#!/usr/local/env groovy +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** +* +* Jenkinsfile for building rapids-plugin on Databricks based on Spark 3.0.1 +* +*/ +@Library(['shared-libs', 'spark-jenkins-shared-lib']) _ + +def urmUrl="https://${ArtifactoryConstants.ARTIFACTORY_NAME}/artifactory/sw-spark-maven" + +pipeline { + agent { + docker { + label 'docker-gpu' + image "${ArtifactoryConstants.ARTIFACTORY_NAME}/sw-spark-docker/plugin:dev-ubuntu16-cuda10.1" + args '--runtime=nvidia -v ${HOME}/.m2:${HOME}/.m2:rw \ + -v ${HOME}/.zinc:${HOME}/.zinc:rw' + } + } + + options { + ansiColor('xterm') + // timeout doesn't seem to work with environment variables so make sure to update below + // IDLE_TIMEOUT config as well + timeout(time: 240, unit: 'MINUTES') + buildDiscarder(logRotator(numToKeepStr: '10')) + } + + parameters { + choice(name: 'DEPLOY_TO', choices: ['Urm', 'Local'], + description: 'Where to deploy artifacts to') + string(name: 'RUNTIME', + defaultValue: '7.3.x-gpu-ml-scala2.12', description: 'databricks runtime') + string(name: 'PUBLIC_KEY', + defaultValue: '\"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDB+ValakyoKn7w+iBRoAi1KlLVH4yVmRXhLCZs1qUECBAhbck2o8Lgjp5wJ+epdT3+EAP2+t/zlK1mU9tTylidapR4fZuIk9ApoQSLOUEXcUtHkPpZulIoGAq78HoyiEs1sKovc6ULvpymjdnQ3ogCZlTlP9uqmL2E4kbtrNCNL0SVj/w10AqzrJ5lqQgO5dIdDRMHW2sv88JI1VLlfiSsofa9RdI7hDRuCnfZ0+dv2URJGGzGt2BkdEmk9t5F1BMpuXvZ8HzOYdACzw0U+THBOk9d4CACUYMyO1XqlXwoYweNKRnigXDCRaTWGFBzTkugRuW/BZBccTR1ON430uRB svcngcc@nvidia.com\"', description: 'public key') + string(name: 'REF', defaultValue: 'branch-0.3', description: 'Commit to build') + string(name: 'BASE_SPARK_VERSION', + defaultValue: '3.0.1', description: 'Databricks base Spark version') + string(name: 'BUILD_PROFILES', + defaultValue: 'databricks301,!snapshot-shims', description: 'the mvn build profiles to use when building Databricks') + } + + environment { + IDLE_TIMEOUT = 240 + JENKINS_ROOT = 'jenkins' + MVN_URM_MIRROR='-s jenkins/settings.xml -P mirror-apache-to-urm' + LIBCUDF_KERNEL_CACHE_PATH='/tmp' + URM_CREDS = credentials("svcngcc_artifactory") + DATABRICKS_TOKEN = credentials("SPARK_DATABRICKS_TOKEN") + URM_URL = "${urmUrl}" + } + + triggers { + cron('H 5 * * *') + } + + stages { + stage('Ubuntu16 CUDA10.1') { + steps { + script { + sshagent(credentials : ['svcngcc_pubpriv']) { + sh "rm -rf spark-rapids-ci.tgz" + sh "tar -zcvf spark-rapids-ci.tgz *" + env.CLUSTERID = sh ( + script: "python3.6 ./jenkins/databricks/create.py -t $DATABRICKS_TOKEN -k $PUBLIC_KEY -r $RUNTIME -i $IDLE_TIMEOUT -n CI-GPU-databricks-${BASE_SPARK_VERSION}", + returnStdout: true + ).trim() + sh "python3.6 ./jenkins/databricks/run-tests.py -c ${env.CLUSTERID} -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p /home/svcngcc/.ssh/id_rsa -l ./jenkins/databricks/build.sh -v $BASE_SPARK_VERSION -b $BUILD_PROFILES" + sh "./jenkins/databricks/deploy.sh" + } + } + } + } + } // end of stages + post { + always { + script { + sh "python3.6 ./jenkins/databricks/shutdown.py -c ${env.CLUSTERID} -t $DATABRICKS_TOKEN -d || true" + if (currentBuild.currentResult == "SUCCESS") { + slack("#swrapids-spark-cicd", "Success", color: "#33CC33") + } else { + slack("#swrapids-spark-cicd", "Failed", color: "#FF0000") + } + } + } + } +} // end of pipeline + +void slack(Map params = [:], String channel, String message) { + Map defaultParams = [ + color: "#000000", + baseUrl: "${SparkConstants.SLACK_API_ENDPOINT}", + tokenCredentialId: "slack_token" + ] + + params["channel"] = channel + params["message"] = "${BUILD_URL}\n" + message + + slackSend(defaultParams << params) +} diff --git a/jenkins/Jenkinsfile.databricksnightly b/jenkins/Jenkinsfile.databricksnightly index 5c52e549ef3..e19340c5f8b 100644 --- a/jenkins/Jenkinsfile.databricksnightly +++ b/jenkins/Jenkinsfile.databricksnightly @@ -45,17 +45,15 @@ pipeline { parameters { choice(name: 'DEPLOY_TO', choices: ['Urm', 'Local'], description: 'Where to deploy artifacts to') - string(name: 'DATABRICKS_VERSION', - defaultValue: '0.3.0-SNAPSHOT', description: 'Version to use for databricks jar produced') - string(name: 'CUDF_VERSION', - defaultValue: '0.17-SNAPSHOT', description: 'Cudf version to use') - string(name: 'CUDA_VERSION', - defaultValue: 'cuda10-1', description: 'cuda version to use') string(name: 'RUNTIME', defaultValue: '7.0.x-gpu-ml-scala2.12', description: 'databricks runtime') string(name: 'PUBLIC_KEY', defaultValue: '\"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDB+ValakyoKn7w+iBRoAi1KlLVH4yVmRXhLCZs1qUECBAhbck2o8Lgjp5wJ+epdT3+EAP2+t/zlK1mU9tTylidapR4fZuIk9ApoQSLOUEXcUtHkPpZulIoGAq78HoyiEs1sKovc6ULvpymjdnQ3ogCZlTlP9uqmL2E4kbtrNCNL0SVj/w10AqzrJ5lqQgO5dIdDRMHW2sv88JI1VLlfiSsofa9RdI7hDRuCnfZ0+dv2URJGGzGt2BkdEmk9t5F1BMpuXvZ8HzOYdACzw0U+THBOk9d4CACUYMyO1XqlXwoYweNKRnigXDCRaTWGFBzTkugRuW/BZBccTR1ON430uRB svcngcc@nvidia.com\"', description: 'public key') string(name: 'REF', defaultValue: 'branch-0.3', description: 'Commit to build') + string(name: 'BASE_SPARK_VERSION', + defaultValue: '3.0.0', description: 'Databricks base Spark version') + string(name: 'BUILD_PROFILES', + defaultValue: 'databricks,!snapshot-shims', description: 'the mvn build profiles to use when building Databricks') } environment { @@ -65,11 +63,6 @@ pipeline { LIBCUDF_KERNEL_CACHE_PATH='/tmp' URM_CREDS = credentials("svcngcc_artifactory") DATABRICKS_TOKEN = credentials("SPARK_DATABRICKS_TOKEN") - SCALA_VERSION = '2.12' - // the spark version used when we install databricks jars into .m2 directory - SPARK_VERSION_TO_INSTALL_JARS = '3.0.0-databricks' - CI_RAPIDS_JAR = 'rapids-4-spark_2.12-0.1-SNAPSHOT-ci.jar' - CI_CUDF_JAR = 'cudf-0.14-cuda10-1.jar' URM_URL = "${urmUrl}" } @@ -85,10 +78,10 @@ pipeline { sh "rm -rf spark-rapids-ci.tgz" sh "tar -zcvf spark-rapids-ci.tgz *" env.CLUSTERID = sh ( - script: "python3.6 ./jenkins/databricks/create.py -t $DATABRICKS_TOKEN -k $PUBLIC_KEY -r $RUNTIME -i $IDLE_TIMEOUT -n CI-GPU-databricks-${DATABRICKS_VERSION}", + script: "python3.6 ./jenkins/databricks/create.py -t $DATABRICKS_TOKEN -k $PUBLIC_KEY -r $RUNTIME -i $IDLE_TIMEOUT -n CI-GPU-databricks-${BASE_SPARK_VERSION}", returnStdout: true ).trim() - sh "python3.6 ./jenkins/databricks/run-tests.py -c ${env.CLUSTERID} -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p /home/svcngcc/.ssh/id_rsa -l ./jenkins/databricks/build.sh -j $CI_RAPIDS_JAR -b $DATABRICKS_VERSION -k $SPARK_VERSION_TO_INSTALL_JARS -a $SCALA_VERSION -f $CUDF_VERSION -u $CUDA_VERSION -m $CI_CUDF_JAR" + sh "python3.6 ./jenkins/databricks/run-tests.py -c ${env.CLUSTERID} -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p /home/svcngcc/.ssh/id_rsa -l ./jenkins/databricks/build.sh -v $BASE_SPARK_VERSION -b $BUILD_PROFILES" sh "./jenkins/databricks/deploy.sh" } } diff --git a/jenkins/databricks/build.sh b/jenkins/databricks/build.sh index 074976f402d..081461facb1 100755 --- a/jenkins/databricks/build.sh +++ b/jenkins/databricks/build.sh @@ -18,32 +18,37 @@ set -e SPARKSRCTGZ=$1 -# this should match whatever is in the pom files for the version -SPARK_PLUGIN_JAR_VERSION=$2 -SCALA_VERSION=$3 -CI_RAPIDS_JAR=$4 -# the version of spark used when we install the databricks jars in .m2 -SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS=$5 -CUDF_VERSION=$6 -CUDA_VERSION=$7 -CI_CUDF_JAR=$8 # version of Apache Spark we are building against -BASE_SPARK_POM_VERSION=$9 +BASE_SPARK_VERSION=$2 +BUILD_PROFILES=$3 + +echo "tgz is $SPARKSRCTGZ" +echo "Base Spark version is $BASE_SPARK_VERSION" +echo "build profiles $BUILD_PROFILES" -echo "Spark version is $SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS" -echo "scala version is: $SCALA_VERSION" +sudo apt install -y maven # this has to match the Databricks init script DB_JAR_LOC=/databricks/jars/ -RAPIDS_BUILT_JAR=rapids-4-spark_$SCALA_VERSION-$SPARK_PLUGIN_JAR_VERSION.jar -sudo apt install -y maven rm -rf spark-rapids mkdir spark-rapids +echo "tar -zxvf $SPARKSRCTGZ -C spark-rapids" tar -zxvf $SPARKSRCTGZ -C spark-rapids cd spark-rapids export WORKSPACE=`pwd` -mvn -B '-Pdatabricks,!snapshot-shims' clean package -DskipTests || true + +SPARK_PLUGIN_JAR_VERSION=`mvn help:evaluate -q -pl dist -Dexpression=project.version -DforceStdout` +CUDF_VERSION=`mvn help:evaluate -q -pl dist -Dexpression=cudf.version -DforceStdout` +SCALA_VERSION=`mvn help:evaluate -q -pl dist -Dexpression=scala.binary.version -DforceStdout` +CUDA_VERSION=`mvn help:evaluate -q -pl dist -Dexpression=cuda.version -DforceStdout` + +# the version of spark used when we install the databricks jars in .m2 +SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS=$BASE_SPARK_VERSION-databricks +RAPIDS_BUILT_JAR=rapids-4-spark_$SCALA_VERSION-$SPARK_PLUGIN_JAR_VERSION.jar + +echo "Scala version is: $SCALA_VERSION" +mvn -B -P${BUILD_PROFILES} clean package -DskipTests || true M2DIR=/home/ubuntu/.m2/repository CUDF_JAR=${M2DIR}/ai/rapids/cudf/${CUDF_VERSION}/cudf-${CUDF_VERSION}-${CUDA_VERSION}.jar @@ -54,8 +59,8 @@ CATALYSTJAR=----workspace_spark_3_0--sql--catalyst--catalyst-hive-2.3__hadoop-2. ANNOTJAR=----workspace_spark_3_0--common--tags--tags-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar COREJAR=----workspace_spark_3_0--core--core-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar # install the 3.0.0 pom file so we get dependencies -COREPOM=spark-core_${SCALA_VERSION}-${BASE_SPARK_POM_VERSION}.pom -COREPOMPATH=$M2DIR/org/apache/spark/spark-core_${SCALA_VERSION}/${BASE_SPARK_POM_VERSION} +COREPOM=spark-core_${SCALA_VERSION}-${BASE_SPARK_VERSION}.pom +COREPOMPATH=$M2DIR/org/apache/spark/spark-core_${SCALA_VERSION}/${BASE_SPARK_VERSION} mvn -B install:install-file \ -Dmaven.repo.local=$M2DIR \ -Dfile=$JARDIR/$COREJAR \ @@ -89,8 +94,7 @@ mvn -B install:install-file \ -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ -Dpackaging=jar -mvn -B '-Pdatabricks,!snapshot-shims' clean package -DskipTests - +mvn -B -P${BUILD_PROFILES} clean package -DskipTests # Copy so we pick up new built jar and latesty CuDF jar. Note that the jar names has to be # exactly what is in the staticly setup Databricks cluster we use. diff --git a/jenkins/databricks/deploy.sh b/jenkins/databricks/deploy.sh index f64757a91d6..6c8a755ab76 100755 --- a/jenkins/databricks/deploy.sh +++ b/jenkins/databricks/deploy.sh @@ -24,6 +24,13 @@ cd spark-rapids echo "Maven mirror is $MVN_URM_MIRROR" SERVER_ID='snapshots' SERVER_URL="$URM_URL-local" -DBJARFPATH=./shims/spark300db/target/rapids-4-spark-shims-spark300-databricks_$SCALA_VERSION-$DATABRICKS_VERSION.jar +SCALA_VERSION=`mvn help:evaluate -q -pl dist -Dexpression=scala.binary.version -DforceStdout` +# remove the periods so change something like 3.0.0 to 300 +VERSION_NUM=${BASE_SPARK_VERSION//.} +SPARK_VERSION_STR=spark$VERSION_NUM +SPARK_PLUGIN_JAR_VERSION=`mvn help:evaluate -q -pl dist -Dexpression=project.version -DforceStdout` +DB_SHIM_DIRECTORY=${SPARK_VERSION_STR}db +DBJARFPATH=./shims/${DB_SHIM_DIRECTORY}/target/rapids-4-spark-shims-$SPARK_VERSION_STR-databricks_$SCALA_VERSION-$SPARK_PLUGIN_JAR_VERSION.jar +echo "Databricks jar is: $DBJARFPATH" mvn -B deploy:deploy-file $MVN_URM_MIRROR '-P!snapshot-shims' -Durl=$SERVER_URL -DrepositoryId=$SERVER_ID \ - -Dfile=$DBJARFPATH -DpomFile=shims/spark300db/pom.xml + -Dfile=$DBJARFPATH -DpomFile=shims/${DB_SHIM_DIRECTORY}/pom.xml diff --git a/jenkins/databricks/run-tests.py b/jenkins/databricks/run-tests.py index 1bb133f9c0a..0337a2ab311 100644 --- a/jenkins/databricks/run-tests.py +++ b/jenkins/databricks/run-tests.py @@ -29,29 +29,22 @@ def main(): script_dest = '/home/ubuntu/build.sh' source_tgz = 'spark-rapids-ci.tgz' tgz_dest = '/home/ubuntu/spark-rapids-ci.tgz' - ci_rapids_jar = 'rapids-4-spark_2.12-0.1-SNAPSHOT-ci.jar' - # the plugin version to use for the jar we build against databricks - db_version = '0.3.0-SNAPSHOT' - scala_version = '2.12' - spark_version = '3.0.0' - cudf_version = '0.17-SNAPSHOT' - cuda_version = 'cuda10-1' - ci_cudf_jar = 'cudf-0.14-cuda10-1.jar' base_spark_pom_version = '3.0.0' clusterid = '' + build_profiles = 'databricks,!snapshot-shims' try: - opts, args = getopt.getopt(sys.argv[1:], 'hw:t:c:p:l:d:z:j:b:k:a:f:u:m:v:', - ['workspace=', 'token=', 'clusterid=', 'private=', 'localscript=', 'dest=', 'sparktgz=', 'cirapidsjar=', 'databricksversion=', 'sparkversion=', 'scalaversion=', 'cudfversion=', 'cudaversion=', 'cicudfjar=', 'basesparkpomversion=']) + opts, args = getopt.getopt(sys.argv[1:], 'hw:t:c:p:l:d:z:m:v:b:', + ['workspace=', 'token=', 'clusterid=', 'private=', 'localscript=', 'dest=', 'sparktgz=', 'basesparkpomversion=', 'buildprofiles=']) except getopt.GetoptError: print( - 'run-tests.py -s -t -c -p -l -d -z -j -b -k -a -f -u -m -v ') + 'run-tests.py -s -t -c -p -l -d -z -v -b ') sys.exit(2) for opt, arg in opts: if opt == '-h': print( - 'run-tests.py -s -t -c -p -n -l -d , -z -j -b -k -a -f -u -m -v ') + 'run-tests.py -s -t -c -p -n -l -d , -z -v -b ') sys.exit() elif opt in ('-w', '--workspace'): workspace = arg @@ -67,22 +60,10 @@ def main(): script_dest = arg elif opt in ('-z', '--sparktgz'): source_tgz = arg - elif opt in ('-j', '--cirapidsjar'): - ci_rapids_jar = arg - elif opt in ('-b', '--databricksversion'): - db_version = arg - elif opt in ('-k', '--sparkversion'): - spark_version = arg - elif opt in ('-a', '--scalaversion'): - scala_version = arg - elif opt in ('-f', '--cudfversion'): - cudf_version = arg - elif opt in ('-u', '--cudaversion'): - cuda_version = arg - elif opt in ('-m', '--cicudfjar'): - ci_cudf_jar = arg elif opt in ('-v', '--basesparkpomversion'): base_spark_pom_version = arg + elif opt in ('-b', '--bulidprofiles'): + build_profiles = arg print('-w is ' + workspace) print('-c is ' + clusterid) @@ -90,14 +71,8 @@ def main(): print('-l is ' + local_script) print('-d is ' + script_dest) print('-z is ' + source_tgz) - print('-j is ' + ci_rapids_jar) - print('-b is ' + db_version) - print('-k is ' + spark_version) - print('-a is ' + scala_version) - print('-f is ' + cudf_version) - print('-u is ' + cuda_version) - print('-m is ' + ci_cudf_jar) print('-v is ' + base_spark_pom_version) + print('-b is ' + build_profiles) master_addr = ClusterUtils.cluster_get_master_addr(workspace, clusterid, token) if master_addr is None: @@ -114,7 +89,7 @@ def main(): print("rsync command: %s" % rsync_command) subprocess.check_call(rsync_command, shell = True) - ssh_command = "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null ubuntu@%s -p 2200 -i %s %s %s %s %s %s %s %s %s %s %s 2>&1 | tee buildout; if [ `echo ${PIPESTATUS[0]}` -ne 0 ]; then false; else true; fi" % (master_addr, private_key_file, script_dest, tgz_dest, db_version, scala_version, ci_rapids_jar, spark_version, cudf_version, cuda_version, ci_cudf_jar, base_spark_pom_version) + ssh_command = "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null ubuntu@%s -p 2200 -i %s %s %s %s %s 2>&1 | tee buildout; if [ `echo ${PIPESTATUS[0]}` -ne 0 ]; then false; else true; fi" % (master_addr, private_key_file, script_dest, tgz_dest, base_spark_pom_version, build_profiles) print("ssh command: %s" % ssh_command) subprocess.check_call(ssh_command, shell = True) diff --git a/shims/aggregator/pom.xml b/shims/aggregator/pom.xml index 92b87779137..757e34b8127 100644 --- a/shims/aggregator/pom.xml +++ b/shims/aggregator/pom.xml @@ -44,6 +44,17 @@ + + databricks301 + + + com.nvidia + rapids-4-spark-shims-spark301-databricks_${scala.binary.version} + ${project.version} + compile + + + include-databricks @@ -54,6 +65,12 @@ ${project.version} compile + + com.nvidia + rapids-4-spark-shims-spark301-databricks_${scala.binary.version} + ${project.version} + compile + diff --git a/shims/pom.xml b/shims/pom.xml index 082d23c068a..62dea5686e6 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -39,6 +39,12 @@ spark300db + + databricks301 + + spark301db + + snapshot-shims diff --git a/shims/spark301db/pom.xml b/shims/spark301db/pom.xml new file mode 100644 index 00000000000..0617393371f --- /dev/null +++ b/shims/spark301db/pom.xml @@ -0,0 +1,117 @@ + + + + 4.0.0 + + + com.nvidia + rapids-4-spark-shims_2.12 + 0.3.0-SNAPSHOT + ../pom.xml + + com.nvidia + rapids-4-spark-shims-spark301-databricks_2.12 + RAPIDS Accelerator for Apache Spark SQL Plugin Spark 3.0.1 Databricks Shim + The RAPIDS SQL plugin for Apache Spark 3.0.1 Databricks Shim + 0.3.0-SNAPSHOT + + + + + + + maven-antrun-plugin + + + dependency + generate-resources + + + + + + + + + + + + + run + + + + + + + + + + ${project.build.directory}/extra-resources + + + src/main/resources + + + + + + 1.10.1 + 3.0.1-databricks + + + + + com.nvidia + rapids-4-spark-shims-spark301_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark301db.version} + provided + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark301db.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark301db.version} + provided + + + org.apache.spark + spark-annotation_${scala.binary.version} + ${spark301db.version} + provided + + + org.apache.parquet + parquet-column + ${parquet.version} + provided + + + + diff --git a/shims/spark301db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider b/shims/spark301db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider new file mode 100644 index 00000000000..d8b3e1a4c52 --- /dev/null +++ b/shims/spark301db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider @@ -0,0 +1 @@ +com.nvidia.spark.rapids.shims.spark301db.SparkShimServiceProvider diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastExchangeExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastExchangeExec.scala new file mode 100644 index 00000000000..06184bd8109 --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastExchangeExec.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.spark301db + +import java.util.UUID + +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike +import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase + +case class GpuBroadcastExchangeExec( + mode: BroadcastMode, + child: SparkPlan) extends GpuBroadcastExchangeExecBase(mode, child) with BroadcastExchangeLike { + + override def runId: UUID = _runId + + override def runtimeStatistics: Statistics = { + val dataSize = metrics("dataSize").value + Statistics(dataSize) + } + + override def doCanonicalize(): SparkPlan = { + GpuBroadcastExchangeExec(mode.canonicalized, child.canonicalized) + } +} diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala new file mode 100644 index 00000000000..8af8617e45f --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.spark301db + +import com.nvidia.spark.rapids.{BaseExprMeta, ConfKeysAndIncompat, GpuBindReferences, GpuBroadcastJoinMeta, GpuColumnVector, GpuExec, GpuOverrides, GpuProjectExec, RapidsConf, RapidsMeta, SparkPlanMeta} +import com.nvidia.spark.rapids.GpuMetricNames.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.rapids.execution.SerializeConcatHostBuffersDeserializeBatch +import org.apache.spark.sql.vectorized.ColumnarBatch + +class GpuBroadcastHashJoinMeta( + join: BroadcastHashJoinExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: ConfKeysAndIncompat) + extends GpuBroadcastJoinMeta[BroadcastHashJoinExec](join, conf, parent, rule) { + + val leftKeys: Seq[BaseExprMeta[_]] = + join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val rightKeys: Seq[BaseExprMeta[_]] = + join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val condition: Option[BaseExprMeta[_]] = + join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + + override def tagPlanForGpu(): Unit = { + GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) + + val buildSide = join.buildSide match { + case BuildLeft => childPlans(0) + case BuildRight => childPlans(1) + } + + if (!canBuildSideBeReplaced(buildSide)) { + willNotWorkOnGpu("the broadcast for this join must be on the GPU too") + } + + if (!canThisBeReplaced) { + buildSide.willNotWorkOnGpu("the BroadcastHashJoin this feeds is not on the GPU") + } + } + + override def convertToGpu(): GpuExec = { + val left = childPlans(0).convertIfNeeded() + val right = childPlans(1).convertIfNeeded() + // The broadcast part of this must be a BroadcastExchangeExec + val buildSide = join.buildSide match { + case BuildLeft => left + case BuildRight => right + } + verifyBuildSideWasReplaced(buildSide) + GpuBroadcastHashJoinExec( + leftKeys.map(_.convertToGpu()), + rightKeys.map(_.convertToGpu()), + join.joinType, join.buildSide, + condition.map(_.convertToGpu()), + left, right) + } +} + +case class GpuBroadcastHashJoinExec( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + joinType: JoinType, + buildSide: BuildSide, + condition: Option[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryExecNode with GpuHashJoin { + + override lazy val additionalMetrics: Map[String, SQLMetric] = Map( + "joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"), + "streamTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "stream time"), + "joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"), + "filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time")) + + override def requiredChildDistribution: Seq[Distribution] = { + val mode = HashedRelationBroadcastMode(buildKeys) + buildSide match { + case BuildLeft => + BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil + case BuildRight => + UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil + } + } + + def broadcastExchange: GpuBroadcastExchangeExec = buildPlan match { + case BroadcastQueryStageExec(_, gpu: GpuBroadcastExchangeExec, _) => gpu + case BroadcastQueryStageExec(_, reused: ReusedExchangeExec, _) => + reused.child.asInstanceOf[GpuBroadcastExchangeExec] + case gpu: GpuBroadcastExchangeExec => gpu + case reused: ReusedExchangeExec => reused.child.asInstanceOf[GpuBroadcastExchangeExec] + } + + override def doExecute(): RDD[InternalRow] = + throw new IllegalStateException( + "GpuBroadcastHashJoin does not support row-based processing") + + override def doExecuteColumnar() : RDD[ColumnarBatch] = { + val numOutputRows = longMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES) + val totalTime = longMetric(TOTAL_TIME) + val streamTime = longMetric("streamTime") + val joinTime = longMetric("joinTime") + val filterTime = longMetric("filterTime") + val joinOutputRows = longMetric("joinOutputRows") + + val broadcastRelation = broadcastExchange + .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() + + val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + + lazy val builtTable = { + val ret = withResource( + GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) + val filtered = filterBuiltTableIfNeeded(combined) + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } + + // Don't warn for a leak, because we cannot control when we are done with this + (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) + ret + } + + val rdd = streamedPlan.executeColumnar() + rdd.mapPartitions(it => + doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows, + numOutputBatches, streamTime, joinTime, filterTime, totalTime)) + } +} + diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastNestedLoopJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastNestedLoopJoinExec.scala new file mode 100644 index 00000000000..54ef6a0278a --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastNestedLoopJoinExec.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark301db + +import com.nvidia.spark.rapids.GpuBuildSide + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec +import org.apache.spark.sql.rapids.execution._ + +/** + * Spark 3.1 changed packages of BuildLeft, BuildRight, BuildSide + */ +case class GpuBroadcastNestedLoopJoinExec( + left: SparkPlan, + right: SparkPlan, + join: BroadcastNestedLoopJoinExec, + joinType: JoinType, + condition: Option[Expression], + targetSizeBytes: Long) + extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition, + targetSizeBytes) { + + def getGpuBuildSide: GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } +} diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuHashJoin.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuHashJoin.scala new file mode 100644 index 00000000000..74f6626f8c5 --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuHashJoin.scala @@ -0,0 +1,328 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.spark301db + +import ai.rapids.cudf.{NvtxColor, Table} +import com.nvidia.spark.rapids._ + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} +import org.apache.spark.sql.execution.joins.HashJoin +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.GpuAnd +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +object GpuHashJoin { + def tagJoin( + meta: RapidsMeta[_, _, _], + joinType: JoinType, + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression]): Unit = joinType match { + case _: InnerLike => + case FullOuter | RightOuter | LeftOuter | LeftSemi | LeftAnti => + if (condition.isDefined) { + meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") + } + case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") + } + + def incRefCount(cb: ColumnarBatch): ColumnarBatch = { + GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) + cb + } +} + +trait GpuHashJoin extends GpuExec with HashJoin { + + override def output: Seq[Attribute] = { + joinType match { + case _: InnerLike => + left.output ++ right.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case j: ExistenceJoin => + left.output :+ j.exists + case LeftExistence(_) => + left.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case x => + throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") + } + } + + // If we have a single batch streamed in then we will produce a single batch of output + // otherwise it can get smaller or bigger, we just don't know. When we support out of + // core joins this will change + override def outputBatching: CoalesceGoal = { + val batching = buildSide match { + case BuildLeft => GpuExec.outputBatching(right) + case BuildRight => GpuExec.outputBatching(left) + } + if (batching == RequireSingleBatch) { + RequireSingleBatch + } else { + null + } + } + + protected lazy val (gpuBuildKeys, gpuStreamedKeys) = { + require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), + "Join keys from two sides should have same types") + val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) + val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) + buildSide match { + case BuildLeft => (lkeys, rkeys) + case BuildRight => (rkeys, lkeys) + } + } + + /** + * Place the columns in left and the columns in right into a single ColumnarBatch + */ + def combine(left: ColumnarBatch, right: ColumnarBatch): ColumnarBatch = { + val l = GpuColumnVector.extractColumns(left) + val r = GpuColumnVector.extractColumns(right) + val c = l ++ r + new ColumnarBatch(c.asInstanceOf[Array[ColumnVector]], left.numRows()) + } + + // TODO eventually dedupe the keys + lazy val joinKeyIndices: Range = gpuBuildKeys.indices + + val localBuildOutput: Seq[Attribute] = buildPlan.output + // The first columns are the ones we joined on and need to remove + lazy val joinIndices: Seq[Int] = joinType match { + case RightOuter => + // The left table and right table are switched in the output + // because we don't support a right join, only left + val numRight = right.output.length + val numLeft = left.output.length + val joinLength = joinKeyIndices.length + def remap(index: Int): Int = { + if (index < numLeft) { + // part of the left table, but is on the right side of the tmp output + index + joinLength + numRight + } else { + // part of the right table, but is on the left side of the tmp output + index + joinLength - numLeft + } + } + output.indices.map (remap) + case _ => + val joinLength = joinKeyIndices.length + output.indices.map (v => v + joinLength) + } + + // Spark adds in rules to filter out nulls for some types of joins, but it does not + // guarantee 100% that all nulls will be filtered out by the time they get to + // this point, but because of https://github.com/rapidsai/cudf/issues/6052 + // we need to filter out the nulls ourselves until it is fixed. + // InnerLike | LeftSemi => + // filter left and right keys + // RightOuter => + // filter left keys + // LeftOuter | LeftAnti => + // filter right keys + + private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { + val builtAnyNullable = gpuBuildKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => builtAnyNullable + case (RightOuter, BuildLeft) => builtAnyNullable + case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable + case _ => false + } + } + + private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { + val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) + (joinType, buildSide) match { + case (_: InnerLike | LeftSemi, _) => streamedAnyNullable + case (RightOuter, BuildRight) => streamedAnyNullable + case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable + case _ => false + } + } + + private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = + exprs.zipWithIndex.map { kv => + GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) + }.reduce(GpuAnd) + + private[this] lazy val builtTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuBuildKeys) + + private[this] lazy val streamedTableNullFilterExpression: GpuExpression = + mkNullFilterExpr(gpuStreamedKeys) + + /** + * Filter the builtBatch if needed. builtBatch will be closed. + */ + def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = + if (shouldFilterBuiltTableForNulls) { + GpuFilter(builtBatch, builtTableNullFilterExpression) + } else { + builtBatch + } + + private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch = + if (shouldFilterStreamTableForNulls) { + GpuFilter(streamedBatch, streamedTableNullFilterExpression) + } else { + streamedBatch + } + + def doJoin(builtTable: Table, + stream: Iterator[ColumnarBatch], + boundCondition: Option[Expression], + numOutputRows: SQLMetric, + joinOutputRows: SQLMetric, + numOutputBatches: SQLMetric, + streamTime: SQLMetric, + joinTime: SQLMetric, + filterTime: SQLMetric, + totalTime: SQLMetric): Iterator[ColumnarBatch] = { + new Iterator[ColumnarBatch] { + import scala.collection.JavaConverters._ + var nextCb: Option[ColumnarBatch] = None + var first: Boolean = true + + TaskContext.get().addTaskCompletionListener[Unit](_ => closeCb()) + + def closeCb(): Unit = { + nextCb.foreach(_.close()) + nextCb = None + } + + override def hasNext: Boolean = { + var mayContinue = true + while (nextCb.isEmpty && mayContinue) { + val startTime = System.nanoTime() + if (stream.hasNext) { + val cb = stream.next() + streamTime += (System.nanoTime() - startTime) + nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, + numOutputBatches, joinTime, filterTime) + totalTime += (System.nanoTime() - startTime) + } else if (first) { + // We have to at least try one in some cases + val cb = GpuColumnVector.emptyBatch(streamedPlan.output.asJava) + streamTime += (System.nanoTime() - startTime) + nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, + numOutputBatches, joinTime, filterTime) + totalTime += (System.nanoTime() - startTime) + } else { + mayContinue = false + } + first = false + } + nextCb.isDefined + } + + override def next(): ColumnarBatch = { + if (!hasNext) { + throw new NoSuchElementException() + } + val ret = nextCb.get + nextCb = None + ret + } + } + } + + private[this] def doJoin(builtTable: Table, + streamedBatch: ColumnarBatch, + boundCondition: Option[Expression], + numOutputRows: SQLMetric, + numJoinOutputRows: SQLMetric, + numOutputBatches: SQLMetric, + joinTime: SQLMetric, + filterTime: SQLMetric): Option[ColumnarBatch] = { + + val combined = withResource(streamedBatch) { streamedBatch => + withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) { + streamedKeysBatch => + GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch)) + } + } + val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb => + GpuColumnVector.from(cb) + } + + val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) + val joined = try { + buildSide match { + case BuildLeft => doJoinLeftRight(builtTable, streamedTable) + case BuildRight => doJoinLeftRight(streamedTable, builtTable) + } + } finally { + streamedTable.close() + nvtxRange.close() + } + + numJoinOutputRows += joined.numRows() + + val tmp = if (boundCondition.isDefined) { + GpuFilter(joined, boundCondition.get, numOutputRows, numOutputBatches, filterTime) + } else { + numOutputRows += joined.numRows() + numOutputBatches += 1 + joined + } + if (tmp.numRows() == 0) { + // Not sure if there is a better way to work around this + numOutputBatches.set(numOutputBatches.value - 1) + tmp.close() + None + } else { + Some(tmp) + } + } + + private[this] def doJoinLeftRight(leftTable: Table, rightTable: Table): ColumnarBatch = { + val joinedTable = joinType match { + case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) + .leftJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case RightOuter => rightTable.onColumns(joinKeyIndices: _*) + .leftJoin(leftTable.onColumns(joinKeyIndices: _*), false) + case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) + .innerJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) + .leftSemiJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) + .leftAntiJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case FullOuter => leftTable.onColumns(joinKeyIndices: _*) + .fullJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + + s" supported") + } + try { + val result = joinIndices.zip(output).map { case (joinIndex, outAttr) => + GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType) + }.toArray[ColumnVector] + + new ColumnarBatch(result, joinedTable.getRowCount.toInt) + } finally { + joinedTable.close() + } + } +} diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffleExchangeExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffleExchangeExec.scala new file mode 100644 index 00000000000..5c92da25aaf --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffleExchangeExec.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.spark301db + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase + +case class GpuShuffleExchangeExec( + override val outputPartitioning: Partitioning, + child: SparkPlan, + canChangeNumPartitions: Boolean) + extends GpuShuffleExchangeExecBase(outputPartitioning, child) with ShuffleExchangeLike { + + override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions + + override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions + + override def getShuffleRDD( + partitionSpecs: Array[ShufflePartitionSpec], + partitionSizes: Option[Array[Long]]): RDD[_] = { + throw new UnsupportedOperationException + } + + override def runtimeStatistics: Statistics = { + val dataSize = metrics("dataSize").value + Statistics(dataSize) + } +} diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala new file mode 100644 index 00000000000..4800ee75abe --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark301db + +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.GpuMetricNames._ + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +object GpuJoinUtils { + def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = { + buildSide match { + case BuildRight => GpuBuildRight + case BuildLeft => GpuBuildLeft + case _ => throw new Exception("unknown buildSide Type") + } + } +} + +/** + * Spark 3.1 changed packages of BuildLeft, BuildRight, BuildSide + */ +class GpuShuffledHashJoinMeta( + join: ShuffledHashJoinExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: ConfKeysAndIncompat) + extends SparkPlanMeta[ShuffledHashJoinExec](join, conf, parent, rule) { + val leftKeys: Seq[BaseExprMeta[_]] = + join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val rightKeys: Seq[BaseExprMeta[_]] = + join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val condition: Option[BaseExprMeta[_]] = + join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + + override def tagPlanForGpu(): Unit = { + GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) + } + + override def convertToGpu(): GpuExec = + GpuShuffledHashJoinExec( + leftKeys.map(_.convertToGpu()), + rightKeys.map(_.convertToGpu()), + join.joinType, + join.buildSide, + condition.map(_.convertToGpu()), + childPlans(0).convertIfNeeded(), + childPlans(1).convertIfNeeded()) +} + +case class GpuShuffledHashJoinExec( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + joinType: JoinType, + buildSide: BuildSide, + condition: Option[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryExecNode with GpuHashJoin { + + override lazy val additionalMetrics: Map[String, SQLMetric] = Map( + "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "build side size"), + "buildTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "build time"), + "streamTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "stream time"), + "joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"), + "joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"), + "filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time")) + + override def requiredChildDistribution: Seq[Distribution] = + HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + "GpuShuffledHashJoin does not support the execute() code path.") + } + + override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { + case BuildLeft => Seq(RequireSingleBatch, null) + case BuildRight => Seq(null, RequireSingleBatch) + } + + override def doExecuteColumnar() : RDD[ColumnarBatch] = { + val buildDataSize = longMetric("buildDataSize") + val numOutputRows = longMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES) + val totalTime = longMetric(TOTAL_TIME) + val buildTime = longMetric("buildTime") + val streamTime = longMetric("streamTime") + val joinTime = longMetric("joinTime") + val filterTime = longMetric("filterTime") + val joinOutputRows = longMetric("joinOutputRows") + + val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + + streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { + (streamIter, buildIter) => { + var combinedSize = 0 + + val startTime = System.nanoTime() + val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( + buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => + withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) + withResource(filterBuiltTableIfNeeded(combined)) { filtered => + combinedSize = + GpuColumnVector.extractColumns(filtered) + .map(_.getBase.getDeviceMemorySize).sum.toInt + GpuColumnVector.from(filtered) + } + } + } + + val delta = System.nanoTime() - startTime + buildTime += delta + totalTime += delta + buildDataSize += combinedSize + val context = TaskContext.get() + context.addTaskCompletionListener[Unit](_ => builtTable.close()) + + doJoin(builtTable, streamIter, boundCondition, + numOutputRows, joinOutputRows, numOutputBatches, + streamTime, joinTime, filterTime, totalTime) + } + } + } +} diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala new file mode 100644 index 00000000000..a5f063b7a72 --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark301db + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} +import org.apache.spark.sql.execution.SortExec +import org.apache.spark.sql.execution.joins.SortMergeJoinExec + +/** + * HashJoin changed in Spark 3.1 requiring Shim + */ +class GpuSortMergeJoinMeta( + join: SortMergeJoinExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: ConfKeysAndIncompat) + extends SparkPlanMeta[SortMergeJoinExec](join, conf, parent, rule) { + + val leftKeys: Seq[BaseExprMeta[_]] = + join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val rightKeys: Seq[BaseExprMeta[_]] = + join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val condition: Option[BaseExprMeta[_]] = join.condition.map( + GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + + override def tagPlanForGpu(): Unit = { + // Use conditions from Hash Join + GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) + + if (!conf.enableReplaceSortMergeJoin) { + willNotWorkOnGpu(s"Not replacing sort merge join with hash join, " + + s"see ${RapidsConf.ENABLE_REPLACE_SORTMERGEJOIN.key}") + } + + // make sure this is last check - if this is SortMergeJoin, the children can be Sorts and we + // want to validate they can run on GPU and remove them before replacing this with a + // ShuffleHashJoin + if (canThisBeReplaced) { + childPlans.foreach { plan => + if (plan.wrapped.isInstanceOf[SortExec]) { + if (!plan.canThisBeReplaced) { + willNotWorkOnGpu(s"can't replace sortMergeJoin because one of the SortExec's before " + + s"can't be replaced.") + } else { + plan.shouldBeRemoved("removing SortExec as part replacing sortMergeJoin with " + + s"shuffleHashJoin") + } + } + } + } + } + + override def convertToGpu(): GpuExec = { + val buildSide = if (canBuildRight(join.joinType)) { + BuildRight + } else if (canBuildLeft(join.joinType)) { + BuildLeft + } else { + throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join") + } + GpuShuffledHashJoinExec( + leftKeys.map(_.convertToGpu()), + rightKeys.map(_.convertToGpu()), + join.joinType, + buildSide, + condition.map(_.convertToGpu()), + childPlans(0).convertIfNeeded(), + childPlans(1).convertIfNeeded()) + } + + /** + * Determine if this type of join supports using the right side of the join as the build side. + * + * These rules match those in Spark's ShuffleHashJoinExec. + */ + private def canBuildRight(joinType: JoinType): Boolean = joinType match { + case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true + case _ => false + } + + /** + * Determine if this type of join supports using the left side of the join as the build side. + * + * These rules match those in Spark's ShuffleHashJoinExec, with the addition of support for + * full outer joins. + */ + private def canBuildLeft(joinType: JoinType): Boolean = joinType match { + case _: InnerLike | RightOuter | FullOuter => true + case _ => false + } +} diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala new file mode 100644 index 00000000000..aceb7479a8e --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark301db + +import com.nvidia.spark.rapids.GpuOverrides.isSupportedType +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.shims.spark301.Spark301Shims +import org.apache.spark.sql.rapids.shims.spark301db._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec +import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.rapids.GpuFileSourceScanExec +import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase} +import org.apache.spark.sql.types._ + +class Spark301dbShims extends Spark301Shims { + + override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION + + override def getGpuBroadcastNestedLoopJoinShim( + left: SparkPlan, + right: SparkPlan, + join: BroadcastNestedLoopJoinExec, + joinType: JoinType, + condition: Option[Expression], + targetSizeBytes: Long): GpuBroadcastNestedLoopJoinExecBase = { + GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) + } + + override def getGpuBroadcastExchangeExec( + mode: BroadcastMode, + child: SparkPlan): GpuBroadcastExchangeExecBase = { + GpuBroadcastExchangeExec(mode, child) + } + + override def isGpuHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuHashJoin => true + case p => false + } + } + + override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuBroadcastHashJoinExec => true + case p => false + } + } + + override def isGpuShuffledHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuShuffledHashJoinExec => true + case p => false + } + } + + override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { + Seq( + GpuOverrides.exec[FileSourceScanExec]( + "Reading data from files, often from Hive tables", + (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { + def isSupported(t: DataType) = t match { + case MapType(StringType, StringType, _) => true + case _ => isSupportedType(t) + } + override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + // partition filters and data filters are not run on the GPU + override val childExprs: Seq[ExprMeta[_]] = Seq.empty + + override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) + + override def convertToGpu(): GpuExec = { + val sparkSession = wrapped.relation.sparkSession + val options = wrapped.relation.options + val newRelation = HadoopFsRelation( + wrapped.relation.location, + wrapped.relation.partitionSchema, + wrapped.relation.dataSchema, + wrapped.relation.bucketSpec, + GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), + options)(sparkSession) + + GpuFileSourceScanExec( + newRelation, + wrapped.output, + wrapped.requiredSchema, + wrapped.partitionFilters, + wrapped.optionalBucketSet, + // TODO: Does Databricks have coalesced bucketing implemented? + None, + wrapped.dataFilters, + wrapped.tableIdentifier, + conf) + } + }), + GpuOverrides.exec[SortMergeJoinExec]( + "Sort merge join, replacing with shuffled hash join", + (join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)), + GpuOverrides.exec[BroadcastHashJoinExec]( + "Implementation of join using broadcast data", + (join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)), + GpuOverrides.exec[ShuffledHashJoinExec]( + "Implementation of join using hashed shuffled data", + (join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)) + ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap + } + + override def getBuildSide(join: HashJoin): GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } + + override def getBuildSide(join: BroadcastNestedLoopJoinExec): GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } + + // Databricks has a different version of FileStatus + override def getPartitionFileNames( + partitions: Seq[PartitionDirectory]): Seq[String] = { + val files = partitions.flatMap(partition => partition.files) + files.map(_.getPath.getName) + } + + override def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { + partitions.map(_.files.map(_.getLen).sum).sum + } + + override def getPartitionedFiles( + partitions: Array[PartitionDirectory]): Array[PartitionedFile] = { + partitions.flatMap { p => + p.files.map { f => + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + } + } + } + + override def getPartitionSplitFiles( + partitions: Array[PartitionDirectory], + maxSplitBytes: Long, + relation: HadoopFsRelation): Array[PartitionedFile] = { + partitions.flatMap { partition => + partition.files.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + } + } + } + + override def getFileScanRDD( + sparkSession: SparkSession, + readFunction: (PartitionedFile) => Iterator[InternalRow], + filePartitions: Seq[FilePartition]): RDD[InternalRow] = { + new GpuFileScanRDD(sparkSession, readFunction, filePartitions) + } + + override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = { + FilePartition(index, files) + } + + override def copyFileSourceScanExec( + scanExec: GpuFileSourceScanExec, + queryUsesInputFile: Boolean): GpuFileSourceScanExec = { + scanExec.copy(queryUsesInputFile=queryUsesInputFile) + } + + override def getGpuShuffleExchangeExec( + outputPartitioning: Partitioning, + child: SparkPlan, + canChangeNumPartitions: Boolean): GpuShuffleExchangeExecBase = { + GpuShuffleExchangeExec(outputPartitioning, child, canChangeNumPartitions) + } + + override def getGpuShuffleExchangeExec( + queryStage: ShuffleQueryStageExec): GpuShuffleExchangeExecBase = { + queryStage.shuffle.asInstanceOf[GpuShuffleExchangeExecBase] + } +} diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/SparkShimServiceProvider.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/SparkShimServiceProvider.scala new file mode 100644 index 00000000000..be7508ffcf2 --- /dev/null +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/SparkShimServiceProvider.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark301db + +import com.nvidia.spark.rapids.{DatabricksShimVersion, SparkShims} + +object SparkShimServiceProvider { + val VERSION = DatabricksShimVersion(3, 0, 1) + val VERSIONNAMES = Seq(s"$VERSION") +} + +class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { + + def matchesVersion(version: String): Boolean = { + SparkShimServiceProvider.VERSIONNAMES.contains(version) + } + + def buildShim: SparkShims = { + new Spark301dbShims() + } +} diff --git a/shims/spark301db/src/main/scala/org/apache/spark/sql/rapids/shims/spark301db/GpuFileScanRDD.scala b/shims/spark301db/src/main/scala/org/apache/spark/sql/rapids/shims/spark301db/GpuFileScanRDD.scala new file mode 100644 index 00000000000..f7e4950f76e --- /dev/null +++ b/shims/spark301db/src/main/scala/org/apache/spark/sql/rapids/shims/spark301db/GpuFileScanRDD.scala @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.rapids.shims.spark301db + +import java.io.{FileNotFoundException, IOException} + +import org.apache.parquet.io.ParquetDecodingException + +import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.{InputFileBlockHolder, RDD} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator + +/** + * An RDD that scans a list of file partitions. + * Databricks has different versions of FileScanRDD so we copy the + * Apache Spark version. + */ +class GpuFileScanRDD( + @transient private val sparkSession: SparkSession, + readFunction: (PartitionedFile) => Iterator[InternalRow], + @transient val filePartitions: Seq[FilePartition]) + extends RDD[InternalRow](sparkSession.sparkContext, Nil) { + + private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + + override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { + val iterator = new Iterator[Object] with AutoCloseable { + private val inputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // apply readFunction, because it might read some bytes. + private val getBytesReadCallback = + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + + // We get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). + private def incTaskInputMetricsBytesRead(): Unit = { + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) + } + + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + private[this] var currentFile: PartitionedFile = null + private[this] var currentIterator: Iterator[Object] = null + + def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + def next(): Object = { + val nextElement = currentIterator.next() + // TODO: we should have a better separation of row based and batch based scan, so that we + // don't need to run this `if` for every record. + val preNumRecordsRead = inputMetrics.recordsRead + if (nextElement.isInstanceOf[ColumnarBatch]) { + incTaskInputMetricsBytesRead() + inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) + } else { + // too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } + inputMetrics.incRecordsRead(1) + } + nextElement + } + + private def readCurrentFile(): Iterator[InternalRow] = { + try { + readFunction(currentFile) + } catch { + case e: FileNotFoundException => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + } + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + private def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + logInfo(s"Reading File $currentFile") + // Sets InputFileBlockHolder for the file block's information + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + if (ignoreMissingFiles || ignoreCorruptFiles) { + currentIterator = new NextIterator[Object] { + // The readFunction may read some bytes before consuming the iterator, e.g., + // vectorized Parquet reader. Here we use lazy val to delay the creation of + // iterator so that we will throw exception in `getNext`. + private lazy val internalIter = readCurrentFile() + + override def getNext(): AnyRef = { + try { + if (internalIter.hasNext) { + internalIter.next() + } else { + finished = true + null + } + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentFile", e) + finished = true + null + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentFile", e) + finished = true + null + } + } + + override def close(): Unit = {} + } + } else { + currentIterator = readCurrentFile() + } + + try { + hasNext + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + case e: ParquetDecodingException => + if (e.getCause.isInstanceOf[SparkUpgradeException]) { + throw e.getCause + } else if (e.getMessage.contains("Can not read value at")) { + val message = "Encounter error while reading parquet files. " + + "One possible cause: Parquet column cannot be converted in the " + + "corresponding files. Details: " + throw new QueryExecutionException(message, e) + } + throw e + } + } else { + currentFile = null + InputFileBlockHolder.unset() + false + } + } + + override def close(): Unit = { + incTaskInputMetricsBytesRead() + InputFileBlockHolder.unset() + } + } + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener[Unit](_ => iterator.close()) + + iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. + } + + override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray + + override protected def getPreferredLocations(split: RDDPartition): Seq[String] = { + split.asInstanceOf[FilePartition].preferredLocations() + } +} +