From 69d566bb0f1749e75b3167d811da88c50fb76779 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 14 Aug 2020 15:04:17 -0500 Subject: [PATCH 1/8] Make GpuFileSourceScanExec work with Databricks Signed-off-by: Thomas Graves --- .../rapids/shims/spark300/Spark300Shims.scala | 50 ++++++++++++++++- shims/spark300db/pom.xml | 13 +++++ .../shims/spark300db/Spark300dbShims.scala | 53 ++++++++++++++++++- .../com/nvidia/spark/rapids/SparkShims.scala | 15 ++++++ .../sql/rapids/GpuFileSourceScanExec.scala | 42 +++++---------- 5 files changed, 141 insertions(+), 32 deletions(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index effe6ff3c38..230bad51b1b 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -20,9 +20,12 @@ import java.time.ZoneId import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.spark300.RapidsShuffleManager +import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} @@ -31,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec -import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, FileScanRDD, HadoopFsRelation, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec @@ -236,4 +239,49 @@ class Spark300Shims extends SparkShims { override def getShuffleManagerShims(): ShuffleManagerShimBase = { new ShuffleManagerShim } + + override def getFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { + partitions.map(_.files.map(_.getLen).sum).sum + } + + override def getFilesGroupedToBuckets( + partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { + partitions.flatMap { p => + p.files.map { f => + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + } + }.groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } + } + + override def getSplitFiles( + partitions: Array[PartitionDirectory], + maxSplitBytes: Long, + relation: HadoopFsRelation): Array[PartitionedFile] = { + partitions.flatMap { partition => + partition.files.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + } + } + } + override def getFileScanRDD( + sparkSession: SparkSession, + readFunction: (PartitionedFile) => Iterator[InternalRow], + filePartitions: Seq[FilePartition]): RDD[InternalRow] = { + new FileScanRDD(sparkSession, readFunction, filePartitions) + } } diff --git a/shims/spark300db/pom.xml b/shims/spark300db/pom.xml index c3f1e919a0f..bc205d58998 100644 --- a/shims/spark300db/pom.xml +++ b/shims/spark300db/pom.xml @@ -32,6 +32,7 @@ 0.2.0-SNAPSHOT + 1.10.1 3.0.0-databricks @@ -59,6 +60,18 @@ ${spark30db.version} provided + + org.apache.spark + spark-annotation_${scala.binary.version} + ${spark30db.version} + provided + + + org.apache.parquet + parquet-column + ${parquet.version} + provided + diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index 97e4ad83a1d..c1fae18cf8f 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -20,13 +20,18 @@ import java.time.ZoneId import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.spark300.Spark300Shims +import org.apache.spark.sql.rapids.shims.spark300db._ +import org.apache.hadoop.fs.Path +import org.apache.spark.rdd.RDD import org.apache.spark.SparkEnv +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuTimeSub} @@ -118,4 +123,50 @@ class Spark300dbShims extends Spark300Shims { override def getBuildSide(join: BroadcastNestedLoopJoinExec): GpuBuildSide = { GpuJoinUtils.getGpuBuildSide(join.buildSide) } + + override def getFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { + partitions.map(_.files.map(_.getLen).sum).sum + } + + override def getFilesGroupedToBuckets( + partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { + partitions.flatMap { p => + p.files.map { f => + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + } + }.groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } + } + + override def getSplitFiles( + partitions: Array[PartitionDirectory], + maxSplitBytes: Long, + relation: HadoopFsRelation): Array[PartitionedFile] = { + partitions.flatMap { partition => + partition.files.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + } + } + } + + override def getFileScanRDD( + sparkSession: SparkSession, + readFunction: (PartitionedFile) => Iterator[InternalRow], + filePartitions: Seq[FilePartition]): RDD[InternalRow] = { + new GpuFileScanRDD(sparkSession, readFunction, filePartitions) + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 7678dad4dca..d3dcfd7c8e7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -16,7 +16,9 @@ package com.nvidia.spark.rapids +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.JoinType @@ -24,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec +import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.rapids.ShuffleManagerShimBase import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase} @@ -105,4 +108,16 @@ trait SparkShims { rule: SparkSession => Rule[SparkPlan]) def getShuffleManagerShims(): ShuffleManagerShimBase + def getFileStatusSize(partitions: Seq[PartitionDirectory]): Long + def getFilesGroupedToBuckets( + partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] + def getSplitFiles( + partitions: Array[PartitionDirectory], + maxSplitBytes: Long, + relation: HadoopFsRelation): Array[PartitionedFile] + def getFileScanRDD( + sparkSession: SparkSession, + readFunction: (PartitionedFile) => Iterator[InternalRow], + filePartitions: Seq[FilePartition]): RDD[InternalRow] } + diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index a74c5021075..5aac1c443b8 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import scala.collection.mutable.HashMap -import com.nvidia.spark.rapids.{GpuExec, GpuMetricNames, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, SparkPlanMeta} +import com.nvidia.spark.rapids.{GpuExec, GpuMetricNames, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, ShimLoader, SparkPlanMeta} import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Attribute, Att import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.{ExecSubqueryExpression, ExplainUtils, FileSourceScanExec, PartitionedFileUtil, SQLExecution} -import org.apache.spark.sql.execution.datasources.{BucketingUtils, DataSourceStrategy, DataSourceUtils, FileFormat, FilePartition, FileScanRDD, HadoopFsRelation, PartitionDirectory, PartitionedFile} +import org.apache.spark.sql.execution.datasources.{BucketingUtils, DataSourceStrategy, DataSourceUtils, FileFormat, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -311,7 +311,7 @@ case class GpuFileSourceScanExec( partitions: Seq[PartitionDirectory], static: Boolean): Unit = { val filesNum = partitions.map(_.files.size.toLong).sum - val filesSize = partitions.map(_.files.map(_.getLen).sum).sum + val filesSize = ShimLoader.getSparkShims.getFileStatusSize(partitions) if (!static || partitionFilters.filter(isDynamicPruningFilter).isEmpty) { driverMetrics("numFiles") = filesNum driverMetrics("filesSize") = filesSize @@ -390,16 +390,9 @@ case class GpuFileSourceScanExec( selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") + val filesGroupedToBuckets = - selectedPartitions.flatMap { p => - p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) - } - }.groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) - } + ShimLoader.getSparkShims.getFilesGroupedToBuckets(selectedPartitions) val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { val bucketSet = optionalBucketSet.get @@ -425,7 +418,8 @@ case class GpuFileSourceScanExec( } } - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + // new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile, filePartitions) } /** @@ -446,27 +440,15 @@ case class GpuFileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") - val splitFiles = selectedPartitions.flatMap { partition => - partition.files.flatMap { file => - // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( - sparkSession = relation.sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values - ) - } - }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + val splitFiles = ShimLoader.getSparkShims + .getSplitFiles(selectedPartitions, maxSplitBytes, relation) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - new FileScanRDD(fsRelation.sparkSession, readFile, partitions) + // new FileScanRDD(fsRelation.sparkSession, readFile, partitions) + ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile, partitions) } // Filters unused DynamicPruningExpression expressions - one which has been replaced From 41ff6c1718ab84c13b5a7277ae9168e4c28bd013 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 14 Aug 2020 16:10:01 -0500 Subject: [PATCH 2/8] Add in GpuFileScanRDD Signed-off-by: Thomas Graves --- .../shims/spark300db/GpuFileScanRDD.scala | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileScanRDD.scala diff --git a/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileScanRDD.scala b/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileScanRDD.scala new file mode 100644 index 00000000000..c63d79f8e31 --- /dev/null +++ b/shims/spark300db/src/main/scala/org/apache/spark/sql/rapids/shims/spark300db/GpuFileScanRDD.scala @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.rapids.shims.spark300db + +import java.io.{FileNotFoundException, IOException} + +import org.apache.parquet.io.ParquetDecodingException + +import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.{InputFileBlockHolder, RDD} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator + +/** + * An RDD that scans a list of file partitions. + * Databricks has different versions of FileScanRDD so we copy the + * Apache Spark version. + */ +class GpuFileScanRDD( + @transient private val sparkSession: SparkSession, + readFunction: (PartitionedFile) => Iterator[InternalRow], + @transient val filePartitions: Seq[FilePartition]) + extends RDD[InternalRow](sparkSession.sparkContext, Nil) { + + private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + + override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { + val iterator = new Iterator[Object] with AutoCloseable { + private val inputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // apply readFunction, because it might read some bytes. + private val getBytesReadCallback = + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + + // We get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). + private def incTaskInputMetricsBytesRead(): Unit = { + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) + } + + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + private[this] var currentFile: PartitionedFile = null + private[this] var currentIterator: Iterator[Object] = null + + def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + def next(): Object = { + val nextElement = currentIterator.next() + // TODO: we should have a better separation of row based and batch based scan, so that we + // don't need to run this `if` for every record. + val preNumRecordsRead = inputMetrics.recordsRead + if (nextElement.isInstanceOf[ColumnarBatch]) { + incTaskInputMetricsBytesRead() + inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) + } else { + // too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } + inputMetrics.incRecordsRead(1) + } + nextElement + } + + private def readCurrentFile(): Iterator[InternalRow] = { + try { + readFunction(currentFile) + } catch { + case e: FileNotFoundException => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + } + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + private def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + logInfo(s"Reading File $currentFile") + // Sets InputFileBlockHolder for the file block's information + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + if (ignoreMissingFiles || ignoreCorruptFiles) { + currentIterator = new NextIterator[Object] { + // The readFunction may read some bytes before consuming the iterator, e.g., + // vectorized Parquet reader. Here we use lazy val to delay the creation of + // iterator so that we will throw exception in `getNext`. + private lazy val internalIter = readCurrentFile() + + override def getNext(): AnyRef = { + try { + if (internalIter.hasNext) { + internalIter.next() + } else { + finished = true + null + } + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentFile", e) + finished = true + null + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentFile", e) + finished = true + null + } + } + + override def close(): Unit = {} + } + } else { + currentIterator = readCurrentFile() + } + + try { + hasNext + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + case e: ParquetDecodingException => + if (e.getCause.isInstanceOf[SparkUpgradeException]) { + throw e.getCause + } else if (e.getMessage.contains("Can not read value at")) { + val message = "Encounter error while reading parquet files. " + + "One possible cause: Parquet column cannot be converted in the " + + "corresponding files. Details: " + throw new QueryExecutionException(message, e) + } + throw e + } + } else { + currentFile = null + InputFileBlockHolder.unset() + false + } + } + + override def close(): Unit = { + incTaskInputMetricsBytesRead() + InputFileBlockHolder.unset() + } + } + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener[Unit](_ => iterator.close()) + + iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. + } + + override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray + + override protected def getPreferredLocations(split: RDDPartition): Seq[String] = { + split.asInstanceOf[FilePartition].preferredLocations() + } +} + From d4c4db4e74158e5db1a3963ad58cf210f2630d44 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 14 Aug 2020 16:12:01 -0500 Subject: [PATCH 3/8] cleanup Signed-off-by: Thomas Graves --- .../rapids/shims/spark300/Spark300Shims.scala | 31 ++++++++++++------- .../shims/spark300db/Spark300dbShims.scala | 27 ++++++++++------ .../com/nvidia/spark/rapids/SparkShims.scala | 8 +++-- .../sql/rapids/GpuFileSourceScanExec.scala | 11 +++---- 4 files changed, 46 insertions(+), 31 deletions(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 230bad51b1b..0055cf8b96a 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -240,24 +240,30 @@ class Spark300Shims extends SparkShims { new ShuffleManagerShim } - override def getFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { + override def getPartitionFileNames( + partitions: Seq[PartitionDirectory]): Seq[String] = { + val files = partitions.flatMap(partition => partition.files) + files.map(_.getPath.getName) + } + + override def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { partitions.map(_.files.map(_.getLen).sum).sum } - override def getFilesGroupedToBuckets( - partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { - partitions.flatMap { p => - p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) - } - }.groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + override def getPartitionFilesGroupedToBuckets( + partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { + partitions.flatMap { p => + p.files.map { f => + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) } + }.groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } } - override def getSplitFiles( + override def getPartitionSplitFiles( partitions: Array[PartitionDirectory], maxSplitBytes: Long, relation: HadoopFsRelation): Array[PartitionedFile] = { @@ -278,6 +284,7 @@ class Spark300Shims extends SparkShims { } } } + override def getFileScanRDD( sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index c1fae18cf8f..6a22ce7c857 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -124,22 +124,29 @@ class Spark300dbShims extends Spark300Shims { GpuJoinUtils.getGpuBuildSide(join.buildSide) } + // Databricks has a different version of FileStatus + override def getPartitionFileNames( + partitions: Seq[PartitionDirectory]): Seq[String] = { + val files = partitions.flatMap(partition => partition.files) + files.map(_.getPath.getName) + } + override def getFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { partitions.map(_.files.map(_.getLen).sum).sum } override def getFilesGroupedToBuckets( - partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { - partitions.flatMap { p => - p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) - } - }.groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { + partitions.flatMap { p => + p.files.map { f => + PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) } - } + }.groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } + } override def getSplitFiles( partitions: Array[PartitionDirectory], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index d3dcfd7c8e7..821a2c7d4b5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -108,10 +108,12 @@ trait SparkShims { rule: SparkSession => Rule[SparkPlan]) def getShuffleManagerShims(): ShuffleManagerShimBase - def getFileStatusSize(partitions: Seq[PartitionDirectory]): Long - def getFilesGroupedToBuckets( + + def getPartitionFileNames(partitions: Seq[PartitionDirectory]): Seq[String] + def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long + def getPartitionFilesGroupedToBuckets( partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] - def getSplitFiles( + def getPartitionSplitFiles( partitions: Array[PartitionDirectory], maxSplitBytes: Long, relation: HadoopFsRelation): Array[PartitionedFile] diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index 5aac1c443b8..dba5d986583 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import scala.collection.mutable.HashMap import com.nvidia.spark.rapids.{GpuExec, GpuMetricNames, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, ShimLoader, SparkPlanMeta} -import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} @@ -185,9 +184,9 @@ case class GpuFileSourceScanExec( // the RDD partition will not be sorted even if the relation has sort columns set // Current solution is to check if all the buckets have a single file in it - val files = selectedPartitions.flatMap(partition => partition.files) + val filesPartNames = ShimLoader.getSparkShims.getPartitionFileNames(selectedPartitions) val bucketToFilesGrouping = - files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) + filesPartNames.groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) // TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced. @@ -311,7 +310,7 @@ case class GpuFileSourceScanExec( partitions: Seq[PartitionDirectory], static: Boolean): Unit = { val filesNum = partitions.map(_.files.size.toLong).sum - val filesSize = ShimLoader.getSparkShims.getFileStatusSize(partitions) + val filesSize = ShimLoader.getSparkShims.getPartitionFileStatusSize(partitions) if (!static || partitionFilters.filter(isDynamicPruningFilter).isEmpty) { driverMetrics("numFiles") = filesNum driverMetrics("filesSize") = filesSize @@ -392,7 +391,7 @@ case class GpuFileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = - ShimLoader.getSparkShims.getFilesGroupedToBuckets(selectedPartitions) + ShimLoader.getSparkShims.getPartitionFilesGroupedToBuckets(selectedPartitions) val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { val bucketSet = optionalBucketSet.get @@ -441,7 +440,7 @@ case class GpuFileSourceScanExec( s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = ShimLoader.getSparkShims - .getSplitFiles(selectedPartitions, maxSplitBytes, relation) + .getPartitionSplitFiles(selectedPartitions, maxSplitBytes, relation) .sortBy(_.length)(implicitly[Ordering[Long]].reverse) val partitions = From 41d2679fa133c9c96e4109ff04e72a4088e1a07d Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 14 Aug 2020 16:21:44 -0500 Subject: [PATCH 4/8] Rework to get PartitionedFiles only Signed-off-by: Thomas Graves --- .../spark/rapids/shims/spark300/Spark300Shims.scala | 8 ++------ .../rapids/shims/spark300db/Spark300dbShims.scala | 8 ++------ .../scala/com/nvidia/spark/rapids/SparkShims.scala | 3 +-- .../spark/sql/rapids/GpuFileSourceScanExec.scala | 11 +++++++++-- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 0055cf8b96a..dbf665fd693 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -250,16 +250,12 @@ class Spark300Shims extends SparkShims { partitions.map(_.files.map(_.getLen).sum).sum } - override def getPartitionFilesGroupedToBuckets( - partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { + override def getPartitionedFiles( + partitions: Array[PartitionDirectory]): Array[PartitionedFile] = { partitions.flatMap { p => p.files.map { f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) } - }.groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) } } diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index 6a22ce7c857..da8be91fa5e 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -135,16 +135,12 @@ class Spark300dbShims extends Spark300Shims { partitions.map(_.files.map(_.getLen).sum).sum } - override def getFilesGroupedToBuckets( - partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { + override def getPartitionedFiles( + partitions: Array[PartitionDirectory]): Array[PartitionedFile] = { partitions.flatMap { p => p.files.map { f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) } - }.groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 821a2c7d4b5..fe347eccc65 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -111,8 +111,7 @@ trait SparkShims { def getPartitionFileNames(partitions: Seq[PartitionDirectory]): Seq[String] def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long - def getPartitionFilesGroupedToBuckets( - partitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] + def getPartitionedFiles(partitions: Array[PartitionDirectory]): Array[PartitionedFile] def getPartitionSplitFiles( partitions: Array[PartitionDirectory], maxSplitBytes: Long, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index dba5d986583..d7fadc1bb83 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import scala.collection.mutable.HashMap import com.nvidia.spark.rapids.{GpuExec, GpuMetricNames, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, ShimLoader, SparkPlanMeta} +import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} @@ -390,8 +391,14 @@ case class GpuFileSourceScanExec( fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") - val filesGroupedToBuckets = - ShimLoader.getSparkShims.getPartitionFilesGroupedToBuckets(selectedPartitions) + val partitionedFiles = + ShimLoader.getSparkShims.getPartitionedFiles(selectedPartitions) + + val filesGroupedToBuckets = partitionedFiles.groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { val bucketSet = optionalBucketSet.get From 2e6b3fad74ab6770364f9a3a7ec5b98e88e11686 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 14 Aug 2020 16:24:23 -0500 Subject: [PATCH 5/8] remove commented out code Signed-off-by: Thomas Graves --- .../org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index d7fadc1bb83..ceb9acf039f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -423,8 +423,6 @@ case class GpuFileSourceScanExec( FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } } - - // new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile, filePartitions) } @@ -453,7 +451,6 @@ case class GpuFileSourceScanExec( val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - // new FileScanRDD(fsRelation.sparkSession, readFile, partitions) ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile, partitions) } From 5972f6b414b9819da3ca085461fe06853f6ff1dd Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 17 Aug 2020 09:44:28 -0500 Subject: [PATCH 6/8] Fix spacing in pom Signed-off-by: Thomas Graves --- shims/spark300db/pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/shims/spark300db/pom.xml b/shims/spark300db/pom.xml index bc205d58998..a19a7bf0240 100644 --- a/shims/spark300db/pom.xml +++ b/shims/spark300db/pom.xml @@ -32,7 +32,7 @@ 0.2.0-SNAPSHOT - 1.10.1 + 1.10.1 3.0.0-databricks @@ -63,8 +63,8 @@ org.apache.spark spark-annotation_${scala.binary.version} - ${spark30db.version} - provided + ${spark30db.version} + provided org.apache.parquet From b01a4bc6ff4945753cc617060b1baceee344ee86 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 17 Aug 2020 14:41:08 -0500 Subject: [PATCH 7/8] Add gpu broadcast get function and fix names Signed-off-by: Thomas Graves --- .../shims/spark300/GpuBroadcastExchangeExec.scala | 2 +- .../rapids/shims/spark300/Spark300Shims.scala | 2 +- .../rapids/shims/spark300db/Spark300dbShims.scala | 14 +++++++++++--- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastExchangeExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastExchangeExec.scala index ced3e238c31..ee96db12672 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastExchangeExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastExchangeExec.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids.shims.spark300 import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuBroadcastExchangeExecBase} +import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase case class GpuBroadcastExchangeExec( mode: BroadcastMode, diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index dbf665fd693..7d126781393 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuTimeSub, ShuffleManagerShimBase} -import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase} +import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastMeta, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, GpuShuffleMeta} import org.apache.spark.sql.rapids.shims.spark300._ import org.apache.spark.sql.types._ import org.apache.spark.storage.{BlockId, BlockManagerId} diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index da8be91fa5e..8b171fbb93b 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -30,12 +30,14 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuTimeSub} -import org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinExecBase +import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastMeta, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, GpuShuffleMeta} import org.apache.spark.sql.types._ import org.apache.spark.storage.{BlockId, BlockManagerId} @@ -53,6 +55,12 @@ class Spark300dbShims extends Spark300Shims { GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) } + override def getGpuBroadcastExchangeExec( + mode: BroadcastMode, + child: SparkPlan): GpuBroadcastExchangeExecBase = { + GpuBroadcastExchangeExec(mode, child) + } + override def isGpuHashJoin(plan: SparkPlan): Boolean = { plan match { case _: GpuHashJoin => true @@ -131,7 +139,7 @@ class Spark300dbShims extends Spark300Shims { files.map(_.getPath.getName) } - override def getFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { + override def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long = { partitions.map(_.files.map(_.getLen).sum).sum } @@ -144,7 +152,7 @@ class Spark300dbShims extends Spark300Shims { } } - override def getSplitFiles( + override def getPartitionSplitFiles( partitions: Array[PartitionDirectory], maxSplitBytes: Long, relation: HadoopFsRelation): Array[PartitionedFile] = { From 6a91ffb44edc994ca90e08969ce320594b2aa4c1 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 17 Aug 2020 15:15:29 -0500 Subject: [PATCH 8/8] remove unused imports Signed-off-by: Thomas Graves --- .../com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala | 1 - .../org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 7d126781393..cb11b3652b9 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -20,7 +20,6 @@ import java.time.ZoneId import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.spark300.RapidsShuffleManager -import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv import org.apache.spark.rdd.RDD diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index ceb9acf039f..69c818aa3a3 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, Literal, PlanExpression, Predicate, SortOrder} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} -import org.apache.spark.sql.execution.{ExecSubqueryExpression, ExplainUtils, FileSourceScanExec, PartitionedFileUtil, SQLExecution} +import org.apache.spark.sql.execution.{ExecSubqueryExpression, ExplainUtils, FileSourceScanExec, SQLExecution} import org.apache.spark.sql.execution.datasources.{BucketingUtils, DataSourceStrategy, DataSourceUtils, FileFormat, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat