diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/FileSinkDescShim.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/FileSinkDescShim.scala index 98301d8e9ef..1e9abaf9d30 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/FileSinkDescShim.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/FileSinkDescShim.scala @@ -36,6 +36,7 @@ {"spark": "333"} {"spark": "340"} {"spark": "341"} +{"spark": "341db"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.hive.rapids.shims diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/HiveInspectorsShim.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/HiveInspectorsShim.scala index a098b1d64c3..87c588cc858 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/HiveInspectorsShim.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/HiveInspectorsShim.scala @@ -36,6 +36,7 @@ {"spark": "333"} {"spark": "340"} {"spark": "341"} +{"spark": "341db"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.hive.rapids.shims diff --git a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/v1FallbackWriters.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/v1FallbackWriters.scala index fc5cafb80d5..9b458287583 100644 --- a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/v1FallbackWriters.scala +++ b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/v1FallbackWriters.scala @@ -32,6 +32,7 @@ {"spark": "333"} {"spark": "340"} {"spark": "341"} +{"spark": "341db"} {"spark": "350"} spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala index a209c9ac689..25aed8adba8 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala @@ -22,47 +22,14 @@ package org.apache.spark.rapids.shims import com.nvidia.spark.rapids.GpuPartitioning -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} -import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrigin} -import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBaseWithMetrics, ShuffledBatchRDD} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.ShuffleOrigin case class GpuShuffleExchangeExec( gpuOutputPartitioning: GpuPartitioning, child: SparkPlan, shuffleOrigin: ShuffleOrigin)( cpuOutputPartitioning: Partitioning) - extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child) - with ShuffleExchangeLike { - - override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil - - override val outputPartitioning: Partitioning = cpuOutputPartitioning - - override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions - - override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions - - override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[_] = { - new ShuffledBatchRDD(shuffleDependencyColumnar, metrics ++ readMetrics, partitionSpecs) - } - - // DB SPECIFIC - throw if called since we don't know how its used - override def withNewOutputPartitioning(outputPartitioning: Partitioning) = { - throw new UnsupportedOperationException - } - - override def runtimeStatistics: Statistics = { - // note that Spark will only use the sizeInBytes statistic but making the rowCount - // available here means that we can more easily reference it in GpuOverrides when - // planning future query stages when AQE is on - Statistics( - sizeInBytes = metrics("dataSize").value, - rowCount = Some(metrics("numOutputRows").value) - ) - } - - override def shuffleId: Int = shuffleDependencyColumnar.shuffleId -} + extends GpuDatabricksShuffleExchangeExecBase(gpuOutputPartitioning, + child, shuffleOrigin)(cpuOutputPartitioning) diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExecBase.scala new file mode 100644 index 00000000000..498275fb396 --- /dev/null +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExecBase.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "321db"} +{"spark": "330db"} +{"spark": "332db"} +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.rapids.shims + +import com.nvidia.spark.rapids.GpuPartitioning + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} +import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrigin} +import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBaseWithMetrics, ShuffledBatchRDD} + +abstract class GpuDatabricksShuffleExchangeExecBase( + gpuOutputPartitioning: GpuPartitioning, + child: SparkPlan, + shuffleOrigin: ShuffleOrigin)( + cpuOutputPartitioning: Partitioning) + extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child) + with ShuffleExchangeLike { + + override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil + + override val outputPartitioning: Partitioning = cpuOutputPartitioning + + override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions + + override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions + + override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[_] = { + new ShuffledBatchRDD(shuffleDependencyColumnar, metrics ++ readMetrics, partitionSpecs) + } + + // DB SPECIFIC - throw if called since we don't know how its used + override def withNewOutputPartitioning(outputPartitioning: Partitioning) = { + throw new UnsupportedOperationException + } + + override def runtimeStatistics: Statistics = { + // note that Spark will only use the sizeInBytes statistic but making the rowCount + // available here means that we can more easily reference it in GpuOverrides when + // planning future query stages when AQE is on + Statistics( + sizeInBytes = metrics("dataSize").value, + rowCount = Some(metrics("numOutputRows").value) + ) + } + + override def shuffleId: Int = shuffleDependencyColumnar.shuffleId +} diff --git a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/GpuBroadcastJoinMeta.scala b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/GpuBroadcastJoinMeta.scala index 76c4c1968f6..25b8a94ec66 100644 --- a/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/GpuBroadcastJoinMeta.scala +++ b/sql-plugin/src/main/spark330db/scala/com/nvidia/spark/rapids/shims/GpuBroadcastJoinMeta.scala @@ -25,7 +25,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec -import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuShuffleExchangeExecBase} +import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuCustomShuffleReaderExec, GpuShuffleExchangeExecBase} abstract class GpuBroadcastJoinMeta[INPUT <: SparkPlan](plan: INPUT, conf: RapidsConf, @@ -51,18 +51,21 @@ abstract class GpuBroadcastJoinMeta[INPUT <: SparkPlan](plan: INPUT, } def verifyBuildSideWasReplaced(buildSide: SparkPlan): Unit = { + def isOnGpu(sqse: ShuffleQueryStageExec): Boolean = sqse.plan match { + case _: GpuShuffleExchangeExecBase => true + case ReusedExchangeExec(_, _: GpuShuffleExchangeExecBase) => true + case _ => false + } val buildSideOnGpu = buildSide match { case bqse: BroadcastQueryStageExec => bqse.plan.isInstanceOf[GpuBroadcastExchangeExec] || bqse.plan.isInstanceOf[ReusedExchangeExec] && bqse.plan.asInstanceOf[ReusedExchangeExec] .child.isInstanceOf[GpuBroadcastExchangeExec] - case sqse: ShuffleQueryStageExec => sqse.plan.isInstanceOf[GpuShuffleExchangeExecBase] || - sqse.plan.isInstanceOf[ReusedExchangeExec] && - sqse.plan.asInstanceOf[ReusedExchangeExec] - .child.isInstanceOf[GpuShuffleExchangeExecBase] + case sqse: ShuffleQueryStageExec => isOnGpu(sqse) case reused: ReusedExchangeExec => reused.child.isInstanceOf[GpuBroadcastExchangeExec] || reused.child.isInstanceOf[GpuShuffleExchangeExecBase] case _: GpuBroadcastExchangeExec | _: GpuShuffleExchangeExecBase => true + case GpuCustomShuffleReaderExec(sqse: ShuffleQueryStageExec, _) => isOnGpu(sqse) case _ => false } if (!buildSideOnGpu) { diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/ParquetCVShims.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/ParquetCVShims.scala index b4c067a7493..8213f4f4225 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/ParquetCVShims.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/execution/datasources/parquet/rapids/shims/ParquetCVShims.scala @@ -19,10 +19,12 @@ {"spark": "332db"} {"spark": "340"} {"spark": "341"} +{"spark": "341db"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.execution.datasources.parquet import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.execution.vectorized.WritableColumnVector import org.apache.spark.sql.types.StructType @@ -38,7 +40,7 @@ object ParquetCVShims { missingColumns: java.util.Set[ParquetColumn], isTopLevel: Boolean): ParquetColumnVector = { val defaultValue = if (sparkSchema != null) { - sparkSchema.existenceDefaultValues(idx) + getExistenceDefaultValues(sparkSchema) } else null new ParquetColumnVector(column, vector, capacity, memoryMode, missingColumns, isTopLevel, defaultValue) diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala index ca297369328..ca4b0dfa31a 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala @@ -17,6 +17,7 @@ /*** spark-rapids-shim-json-lines {"spark": "330db"} {"spark": "332db"} +{"spark": "341db"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution @@ -112,16 +113,22 @@ case class GpuBroadcastHashJoinExec( executorBroadcast } - def shuffleExchange: GpuShuffleExchangeExec = buildPlan match { - case bqse: ShuffleQueryStageExec if bqse.plan.isInstanceOf[GpuShuffleExchangeExec] => - bqse.plan.asInstanceOf[GpuShuffleExchangeExec] - case bqse: ShuffleQueryStageExec if bqse.plan.isInstanceOf[ReusedExchangeExec] => - bqse.plan.asInstanceOf[ReusedExchangeExec].child.asInstanceOf[GpuShuffleExchangeExec] - case gpu: GpuShuffleExchangeExec => gpu - case reused: ReusedExchangeExec => reused.child.asInstanceOf[GpuShuffleExchangeExec] + def shuffleExchange: GpuShuffleExchangeExec = { + def from(p: ShuffleQueryStageExec): GpuShuffleExchangeExec = p.plan match { + case g: GpuShuffleExchangeExec => g + case ReusedExchangeExec(_, g: GpuShuffleExchangeExec) => g + case _ => throw new IllegalStateException(s"cannot locate GPU shuffle in $p") + } + buildPlan match { + case gpu: GpuShuffleExchangeExec => gpu + case sqse: ShuffleQueryStageExec => from(sqse) + case reused: ReusedExchangeExec => reused.child.asInstanceOf[GpuShuffleExchangeExec] + case GpuShuffleCoalesceExec(GpuCustomShuffleReaderExec(sqse: ShuffleQueryStageExec, _), _) => + from(sqse) + case GpuCustomShuffleReaderExec(sqse: ShuffleQueryStageExec, _) => from(sqse) + } } - private def getExecutorBuiltBatchAndStreamIter( buildRelation: RDD[ColumnarBatch], buildSchema: StructType, @@ -158,7 +165,7 @@ case class GpuBroadcastHashJoinExec( // Get all the broadcast data from the shuffle coalesced into a single partition val partitionSpecs = Seq(CoalescedPartitionSpec(0, shuffleExchange.numPartitions)) - val buildRelation = shuffleExchange.getShuffleRDD(partitionSpecs.toArray) + val buildRelation = ShuffleExchangeShim.getShuffleRDD(shuffleExchange, partitionSpecs) .asInstanceOf[RDD[ColumnarBatch]] val rdd = streamedPlan.executeColumnar() diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index 80e766a23cb..6c04a2aeb57 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -17,6 +17,7 @@ /*** spark-rapids-shim-json-lines {"spark": "330db"} {"spark": "332db"} +{"spark": "341db"} spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution @@ -130,7 +131,7 @@ case class GpuBroadcastNestedLoopJoinExec( if (executorBroadcast) { // Get all the broadcast data from the shuffle coalesced into a single partition val partitionSpecs = Seq(CoalescedPartitionSpec(0, shuffleExchange.numPartitions)) - shuffleExchange.getShuffleRDD(partitionSpecs.toArray).asInstanceOf[RDD[ColumnarBatch]] + ShuffleExchangeShim.getShuffleRDD(shuffleExchange, partitionSpecs) } else { broadcastExchange.executeColumnarBroadcast[Any]() } diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/ShuffleExchangeShim.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/ShuffleExchangeShim.scala new file mode 100644 index 00000000000..6b0bf4924a4 --- /dev/null +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/ShuffleExchangeShim.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330db"} +{"spark": "332db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution + +import org.apache.spark.rapids.shims.GpuShuffleExchangeExec +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.CoalescedPartitionSpec +import org.apache.spark.sql.vectorized.ColumnarBatch + +object ShuffleExchangeShim { + def getShuffleRDD( + shuffleExchange: GpuShuffleExchangeExec, + partitionSpecs: Seq[CoalescedPartitionSpec]): RDD[ColumnarBatch] = { + shuffleExchange.getShuffleRDD(partitionSpecs.toArray) + .asInstanceOf[RDD[ColumnarBatch]] + } + +} diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/GpuAggregateInPandasExecMeta.scala b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/GpuAggregateInPandasExecMeta.scala similarity index 99% rename from sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/GpuAggregateInPandasExecMeta.scala rename to sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/GpuAggregateInPandasExecMeta.scala index 1feb2f4e252..4372f3555c1 100644 --- a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/GpuAggregateInPandasExecMeta.scala +++ b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/GpuAggregateInPandasExecMeta.scala @@ -15,6 +15,7 @@ */ /*** spark-rapids-shim-json-lines +{"spark": "341db"} {"spark": "350"} spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims diff --git a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala new file mode 100644 index 00000000000..249502f1b49 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile + +object PartitionedFileUtilsShim { + // Wrapper for case class constructor so Java code can access + // the default values across Spark versions. + def newPartitionedFile( + partitionValues: InternalRow, + filePath: String, + start: Long, + length: Long): PartitionedFile = { + PartitionedFile(partitionValues, SparkPath.fromPathString(filePath), start, length) + } + + def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = { + pf.copy(locations = locations) + } +} diff --git a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/spark341db/RapidsShuffleManager.scala b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/spark341db/RapidsShuffleManager.scala index daa1c685528..a5500bd018c 100644 --- a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/spark341db/RapidsShuffleManager.scala +++ b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/spark341db/RapidsShuffleManager.scala @@ -20,7 +20,7 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.spark341db import org.apache.spark.SparkConf -import org.apache.spark.sql.shims.ProxyRapidsShuffleInternalManagerBase +import org.apache.spark.sql.rapids.ProxyRapidsShuffleInternalManagerBase /** A shuffle manager optimized for the RAPIDS Plugin for Apache Spark. */ sealed class RapidsShuffleManager( diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala new file mode 100644 index 00000000000..35d916bd5e8 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.rapids.shims + +import com.nvidia.spark.rapids.GpuPartitioning + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} +import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrigin} +import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBaseWithMetrics, ShuffledBatchRDD} + +case class GpuShuffleExchangeExec( + gpuOutputPartitioning: GpuPartitioning, + child: SparkPlan, + shuffleOrigin: ShuffleOrigin)( + cpuOutputPartitioning: Partitioning) + extends GpuShuffleExchangeExecBase(gpuOutputPartitioning, child, shuffleOrigin)( + cpuOutputPartitioning) { + + override def getShuffleRDD( + partitionSpecs: Array[ShufflePartitionSpec], + lazyFetching: Boolean): RDD[_] = { + new ShuffledBatchRDD(shuffleDependencyColumnar, metrics ++ readMetrics, partitionSpecs) + } + + // DB SPECIFIC - throw if called since we don't know how its used + override def targetOutputPartitioning: Partitioning = { + throw new UnsupportedOperationException + } +} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala new file mode 100644 index 00000000000..b52d9e4695c --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.execution.rapids.shims + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ + +object FilePartitionShims extends SplitFiles { + def getPartitions(selectedPartitions: Array[PartitionDirectory]): Array[PartitionedFile] = { + selectedPartitions.flatMap { p => + p.files.map { f => + PartitionedFileUtil.getPartitionedFile(f, p.values, Some(SparkPath.fromPath(f.getPath))) + } + } + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala new file mode 100644 index 00000000000..3753f64e10c --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +{"spark": "350"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.execution.rapids.shims + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory, PartitionedFile} + +trait SplitFiles { + def splitFiles(sparkSession: SparkSession, + hadoopConf: Configuration, + selectedPartitions: Array[PartitionDirectory], + maxSplitBytes: Long): Seq[PartitionedFile] = { + + def canBeSplit(filePath: Path, hadoopConf: Configuration): Boolean = { + // Checks if file at path `filePath` can be split. + // Uncompressed Hive Text files may be split. GZIP compressed files are not. + // Note: This method works on a Path, and cannot take a `FileStatus`. + // partition.files is an Array[FileStatus] on vanilla Apache Spark, + // but an Array[SerializableFileStatus] on Databricks. + val codec = new CompressionCodecFactory(hadoopConf).getCodec(filePath) + codec == null || codec.isInstanceOf[SplittableCompressionCodec] + } + + selectedPartitions.flatMap { partition => + partition.files.flatMap { f => + PartitionedFileUtil.splitFiles( + sparkSession, + f, + isSplitable = canBeSplit(f.getPath, hadoopConf), + maxSplitBytes, + partition.values + ) + }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + } + } + + def splitFiles( + selectedPartitions: Array[PartitionDirectory], + relation: HadoopFsRelation, + maxSplitBytes: Long): Array[PartitionedFile] = { + + selectedPartitions.flatMap { partition => + partition.files.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + } + }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + } +} diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/ShuffleExchangeShim.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/ShuffleExchangeShim.scala new file mode 100644 index 00000000000..5c6b5b21746 --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/ShuffleExchangeShim.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.execution + +import org.apache.spark.rapids.shims.GpuShuffleExchangeExec +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.CoalescedPartitionSpec +import org.apache.spark.sql.vectorized.ColumnarBatch + +object ShuffleExchangeShim { + def getShuffleRDD( + shuffleExchange: GpuShuffleExchangeExec, + partitionSpecs: Seq[CoalescedPartitionSpec]): RDD[ColumnarBatch] = { + shuffleExchange.getShuffleRDD(partitionSpecs.toArray, lazyFetching = true) + .asInstanceOf[RDD[ColumnarBatch]] + } + +} diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala index 2319d6e7d73..fdbdc30afb8 100644 --- a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala +++ b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/rapids/shims/FilePartitionShims.scala @@ -20,15 +20,10 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.execution.rapids.shims -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} - -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ -object FilePartitionShims { +object FilePartitionShims extends SplitFiles { def getPartitions(selectedPartitions: Array[PartitionDirectory]): Array[PartitionedFile] = { selectedPartitions.flatMap { p => p.files.map { f => @@ -36,53 +31,4 @@ object FilePartitionShims { } } } - - def splitFiles(sparkSession: SparkSession, - hadoopConf: Configuration, - selectedPartitions: Array[PartitionDirectory], - maxSplitBytes: Long): Seq[PartitionedFile] = { - - def canBeSplit(filePath: Path, hadoopConf: Configuration): Boolean = { - // Checks if file at path `filePath` can be split. - // Uncompressed Hive Text files may be split. GZIP compressed files are not. - // Note: This method works on a Path, and cannot take a `FileStatus`. - // partition.files is an Array[FileStatus] on vanilla Apache Spark, - // but an Array[SerializableFileStatus] on Databricks. - val codec = new CompressionCodecFactory(hadoopConf).getCodec(filePath) - codec == null || codec.isInstanceOf[SplittableCompressionCodec] - } - - selectedPartitions.flatMap { partition => - partition.files.flatMap { f => - PartitionedFileUtil.splitFiles( - sparkSession, - f, - isSplitable = canBeSplit(f.getPath, hadoopConf), - maxSplitBytes, - partition.values - ) - }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - } - } - - def splitFiles(selectedPartitions: Array[PartitionDirectory], - relation: HadoopFsRelation, - maxSplitBytes: Long): Array[PartitionedFile] = { - - selectedPartitions.flatMap { partition => - partition.files.flatMap { file => - // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( - sparkSession = relation.sparkSession, - file = file, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values - ) - } - }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - } -} \ No newline at end of file +}