diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala new file mode 100644 index 00000000000..9eefdf636e7 --- /dev/null +++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.{GpuBatchScanExecMetrics, ScanWithMetrics} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.execution.datasources.v2._ + +case class GpuBatchScanExec( + output: Seq[AttributeReference], + @transient scan: Scan) extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics { + @transient lazy val batch: Batch = scan.toBatch + + scan match { + case s: ScanWithMetrics => s.metrics = allMetrics ++ additionalMetrics + case _ => + } + + override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() + + override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() + + override lazy val inputRDD: RDD[InternalRow] = { + new GpuDataSourceRDD(sparkContext, partitions, readerFactory) + } + + override def doCanonicalize(): GpuBatchScanExec = { + this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output))) + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataSourceRDD.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala similarity index 55% rename from sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataSourceRDD.scala rename to sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala index d04d469cbb8..25ffd0295db 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataSourceRDD.scala +++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala @@ -14,15 +14,14 @@ * limitations under the License. */ -package com.nvidia.spark.rapids +package com.nvidia.spark.rapids.shims -import com.nvidia.spark.rapids.shims.ShimDataSourceRDD +import com.nvidia.spark.rapids.{MetricsBatchIterator, PartitionIterator} import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} -import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition -import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition} import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -36,7 +35,7 @@ class GpuDataSourceRDD( sc: SparkContext, @transient private val inputPartitions: Seq[InputPartition], partitionReaderFactory: PartitionReaderFactory) - extends ShimDataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads = true) { + extends DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads = true) { private def castPartition(split: Partition): DataSourceRDDPartition = split match { case p: DataSourceRDDPartition => p @@ -53,50 +52,11 @@ class GpuDataSourceRDD( } } -private class PartitionIterator[T](reader: PartitionReader[T]) extends Iterator[T] { - private[this] var valuePrepared = false - - override def hasNext: Boolean = { - if (!valuePrepared) { - valuePrepared = reader.next() - } - valuePrepared - } - - override def next(): T = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - valuePrepared = false - reader.get() - } -} - -private class MetricsBatchIterator(iter: Iterator[ColumnarBatch]) extends Iterator[ColumnarBatch] { - private[this] val inputMetrics = TaskContext.get().taskMetrics().inputMetrics - - override def hasNext: Boolean = iter.hasNext - - override def next(): ColumnarBatch = { - val batch = iter.next() - TrampolineUtil.incInputRecordsRows(inputMetrics, batch.numRows()) - batch +object GpuDataSourceRDD { + def apply( + sc: SparkContext, + inputPartitions: Seq[InputPartition], + partitionReaderFactory: PartitionReaderFactory): GpuDataSourceRDD = { + new GpuDataSourceRDD(sc, inputPartitions, partitionReaderFactory) } } - -/** Wraps a columnar PartitionReader to update bytes read metric based on filesystem statistics. */ -class PartitionReaderWithBytesRead(reader: PartitionReader[ColumnarBatch]) - extends PartitionReader[ColumnarBatch] { - private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics - private[this] val getBytesRead = TrampolineUtil.getFSBytesReadOnThreadCallback() - - override def next(): Boolean = { - val result = reader.next() - TrampolineUtil.incBytesRead(inputMetrics, getBytesRead()) - result - } - - override def get(): ColumnarBatch = reader.get() - - override def close(): Unit = reader.close() -} diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/ShimDataSourceRDD.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/ShimDataSourceRDD.scala deleted file mode 100644 index 5108aaa65ac..00000000000 --- a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/ShimDataSourceRDD.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims - -import org.apache.spark.SparkContext -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} -import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD -import org.apache.spark.sql.execution.metric.SQLMetric - -class ShimDataSourceRDD( - sc: SparkContext, - @transient private val inputPartitions: Seq[InputPartition], - partitionReaderFactory: PartitionReaderFactory, - columnarReads: Boolean -) extends DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads, - Map.empty[String, SQLMetric]) diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala index 10eb99692e6..30b80d8d10e 100644 --- a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala +++ b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala @@ -417,12 +417,12 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { override def tagPlanForGpu(): Unit = { if (!p.runtimeFilters.isEmpty) { - willNotWorkOnGpu("Runtime filtering (DPP) on datasource V2 is not supported") + willNotWorkOnGpu("runtime filtering (DPP) on datasource V2 is not supported") } } override def convertToGpu(): GpuExec = - GpuBatchScanExec(p.output, childScans.head.convertToGpu()) + GpuBatchScanExec(p.output, childScans.head.convertToGpu(), p.runtimeFilters) }) ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap } diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/ShimDataSourceRDD.scala b/sql-plugin/src/main/320+/scala/org/apache/spark/sql/execution/datasources/rapids/DataSourceStrategyUtils.scala similarity index 50% rename from sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/ShimDataSourceRDD.scala rename to sql-plugin/src/main/320+/scala/org/apache/spark/sql/execution/datasources/rapids/DataSourceStrategyUtils.scala index b2abae3693a..7713d863409 100644 --- a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/ShimDataSourceRDD.scala +++ b/sql-plugin/src/main/320+/scala/org/apache/spark/sql/execution/datasources/rapids/DataSourceStrategyUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,15 +14,14 @@ * limitations under the License. */ -package com.nvidia.spark.rapids.shims +package org.apache.spark.sql.execution.datasources.rapids -import org.apache.spark.SparkContext -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} -import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter -class ShimDataSourceRDD( - sc: SparkContext, - @transient private val inputPartitions: Seq[InputPartition], - partitionReaderFactory: PartitionReaderFactory, - columnarReads: Boolean -) extends DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads) +object DataSourceStrategyUtils { + // Trampoline utility to access protected translateRuntimeFilter + def translateRuntimeFilter(expr: Expression): Option[Filter] = + DataSourceStrategy.translateRuntimeFilter(expr) +} diff --git a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala new file mode 100644 index 00000000000..5049c3b090b --- /dev/null +++ b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.google.common.base.Objects +import com.nvidia.spark.rapids.{GpuBatchScanExecMetrics, ScanWithMetrics} + +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.execution.datasources.rapids.DataSourceStrategyUtils +import org.apache.spark.sql.execution.datasources.v2._ + +case class GpuBatchScanExec( + output: Seq[AttributeReference], + @transient scan: Scan, + runtimeFilters: Seq[Expression] = Seq.empty) + extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics { + @transient lazy val batch: Batch = scan.toBatch + + scan match { + case s: ScanWithMetrics => s.metrics = allMetrics ++ additionalMetrics + case _ => + } + + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. + override def equals(other: Any): Boolean = other match { + case other: GpuBatchScanExec => + this.batch == other.batch && this.runtimeFilters == other.runtimeFilters + case _ => + false + } + + override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters) + + @transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() + + @transient private lazy val filteredPartitions: Seq[InputPartition] = { + val dataSourceFilters = runtimeFilters.flatMap { + case DynamicPruningExpression(e) => DataSourceStrategyUtils.translateRuntimeFilter(e) + case _ => None + } + + if (dataSourceFilters.nonEmpty && scan.isInstanceOf[SupportsRuntimeFiltering]) { + val originalPartitioning = outputPartitioning + + // the cast is safe as runtime filters are only assigned if the scan can be filtered + val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering] + filterableScan.filter(dataSourceFilters.toArray) + + // call toBatch again to get filtered partitions + val newPartitions = scan.toBatch.planInputPartitions() + + originalPartitioning match { + case p: DataSourcePartitioning if p.numPartitions != newPartitions.size => + throw new SparkException( + "Data source must have preserved the original partitioning during runtime filtering; " + + s"reported num partitions: ${p.numPartitions}, " + + s"num partitions after runtime filtering: ${newPartitions.size}") + case _ => + // no validation is needed as the data source did not report any specific partitioning + } + + newPartitions + } else { + partitions + } + } + + override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() + + override lazy val inputRDD: RDD[InternalRow] = { + if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) { + // return an empty RDD with 1 partition if dynamic filtering removed the only split + sparkContext.parallelize(Array.empty[InternalRow], 1) + } else { + new GpuDataSourceRDD(sparkContext, partitions, readerFactory) + } + } + + override def doCanonicalize(): GpuBatchScanExec = { + this.copy( + output = output.map(QueryPlan.normalizeExpressions(_, output)), + runtimeFilters = QueryPlan.normalizePredicates( + runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)), + output)) + } + + override def simpleString(maxFields: Int): String = { + val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields) + val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}" + val result = s"$nodeName$truncatedOutputString ${scan.description()} $runtimeFiltersString" + redact(result) + } +} diff --git a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala new file mode 100644 index 00000000000..1dfb97ae7df --- /dev/null +++ b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.{MetricsBatchIterator, PartitionIterator} + +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A replacement for DataSourceRDD that does NOT compute the bytes read input metric. + * DataSourceRDD assumes all reads occur on the task thread, and some GPU input sources + * use multithreaded readers that cannot generate proper metrics with DataSourceRDD. + * @note It is the responsibility of users of this RDD to generate the bytes read input + * metric explicitly! + */ +class GpuDataSourceRDD( + sc: SparkContext, + @transient private val inputPartitions: Seq[InputPartition], + partitionReaderFactory: PartitionReaderFactory +) extends DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads = true, + Map.empty[String, SQLMetric]) { + + private def castPartition(split: Partition): DataSourceRDDPartition = split match { + case p: DataSourceRDDPartition => p + case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split") + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val inputPartition = castPartition(split).inputPartition + val batchReader = partitionReaderFactory.createColumnarReader(inputPartition) + val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader)) + context.addTaskCompletionListener[Unit](_ => batchReader.close()) + // TODO: SPARK-25083 remove the type erasure hack in data source scan + new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) + } +} + +object GpuDataSourceRDD { + def apply( + sc: SparkContext, + inputPartitions: Seq[InputPartition], + partitionReaderFactory: PartitionReaderFactory): GpuDataSourceRDD = { + new GpuDataSourceRDD(sc, inputPartitions, partitionReaderFactory) + } +} diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala new file mode 100644 index 00000000000..9711bd2cbf1 --- /dev/null +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.google.common.base.Objects +import com.nvidia.spark.rapids.{GpuBatchScanExecMetrics, ScanWithMetrics} + +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, SinglePartition} +import org.apache.spark.sql.catalyst.util.InternalRowSet +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.execution.datasources.rapids.DataSourceStrategyUtils +import org.apache.spark.sql.execution.datasources.v2._ + +case class GpuBatchScanExec( + output: Seq[AttributeReference], + @transient scan: Scan, + runtimeFilters: Seq[Expression] = Seq.empty, + keyGroupedPartitioning: Option[Seq[Expression]] = None) + extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics { + @transient lazy val batch: Batch = scan.toBatch + + scan match { + case s: ScanWithMetrics => s.metrics = allMetrics ++ additionalMetrics + case _ => + } + + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. + override def equals(other: Any): Boolean = other match { + case other: BatchScanExec => + this.batch == other.batch && this.runtimeFilters == other.runtimeFilters + case _ => + false + } + + override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters) + + @transient override lazy val inputPartitions: Seq[InputPartition] = batch.planInputPartitions() + + @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = { + val dataSourceFilters = runtimeFilters.flatMap { + case DynamicPruningExpression(e) => DataSourceStrategyUtils.translateRuntimeFilter(e) + case _ => None + } + + if (dataSourceFilters.nonEmpty && scan.isInstanceOf[SupportsRuntimeFiltering]) { + val originalPartitioning = outputPartitioning + + // the cast is safe as runtime filters are only assigned if the scan can be filtered + val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering] + filterableScan.filter(dataSourceFilters.toArray) + + // call toBatch again to get filtered partitions + val newPartitions = scan.toBatch.planInputPartitions() + + originalPartitioning match { + case p: KeyGroupedPartitioning => + if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) { + throw new SparkException("Data source must have preserved the original partitioning " + + "during runtime filtering: not all partitions implement HasPartitionKey after " + + "filtering") + } + + val newRows = new InternalRowSet(p.expressions.map(_.dataType)) + newRows ++= newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey()) + val oldRows = p.partitionValuesOpt.get + + if (oldRows.size != newRows.size) { + throw new SparkException("Data source must have preserved the original partitioning " + + "during runtime filtering: the number of unique partition values obtained " + + s"through HasPartitionKey changed: before ${oldRows.size}, after ${newRows.size}") + } + + if (!oldRows.forall(newRows.contains)) { + throw new SparkException("Data source must have preserved the original partitioning " + + "during runtime filtering: the number of unique partition values obtained " + + s"through HasPartitionKey remain the same but do not exactly match") + } + + groupPartitions(newPartitions).get.map(_._2) + + case _ => + // no validation is needed as the data source did not report any specific partitioning + newPartitions.map(Seq(_)) + } + + } else { + partitions + } + } + + override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() + + override lazy val inputRDD: RDD[InternalRow] = { + if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) { + // return an empty RDD with 1 partition if dynamic filtering removed the only split + sparkContext.parallelize(Array.empty[InternalRow], 1) + } else { + new GpuDataSourceRDD(sparkContext, filteredPartitions, readerFactory) + } + } + + override def doCanonicalize(): GpuBatchScanExec = { + this.copy( + output = output.map(QueryPlan.normalizeExpressions(_, output)), + runtimeFilters = QueryPlan.normalizePredicates( + runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)), + output)) + } + + override def simpleString(maxFields: Int): String = { + val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields) + val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}" + val result = s"$nodeName$truncatedOutputString ${scan.description()} $runtimeFiltersString" + redact(result) + } +} diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala new file mode 100644 index 00000000000..35e43bbb598 --- /dev/null +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuDataSourceRDD.scala @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.{MetricsBatchIterator, PartitionIterator} + +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A replacement for DataSourceRDD that does NOT compute the bytes read input metric. + * DataSourceRDD assumes all reads occur on the task thread, and some GPU input sources + * use multithreaded readers that cannot generate proper metrics with DataSourceRDD. + * @note It is the responsibility of users of this RDD to generate the bytes read input + * metric explicitly! + */ +class GpuDataSourceRDD( + sc: SparkContext, + @transient private val inputPartitions: Seq[Seq[InputPartition]], + partitionReaderFactory: PartitionReaderFactory +) extends DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads = true, + Map.empty[String, SQLMetric]) { + private def castPartition(split: Partition): DataSourceRDDPartition = split match { + case p: DataSourceRDDPartition => p + case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split") + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + + val iterator = new Iterator[Object] { + private val inputPartitions = castPartition(split).inputPartitions + private var currentIter: Option[Iterator[Object]] = None + private var currentIndex: Int = 0 + + override def hasNext: Boolean = currentIter.exists(_.hasNext) || advanceToNextIter() + + override def next(): Object = { + if (!hasNext) throw new NoSuchElementException("No more elements") + currentIter.get.next() + } + + private def advanceToNextIter(): Boolean = { + if (currentIndex >= inputPartitions.length) { + false + } else { + val inputPartition = inputPartitions(currentIndex) + currentIndex += 1 + + // TODO: SPARK-25083 remove the type erasure hack in data source scan + val (iter, reader) = { + val batchReader = partitionReaderFactory.createColumnarReader(inputPartition) + val iter = new MetricsBatchIterator( + new PartitionIterator[ColumnarBatch](batchReader)) + (iter, batchReader) + } + context.addTaskCompletionListener[Unit] { _ => + reader.close() + } + currentIter = Some(iter) + hasNext + } + } + } + + new InterruptibleIterator(context, iterator).asInstanceOf[Iterator[InternalRow]] + } +} + +object GpuDataSourceRDD { + def apply( + sc: SparkContext, + inputPartitions: Seq[InputPartition], + partitionReaderFactory: PartitionReaderFactory): GpuDataSourceRDD = { + new GpuDataSourceRDD(sc, inputPartitions.map(Seq(_)), partitionReaderFactory) + } +} diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala index 2aa561889db..2fc85d58035 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala @@ -343,8 +343,17 @@ trait Spark33XShims extends Spark321PlusShims with Spark320PlusNonDBShims { override val childScans: scala.Seq[ScanMeta[_]] = Seq(GpuOverrides.wrapScan(p.scan, conf, Some(this))) - override def convertToGpu(): GpuExec = - GpuBatchScanExec(p.output, childScans.head.convertToGpu()) + override def tagPlanForGpu(): Unit = { + if (!p.runtimeFilters.isEmpty) { + willNotWorkOnGpu("runtime filtering (DPP) on datasource V2 is not supported") + } + if (!p.keyGroupedPartitioning.isEmpty) { + willNotWorkOnGpu("key grouped partitioning is not supported") + } + } + + override def convertToGpu(): GpuExec = GpuBatchScanExec(p.output, + childScans.head.convertToGpu(), p.runtimeFilters, p.keyGroupedPartitioning) }), GpuOverrides.exec[CoalesceExec]( "The backend for the dataframe coalesce method", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala new file mode 100644 index 00000000000..376a76135c9 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExecMetrics.scala @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +trait GpuBatchScanExecMetrics extends GpuExec { + import GpuMetric._ + + override def supportsColumnar = true + + override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL + override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL + override lazy val additionalMetrics: Map[String, GpuMetric] = Map( + GPU_DECODE_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_GPU_DECODE_TIME), + BUFFER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_BUFFER_TIME), + PEAK_DEVICE_MEMORY -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY)) ++ + semaphoreMetrics +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCSVScan.scala similarity index 90% rename from sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCSVScan.scala index f41397ec6c3..e34f2b898e0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCSVScan.scala @@ -26,12 +26,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.csv.{CSVOptions, GpuCsvUtils} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.PermissiveMode import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.execution.datasources.{PartitionedFile, PartitioningAwareFileIndex} @@ -45,40 +43,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration -case class GpuBatchScanExec( - output: Seq[AttributeReference], - @transient scan: Scan) extends DataSourceV2ScanExecBase with GpuExec { - import GpuMetric._ - @transient lazy val batch: Batch = scan.toBatch - - override def supportsColumnar = true - - override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL - override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL - override lazy val additionalMetrics: Map[String, GpuMetric] = Map( - GPU_DECODE_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_GPU_DECODE_TIME), - BUFFER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_BUFFER_TIME), - PEAK_DEVICE_MEMORY -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY)) ++ - semaphoreMetrics - - scan match { - case s: ScanWithMetrics => s.metrics = allMetrics ++ additionalMetrics - case _ => - } - - override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() - - override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() - - override lazy val inputRDD: RDD[InternalRow] = { - new GpuDataSourceRDD(sparkContext, partitions, readerFactory) - } - - override def doCanonicalize(): GpuBatchScanExec = { - this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output))) - } -} - trait ScanWithMetrics { //this is initialized by the exec post creation var metrics : Map[String, GpuMetric] = Map.empty diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 6f61de21f2c..daf8d56f33d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -24,7 +24,7 @@ import scala.util.control.NonFatal import ai.rapids.cudf.DType import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF} -import com.nvidia.spark.rapids.shims.{AQEUtils, GpuHashPartitioning, GpuRangePartitioning, GpuSpecifiedWindowFrameMeta, GpuWindowExpressionMeta, OffsetWindowFunctionMeta, SparkShimImpl} +import com.nvidia.spark.rapids.shims.{AQEUtils, GpuBatchScanExec, GpuHashPartitioning, GpuRangePartitioning, GpuSpecifiedWindowFrameMeta, GpuWindowExpressionMeta, OffsetWindowFunctionMeta, SparkShimImpl} import org.apache.spark.internal.Logging import org.apache.spark.rapids.shims.GpuShuffleExchangeExec diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 06cb6b5b706..8f0c80c7f4e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import scala.annotation.tailrec -import com.nvidia.spark.rapids.shims.SparkShimImpl +import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl} import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, SortOrder} import org.apache.spark.sql.catalyst.rules.Rule diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/dataSourceUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/dataSourceUtil.scala new file mode 100644 index 00000000000..aee11b3c84d --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/dataSourceUtil.scala @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.TaskContext +import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.vectorized.ColumnarBatch + +class PartitionIterator[T](reader: PartitionReader[T]) extends Iterator[T] { + private[this] var valuePrepared = false + + override def hasNext: Boolean = { + if (!valuePrepared) { + valuePrepared = reader.next() + } + valuePrepared + } + + override def next(): T = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + valuePrepared = false + reader.get() + } +} + +class MetricsBatchIterator(iter: Iterator[ColumnarBatch]) extends Iterator[ColumnarBatch] { + private[this] val inputMetrics = TaskContext.get().taskMetrics().inputMetrics + + override def hasNext: Boolean = iter.hasNext + + override def next(): ColumnarBatch = { + val batch = iter.next() + TrampolineUtil.incInputRecordsRows(inputMetrics, batch.numRows()) + batch + } +} + +/** Wraps a columnar PartitionReader to update bytes read metric based on filesystem statistics. */ +class PartitionReaderWithBytesRead(reader: PartitionReader[ColumnarBatch]) + extends PartitionReader[ColumnarBatch] { + private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics + private[this] val getBytesRead = TrampolineUtil.getFSBytesReadOnThreadCallback() + + override def next(): Boolean = { + val result = reader.next() + TrampolineUtil.incBytesRead(inputMetrics, getBytesRead()) + result + } + + override def get(): ColumnarBatch = reader.get() + + override def close(): Unit = reader.close() +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index 6b9e4e1389f..fa696018dfb 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -20,8 +20,8 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import scala.collection.mutable.HashMap -import com.nvidia.spark.rapids.{GpuDataSourceRDD, GpuExec, GpuMetric, GpuOrcMultiFilePartitionReaderFactory, GpuParquetMultiFilePartitionReaderFactory, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, RapidsConf, SparkPlanMeta} -import com.nvidia.spark.rapids.shims.SparkShimImpl +import com.nvidia.spark.rapids.{GpuExec, GpuMetric, GpuOrcMultiFilePartitionReaderFactory, GpuParquetMultiFilePartitionReaderFactory, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, RapidsConf, SparkPlanMeta} +import com.nvidia.spark.rapids.shims.{GpuDataSourceRDD, SparkShimImpl} import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD @@ -578,7 +578,7 @@ case class GpuFileSourceScanExec( } // note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code - new GpuDataSourceRDD(relation.sparkSession.sparkContext, partitions, factory) + GpuDataSourceRDD(relation.sparkSession.sparkContext, partitions, factory) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala index 79ded960a0a..5b2a8047e0f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala @@ -21,7 +21,8 @@ import java.math.RoundingMode import scala.util.Random import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} -import com.nvidia.spark.rapids.{GpuAlias, GpuBatchScanExec, GpuColumnVector, GpuIsNotNull, GpuIsNull, GpuLiteral, GpuOverrides, GpuScalar, GpuUnitTests, HostColumnarToGpu, RapidsConf} +import com.nvidia.spark.rapids.{GpuAlias, GpuColumnVector, GpuIsNotNull, GpuIsNull, GpuLiteral, GpuOverrides, GpuScalar, GpuUnitTests, HostColumnarToGpu, RapidsConf} +import com.nvidia.spark.rapids.shims.GpuBatchScanExec import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession