diff --git a/integration_tests/pom.xml b/integration_tests/pom.xml index cb3e248ad36..96b197c843d 100644 --- a/integration_tests/pom.xml +++ b/integration_tests/pom.xml @@ -32,7 +32,13 @@ 3.0.0 - + + spark300dbtests + + 3.0.0-databricks + + + spark301tests 3.0.1-SNAPSHOT diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index dc29e63fa8a..bd8726a2fc9 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -97,6 +97,8 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 @ignore_order(local=True) +@pytest.mark.xfail(condition=is_databricks_runtime(), + reason='https://github.com/NVIDIA/spark-rapids/issues/441') @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) def test_broadcast_nested_loop_join_special_case(data_gen): def do_join(spark): diff --git a/jenkins/Jenkinsfile.databricksnightly b/jenkins/Jenkinsfile.databricksnightly index 27cde28a212..6bc6a8bec38 100644 --- a/jenkins/Jenkinsfile.databricksnightly +++ b/jenkins/Jenkinsfile.databricksnightly @@ -44,7 +44,7 @@ pipeline { choice(name: 'DEPLOY_TO', choices: ['Urm', 'Local'], description: 'Where to deploy artifacts to') string(name: 'DATABRICKS_VERSION', - defaultValue: '0.2-databricks-SNAPSHOT', description: 'Version to set') + defaultValue: '0.2.0-SNAPSHOT', description: 'Version to set') string(name: 'CUDF_VERSION', defaultValue: '0.15-SNAPSHOT', description: 'Cudf version to use') string(name: 'CUDA_VERSION', @@ -61,7 +61,7 @@ pipeline { URM_CREDS = credentials("svcngcc_artifactory") DATABRICKS_TOKEN = credentials("SPARK_DATABRICKS_TOKEN") SCALA_VERSION = '2.12' - SPARK_VERSION = '3.0.0' + SPARK_VERSION = '3.0.0-databricks' CI_RAPIDS_JAR = 'rapids-4-spark_2.12-0.1-SNAPSHOT-ci.jar' CI_CUDF_JAR = 'cudf-0.14-cuda10-1.jar' URM_URL = "${urmUrl}" @@ -76,8 +76,7 @@ pipeline { steps { script { sshagent(credentials : ['svcngcc_pubpriv']) { - sh "mvn -B versions:set -DnewVersion=$DATABRICKS_VERSION && git clean -d -f" - sh "patch -p1 < ./jenkins/databricks/dbimports.patch" + sh "rm spark-rapids-ci.tgz" sh "tar -zcvf spark-rapids-ci.tgz *" sh "python3.6 ./jenkins/databricks/run-tests.py -c $CLUSTER_ID -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p /home/svcngcc/.ssh/id_rsa -l ./jenkins/databricks/build.sh -j $CI_RAPIDS_JAR -b $DATABRICKS_VERSION -k $SPARK_VERSION -a $SCALA_VERSION -f $CUDF_VERSION -u $CUDA_VERSION -m $CI_CUDF_JAR" sh "./jenkins/databricks/deploy.sh" diff --git a/jenkins/Jenkinsfile.databricksrelease b/jenkins/Jenkinsfile.databricksrelease index b2f15758651..647bab74099 100644 --- a/jenkins/Jenkinsfile.databricksrelease +++ b/jenkins/Jenkinsfile.databricksrelease @@ -45,7 +45,7 @@ pipeline { string(name: 'DEPLOY_TO', defaultValue: 'https://oss.sonatype.org/service/local/staging/deploy/maven2', description: 'The repo URL where to deploy the artifacts') string(name: 'DATABRICKS_VERSION', - defaultValue: '0.2-databricks-SNAPSHOT', description: 'Version to set') + defaultValue: '0.2.0-SNAPSHOT', description: 'Version to set') string(name: 'CUDF_VERSION', defaultValue: '0.15-SNAPSHOT', description: 'Cudf version to use') string(name: 'CUDA_VERSION', @@ -62,7 +62,7 @@ pipeline { DIST_PL='dist' SQL_PL='sql-plugin' SCALA_VERSION = '2.12' - SPARK_VERSION = '3.0.0' + SPARK_VERSION = '3.0.0-databricks' CI_RAPIDS_JAR = 'rapids-4-spark_2.12-0.1-SNAPSHOT-ci.jar' CI_CUDF_JAR = 'cudf-0.14-cuda10-1.jar' LOCAL_URL = "${localUrl}" @@ -73,8 +73,7 @@ pipeline { steps { script { sshagent(credentials : ['svcngcc_pubpriv']) { - sh "mvn versions:set -DnewVersion=0.2.0-databricks && git clean -d -f" - sh "patch -p1 < ./jenkins/databricks/dbimports.patch" + sh "rm spark-rapids-ci.tgz" sh "tar -zcvf spark-rapids-ci.tgz * || true" sh "python3.6 ./jenkins/databricks/run-tests.py -z ./spark-rapids-ci.tgz -t $DATABRICKS_TOKEN -p /home/svcngcc/.ssh/id_rsa -l ./jenkins/databricks/build.sh -j $CI_RAPIDS_JAR -b $DATABRICKS_VERSION -k $SPARK_VERSION -a $SCALA_VERSION -f $CUDF_VERSION -u $CUDA_VERSION -m $CI_CUDF_JAR" } diff --git a/jenkins/databricks/build.sh b/jenkins/databricks/build.sh index e6e56df7524..27d44cbe295 100755 --- a/jenkins/databricks/build.sh +++ b/jenkins/databricks/build.sh @@ -25,6 +25,7 @@ SPARK_VERSION=$5 CUDF_VERSION=$6 CUDA_VERSION=$7 CI_CUDF_JAR=$8 +BASE_SPARK_POM_VERSION=$9 echo "Spark version is $SPARK_VERSION" echo "scala version is: $SCALA_VERSION" @@ -40,7 +41,7 @@ rm -rf spark-rapids mkdir spark-rapids tar -zxvf $SPARKTGZ -C spark-rapids cd spark-rapids -mvn -B clean package || true +mvn -B -Pdatabricks clean package -DskipTests || true M2DIR=/home/ubuntu/.m2/repository CUDF_JAR=${M2DIR}/ai/rapids/cudf/${CUDF_VERSION}/cudf-${CUDF_VERSION}-${CUDA_VERSION}.jar @@ -50,13 +51,17 @@ SQLJAR=----workspace_spark_3_0--sql--core--core-hive-2.3__hadoop-2.7_${SCALA_VER CATALYSTJAR=----workspace_spark_3_0--sql--catalyst--catalyst-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar ANNOTJAR=----workspace_spark_3_0--common--tags--tags-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar COREJAR=----workspace_spark_3_0--core--core-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar +# install the 3.0.0 pom file so we get dependencies +COREPOM=spark-core_${SCALA_VERSION}-${BASE_SPARK_POM_VERSION}.pom +COREPOMPATH=$M2DIR/org/apache/spark/spark-core_${SCALA_VERSION}/${BASE_SPARK_POM_VERSION} mvn -B install:install-file \ -Dmaven.repo.local=$M2DIR \ -Dfile=$JARDIR/$COREJAR \ -DgroupId=org.apache.spark \ -DartifactId=spark-core_$SCALA_VERSION \ -Dversion=$SPARK_VERSION \ - -Dpackaging=jar + -Dpackaging=jar \ + -DpomFile=$COREPOMPATH/$COREPOM mvn -B install:install-file \ -Dmaven.repo.local=$M2DIR \ diff --git a/jenkins/databricks/deploy.sh b/jenkins/databricks/deploy.sh index 607db259aff..ed71b949a31 100755 --- a/jenkins/databricks/deploy.sh +++ b/jenkins/databricks/deploy.sh @@ -24,6 +24,6 @@ cd spark-rapids echo "Maven mirror is $MVN_URM_MIRROR" SERVER_ID='snapshots' SERVER_URL="$URM_URL-local" -FPATH=./dist/target/rapids-4-spark_$SCALA_VERSION-$DATABRICKS_VERSION.jar +DBJARFPATH=./shims/spark300db/target/rapids-4-spark-shims-spark300-databricks_$SCALA_VERSION-$DATABRICKS_VERSION.jar mvn -B deploy:deploy-file $MVN_URM_MIRROR -Durl=$SERVER_URL -DrepositoryId=$SERVER_ID \ - -Dfile=$FPATH -DpomFile=dist/pom.xml + -Dfile=$DBJARFPATH -DpomFile=shims/spark300db/pom.xml diff --git a/jenkins/databricks/run-tests.py b/jenkins/databricks/run-tests.py index 975cbe8601d..d0ddc59d13a 100644 --- a/jenkins/databricks/run-tests.py +++ b/jenkins/databricks/run-tests.py @@ -52,19 +52,20 @@ def main(): cudf_version = '0.15-SNAPSHOT' cuda_version = 'cuda10-1' ci_cudf_jar = 'cudf-0.14-cuda10-1.jar' + base_spark_pom_version = '3.0.0' try: - opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:p:l:nd:z:j:b:k:a:f:u:m:', - ['workspace=', 'token=', 'clusterid=', 'private=', 'nostart=', 'localscript=', 'dest=', 'sparktgz=', 'cirapidsjar=', 'databricksversion=', 'sparkversion=', 'scalaversion=', 'cudfversion=', 'cudaversion=', 'cicudfjar=']) + opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:p:l:nd:z:j:b:k:a:f:u:m:v:', + ['workspace=', 'token=', 'clusterid=', 'private=', 'nostart=', 'localscript=', 'dest=', 'sparktgz=', 'cirapidsjar=', 'databricksversion=', 'sparkversion=', 'scalaversion=', 'cudfversion=', 'cudaversion=', 'cicudfjar=', 'basesparkpomversion=']) except getopt.GetoptError: print( - 'run-tests.py -s -t -c -p -n -l -d -z -j -b -k -a -f -u -m ') + 'run-tests.py -s -t -c -p -n -l -d -z -j -b -k -a -f -u -m -v ') sys.exit(2) for opt, arg in opts: if opt == '-h': print( - 'run-tests.py -s -t -c -p -n -l -d , -z -j -b -k -a -f -u -m ') + 'run-tests.py -s -t -c -p -n -l -d , -z -j -b -k -a -f -u -m -v ') sys.exit() elif opt in ('-s', '--workspace'): workspace = arg @@ -96,6 +97,8 @@ def main(): cuda_version = arg elif opt in ('-m', '--cicudfjar'): ci_cudf_jar = arg + elif opt in ('-v', '--basesparkpomversion'): + base_spark_pom_version = arg print('-s is ' + workspace) print('-c is ' + clusterid) @@ -114,6 +117,7 @@ def main(): print('-f is ' + cudf_version) print('-u is ' + cuda_version) print('-m is ' + ci_cudf_jar) + print('-v is ' + base_spark_pom_version) if skip_start is None: jsonout = cluster_state(workspace, clusterid, token) @@ -161,7 +165,7 @@ def main(): print("rsync command: %s" % rsync_command) subprocess.check_call(rsync_command, shell = True) - ssh_command = "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null ubuntu@%s -p 2200 -i %s %s %s %s %s %s %s %s %s %s 2>&1 | tee buildout; if [ `echo ${PIPESTATUS[0]}` -ne 0 ]; then false; else true; fi" % (master_addr, private_key_file, script_dest, tgz_dest, db_version, scala_version, ci_rapids_jar, spark_version, cudf_version, cuda_version, ci_cudf_jar) + ssh_command = "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null ubuntu@%s -p 2200 -i %s %s %s %s %s %s %s %s %s %s %s 2>&1 | tee buildout; if [ `echo ${PIPESTATUS[0]}` -ne 0 ]; then false; else true; fi" % (master_addr, private_key_file, script_dest, tgz_dest, db_version, scala_version, ci_rapids_jar, spark_version, cudf_version, cuda_version, ci_cudf_jar, base_spark_pom_version) print("ssh command: %s" % ssh_command) subprocess.check_call(ssh_command, shell = True) diff --git a/jenkins/deploy.sh b/jenkins/deploy.sh index e77f6195254..8351fd2fbdd 100755 --- a/jenkins/deploy.sh +++ b/jenkins/deploy.sh @@ -60,9 +60,9 @@ if [ "$SIGN_FILE" == true ]; then SQL_ART_VER=`mvn exec:exec -q -pl $SQL_PL -Dexec.executable=echo -Dexec.args='${project.version}'` JS_FPATH="${SQL_PL}/target/${SQL_ART_ID}-${SQL_ART_VER}" SRC_DOC_JARS="-Dsources=${JS_FPATH}-sources.jar -Djavadoc=${JS_FPATH}-javadoc.jar" - DEPLOY_CMD="mvn -B gpg:sign-and-deploy-file -s jenkins/settings.xml -Dgpg.passphrase=$GPG_PASSPHRASE" + DEPLOY_CMD="mvn -B -Pinclude-databricks gpg:sign-and-deploy-file -s jenkins/settings.xml -Dgpg.passphrase=$GPG_PASSPHRASE" else - DEPLOY_CMD="mvn -B deploy:deploy-file -s jenkins/settings.xml" + DEPLOY_CMD="mvn -B -Pinclude-databricks deploy:deploy-file -s jenkins/settings.xml" fi echo "Deploy CMD: $DEPLOY_CMD" diff --git a/jenkins/spark-nightly-build.sh b/jenkins/spark-nightly-build.sh index 33d8ba40ef8..41a7f38b0d6 100755 --- a/jenkins/spark-nightly-build.sh +++ b/jenkins/spark-nightly-build.sh @@ -19,7 +19,7 @@ set -ex . jenkins/version-def.sh -mvn -U -B clean deploy $MVN_URM_MIRROR -Dmaven.repo.local=$WORKSPACE/.m2 +mvn -U -B -Pinclude-databricks clean deploy $MVN_URM_MIRROR -Dmaven.repo.local=$WORKSPACE/.m2 # Run unit tests against other spark versions mvn -U -B -Pspark301tests test $MVN_URM_MIRROR -Dmaven.repo.local=$WORKSPACE/.m2 # spark310 unit tests fail - https://github.com/NVIDIA/spark-rapids/issues/382 diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 64149f3051a..9fa312ce6a1 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -37,7 +37,7 @@ export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH" tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \ rm -f $SPARK_HOME.tgz -mvn -U -B $MVN_URM_MIRROR clean verify -Dpytest.TEST_TAGS='' +mvn -U -B $MVN_URM_MIRROR -Pinclude-databricks clean verify -Dpytest.TEST_TAGS='' # Run the unit tests for other Spark versions but dont run full python integration tests env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests test -Dpytest.TEST_TAGS='' # spark310 unit tests fail - https://github.com/NVIDIA/spark-rapids/issues/382 diff --git a/shims/aggregator/pom.xml b/shims/aggregator/pom.xml index 70e45d76c69..af023d99989 100644 --- a/shims/aggregator/pom.xml +++ b/shims/aggregator/pom.xml @@ -32,6 +32,32 @@ The RAPIDS SQL plugin for Apache Spark Shim Aggregator 0.2.0-SNAPSHOT + + + databricks + + + com.nvidia + rapids-4-spark-shims-spark300-databricks_${scala.binary.version} + ${project.version} + compile + + + + + + include-databricks + + + com.nvidia + rapids-4-spark-shims-spark300-databricks_${scala.binary.version} + ${project.version} + compile + + + + + com.nvidia diff --git a/shims/pom.xml b/shims/pom.xml index 18ab6dd96e8..2df7c512232 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -32,6 +32,15 @@ The RAPIDS SQL plugin for Apache Spark Shims 0.2.0-SNAPSHOT + + + databricks + + spark300db + + + + spark300 spark301 diff --git a/shims/spark300db/pom.xml b/shims/spark300db/pom.xml new file mode 100644 index 00000000000..c3f1e919a0f --- /dev/null +++ b/shims/spark300db/pom.xml @@ -0,0 +1,64 @@ + + + + 4.0.0 + + + com.nvidia + rapids-4-spark-shims_2.12 + 0.2.0-SNAPSHOT + ../pom.xml + + com.nvidia + rapids-4-spark-shims-spark300-databricks_2.12 + RAPIDS Accelerator for Apache Spark SQL Plugin Spark 3.0.0 Databricks Shim + The RAPIDS SQL plugin for Apache Spark 3.0.0 Databricks Shim + 0.2.0-SNAPSHOT + + + 3.0.0-databricks + + + + + com.nvidia + rapids-4-spark-shims-spark300_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark30db.version} + provided + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark30db.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark30db.version} + provided + + + + diff --git a/shims/spark300db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider b/shims/spark300db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider new file mode 100644 index 00000000000..e8d964fc951 --- /dev/null +++ b/shims/spark300db/src/main/resources/META-INF/services/com.nvidia.spark.rapids.SparkShimServiceProvider @@ -0,0 +1 @@ +com.nvidia.spark.rapids.shims.spark300db.SparkShimServiceProvider diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala new file mode 100644 index 00000000000..94a14cdc5fe --- /dev/null +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark300db + +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.GpuMetricNames._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, SerializeConcatHostBuffersDeserializeBatch} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Spark 3.1 changed packages of BuildLeft, BuildRight, BuildSide + */ +class GpuBroadcastHashJoinMeta( + join: BroadcastHashJoinExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: ConfKeysAndIncompat) + extends SparkPlanMeta[BroadcastHashJoinExec](join, conf, parent, rule) { + + val leftKeys: Seq[BaseExprMeta[_]] = + join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val rightKeys: Seq[BaseExprMeta[_]] = + join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val condition: Option[BaseExprMeta[_]] = + join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + + override def tagPlanForGpu(): Unit = { + GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) + + val buildSide = join.buildSide match { + case BuildLeft => childPlans(0) + case BuildRight => childPlans(1) + } + + if (!buildSide.canThisBeReplaced) { + willNotWorkOnGpu("the broadcast for this join must be on the GPU too") + } + + if (!canThisBeReplaced) { + buildSide.willNotWorkOnGpu("the BroadcastHashJoin this feeds is not on the GPU") + } + } + + override def convertToGpu(): GpuExec = { + val left = childPlans(0).convertIfNeeded() + val right = childPlans(1).convertIfNeeded() + // The broadcast part of this must be a BroadcastExchangeExec + val buildSide = join.buildSide match { + case BuildLeft => left + case BuildRight => right + } + if (!buildSide.isInstanceOf[GpuBroadcastExchangeExec]) { + throw new IllegalStateException("the broadcast must be on the GPU too") + } + GpuBroadcastHashJoinExec( + leftKeys.map(_.convertToGpu()), + rightKeys.map(_.convertToGpu()), + join.joinType, join.buildSide, + condition.map(_.convertToGpu()), + left, right) + } +} + +case class GpuBroadcastHashJoinExec( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + joinType: JoinType, + buildSide: BuildSide, + condition: Option[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryExecNode with GpuHashJoin { + + override lazy val additionalMetrics: Map[String, SQLMetric] = Map( + "joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"), + "joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"), + "filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time")) + + override def requiredChildDistribution: Seq[Distribution] = { + val mode = HashedRelationBroadcastMode(buildKeys) + buildSide match { + case BuildLeft => + BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil + case BuildRight => + UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil + } + } + + def broadcastExchange: GpuBroadcastExchangeExec = buildPlan match { + case gpu: GpuBroadcastExchangeExec => gpu + case reused: ReusedExchangeExec => reused.child.asInstanceOf[GpuBroadcastExchangeExec] + } + + override def doExecute(): RDD[InternalRow] = + throw new IllegalStateException("GpuBroadcastHashJoin does not support row-based processing") + + override def doExecuteColumnar() : RDD[ColumnarBatch] = { + val numOutputRows = longMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES) + val totalTime = longMetric(TOTAL_TIME) + val joinTime = longMetric("joinTime") + val filterTime = longMetric("filterTime") + val joinOutputRows = longMetric("joinOutputRows") + + val broadcastRelation = broadcastExchange + .executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() + + val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + + lazy val builtTable = { + // TODO clean up intermediate results... + val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) + val combined = combine(keys, broadcastRelation.value.batch) + val ret = GpuColumnVector.from(combined) + // Don't warn for a leak, because we cannot control when we are done with this + (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) + ret + } + + val rdd = streamedPlan.executeColumnar() + rdd.mapPartitions(it => + doJoin(builtTable, it, boundCondition, numOutputRows, joinOutputRows, + numOutputBatches, joinTime, filterTime, totalTime)) + } +} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastNestedLoopJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastNestedLoopJoinExec.scala new file mode 100644 index 00000000000..a687a770ce4 --- /dev/null +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastNestedLoopJoinExec.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark300db + +import com.nvidia.spark.rapids.GpuBuildSide + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec +import org.apache.spark.sql.rapids.execution._ + +/** + * Spark 3.1 changed packages of BuildLeft, BuildRight, BuildSide + */ +case class GpuBroadcastNestedLoopJoinExec( + left: SparkPlan, + right: SparkPlan, + join: BroadcastNestedLoopJoinExec, + joinType: JoinType, + condition: Option[Expression], + targetSizeBytes: Long) + extends GpuBroadcastNestedLoopJoinExecBase(left, right, join, joinType, condition, + targetSizeBytes) { + + def getGpuBuildSide: GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } +} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala new file mode 100644 index 00000000000..b80db78fb28 --- /dev/null +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.spark300db + +import ai.rapids.cudf.{NvtxColor, Table} +import com.nvidia.spark.rapids._ + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} +import org.apache.spark.sql.execution.joins.HashJoin +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +object GpuHashJoin { + def tagJoin( + meta: RapidsMeta[_, _, _], + joinType: JoinType, + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression]): Unit = joinType match { + case _: InnerLike => + case FullOuter | RightOuter | LeftOuter | LeftSemi | LeftAnti => + if (condition.isDefined) { + meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") + } + case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") + } +} + +trait GpuHashJoin extends GpuExec with HashJoin { + + override def output: Seq[Attribute] = { + joinType match { + case _: InnerLike => + left.output ++ right.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case j: ExistenceJoin => + left.output :+ j.exists + case LeftExistence(_) => + left.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case x => + throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") + } + } + + protected lazy val (gpuBuildKeys, gpuStreamedKeys) = { + require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), + "Join keys from two sides should have same types") + val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) + val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) + buildSide match { + case BuildLeft => (lkeys, rkeys) + case BuildRight => (rkeys, lkeys) + } + } + + /** + * Place the columns in left and the columns in right into a single ColumnarBatch + */ + def combine(left: ColumnarBatch, right: ColumnarBatch): ColumnarBatch = { + val l = GpuColumnVector.extractColumns(left) + val r = GpuColumnVector.extractColumns(right) + val c = l ++ r + new ColumnarBatch(c.asInstanceOf[Array[ColumnVector]], left.numRows()) + } + + // TODO eventually dedupe the keys + lazy val joinKeyIndices: Range = gpuBuildKeys.indices + + val localBuildOutput: Seq[Attribute] = buildPlan.output + // The first columns are the ones we joined on and need to remove + lazy val joinIndices: Seq[Int] = joinType match { + case RightOuter => + // The left table and right table are switched in the output + // because we don't support a right join, only left + val numRight = right.output.length + val numLeft = left.output.length + val joinLength = joinKeyIndices.length + def remap(index: Int): Int = { + if (index < numLeft) { + // part of the left table, but is on the right side of the tmp output + index + joinLength + numRight + } else { + // part of the right table, but is on the left side of the tmp output + index + joinLength - numLeft + } + } + output.indices.map (remap) + case _ => + val joinLength = joinKeyIndices.length + output.indices.map (v => v + joinLength) + } + + def doJoin(builtTable: Table, + stream: Iterator[ColumnarBatch], + boundCondition: Option[Expression], + numOutputRows: SQLMetric, + joinOutputRows: SQLMetric, + numOutputBatches: SQLMetric, + joinTime: SQLMetric, + filterTime: SQLMetric, + totalTime: SQLMetric): Iterator[ColumnarBatch] = { + new Iterator[ColumnarBatch] { + import scala.collection.JavaConverters._ + var nextCb: Option[ColumnarBatch] = None + var first: Boolean = true + + TaskContext.get().addTaskCompletionListener[Unit](_ => closeCb()) + + def closeCb(): Unit = { + nextCb.foreach(_.close()) + nextCb = None + } + + override def hasNext: Boolean = { + while (nextCb.isEmpty && (first || stream.hasNext)) { + if (stream.hasNext) { + val cb = stream.next() + val startTime = System.nanoTime() + nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, + numOutputBatches, joinTime, filterTime) + totalTime += (System.nanoTime() - startTime) + } else if (first) { + // We have to at least try one in some cases + val startTime = System.nanoTime() + val cb = GpuColumnVector.emptyBatch(streamedPlan.output.asJava) + nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, + numOutputBatches, joinTime, filterTime) + totalTime += (System.nanoTime() - startTime) + } + first = false + } + nextCb.isDefined + } + + override def next(): ColumnarBatch = { + if (!hasNext) { + throw new NoSuchElementException() + } + val ret = nextCb.get + nextCb = None + ret + } + } + } + + private[this] def doJoin(builtTable: Table, + streamedBatch: ColumnarBatch, + boundCondition: Option[Expression], + numOutputRows: SQLMetric, + numJoinOutputRows: SQLMetric, + numOutputBatches: SQLMetric, + joinTime: SQLMetric, + filterTime: SQLMetric): Option[ColumnarBatch] = { + + val streamedTable = try { + val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys) + try { + val combined = combine(streamedKeysBatch, streamedBatch) + GpuColumnVector.from(combined) + } finally { + streamedKeysBatch.close() + } + } finally { + streamedBatch.close() + } + + val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) + val joined = try { + buildSide match { + case BuildLeft => doJoinLeftRight(builtTable, streamedTable) + case BuildRight => doJoinLeftRight(streamedTable, builtTable) + } + } finally { + streamedTable.close() + nvtxRange.close() + } + + numJoinOutputRows += joined.numRows() + + val tmp = if (boundCondition.isDefined) { + GpuFilter(joined, boundCondition.get, numOutputRows, numOutputBatches, filterTime) + } else { + numOutputRows += joined.numRows() + numOutputBatches += 1 + joined + } + if (tmp.numRows() == 0) { + // Not sure if there is a better way to work around this + numOutputBatches.set(numOutputBatches.value - 1) + tmp.close() + None + } else { + Some(tmp) + } + } + + private[this] def doJoinLeftRight(leftTable: Table, rightTable: Table): ColumnarBatch = { + val joinedTable = joinType match { + case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) + .leftJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case RightOuter => rightTable.onColumns(joinKeyIndices: _*) + .leftJoin(leftTable.onColumns(joinKeyIndices: _*), false) + case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) + .innerJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) + .leftSemiJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) + .leftAntiJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case FullOuter => leftTable.onColumns(joinKeyIndices: _*) + .fullJoin(rightTable.onColumns(joinKeyIndices: _*), false) + case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + + s" supported") + } + try { + val result = joinIndices.map(joinIndex => + GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount())) + .toArray[ColumnVector] + + new ColumnarBatch(result, joinedTable.getRowCount.toInt) + } finally { + joinedTable.close() + } + } +} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala new file mode 100644 index 00000000000..ad481219fc6 --- /dev/null +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark300db + +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.GpuMetricNames._ + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +object GpuJoinUtils { + def getGpuBuildSide(buildSide: BuildSide): GpuBuildSide = { + buildSide match { + case BuildRight => GpuBuildRight + case BuildLeft => GpuBuildLeft + case _ => throw new Exception("unknown buildSide Type") + } + } +} + +/** + * Spark 3.1 changed packages of BuildLeft, BuildRight, BuildSide + */ +class GpuShuffledHashJoinMeta( + join: ShuffledHashJoinExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: ConfKeysAndIncompat) + extends SparkPlanMeta[ShuffledHashJoinExec](join, conf, parent, rule) { + val leftKeys: Seq[BaseExprMeta[_]] = + join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val rightKeys: Seq[BaseExprMeta[_]] = + join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val condition: Option[BaseExprMeta[_]] = + join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + + override def tagPlanForGpu(): Unit = { + GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) + } + + override def convertToGpu(): GpuExec = + GpuShuffledHashJoinExec( + leftKeys.map(_.convertToGpu()), + rightKeys.map(_.convertToGpu()), + join.joinType, + join.buildSide, + condition.map(_.convertToGpu()), + childPlans(0).convertIfNeeded(), + childPlans(1).convertIfNeeded()) +} + +case class GpuShuffledHashJoinExec( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + joinType: JoinType, + buildSide: BuildSide, + condition: Option[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryExecNode with GpuHashJoin { + + override lazy val additionalMetrics: Map[String, SQLMetric] = Map( + "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "build side size"), + "buildTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "build time"), + "joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"), + "joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"), + "filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time")) + + override def requiredChildDistribution: Seq[Distribution] = + HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + "GpuShuffledHashJoin does not support the execute() code path.") + } + + override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { + case BuildLeft => Seq(RequireSingleBatch, null) + case BuildRight => Seq(null, RequireSingleBatch) + } + + override def doExecuteColumnar() : RDD[ColumnarBatch] = { + val buildDataSize = longMetric("buildDataSize") + val numOutputRows = longMetric(NUM_OUTPUT_ROWS) + val numOutputBatches = longMetric(NUM_OUTPUT_BATCHES) + val totalTime = longMetric(TOTAL_TIME) + val buildTime = longMetric("buildTime") + val joinTime = longMetric("joinTime") + val filterTime = longMetric("filterTime") + val joinOutputRows = longMetric("joinOutputRows") + + val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) + + streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { + (streamIter, buildIter) => { + var combinedSize = 0 + val startTime = System.nanoTime() + val buildBatch = + ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) + val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) + val builtTable = try { + // Combine does not inc any reference counting + val combined = combine(keys, buildBatch) + combinedSize = + GpuColumnVector.extractColumns(combined) + .map(_.getBase.getDeviceMemorySize).sum.toInt + GpuColumnVector.from(combined) + } finally { + keys.close() + buildBatch.close() + } + + val delta = System.nanoTime() - startTime + buildTime += delta + totalTime += delta + buildDataSize += combinedSize + val context = TaskContext.get() + context.addTaskCompletionListener[Unit](_ => builtTable.close()) + + doJoin(builtTable, streamIter, boundCondition, + numOutputRows, joinOutputRows, numOutputBatches, + joinTime, filterTime, totalTime) + } + } + } +} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuSortMergeJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuSortMergeJoinExec.scala new file mode 100644 index 00000000000..8eaa1e6a222 --- /dev/null +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuSortMergeJoinExec.scala @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark300db + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} +import org.apache.spark.sql.execution.SortExec +import org.apache.spark.sql.execution.joins.SortMergeJoinExec + +/** + * HashJoin changed in Spark 3.1 requiring Shim + */ +class GpuSortMergeJoinMeta( + join: SortMergeJoinExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: ConfKeysAndIncompat) + extends SparkPlanMeta[SortMergeJoinExec](join, conf, parent, rule) { + + val leftKeys: Seq[BaseExprMeta[_]] = + join.leftKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val rightKeys: Seq[BaseExprMeta[_]] = + join.rightKeys.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val condition: Option[BaseExprMeta[_]] = join.condition.map( + GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ condition + + override def tagPlanForGpu(): Unit = { + // Use conditions from Hash Join + GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition) + + if (!conf.enableReplaceSortMergeJoin) { + willNotWorkOnGpu(s"Not replacing sort merge join with hash join, " + + s"see ${RapidsConf.ENABLE_REPLACE_SORTMERGEJOIN.key}") + } + + // make sure this is last check - if this is SortMergeJoin, the children can be Sorts and we + // want to validate they can run on GPU and remove them before replacing this with a + // ShuffleHashJoin + if (canThisBeReplaced) { + childPlans.foreach { plan => + if (plan.wrapped.isInstanceOf[SortExec]) { + if (!plan.canThisBeReplaced) { + willNotWorkOnGpu(s"can't replace sortMergeJoin because one of the SortExec's before " + + s"can't be replaced.") + } else { + plan.shouldBeRemoved("removing SortExec as part replacing sortMergeJoin with " + + s"shuffleHashJoin") + } + } + } + } + } + + override def convertToGpu(): GpuExec = { + val buildSide = if (canBuildRight(join.joinType)) { + BuildRight + } else if (canBuildLeft(join.joinType)) { + BuildLeft + } else { + throw new IllegalStateException(s"Cannot build either side for ${join.joinType} join") + } + GpuShuffledHashJoinExec( + leftKeys.map(_.convertToGpu()), + rightKeys.map(_.convertToGpu()), + join.joinType, + buildSide, + condition.map(_.convertToGpu()), + childPlans(0).convertIfNeeded(), + childPlans(1).convertIfNeeded()) + } + + /** + * Determine if this type of join supports using the right side of the join as the build side. + * + * These rules match those in Spark's ShuffleHashJoinExec. + */ + private def canBuildRight(joinType: JoinType): Boolean = joinType match { + case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true + case _ => false + } + + /** + * Determine if this type of join supports using the left side of the join as the build side. + * + * These rules match those in Spark's ShuffleHashJoinExec, with the addition of support for + * full outer joins. + */ + private def canBuildLeft(joinType: JoinType): Boolean = joinType match { + case _: InnerLike | RightOuter | FullOuter => true + case _ => false + } +} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala new file mode 100644 index 00000000000..fcf42aff1de --- /dev/null +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark300db + +import java.time.ZoneId + +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.shims.spark300.Spark300Shims + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.rapids.GpuTimeSub +import org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinExecBase +import org.apache.spark.sql.rapids.shims.spark300db._ +import org.apache.spark.sql.types._ +import org.apache.spark.storage.{BlockId, BlockManagerId} + +class Spark300dbShims extends Spark300Shims { + + override def getGpuBroadcastNestedLoopJoinShim( + left: SparkPlan, + right: SparkPlan, + join: BroadcastNestedLoopJoinExec, + joinType: JoinType, + condition: Option[Expression], + targetSizeBytes: Long): GpuBroadcastNestedLoopJoinExecBase = { + GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) + } + + override def isGpuHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuHashJoin => true + case p => false + } + } + + override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuBroadcastHashJoinExec => true + case p => false + } + } + + override def isGpuShuffledHashJoin(plan: SparkPlan): Boolean = { + plan match { + case _: GpuShuffledHashJoinExec => true + case p => false + } + } + + override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { + Seq( + GpuOverrides.exec[FileSourceScanExec]( + "Reading data from files, often from Hive tables", + (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { + // partition filters and data filters are not run on the GPU + override val childExprs: Seq[ExprMeta[_]] = Seq.empty + + override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) + + override def convertToGpu(): GpuExec = { + val newRelation = HadoopFsRelation( + wrapped.relation.location, + wrapped.relation.partitionSchema, + wrapped.relation.dataSchema, + wrapped.relation.bucketSpec, + GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), + wrapped.relation.options)(wrapped.relation.sparkSession) + GpuFileSourceScanExec( + newRelation, + wrapped.output, + wrapped.requiredSchema, + wrapped.partitionFilters, + wrapped.optionalBucketSet, + wrapped.dataFilters, + wrapped.tableIdentifier) + } + }), + GpuOverrides.exec[SortMergeJoinExec]( + "Sort merge join, replacing with shuffled hash join", + (join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)), + GpuOverrides.exec[BroadcastHashJoinExec]( + "Implementation of join using broadcast data", + (join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)), + GpuOverrides.exec[ShuffledHashJoinExec]( + "Implementation of join using hashed shuffled data", + (join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)), + ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap + } + + override def getBuildSide(join: HashJoin): GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } + + override def getBuildSide(join: BroadcastNestedLoopJoinExec): GpuBuildSide = { + GpuJoinUtils.getGpuBuildSide(join.buildSide) + } +} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/SparkShimServiceProvider.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/SparkShimServiceProvider.scala new file mode 100644 index 00000000000..644d30d368d --- /dev/null +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/SparkShimServiceProvider.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark300db + +import com.nvidia.spark.rapids.SparkShims + +class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { + + val VERSIONNAMES = Seq("3.0.0-databricks") + + def matchesVersion(version: String): Boolean = { + VERSIONNAMES.contains(version) + } + + def buildShim: SparkShims = { + new Spark300dbShims() + } +} diff --git a/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileSourceScanExec.scala b/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileSourceScanExec.scala new file mode 100644 index 00000000000..9c63a7e826f --- /dev/null +++ b/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileSourceScanExec.scala @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.shims.spark300db + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import com.nvidia.spark.rapids.{GpuExec, GpuReadCSVFileFormat, GpuReadOrcFileFormat, GpuReadParquetFileFormat, SparkPlanMeta} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{DataSourceScanExec, ExplainUtils, FileSourceScanExec} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.rapids.GpuFileSourceScanExecBase +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.collection.BitSet + +case class GpuFileSourceScanExec( + @transient relation: HadoopFsRelation, + output: Seq[Attribute], + requiredSchema: StructType, + partitionFilters: Seq[Expression], + optionalBucketSet: Option[BitSet], + dataFilters: Seq[Expression], + override val tableIdentifier: Option[TableIdentifier]) + extends DataSourceScanExec with GpuFileSourceScanExecBase with GpuExec { + + override val nodeName: String = { + s"GpuScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" + } + + private[this] val wrapped: FileSourceScanExec = { + val tclass = classOf[org.apache.spark.sql.execution.FileSourceScanExec] + val constructors = tclass.getConstructors() + if (constructors.size > 1) { + throw new IllegalStateException(s"Only expected 1 constructor for FileSourceScanExec") + } + val constructor = constructors(0) + val instance = if (constructor.getParameterCount() == 8) { + // Some distributions of Spark modified FileSourceScanExec to take an additional parameter + // that is the logicalRelation. We don't know what its used for exactly but haven't + // run into any issues in testing using the one we create here. + @transient val logicalRelation = LogicalRelation(relation) + try { + constructor.newInstance(relation, output, requiredSchema, partitionFilters, + optionalBucketSet, dataFilters, tableIdentifier, + logicalRelation).asInstanceOf[FileSourceScanExec] + } catch { + case il: IllegalArgumentException => + // TODO - workaround until https://github.com/NVIDIA/spark-rapids/issues/354 + constructor.newInstance(relation, output, requiredSchema, partitionFilters, + optionalBucketSet, None, dataFilters, tableIdentifier).asInstanceOf[FileSourceScanExec] + } + } else { + constructor.newInstance(relation, output, requiredSchema, partitionFilters, + optionalBucketSet, dataFilters, tableIdentifier).asInstanceOf[FileSourceScanExec] + } + instance + } + + override lazy val outputPartitioning: Partitioning = wrapped.outputPartitioning + + override lazy val outputOrdering: Seq[SortOrder] = wrapped.outputOrdering + + override lazy val metadata: Map[String, String] = wrapped.metadata + + override lazy val metrics: Map[String, SQLMetric] = wrapped.metrics + + override def verboseStringWithOperatorId(): String = { + val metadataStr = metadata.toSeq.sorted.filterNot { + case (_, value) if (value.isEmpty || value.equals("[]")) => true + case (key, _) if (key.equals("DataFilters") || key.equals("Format")) => true + case (_, _) => false + }.map { + case (key, _) if (key.equals("Location")) => + val location = wrapped.relation.location + val numPaths = location.rootPaths.length + val abbreviatedLoaction = if (numPaths <= 1) { + location.rootPaths.mkString("[", ", ", "]") + } else { + "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]" + } + s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLoaction)}" + case (key, value) => s"$key: ${redact(value)}" + } + + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Output", output)} + |${metadataStr.mkString("\n")} + |""".stripMargin + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + wrapped.inputRDD :: Nil + } + + override protected def doExecute(): RDD[InternalRow] = + throw new IllegalStateException(s"Row-based execution should not occur for $this") + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputRows = longMetric("numOutputRows") + val scanTime = longMetric("scanTime") + wrapped.inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => + new Iterator[ColumnarBatch] { + + override def hasNext: Boolean = { + // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. + val startNs = System.nanoTime() + val res = batches.hasNext + scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs) + res + } + + override def next(): ColumnarBatch = { + val batch = batches.next() + numOutputRows += batch.numRows() + batch + } + } + } + } + + override val nodeNamePrefix: String = "Gpu" + wrapped.nodeNamePrefix + + override def doCanonicalize(): GpuFileSourceScanExec = { + val canonical = wrapped.doCanonicalize() + GpuFileSourceScanExec( + canonical.relation, + canonical.output, + canonical.requiredSchema, + canonical.partitionFilters, + canonical.optionalBucketSet, + canonical.dataFilters, + canonical.tableIdentifier) + } +} + +object GpuFileSourceScanExec { + def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = { + meta.wrapped.relation.fileFormat match { + case _: CSVFileFormat => GpuReadCSVFileFormat.tagSupport(meta) + case _: OrcFileFormat => GpuReadOrcFileFormat.tagSupport(meta) + case _: ParquetFileFormat => GpuReadParquetFileFormat.tagSupport(meta) + case f => + meta.willNotWorkOnGpu(s"unsupported file format: ${f.getClass.getCanonicalName}") + } + } + + def convertFileFormat(format: FileFormat): FileFormat = { + format match { + case _: CSVFileFormat => new GpuReadCSVFileFormat + case _: OrcFileFormat => new GpuReadOrcFileFormat + case _: ParquetFileFormat => new GpuReadParquetFileFormat + case f => + throw new IllegalArgumentException(s"${f.getClass.getCanonicalName} is not supported") + } + } +} diff --git a/sql-plugin/pom.xml b/sql-plugin/pom.xml index d80721f110a..f450d5483e6 100644 --- a/sql-plugin/pom.xml +++ b/sql-plugin/pom.xml @@ -30,18 +30,6 @@ The RAPIDS SQL plugin for Apache Spark 0.2.0-SNAPSHOT - - - databricks - - - org.apache.spark - spark-annotation_${scala.binary.version} - - - - - ai.rapids diff --git a/tests/pom.xml b/tests/pom.xml index a6e262e7a4e..2037e760991 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -34,6 +34,12 @@ 3.0.0 + + spark300dbtests + + 3.0.0-databricks + + spark301tests