From a614dbff529a0b12748de77e10ed53aa5a90900b Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 17 Nov 2020 20:33:15 -0600 Subject: [PATCH] Remove support for databricks 7.0 runtime - shim spark300db (#1145) * Remove databricks 3.0.0 shim layer Signed-off-by: Thomas Graves * Update docs Signed-off-by: Thomas Graves --- docs/FAQ.md | 1 - .../get-started/getting-started-databricks.md | 6 +- integration_tests/pom.xml | 4 +- jenkins/Jenkinsfile-blossom.premerge | 2 +- jenkins/Jenkinsfile.databricksnightly | 116 ------- jenkins/Jenkinsfile.databricksrelease | 110 ------ pom.xml | 2 +- shims/aggregator/pom.xml | 17 - shims/pom.xml | 6 - shims/spark300db/pom.xml | 121 ------- ...idia.spark.rapids.SparkShimServiceProvider | 1 - .../spark300db/GpuBroadcastExchangeExec.scala | 29 -- .../spark300db/GpuBroadcastHashJoinExec.scala | 162 --------- .../GpuBroadcastNestedLoopJoinExec.scala | 43 --- .../rapids/shims/spark300db/GpuHashJoin.scala | 328 ------------------ .../spark300db/GpuShuffledHashJoinExec.scala | 151 -------- .../spark300db/GpuSortMergeJoinExec.scala | 110 ------ .../spark300db/GpuWindowInPandasExec.scala | 68 ---- .../shims/spark300db/Spark300dbShims.scala | 223 ------------ .../spark300db/SparkShimServiceProvider.scala | 35 -- .../shims/spark300db/GpuFileScanRDD.scala | 196 ----------- tests/pom.xml | 4 +- 22 files changed, 9 insertions(+), 1726 deletions(-) delete mode 100644 jenkins/Jenkinsfile.databricksnightly delete mode 100644 jenkins/Jenkinsfile.databricksrelease delete mode 100644 shims/spark300db/pom.xml delete mode 100644 shims/spark300db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastExchangeExec.scala delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastNestedLoopJoinExec.scala delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuSortMergeJoinExec.scala delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuWindowInPandasExec.scala delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala delete mode 100644 shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/SparkShimServiceProvider.scala delete mode 100644 shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileScanRDD.scala diff --git a/docs/FAQ.md b/docs/FAQ.md index a25c44b6546..567e72232f3 100644 --- a/docs/FAQ.md +++ b/docs/FAQ.md @@ -28,7 +28,6 @@ top of these changes and release updates as quickly as possible. The RAPIDS Accelerator for Apache Spark officially supports [Apache Spark](get-started/getting-started-on-prem.md), -[Databricks Runtime 7.0](get-started/getting-started-databricks.md) [Databricks Runtime 7.3](get-started/getting-started-databricks.md) and [Google Cloud Dataproc](get-started/getting-started-gcp.md). Most distributions based off of Apache Spark 3.0.0 should work, but because the plugin replaces diff --git a/docs/get-started/getting-started-databricks.md b/docs/get-started/getting-started-databricks.md index b7a2b8b6a69..99da35e4861 100644 --- a/docs/get-started/getting-started-databricks.md +++ b/docs/get-started/getting-started-databricks.md @@ -9,9 +9,9 @@ parent: Getting-Started This guide will run through how to set up the RAPIDS Accelerator for Apache Spark 3.0 on Databricks. At the end of this guide, the reader will be able to run a sample Apache Spark application that runs on NVIDIA GPUs on Databricks. ## Prerequisites - * Apache Spark 3.0 running in DataBricks Runtime 7.0 ML with GPU or Runtime 7.3 ML with GPU - * AWS: 7.0 ML (includes Apache Spark 3.0.0, GPU, Scala 2.12) or 7.3 LTS ML (includes Apache Spark 3.0.1, GPU, Scala 2.12) - * Azure: 7.0 ML (GPU, Scala 2.12, Spark 3.0.0) or 7.3 LTS ML (GPU, Scala 2.12, Spark 3.0.1) + * Apache Spark 3.0 running in DataBricks Runtime 7.3 ML with GPU + * AWS:7.3 LTS ML (includes Apache Spark 3.0.1, GPU, Scala 2.12) + * Azure: 7.3 LTS ML (GPU, Scala 2.12, Spark 3.0.1) The number of GPUs per node dictates the number of Spark executors that can run in that node. diff --git a/integration_tests/pom.xml b/integration_tests/pom.xml index 87c8ed96812..4e53108532c 100644 --- a/integration_tests/pom.xml +++ b/integration_tests/pom.xml @@ -33,9 +33,9 @@ - spark300dbtests + spark301dbtests - 3.0.0-databricks + 3.0.1-databricks diff --git a/jenkins/Jenkinsfile-blossom.premerge b/jenkins/Jenkinsfile-blossom.premerge index f3081357b57..357f76a8735 100644 --- a/jenkins/Jenkinsfile-blossom.premerge +++ b/jenkins/Jenkinsfile-blossom.premerge @@ -223,7 +223,7 @@ pipeline { step([$class : 'JacocoPublisher', execPattern : '**/target/jacoco.exec', classPattern : 'target/jacoco_classes/', - sourcePattern : 'shuffle-plugin/src/main/scala/,udf-compiler/src/main/scala/,sql-plugin/src/main/java/,sql-plugin/src/main/scala/,shims/spark310/src/main/scala/,shims/spark300/src/main/scala/,shims/spark300db/src/main/scala/,shims/spark301/src/main/scala/,shims/spark302/src/main/scala/', + sourcePattern : 'shuffle-plugin/src/main/scala/,udf-compiler/src/main/scala/,sql-plugin/src/main/java/,sql-plugin/src/main/scala/,shims/spark310/src/main/scala/,shims/spark300/src/main/scala/,shims/spark301db/src/main/scala/,shims/spark301/src/main/scala/,shims/spark302/src/main/scala/', sourceInclusionPattern: '**/*.java,**/*.scala' ]) } diff --git a/jenkins/Jenkinsfile.databricksnightly b/jenkins/Jenkinsfile.databricksnightly deleted file mode 100644 index e19340c5f8b..00000000000 --- a/jenkins/Jenkinsfile.databricksnightly +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/local/env groovy -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** -* -* Jenkinsfile for building rapids-plugin on Databricks -* -*/ -@Library(['shared-libs', 'spark-jenkins-shared-lib']) _ - -def urmUrl="https://${ArtifactoryConstants.ARTIFACTORY_NAME}/artifactory/sw-spark-maven" - -pipeline { - agent { - docker { - label 'docker-gpu' - image "${ArtifactoryConstants.ARTIFACTORY_NAME}/sw-spark-docker/plugin:dev-ubuntu16-cuda10.1" - args '--runtime=nvidia -v ${HOME}/.m2:${HOME}/.m2:rw \ - -v ${HOME}/.zinc:${HOME}/.zinc:rw' - } - } - - options { - ansiColor('xterm') - // timeout doesn't seem to work with environment variables so make sure to update below - // IDLE_TIMEOUT config as well - timeout(time: 180, unit: 'MINUTES') - buildDiscarder(logRotator(numToKeepStr: '10')) - } - - parameters { - choice(name: 'DEPLOY_TO', choices: ['Urm', 'Local'], - description: 'Where to deploy artifacts to') - string(name: 'RUNTIME', - defaultValue: '7.0.x-gpu-ml-scala2.12', description: 'databricks runtime') - string(name: 'PUBLIC_KEY', - defaultValue: '\"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDB+ValakyoKn7w+iBRoAi1KlLVH4yVmRXhLCZs1qUECBAhbck2o8Lgjp5wJ+epdT3+EAP2+t/zlK1mU9tTylidapR4fZuIk9ApoQSLOUEXcUtHkPpZulIoGAq78HoyiEs1sKovc6ULvpymjdnQ3ogCZlTlP9uqmL2E4kbtrNCNL0SVj/w10AqzrJ5lqQgO5dIdDRMHW2sv88JI1VLlfiSsofa9RdI7hDRuCnfZ0+dv2URJGGzGt2BkdEmk9t5F1BMpuXvZ8HzOYdACzw0U+THBOk9d4CACUYMyO1XqlXwoYweNKRnigXDCRaTWGFBzTkugRuW/BZBccTR1ON430uRB svcngcc@nvidia.com\"', description: 'public key') - string(name: 'REF', defaultValue: 'branch-0.3', description: 'Commit to build') - string(name: 'BASE_SPARK_VERSION', - defaultValue: '3.0.0', description: 'Databricks base Spark version') - string(name: 'BUILD_PROFILES', - defaultValue: 'databricks,!snapshot-shims', description: 'the mvn build profiles to use when building Databricks') - } - - environment { - IDLE_TIMEOUT = 180 - JENKINS_ROOT = 'jenkins' - MVN_URM_MIRROR='-s jenkins/settings.xml -P mirror-apache-to-urm' - LIBCUDF_KERNEL_CACHE_PATH='/tmp' - URM_CREDS = credentials("svcngcc_artifactory") - DATABRICKS_TOKEN = credentials("SPARK_DATABRICKS_TOKEN") - URM_URL = "${urmUrl}" - } - - triggers { - cron('H 5 * * *') - } - - stages { - stage('Ubuntu16 CUDA10.1') { - steps { - script { - sshagent(credentials : ['svcngcc_pubpriv']) { - sh "rm -rf spark-rapids-ci.tgz" - sh "tar -zcvf spark-rapids-ci.tgz *" - env.CLUSTERID = sh ( - script: "python3.6 ./jenkins/databricks/create.py -t $DATABRICKS_TOKEN -k $PUBLIC_KEY -r $RUNTIME -i $IDLE_TIMEOUT -n CI-GPU-databricks-${BASE_SPARK_VERSION}", - returnStdout: true - ).trim() - sh "python3.6 ./jenkins/databricks/run-tests.py -c ${env.CLUSTERID} -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p /home/svcngcc/.ssh/id_rsa -l ./jenkins/databricks/build.sh -v $BASE_SPARK_VERSION -b $BUILD_PROFILES" - sh "./jenkins/databricks/deploy.sh" - } - } - } - } - } // end of stages - post { - always { - script { - sh "python3.6 ./jenkins/databricks/shutdown.py -c ${env.CLUSTERID} -t $DATABRICKS_TOKEN -d || true" - if (currentBuild.currentResult == "SUCCESS") { - slack("#swrapids-spark-cicd", "Success", color: "#33CC33") - } else { - slack("#swrapids-spark-cicd", "Failed", color: "#FF0000") - } - } - } - } -} // end of pipeline - -void slack(Map params = [:], String channel, String message) { - Map defaultParams = [ - color: "#000000", - baseUrl: "${SparkConstants.SLACK_API_ENDPOINT}", - tokenCredentialId: "slack_token" - ] - - params["channel"] = channel - params["message"] = "${BUILD_URL}\n" + message - - slackSend(defaultParams << params) -} diff --git a/jenkins/Jenkinsfile.databricksrelease b/jenkins/Jenkinsfile.databricksrelease deleted file mode 100644 index 332dde39a84..00000000000 --- a/jenkins/Jenkinsfile.databricksrelease +++ /dev/null @@ -1,110 +0,0 @@ -#!/usr/local/env groovy -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** -* -* Jenkinsfile for building rapids-plugin on Databricks -* -*/ -@Library(['shared-libs', 'spark-jenkins-shared-lib']) _ - -def urmUrl="https://${ArtifactoryConstants.ARTIFACTORY_NAME}/artifactory/sw-spark-maven" - -pipeline { - agent { - docker { - label 'docker-gpu' - image "${ArtifactoryConstants.ARTIFACTORY_NAME}/sw-spark-docker/plugin:dev-ubuntu16-cuda10.1" - args '--runtime=nvidia -v ${HOME}/.m2:${HOME}/.m2:rw \ - -v ${HOME}/.zinc:${HOME}/.zinc:rw' - } - } - - options { - ansiColor('xterm') - timeout(time: 180, unit: 'MINUTES') - buildDiscarder(logRotator(numToKeepStr: '10')) - } - - parameters { - choice(name: 'DEPLOY_TO', choices: ['Local', 'Urm'], - description: 'Where to deploy artifacts to') - string(name: 'DATABRICKS_VERSION', - defaultValue: '0.2.0', description: 'Version to set') - string(name: 'CUDF_VERSION', - defaultValue: '0.15', description: 'Cudf version to use') - string(name: 'CUDA_VERSION', - defaultValue: 'cuda10-1', description: 'cuda version to use') - string(name: 'CLUSTER_ID', - defaultValue: '0828-071715-knack867', description: 'databricks cluster id') - string(name: 'REF', defaultValue: 'main', description: 'Commit to build') - } - - environment { - JENKINS_ROOT = 'jenkins' - MVN_URM_MIRROR='-s jenkins/settings.xml -P mirror-apache-to-urm' - LIBCUDF_KERNEL_CACHE_PATH='/tmp' - URM_CREDS = credentials("svcngcc_artifactory") - DATABRICKS_TOKEN = credentials("TIM_DATABRICKS_TOKEN") - DATABRICKS_SECRET = credentials("TIM_DATABRICKS_SECRET") - SCALA_VERSION = '2.12' - SPARK_VERSION = '3.0.0-databricks' - CI_RAPIDS_JAR = 'rapids-4-spark_2.12-0.1-SNAPSHOT-ci.jar' - CI_CUDF_JAR = 'cudf-0.14-cuda10-1.jar' - URM_URL = "${urmUrl}" - } - - stages { - stage('Ubuntu16 CUDA10.1') { - steps { - script { - sshagent(credentials : ['svcngcc_pubpriv']) { - sh "rm -rf spark-rapids-ci.tgz" - sh "tar -zvcf spark-rapids-ci.tgz *" - sh "python3.6 ./jenkins/databricks/run-tests.py -c $CLUSTER_ID -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p $DATABRICKS_SECRET -l ./jenkins/databricks/build.sh -j $CI_RAPIDS_JAR -b $DATABRICKS_VERSION -k $SPARK_VERSION -a $SCALA_VERSION -f $CUDF_VERSION -u $CUDA_VERSION -m $CI_CUDF_JAR" - sh "./jenkins/databricks/deploy.sh" - } - } - } - } - } // end of stages - post { - always { - script { - sh "python3.6 ./jenkins/databricks/shutdown.py -c $CLUSTER_ID -t $DATABRICKS_TOKEN || true" - if (currentBuild.currentResult == "SUCCESS") { - slack("#swrapids-spark-cicd", "Success", color: "#33CC33") - } else { - slack("#swrapids-spark-cicd", "Failed", color: "#FF0000") - } - } - } - } -} // end of pipeline - -void slack(Map params = [:], String channel, String message) { - Map defaultParams = [ - color: "#000000", - baseUrl: "${SparkConstants.SLACK_API_ENDPOINT}", - tokenCredentialId: "slack_token" - ] - - params["channel"] = channel - params["message"] = "${BUILD_URL}\n" + message - - slackSend(defaultParams << params) -} diff --git a/pom.xml b/pom.xml index cc80aa87086..2fac2a74ca2 100644 --- a/pom.xml +++ b/pom.xml @@ -125,7 +125,7 @@ - databricks + databricks301 true diff --git a/shims/aggregator/pom.xml b/shims/aggregator/pom.xml index 757e34b8127..c89fca50aa6 100644 --- a/shims/aggregator/pom.xml +++ b/shims/aggregator/pom.xml @@ -33,17 +33,6 @@ 0.3.0-SNAPSHOT - - databricks - - - com.nvidia - rapids-4-spark-shims-spark300-databricks_${scala.binary.version} - ${project.version} - compile - - - databricks301 @@ -59,12 +48,6 @@ include-databricks - - com.nvidia - rapids-4-spark-shims-spark300-databricks_${scala.binary.version} - ${project.version} - compile - com.nvidia rapids-4-spark-shims-spark301-databricks_${scala.binary.version} diff --git a/shims/pom.xml b/shims/pom.xml index 62dea5686e6..51079c11e2f 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -33,12 +33,6 @@ 0.3.0-SNAPSHOT - - databricks - - spark300db - - databricks301 diff --git a/shims/spark300db/pom.xml b/shims/spark300db/pom.xml deleted file mode 100644 index 341b18dc138..00000000000 --- a/shims/spark300db/pom.xml +++ /dev/null @@ -1,121 +0,0 @@ - - - - 4.0.0 - - - com.nvidia - rapids-4-spark-shims_2.12 - 0.3.0-SNAPSHOT - ../pom.xml - - com.nvidia - rapids-4-spark-shims-spark300-databricks_2.12 - RAPIDS Accelerator for Apache Spark SQL Plugin Spark 3.0.0 Databricks Shim - The RAPIDS SQL plugin for Apache Spark 3.0.0 Databricks Shim - 0.3.0-SNAPSHOT - - - - - - - maven-antrun-plugin - - - dependency - generate-resources - - - - - - - - - - - - - run - - - - - - org.scalastyle - scalastyle-maven-plugin - - - - - - - ${project.build.directory}/extra-resources - - - src/main/resources - - - - - - 1.10.1 - 3.0.0-databricks - - - - - com.nvidia - rapids-4-spark-shims-spark300_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark30db.version} - provided - - - org.apache.spark - spark-catalyst_${scala.binary.version} - ${spark30db.version} - provided - - - org.apache.spark - spark-core_${scala.binary.version} - ${spark30db.version} - provided - - - org.apache.spark - spark-annotation_${scala.binary.version} - ${spark30db.version} - provided - - - org.apache.parquet - parquet-column - ${parquet.version} - provided - - - - diff --git a/shims/spark300db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider b/shims/spark300db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider deleted file mode 100644 index e8d964fc951..00000000000 --- a/shims/spark300db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider +++ /dev/null @@ -1 +0,0 @@ -com.nvidia.spark.rapids.shims.spark300db.SparkShimServiceProvider diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastExchangeExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastExchangeExec.scala deleted file mode 100644 index e43a5257c26..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastExchangeExec.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nvidia.spark.rapids.shims.spark300db - -import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuBroadcastExchangeExecBase} - -case class GpuBroadcastExchangeExec( - mode: BroadcastMode, - child: SparkPlan) extends GpuBroadcastExchangeExecBase(mode, child) { - - override def doCanonicalize(): SparkPlan = { - GpuBroadcastExchangeExec(mode.canonicalized, child.canonicalized) - } -} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala deleted file mode 100644 index f55315025a7..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark300db - -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.GpuMetricNames._ - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution} -import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} -import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec -import org.apache.spark.sql.execution.exchange.ReusedExchangeExec -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.rapids.execution.SerializeConcatHostBuffersDeserializeBatch -import org.apache.spark.sql.vectorized.ColumnarBatch - -/** - * Spark 3.1 changed packages of BuildLeft, BuildRight, BuildSide - */ -class GpuBroadcastHashJoinMeta( - join: BroadcastHashJoinExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: ConfKeysAndIncompat) - extends SparkPlanMeta[BroadcastHashJoinExec](join, conf, parent, rule) { - - val leftKeys: Seq[BaseExprMeta[_]] = - join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val rightKeys: Seq[BaseExprMeta[_]] = - join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = - join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition - - override def tagPlanForGpu(): Unit = { - GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) - - val buildSide = join.buildSide match { - case BuildLeft => childPlans(0) - case BuildRight => childPlans(1) - } - - if (!buildSide.canThisBeReplaced) { - willNotWorkOnGpu("the broadcast for this join must be on the GPU too") - } - - if (!canThisBeReplaced) { - buildSide.willNotWorkOnGpu("the BroadcastHashJoin this feeds is not on the GPU") - } - } - - override def convertToGpu(): GpuExec = { - val left = childPlans(0).convertIfNeeded() - val right = childPlans(1).convertIfNeeded() - // The broadcast part of this must be a BroadcastExchangeExec - val buildSide = join.buildSide match { - case BuildLeft => left - case BuildRight => right - } - if (!buildSide.isInstanceOf[GpuBroadcastExchangeExec]) { - throw new IllegalStateException("the broadcast must be on the GPU too") - } - GpuBroadcastHashJoinExec( - leftKeys.map(_.convertToGpu()), - rightKeys.map(_.convertToGpu()), - join.joinType, join.buildSide, - condition.map(_.convertToGpu()), - left, right) - } -} - -case class GpuBroadcastHashJoinExec( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - joinType: JoinType, - buildSide: BuildSide, - condition: Option[Expression], - left: SparkPlan, - right: SparkPlan) extends BinaryExecNode with GpuHashJoin { - - override lazy val additionalMetrics: Map[String, SQLMetric] = Map( - "joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"), - "streamTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "stream time"), - "joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"), - "filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time")) - - override def requiredChildDistribution: Seq[Distribution] = { - val mode = HashedRelationBroadcastMode(buildKeys) - buildSide match { - case BuildLeft => - BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil - case BuildRight => - UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil - } - } - - def broadcastExchange: GpuBroadcastExchangeExec = buildPlan match { - case BroadcastQueryStageExec(_, gpu: GpuBroadcastExchangeExec, _) => gpu - case BroadcastQueryStageExec(_, reused: ReusedExchangeExec, _) => - reused.child.asInstanceOf[GpuBroadcastExchangeExec] - case gpu: GpuBroadcastExchangeExec => gpu - case reused: ReusedExchangeExec => reused.child.asInstanceOf[GpuBroadcastExchangeExec] - } - - override def doExecute(): RDD[InternalRow] = - throw new IllegalStateException("GpuBroadcastHashJoin does not support row-based processing") - - override def doExecuteColumnar() : RDD[ColumnarBatch] = { - val numOutputRows = longMetric(NUM_OUTPUT_ROWS) - val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES) - val totalTime = longMetric(TOTAL_TIME) - val streamTime = longMetric("streamTime") - val joinTime = longMetric("joinTime") - val filterTime = longMetric("filterTime") - val joinOutputRows = longMetric("joinOutputRows") - - val broadcastRelation = broadcastExchange - .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() - - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) - - lazy val builtTable = { - val ret = withResource( - GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) - val filtered = filterBuiltTableIfNeeded(combined) - withResource(filtered) { filtered => - GpuColumnVector.from(filtered) - } - } - - // Don't warn for a leak, because we cannot control when we are done with this - (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) - ret - } - - val rdd = streamedPlan.executeColumnar() - rdd.mapPartitions(it => - doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows, - numOutputBatches, streamTime, joinTime, filterTime, totalTime)) - } -} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastNestedLoopJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastNestedLoopJoinExec.scala deleted file mode 100644 index a687a770ce4..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastNestedLoopJoinExec.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark300db - -import com.nvidia.spark.rapids.GpuBuildSide - -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec -import org.apache.spark.sql.rapids.execution._ - -/** - * Spark 3.1 changed packages of BuildLeft, BuildRight, BuildSide - */ -case class GpuBroadcastNestedLoopJoinExec( - left: SparkPlan, - right: SparkPlan, - join: BroadcastNestedLoopJoinExec, - joinType: JoinType, - condition: Option[Expression], - targetSizeBytes: Long) - extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition, - targetSizeBytes) { - - def getGpuBuildSide: GpuBuildSide = { - GpuJoinUtils.getGpuBuildSide(join.buildSide) - } -} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala deleted file mode 100644 index b703e3fce95..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nvidia.spark.rapids.shims.spark300db - -import ai.rapids.cudf.{NvtxColor, Table} -import com.nvidia.spark.rapids._ - -import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} -import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} -import org.apache.spark.sql.execution.joins.HashJoin -import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.rapids.GpuAnd -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} - -object GpuHashJoin { - def tagJoin( - meta: RapidsMeta[_, _, _], - joinType: JoinType, - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - condition: Option[Expression]): Unit = joinType match { - case _: InnerLike => - case FullOuter | RightOuter | LeftOuter | LeftSemi | LeftAnti => - if (condition.isDefined) { - meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") - } - case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") - } - - def incRefCount(cb: ColumnarBatch): ColumnarBatch = { - GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) - cb - } -} - -trait GpuHashJoin extends GpuExec with HashJoin { - - override def output: Seq[Attribute] = { - joinType match { - case _: InnerLike => - left.output ++ right.output - case LeftOuter => - left.output ++ right.output.map(_.withNullability(true)) - case RightOuter => - left.output.map(_.withNullability(true)) ++ right.output - case j: ExistenceJoin => - left.output :+ j.exists - case LeftExistence(_) => - left.output - case FullOuter => - left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) - case x => - throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") - } - } - - // If we have a single batch streamed in then we will produce a single batch of output - // otherwise it can get smaller or bigger, we just don't know. When we support out of - // core joins this will change - override def outputBatching: CoalesceGoal = { - val batching = buildSide match { - case BuildLeft => GpuExec.outputBatching(right) - case BuildRight => GpuExec.outputBatching(left) - } - if (batching == RequireSingleBatch) { - RequireSingleBatch - } else { - null - } - } - - protected lazy val (gpuBuildKeys, gpuStreamedKeys) = { - require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), - "Join keys from two sides should have same types") - val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) - val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) - buildSide match { - case BuildLeft => (lkeys, rkeys) - case BuildRight => (rkeys, lkeys) - } - } - - /** - * Place the columns in left and the columns in right into a single ColumnarBatch - */ - def combine(left: ColumnarBatch, right: ColumnarBatch): ColumnarBatch = { - val l = GpuColumnVector.extractColumns(left) - val r = GpuColumnVector.extractColumns(right) - val c = l ++ r - new ColumnarBatch(c.asInstanceOf[Array[ColumnVector]], left.numRows()) - } - - // TODO eventually dedupe the keys - lazy val joinKeyIndices: Range = gpuBuildKeys.indices - - val localBuildOutput: Seq[Attribute] = buildPlan.output - // The first columns are the ones we joined on and need to remove - lazy val joinIndices: Seq[Int] = joinType match { - case RightOuter => - // The left table and right table are switched in the output - // because we don't support a right join, only left - val numRight = right.output.length - val numLeft = left.output.length - val joinLength = joinKeyIndices.length - def remap(index: Int): Int = { - if (index < numLeft) { - // part of the left table, but is on the right side of the tmp output - index + joinLength + numRight - } else { - // part of the right table, but is on the left side of the tmp output - index + joinLength - numLeft - } - } - output.indices.map (remap) - case _ => - val joinLength = joinKeyIndices.length - output.indices.map (v => v + joinLength) - } - - // Spark adds in rules to filter out nulls for some types of joins, but it does not - // guarantee 100% that all nulls will be filtered out by the time they get to - // this point, but because of https://github.com/rapidsai/cudf/issues/6052 - // we need to filter out the nulls ourselves until it is fixed. - // InnerLike | LeftSemi => - // filter left and right keys - // RightOuter => - // filter left keys - // LeftOuter | LeftAnti => - // filter right keys - - private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { - val builtAnyNullable = gpuBuildKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => builtAnyNullable - case (RightOuter, BuildLeft) => builtAnyNullable - case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable - case _ => false - } - } - - private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { - val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => streamedAnyNullable - case (RightOuter, BuildRight) => streamedAnyNullable - case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable - case _ => false - } - } - - private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = - exprs.zipWithIndex.map { kv => - GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) - }.reduce(GpuAnd) - - private[this] lazy val builtTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuBuildKeys) - - private[this] lazy val streamedTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuStreamedKeys) - - /** - * Filter the builtBatch if needed. builtBatch will be closed. - */ - def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = - if (shouldFilterBuiltTableForNulls) { - GpuFilter(builtBatch, builtTableNullFilterExpression) - } else { - builtBatch - } - - private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch = - if (shouldFilterStreamTableForNulls) { - GpuFilter(streamedBatch, streamedTableNullFilterExpression) - } else { - streamedBatch - } - - def doJoin(builtTable: Table, - stream: Iterator[ColumnarBatch], - boundCondition: Option[Expression], - numOutputRows: SQLMetric, - joinOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - streamTime: SQLMetric, - joinTime: SQLMetric, - filterTime: SQLMetric, - totalTime: SQLMetric): Iterator[ColumnarBatch] = { - new Iterator[ColumnarBatch] { - import scala.collection.JavaConverters._ - var nextCb: Option[ColumnarBatch] = None - var first: Boolean = true - - TaskContext.get().addTaskCompletionListener[Unit](_ => closeCb()) - - def closeCb(): Unit = { - nextCb.foreach(_.close()) - nextCb = None - } - - override def hasNext: Boolean = { - var mayContinue = true - while (nextCb.isEmpty && mayContinue) { - val startTime = System.nanoTime() - if (stream.hasNext) { - val cb = stream.next() - streamTime += (System.nanoTime() - startTime) - nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, - numOutputBatches, joinTime, filterTime) - totalTime += (System.nanoTime() - startTime) - } else if (first) { - // We have to at least try one in some cases - val cb = GpuColumnVector.emptyBatch(streamedPlan.output.asJava) - streamTime += (System.nanoTime() - startTime) - nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, - numOutputBatches, joinTime, filterTime) - totalTime += (System.nanoTime() - startTime) - } else { - mayContinue = false - } - first = false - } - nextCb.isDefined - } - - override def next(): ColumnarBatch = { - if (!hasNext) { - throw new NoSuchElementException() - } - val ret = nextCb.get - nextCb = None - ret - } - } - } - - private[this] def doJoin(builtTable: Table, - streamedBatch: ColumnarBatch, - boundCondition: Option[Expression], - numOutputRows: SQLMetric, - numJoinOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - joinTime: SQLMetric, - filterTime: SQLMetric): Option[ColumnarBatch] = { - - val combined = withResource(streamedBatch) { streamedBatch => - withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) { - streamedKeysBatch => - GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch)) - } - } - val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb => - GpuColumnVector.from(cb) - } - - val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) - val joined = try { - buildSide match { - case BuildLeft => doJoinLeftRight(builtTable, streamedTable) - case BuildRight => doJoinLeftRight(streamedTable, builtTable) - } - } finally { - streamedTable.close() - nvtxRange.close() - } - - numJoinOutputRows += joined.numRows() - - val tmp = if (boundCondition.isDefined) { - GpuFilter(joined, boundCondition.get, numOutputRows, numOutputBatches, filterTime) - } else { - numOutputRows += joined.numRows() - numOutputBatches += 1 - joined - } - if (tmp.numRows() == 0) { - // Not sure if there is a better way to work around this - numOutputBatches.set(numOutputBatches.value - 1) - tmp.close() - None - } else { - Some(tmp) - } - } - - private[this] def doJoinLeftRight(leftTable: Table, rightTable: Table): ColumnarBatch = { - val joinedTable = joinType match { - case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) - .leftJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case RightOuter => rightTable.onColumns(joinKeyIndices: _*) - .leftJoin(leftTable.onColumns(joinKeyIndices: _*), false) - case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) - .innerJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) - .leftSemiJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) - .leftAntiJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case FullOuter => leftTable.onColumns(joinKeyIndices: _*) - .fullJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + - s" supported") - } - try { - val result = joinIndices.zip(output).map { case (joinIndex, outAttr) => - GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType) - }.toArray[ColumnVector] - - new ColumnarBatch(result, joinedTable.getRowCount.toInt) - } finally { - joinedTable.close() - } - } -} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala deleted file mode 100644 index 6ebbdf4f2cf..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark300db - -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.GpuMetricNames._ - -import org.apache.spark.TaskContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} -import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} -import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.vectorized.ColumnarBatch - -object GpuJoinUtils { - def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = { - buildSide match { - case BuildRight => GpuBuildRight - case BuildLeft => GpuBuildLeft - case _ => throw new Exception("unknown buildSide Type") - } - } -} - -/** - * Spark 3.1 changed packages of BuildLeft, BuildRight, BuildSide - */ -class GpuShuffledHashJoinMeta( - join: ShuffledHashJoinExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: ConfKeysAndIncompat) - extends SparkPlanMeta[ShuffledHashJoinExec](join, conf, parent, rule) { - val leftKeys: Seq[BaseExprMeta[_]] = - join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val rightKeys: Seq[BaseExprMeta[_]] = - join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = - join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition - - override def tagPlanForGpu(): Unit = { - GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) - } - - override def convertToGpu(): GpuExec = - GpuShuffledHashJoinExec( - leftKeys.map(_.convertToGpu()), - rightKeys.map(_.convertToGpu()), - join.joinType, - join.buildSide, - condition.map(_.convertToGpu()), - childPlans(0).convertIfNeeded(), - childPlans(1).convertIfNeeded()) -} - -case class GpuShuffledHashJoinExec( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - joinType: JoinType, - buildSide: BuildSide, - condition: Option[Expression], - left: SparkPlan, - right: SparkPlan) extends BinaryExecNode with GpuHashJoin { - - override lazy val additionalMetrics: Map[String, SQLMetric] = Map( - "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "build side size"), - "buildTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "build time"), - "streamTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "stream time"), - "joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"), - "joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"), - "filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time")) - - override def requiredChildDistribution: Seq[Distribution] = - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil - - override protected def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException( - "GpuShuffledHashJoin does not support the execute() code path.") - } - - override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { - case BuildLeft => Seq(RequireSingleBatch, null) - case BuildRight => Seq(null, RequireSingleBatch) - } - - override def doExecuteColumnar() : RDD[ColumnarBatch] = { - val buildDataSize = longMetric("buildDataSize") - val numOutputRows = longMetric(NUM_OUTPUT_ROWS) - val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES) - val totalTime = longMetric(TOTAL_TIME) - val buildTime = longMetric("buildTime") - val streamTime = longMetric("streamTime") - val joinTime = longMetric("joinTime") - val filterTime = longMetric("filterTime") - val joinOutputRows = longMetric("joinOutputRows") - - val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) - - streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { - (streamIter, buildIter) => { - var combinedSize = 0 - - val startTime = System.nanoTime() - val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( - buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => - withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) - withResource(filterBuiltTableIfNeeded(combined)) { filtered => - combinedSize = - GpuColumnVector.extractColumns(filtered) - .map(_.getBase.getDeviceMemorySize).sum.toInt - GpuColumnVector.from(filtered) - } - } - } - - val delta = System.nanoTime() - startTime - buildTime += delta - totalTime += delta - buildDataSize += combinedSize - val context = TaskContext.get() - context.addTaskCompletionListener[Unit](_ => builtTable.close()) - - doJoin(builtTable, streamIter, boundCondition, - numOutputRows, joinOutputRows, numOutputBatches, - streamTime, joinTime, filterTime, totalTime) - } - } - } -} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuSortMergeJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuSortMergeJoinExec.scala deleted file mode 100644 index 8eaa1e6a222..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuSortMergeJoinExec.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark300db - -import com.nvidia.spark.rapids._ - -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} -import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} -import org.apache.spark.sql.execution.SortExec -import org.apache.spark.sql.execution.joins.SortMergeJoinExec - -/** - * HashJoin changed in Spark 3.1 requiring Shim - */ -class GpuSortMergeJoinMeta( - join: SortMergeJoinExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: ConfKeysAndIncompat) - extends SparkPlanMeta[SortMergeJoinExec](join, conf, parent, rule) { - - val leftKeys: Seq[BaseExprMeta[_]] = - join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val rightKeys: Seq[BaseExprMeta[_]] = - join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) - val condition: Option[BaseExprMeta[_]] = join.condition.map( - GpuOverrides.wrapExpr(_, conf, Some(this))) - - override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition - - override def tagPlanForGpu(): Unit = { - // Use conditions from Hash Join - GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) - - if (!conf.enableReplaceSortMergeJoin) { - willNotWorkOnGpu(s"Not replacing sort merge join with hash join, " + - s"see ${RapidsConf.ENABLE_REPLACE_SORTMERGEJOIN.key}") - } - - // make sure this is last check - if this is SortMergeJoin, the children can be Sorts and we - // want to validate they can run on GPU and remove them before replacing this with a - // ShuffleHashJoin - if (canThisBeReplaced) { - childPlans.foreach { plan => - if (plan.wrapped.isInstanceOf[SortExec]) { - if (!plan.canThisBeReplaced) { - willNotWorkOnGpu(s"can't replace sortMergeJoin because one of the SortExec's before " + - s"can't be replaced.") - } else { - plan.shouldBeRemoved("removing SortExec as part replacing sortMergeJoin with " + - s"shuffleHashJoin") - } - } - } - } - } - - override def convertToGpu(): GpuExec = { - val buildSide = if (canBuildRight(join.joinType)) { - BuildRight - } else if (canBuildLeft(join.joinType)) { - BuildLeft - } else { - throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join") - } - GpuShuffledHashJoinExec( - leftKeys.map(_.convertToGpu()), - rightKeys.map(_.convertToGpu()), - join.joinType, - buildSide, - condition.map(_.convertToGpu()), - childPlans(0).convertIfNeeded(), - childPlans(1).convertIfNeeded()) - } - - /** - * Determine if this type of join supports using the right side of the join as the build side. - * - * These rules match those in Spark's ShuffleHashJoinExec. - */ - private def canBuildRight(joinType: JoinType): Boolean = joinType match { - case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true - case _ => false - } - - /** - * Determine if this type of join supports using the left side of the join as the build side. - * - * These rules match those in Spark's ShuffleHashJoinExec, with the addition of support for - * full outer joins. - */ - private def canBuildLeft(joinType: JoinType): Boolean = joinType match { - case _: InnerLike | RightOuter | FullOuter => true - case _ => false - } -} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuWindowInPandasExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuWindowInPandasExec.scala deleted file mode 100644 index fb480cf239b..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuWindowInPandasExec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark300db - -import com.nvidia.spark.rapids.{GpuBindReferences, GpuBoundReference, GpuProjectExec, GpuWindowExpression} -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder} -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecBase -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} - -/* - * This GpuWindowInPandasExec aims at accelerating the data transfer between - * JVM and Python, and scheduling GPU resources for Python processes - */ -case class GpuWindowInPandasExec( - projectList: Seq[Expression], - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], - child: SparkPlan) extends GpuWindowInPandasExecBase { - - override final def pythonModuleKey: String = "databricks" - - // On Databricks, the projectList contains not only the window expression, but may also contains - // the input attributes. So we need to extract the window expressions from it. - override def windowExpression: Seq[Expression] = projectList.filter { expr => - expr.find(node => node.isInstanceOf[GpuWindowExpression]).isDefined - } - - // On Databricks, the projectList is expected to be the final output, and it is nondeterministic. - // It may contain the input attributes or not, or even part of the input attributes. So - // we need to project the joined batch per this projectList. - // But for the schema, just return it directly. - override def output: Seq[Attribute] = projectList - .map(_.asInstanceOf[NamedExpression].toAttribute) - - override def projectResult(joinedBatch: ColumnarBatch): ColumnarBatch = { - // Project the data - withResource(joinedBatch) { joinBatch => - GpuProjectExec.project(joinBatch, outReferences) - } - } - - private val outReferences = { - val references = windowExpression.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - GpuBoundReference(child.output.size + i, e.dataType, e.nullable) - } - val unboundToRefMap = windowExpression.zip(references).toMap - // Bound the project list for GPU - GpuBindReferences.bindGpuReferences( - projectList.map(_.transform(unboundToRefMap)), child.output) - } - -} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala deleted file mode 100644 index 0b9db3c3b23..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark300db - -import java.time.ZoneId - -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.shims.spark300.Spark300Shims - -import org.apache.spark.sql.rapids.shims.spark300db._ -import org.apache.hadoop.fs.Path - -import org.apache.spark.rdd.RDD -import org.apache.spark.SparkEnv -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} -import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec -import org.apache.spark.sql.execution.python.WindowInPandasExec -import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuTimeSub} -import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastMeta, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, GpuShuffleMeta} -import org.apache.spark.sql.rapids.execution.python.GpuWindowInPandasExecMetaBase -import org.apache.spark.sql.types._ -import org.apache.spark.storage.{BlockId, BlockManagerId} - -class Spark300dbShims extends Spark300Shims { - - override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION - - override def getGpuBroadcastNestedLoopJoinShim( - left: SparkPlan, - right: SparkPlan, - join: BroadcastNestedLoopJoinExec, - joinType: JoinType, - condition: Option[Expression], - targetSizeBytes: Long): GpuBroadcastNestedLoopJoinExecBase = { - GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) - } - - override def getGpuBroadcastExchangeExec( - mode: BroadcastMode, - child: SparkPlan): GpuBroadcastExchangeExecBase = { - GpuBroadcastExchangeExec(mode, child) - } - - override def isGpuHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuHashJoin => true - case p => false - } - } - - override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuBroadcastHashJoinExec => true - case p => false - } - } - - override def isGpuShuffledHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuShuffledHashJoinExec => true - case p => false - } - } - - override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { - Seq( - GpuOverrides.exec[WindowInPandasExec]( - "The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between" + - " the Java process and the Python process. It also supports scheduling GPU resources" + - " for the Python process when enabled. For now it only supports row based window frame.", - (winPy, conf, p, r) => new GpuWindowInPandasExecMetaBase(winPy, conf, p, r) { - override def convertToGpu(): GpuExec = { - GpuWindowInPandasExec( - windowExpressions.map(_.convertToGpu()), - partitionSpec.map(_.convertToGpu()), - orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), - childPlans.head.convertIfNeeded() - ) - } - }).disabledByDefault("it only supports row based frame for now"), - GpuOverrides.exec[FileSourceScanExec]( - "Reading data from files, often from Hive tables", - (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { - - override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, - allowArray = true, - allowStringMaps = true, - allowStruct = true, - allowNesting = true) - - // partition filters and data filters are not run on the GPU - override val childExprs: Seq[ExprMeta[_]] = Seq.empty - - override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) - - override def convertToGpu(): GpuExec = { - val sparkSession = wrapped.relation.sparkSession - val options = wrapped.relation.options - val newRelation = HadoopFsRelation( - wrapped.relation.location, - wrapped.relation.partitionSchema, - wrapped.relation.dataSchema, - wrapped.relation.bucketSpec, - GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), - options)(sparkSession) - - GpuFileSourceScanExec( - newRelation, - wrapped.output, - wrapped.requiredSchema, - wrapped.partitionFilters, - wrapped.optionalBucketSet, - // TODO: Does Databricks have coalesced bucketing implemented? - None, - wrapped.dataFilters, - wrapped.tableIdentifier, - conf) - } - }), - GpuOverrides.exec[SortMergeJoinExec]( - "Sort merge join, replacing with shuffled hash join", - (join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)), - GpuOverrides.exec[BroadcastHashJoinExec]( - "Implementation of join using broadcast data", - (join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)), - GpuOverrides.exec[ShuffledHashJoinExec]( - "Implementation of join using hashed shuffled data", - (join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)) - ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap - } - - override def getBuildSide(join: HashJoin): GpuBuildSide = { - GpuJoinUtils.getGpuBuildSide(join.buildSide) - } - - override def getBuildSide(join: BroadcastNestedLoopJoinExec): GpuBuildSide = { - GpuJoinUtils.getGpuBuildSide(join.buildSide) - } - - // Databricks has a different version of FileStatus - override def getPartitionFileNames( - partitions: Seq[PartitionDirectory]): Seq[String] = { - val files = partitions.flatMap(partition => partition.files) - files.map(_.getPath.getName) - } - - override def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { - partitions.map(_.files.map(_.getLen).sum).sum - } - - override def getPartitionedFiles( - partitions: Array[PartitionDirectory]): Array[PartitionedFile] = { - partitions.flatMap { p => - p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) - } - } - } - - override def getPartitionSplitFiles( - partitions: Array[PartitionDirectory], - maxSplitBytes: Long, - relation: HadoopFsRelation): Array[PartitionedFile] = { - partitions.flatMap { partition => - partition.files.flatMap { file => - // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( - sparkSession = relation.sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values - ) - } - } - } - - override def getFileScanRDD( - sparkSession: SparkSession, - readFunction: (PartitionedFile) => Iterator[InternalRow], - filePartitions: Seq[FilePartition]): RDD[InternalRow] = { - new GpuFileScanRDD(sparkSession, readFunction, filePartitions) - } - - override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = { - FilePartition(index, files) - } - - override def copyFileSourceScanExec( - scanExec: GpuFileSourceScanExec, - queryUsesInputFile: Boolean): GpuFileSourceScanExec = { - scanExec.copy(queryUsesInputFile=queryUsesInputFile) - } -} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/SparkShimServiceProvider.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/SparkShimServiceProvider.scala deleted file mode 100644 index d897e05e01d..00000000000 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/SparkShimServiceProvider.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims.spark300db - -import com.nvidia.spark.rapids.{DatabricksShimVersion, SparkShims} - -object SparkShimServiceProvider { - val VERSION = DatabricksShimVersion(3, 0, 0) - val VERSIONNAMES = Seq(s"$VERSION") -} - -class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { - - def matchesVersion(version: String): Boolean = { - SparkShimServiceProvider.VERSIONNAMES.contains(version) - } - - def buildShim: SparkShims = { - new Spark300dbShims() - } -} diff --git a/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileScanRDD.scala b/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileScanRDD.scala deleted file mode 100644 index c63d79f8e31..00000000000 --- a/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileScanRDD.scala +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.rapids.shims.spark300db - -import java.io.{FileNotFoundException, IOException} - -import org.apache.parquet.io.ParquetDecodingException - -import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.{InputFileBlockHolder, RDD} -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.NextIterator - -/** - * An RDD that scans a list of file partitions. - * Databricks has different versions of FileScanRDD so we copy the - * Apache Spark version. - */ -class GpuFileScanRDD( - @transient private val sparkSession: SparkSession, - readFunction: (PartitionedFile) => Iterator[InternalRow], - @transient val filePartitions: Seq[FilePartition]) - extends RDD[InternalRow](sparkSession.sparkContext, Nil) { - - private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles - private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles - - override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { - val iterator = new Iterator[Object] with AutoCloseable { - private val inputMetrics = context.taskMetrics().inputMetrics - private val existingBytesRead = inputMetrics.bytesRead - - // Find a function that will return the FileSystem bytes read by this thread. Do this before - // apply readFunction, because it might read some bytes. - private val getBytesReadCallback = - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - - // We get our input bytes from thread-local Hadoop FileSystem statistics. - // If we do a coalesce, however, we are likely to compute multiple partitions in the same - // task and in the same thread, in which case we need to avoid override values written by - // previous partitions (SPARK-13071). - private def incTaskInputMetricsBytesRead(): Unit = { - inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) - } - - private[this] val files = split.asInstanceOf[FilePartition].files.toIterator - private[this] var currentFile: PartitionedFile = null - private[this] var currentIterator: Iterator[Object] = null - - def hasNext: Boolean = { - // Kill the task in case it has been marked as killed. This logic is from - // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order - // to avoid performance overhead. - context.killTaskIfInterrupted() - (currentIterator != null && currentIterator.hasNext) || nextIterator() - } - def next(): Object = { - val nextElement = currentIterator.next() - // TODO: we should have a better separation of row based and batch based scan, so that we - // don't need to run this `if` for every record. - val preNumRecordsRead = inputMetrics.recordsRead - if (nextElement.isInstanceOf[ColumnarBatch]) { - incTaskInputMetricsBytesRead() - inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) - } else { - // too costly to update every record - if (inputMetrics.recordsRead % - SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { - incTaskInputMetricsBytesRead() - } - inputMetrics.incRecordsRead(1) - } - nextElement - } - - private def readCurrentFile(): Iterator[InternalRow] = { - try { - readFunction(currentFile) - } catch { - case e: FileNotFoundException => - throw new FileNotFoundException( - e.getMessage + "\n" + - "It is possible the underlying files have been updated. " + - "You can explicitly invalidate the cache in Spark by " + - "running 'REFRESH TABLE tableName' command in SQL or " + - "by recreating the Dataset/DataFrame involved.") - } - } - - /** Advances to the next file. Returns true if a new non-empty iterator is available. */ - private def nextIterator(): Boolean = { - if (files.hasNext) { - currentFile = files.next() - logInfo(s"Reading File $currentFile") - // Sets InputFileBlockHolder for the file block's information - InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) - - if (ignoreMissingFiles || ignoreCorruptFiles) { - currentIterator = new NextIterator[Object] { - // The readFunction may read some bytes before consuming the iterator, e.g., - // vectorized Parquet reader. Here we use lazy val to delay the creation of - // iterator so that we will throw exception in `getNext`. - private lazy val internalIter = readCurrentFile() - - override def getNext(): AnyRef = { - try { - if (internalIter.hasNext) { - internalIter.next() - } else { - finished = true - null - } - } catch { - case e: FileNotFoundException if ignoreMissingFiles => - logWarning(s"Skipped missing file: $currentFile", e) - finished = true - null - // Throw FileNotFoundException even if `ignoreCorruptFiles` is true - case e: FileNotFoundException if !ignoreMissingFiles => throw e - case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => - logWarning( - s"Skipped the rest of the content in the corrupted file: $currentFile", e) - finished = true - null - } - } - - override def close(): Unit = {} - } - } else { - currentIterator = readCurrentFile() - } - - try { - hasNext - } catch { - case e: SchemaColumnConvertNotSupportedException => - val message = "Parquet column cannot be converted in " + - s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + - s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" - throw new QueryExecutionException(message, e) - case e: ParquetDecodingException => - if (e.getCause.isInstanceOf[SparkUpgradeException]) { - throw e.getCause - } else if (e.getMessage.contains("Can not read value at")) { - val message = "Encounter error while reading parquet files. " + - "One possible cause: Parquet column cannot be converted in the " + - "corresponding files. Details: " - throw new QueryExecutionException(message, e) - } - throw e - } - } else { - currentFile = null - InputFileBlockHolder.unset() - false - } - } - - override def close(): Unit = { - incTaskInputMetricsBytesRead() - InputFileBlockHolder.unset() - } - } - - // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener[Unit](_ => iterator.close()) - - iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. - } - - override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray - - override protected def getPreferredLocations(split: RDDPartition): Seq[String] = { - split.asInstanceOf[FilePartition].preferredLocations() - } -} - diff --git a/tests/pom.xml b/tests/pom.xml index 711b5efb840..bdcccc456b9 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -35,9 +35,9 @@ - spark300dbtests + spark301dbtests - 3.0.0-databricks + 3.0.1-databricks