From 38148e020ae65fbf9687b69efeaeec932fb75d0e Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 22 Oct 2020 15:53:42 -0500 Subject: [PATCH 1/2] Use BasicColumnarWriteTaskStats instead of BasicWriteTaskStats Signed-off-by: Jason Lowe --- .../BasicColumnarWriteStatsTracker.scala | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/BasicColumnarWriteStatsTracker.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/BasicColumnarWriteStatsTracker.scala index edce4705e92..57b7c08a0da 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/BasicColumnarWriteStatsTracker.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/BasicColumnarWriteStatsTracker.scala @@ -24,11 +24,23 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, WriteTaskStats} +import org.apache.spark.sql.execution.datasources.WriteTaskStats import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration +/** + * Simple metrics collected during an instance of [[GpuFileFormatDataWriter]]. + * These were first introduced in https://github.com/apache/spark/pull/18159 (SPARK-20703). + */ +case class BasicColumnarWriteTaskStats( + numPartitions: Int, + numFiles: Int, + numBytes: Long, + numRows: Long) + extends WriteTaskStats + + /** * Simple metrics collected during an instance of [[GpuFileFormatDataWriter]]. * This is the columnar version of @@ -104,7 +116,7 @@ class BasicColumnarWriteTaskStatsTracker(hadoopConf: Configuration) "This could be due to the output format not writing empty files, " + "or files being not immediately visible in the filesystem.") } - BasicWriteTaskStats(numPartitions, numFiles, numBytes, numRows) + BasicColumnarWriteTaskStats(numPartitions, numFiles, numBytes, numRows) } } @@ -112,7 +124,7 @@ class BasicColumnarWriteTaskStatsTracker(hadoopConf: Configuration) /** * Simple [[ColumnarWriteJobStatsTracker]] implementation that's serializable, * capable ofinstantiating [[BasicColumnarWriteTaskStatsTracker]] on executors and processing the - * `BasicWriteTaskStats` they produce by aggregating the metrics and posting them + * `BasicColumnarWriteTaskStats` they produce by aggregating the metrics and posting them * as DriverMetricUpdates. */ class BasicColumnarWriteJobStatsTracker( @@ -131,7 +143,7 @@ class BasicColumnarWriteJobStatsTracker( var totalNumBytes: Long = 0L var totalNumOutput: Long = 0L - val basicStats = stats.map(_.asInstanceOf[BasicWriteTaskStats]) + val basicStats = stats.map(_.asInstanceOf[BasicColumnarWriteTaskStats]) basicStats.foreach { summary => numPartitions += summary.numPartitions From 028c0a06274354e2b68ae308769db9cebb6962c6 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 23 Oct 2020 09:24:07 -0500 Subject: [PATCH 2/2] Update for new position parameter in Spark 3.1.0 RegExpReplace Signed-off-by: Jason Lowe --- .../rapids/shims/spark300/Spark300Shims.scala | 20 +++++++-- .../rapids/shims/spark310/Spark310Shims.scala | 43 +++++++++++++++---- .../nvidia/spark/rapids/GpuOverrides.scala | 16 ------- 3 files changed, 51 insertions(+), 28 deletions(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index c9eb9e3a4f3..eb49a82d769 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -18,8 +18,6 @@ package com.nvidia.spark.rapids.shims.spark300 import java.time.ZoneId -import scala.collection.JavaConverters._ - import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.spark300.RapidsShuffleManager @@ -44,7 +42,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.execution.python.WindowInPandasExec -import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuTimeSub, ShuffleManagerShimBase} +import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub, ShuffleManagerShimBase} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase} import org.apache.spark.sql.rapids.shims.spark300._ import org.apache.spark.sql.types._ @@ -231,6 +229,22 @@ class Spark300Shims extends SparkShims { override def convertToGpu(): GpuExpression = GpuLast(child.convertToGpu(), ignoreNulls.convertToGpu()) + }), + GpuOverrides.expr[RegExpReplace]( + "RegExpReplace support for string literal input patterns", + (a, conf, p, r) => new TernaryExprMeta[RegExpReplace](a, conf, p, r) { + override def tagExprForGpu(): Unit = { + if (!GpuOverrides.isLit(a.rep)) { + willNotWorkOnGpu("Only literal values are supported for replacement string") + } + if (GpuOverrides.isNullOrEmptyOrRegex(a.regexp)) { + willNotWorkOnGpu( + "Only non-null, non-empty String literals that are not regex patterns " + + "are supported by RegExpReplace on the GPU") + } + } + override def convertToGpu(lhs: Expression, regexp: Expression, + rep: Expression): GpuExpression = GpuStringReplace(lhs, regexp, rep) }) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap } diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala index 0958a9275b9..15f9b8fced8 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala @@ -16,10 +16,6 @@ package com.nvidia.spark.rapids.shims.spark310 -import java.time.ZoneId - -import scala.collection.JavaConverters._ - import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.spark301.Spark301Shims import com.nvidia.spark.rapids.spark310.RapidsShuffleManager @@ -28,7 +24,6 @@ import org.apache.spark.SparkEnv import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec @@ -36,11 +31,10 @@ import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} -import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.python.WindowInPandasExec import org.apache.spark.sql.internal.StaticSQLConf -import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, ShuffleManagerShimBase} +import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuStringReplace, ShuffleManagerShimBase} import org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinExecBase import org.apache.spark.sql.rapids.shims.spark310._ import org.apache.spark.sql.types._ @@ -104,8 +98,39 @@ class Spark310Shims extends Spark301Shims { } } + def exprs310: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( + GpuOverrides.expr[RegExpReplace]( + "RegExpReplace support for string literal input patterns", + (a, conf, p, r) => new ExprMeta[RegExpReplace](a, conf, p, r) { + override def tagExprForGpu(): Unit = { + if (!GpuOverrides.isLit(a.rep)) { + willNotWorkOnGpu("Only literal values are supported for replacement string") + } + if (GpuOverrides.isNullOrEmptyOrRegex(a.regexp)) { + willNotWorkOnGpu( + "Only non-null, non-empty String literals that are not regex patterns " + + "are supported by RegExpReplace on the GPU") + } + if (!a.pos.foldable) { + willNotWorkOnGpu("Only foldable expressions are supported for the " + + "starting search position") + } + val posEval = a.pos.eval() + if (posEval.asInstanceOf[Int] != 1) { + willNotWorkOnGpu("Only a search starting position of 1 is supported") + } + } + override def convertToGpu(): GpuExpression = { + GpuStringReplace( + childExprs(0).convertToGpu(), + childExprs(1).convertToGpu(), + childExprs(2).convertToGpu()) + } + }) + ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap + override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { - super.exprs301 + super.exprs301 ++ exprs310 } override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 2db0c556f28..dfb049b2c2c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1594,22 +1594,6 @@ object GpuOverrides { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuLike(lhs, rhs, a.escapeChar) }), - expr[RegExpReplace]( - "RegExpReplace support for string literal input patterns", - (a, conf, p, r) => new TernaryExprMeta[RegExpReplace](a, conf, p, r) { - override def tagExprForGpu(): Unit = { - if (!isLit(a.rep)) { - willNotWorkOnGpu("Only literal values are supported for replacement string") - } - if (isNullOrEmptyOrRegex(a.regexp)) { - willNotWorkOnGpu( - "Only non-null, non-empty String literals that are not regex patterns " + - "are supported by RegExpReplace on the GPU") - } - } - override def convertToGpu(lhs: Expression, regexp: Expression, - rep: Expression): GpuExpression = GpuStringReplace(lhs, regexp, rep) - }), expr[Length]( "String character length", (a, conf, p, r) => new UnaryExprMeta[Length](a, conf, p, r) {