From daff33ca3f3ad2bc855819e73c29cbf05c1bed2a Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 2 Sep 2020 10:16:02 -0500 Subject: [PATCH] Fix issues with cannonicalization (#623) Signed-off-by: Robert (Bobby) Evans --- .../rapids/tests/tpcds/TpcdsLikeSpark.scala | 34 +++++- .../nvidia/spark/rapids/GpuCanonicalize.scala | 112 ++++++++++++++++++ .../com/nvidia/spark/rapids/GpuExec.scala | 32 +++++ .../nvidia/spark/rapids/GpuExpressions.scala | 5 + .../com/nvidia/spark/rapids/aggregate.scala | 47 +++++++- .../spark/sql/rapids/AggregateFunctions.scala | 22 ++-- .../sql/rapids/execution/TrampolineUtil.scala | 2 + 7 files changed, 233 insertions(+), 21 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCanonicalize.scala diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala index 77ab54a5842..2af7e983263 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala @@ -26,8 +26,14 @@ case class Table( partitionColumns: Seq[String], schema: StructType) { - private[this] def path(basePath: String) = - basePath + "/" + name + ".dat" + private[this] def path(basePath: String, appendDat: Boolean = true) = { + val rest = if (appendDat) { + ".dat" + } else { + "" + } + basePath + "/" + name + rest + } def readCSV(spark: SparkSession, basePath: String): DataFrame = spark.read.option("delimiter", "|") @@ -37,12 +43,20 @@ case class Table( def setupCSV(spark: SparkSession, basePath: String): Unit = readCSV(spark, basePath).createOrReplaceTempView(name) - def setupParquet(spark: SparkSession, basePath: String): Unit = - spark.read.parquet(path(basePath)).createOrReplaceTempView(name) + def setupParquet(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = + spark.read.parquet(path(basePath, appendDat)).createOrReplaceTempView(name) def setupOrc(spark: SparkSession, basePath: String): Unit = spark.read.orc(path(basePath)).createOrReplaceTempView(name) + def setup( + spark: SparkSession, + basePath: String, + format: String, + appendDat: Boolean = true): Unit = { + spark.read.format(format).load(path(basePath, appendDat)).createOrReplaceTempView(name) + } + private def setupWrite( spark: SparkSession, inputBase: String, @@ -127,14 +141,22 @@ object TpcdsLikeSpark { tables.foreach(_.setupCSV(spark, basePath)) } - def setupAllParquet(spark: SparkSession, basePath: String): Unit = { - tables.foreach(_.setupParquet(spark, basePath)) + def setupAllParquet(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = { + tables.foreach(_.setupParquet(spark, basePath, appendDat)) } def setupAllOrc(spark: SparkSession, basePath: String): Unit = { tables.foreach(_.setupOrc(spark, basePath)) } + def setupAll( + spark: SparkSession, + basePath: String, + format: String, + appendDat: Boolean = true): Unit = { + tables.foreach(_.setup(spark, basePath, format, appendDat)) + } + private val tables = Array( Table( "catalog_sales", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCanonicalize.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCanonicalize.scala new file mode 100644 index 00000000000..9c1c6e1ffd4 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCanonicalize.scala @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.rapids._ +import org.apache.spark.sql.rapids.execution.TrampolineUtil + +/** + * Rewrites an expression using rules that are guaranteed preserve the result while attempting + * to remove cosmetic variations. Deterministic expressions that are `equal` after canonicalization + * will always return the same answer given the same input (i.e. false positives should not be + * possible). However, it is possible that two canonical expressions that are not equal will in fact + * return the same answer given any input (i.e. false negatives are possible). + * + * The following rules are applied: + * - Names and nullability hints for [[org.apache.spark.sql.types.DataType]]s are stripped. + * - Names for [[GetStructField]] are stripped. + * - TimeZoneId for [[Cast]] and [[AnsiCast]] are stripped if `needsTimeZone` is false. + * - Commutative and associative operations ([[Add]] and [[Multiply]]) have their children ordered + * by `hashCode`. + * - [[EqualTo]] and [[EqualNullSafe]] are reordered by `hashCode`. + * - Other comparisons ([[GreaterThan]], [[LessThan]]) are reversed by `hashCode`. + * - Elements in [[In]] are reordered by `hashCode`. + * + * This is essentially a copy of the Spark `Canonicalize` class but updated for GPU operators + */ +object GpuCanonicalize { + def execute(e: Expression): Expression = { + expressionReorder(ignoreTimeZone(ignoreNamesTypes(e))) + } + + /** Remove names and nullability from types, and names from `GetStructField`. */ + def ignoreNamesTypes(e: Expression): Expression = e match { + case a: AttributeReference => + AttributeReference("none", TrampolineUtil.asNullable(a.dataType))(exprId = a.exprId) + case GetStructField(child, ordinal, Some(_)) => GetStructField(child, ordinal, None) + case _ => e + } + + /** Remove TimeZoneId for Cast if needsTimeZone return false. */ + def ignoreTimeZone(e: Expression): Expression = e match { + case c: CastBase if c.timeZoneId.nonEmpty && !c.needsTimeZone => + c.withTimeZone(null) + case c: GpuCast if c.timeZoneId.nonEmpty => + // TODO when we start to support time zones check for `&& !c.needsTimeZone` + c.withTimeZone(null) + case _ => e + } + + /** Collects adjacent commutative operations. */ + private def gatherCommutative( + e: Expression, + f: PartialFunction[Expression, Seq[Expression]]): Seq[Expression] = e match { + case c if f.isDefinedAt(c) => f(c).flatMap(gatherCommutative(_, f)) + case other => other :: Nil + } + + /** Orders a set of commutative operations by their hash code. */ + private def orderCommutative( + e: Expression, + f: PartialFunction[Expression, Seq[Expression]]): Seq[Expression] = + gatherCommutative(e, f).sortBy(_.hashCode()) + + /** Rearrange expressions that are commutative or associative. */ + private def expressionReorder(e: Expression): Expression = e match { + case a: GpuAdd => orderCommutative(a, { case GpuAdd(l, r) => Seq(l, r) }).reduce(GpuAdd) + case m: GpuMultiply => + orderCommutative(m, { case GpuMultiply(l, r) => Seq(l, r) }).reduce(GpuMultiply) + case o: GpuOr => + orderCommutative(o, { case GpuOr(l, r) if l.deterministic && r.deterministic => Seq(l, r) }) + .reduce(GpuOr) + case a: GpuAnd => + orderCommutative(a, { case GpuAnd(l, r) if l.deterministic && r.deterministic => Seq(l, r)}) + .reduce(GpuAnd) + + case GpuEqualTo(l, r) if l.hashCode() > r.hashCode() => GpuEqualTo(r, l) + case GpuEqualNullSafe(l, r) if l.hashCode() > r.hashCode() => GpuEqualNullSafe(r, l) + + case GpuGreaterThan(l, r) if l.hashCode() > r.hashCode() => GpuLessThan(r, l) + case GpuLessThan(l, r) if l.hashCode() > r.hashCode() => GpuGreaterThan(r, l) + + case GpuGreaterThanOrEqual(l, r) if l.hashCode() > r.hashCode() => GpuLessThanOrEqual(r, l) + case GpuLessThanOrEqual(l, r) if l.hashCode() > r.hashCode() => GpuGreaterThanOrEqual(r, l) + + // Note in the following `NOT` cases, `l.hashCode() <= r.hashCode()` holds. The reason is that + // canonicalization is conducted bottom-up -- see [[Expression.canonicalized]]. + case GpuNot(GpuGreaterThan(l, r)) => GpuLessThanOrEqual(l, r) + case GpuNot(GpuLessThan(l, r)) => GpuGreaterThanOrEqual(l, r) + case GpuNot(GpuGreaterThanOrEqual(l, r)) => GpuLessThan(l, r) + case GpuNot(GpuLessThanOrEqual(l, r)) => GpuGreaterThan(l, r) + + // order the list in the In operator + case GpuInSet(value, list) if list.length > 1 => GpuInSet(value, list.sortBy(_.hashCode())) + + case _ => e + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index ceee5b93ea0..5ebfd5cf627 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -19,6 +19,8 @@ package com.nvidia.spark.rapids import com.nvidia.spark.rapids.GpuMetricNames._ import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId} +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -88,4 +90,34 @@ trait GpuExec extends SparkPlan with Arm { case c: GpuExpression => c.disableCoalesceUntilInput() case _ => false } + + /** + * Defines how the canonicalization should work for the current plan. + */ + override protected def doCanonicalize(): SparkPlan = { + val canonicalizedChildren = children.map(_.canonicalized) + var id = -1 + mapExpressions { + case a: Alias => + id += 1 + // As the root of the expression, Alias will always take an arbitrary exprId, we need to + // normalize that for equality testing, by assigning expr id from 0 incrementally. The + // alias name doesn't matter and should be erased. + val normalizedChild = QueryPlan.normalizeExpressions(a.child, allAttributes) + Alias(normalizedChild, "")(ExprId(id), a.qualifier) + case a: GpuAlias => + id += 1 + // As the root of the expression, Alias will always take an arbitrary exprId, we need to + // normalize that for equality testing, by assigning expr id from 0 incrementally. The + // alias name doesn't matter and should be erased. + val normalizedChild = QueryPlan.normalizeExpressions(a.child, allAttributes) + GpuAlias(normalizedChild, "")(ExprId(id), a.qualifier) + case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 => + // Top level `AttributeReference` may also be used for output like `Alias`, we should + // normalize the exprId too. + id += 1 + ar.withExprId(ExprId(id)).canonicalized + case other => QueryPlan.normalizeExpressions(other, allAttributes) + }.withNewChildren(canonicalizedChildren) + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala index c473ceec8ab..c24f41985f3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala @@ -91,6 +91,11 @@ trait GpuExpression extends Expression with Unevaluable with Arm { * temporary value. */ def columnarEval(batch: ColumnarBatch): Any + + override lazy val canonicalized: Expression = { + val canonicalizedChildren = children.map(_.canonicalized) + GpuCanonicalize.execute(withNewChildren(canonicalizedChildren)) + } } abstract class GpuLeafExpression extends GpuExpression { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 50b61b24008..24dc395b222 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -26,11 +26,11 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, AttributeSet, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, NamedExpression} import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, HashPartitioning, Partitioning, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateExpression, GpuDeclarativeAggregate} @@ -233,6 +233,17 @@ case class GpuHashAggregateExec( resultExpressions: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with GpuExec with Arm { + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("Keys", groupingExpressions)} + |${ExplainUtils.generateFieldString("Functions", aggregateExpressions)} + |${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)} + |${ExplainUtils.generateFieldString("Results", resultExpressions)} + |""".stripMargin + } + case class BoundExpressionsModeAggregates(boundInputReferences: Seq[GpuExpression] , boundFinalProjections: Option[scala.Seq[GpuExpression]], boundResultReferences: scala.Seq[Expression] , @@ -834,6 +845,8 @@ case class GpuHashAggregateExec( "concatTime"-> SQLMetrics.createNanoTimingMetric(sparkContext, "time in batch concat") ) + protected def outputExpressions: Seq[NamedExpression] = resultExpressions + // // This section is derived (copied in most cases) from HashAggregateExec // @@ -841,7 +854,33 @@ case class GpuHashAggregateExec( aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) } - override def outputPartitioning: Partitioning = child.outputPartitioning + final override def outputPartitioning: Partitioning = { + if (hasAlias) { + child.outputPartitioning match { + case h: GpuHashPartitioning => h.copy(expressions = replaceAliases(h.expressions)) + case h: HashPartitioning => h.copy(expressions = replaceAliases(h.expressions)) + case other => other + } + } else { + child.outputPartitioning + } + } + + protected def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined + + protected def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = { + exprs.map { + case a: AttributeReference => replaceAlias(a).getOrElse(a) + case other => other + } + } + + protected def replaceAlias(attr: AttributeReference): Option[Attribute] = { + outputExpressions.collectFirst { + case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) => + a.toAttribute + } + } // Used in de-duping and optimizer rules override def producedAttributes: AttributeSet = diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 3eed37be9fe..69db9c50cc4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -99,7 +99,7 @@ case class GpuAggregateExpression(origAggregateFunction: GpuAggregateFunction, override def dataType: DataType = aggregateFunction.dataType - override def children: Seq[Expression] = aggregateFunction.children + override def children: Seq[Expression] = Seq(aggregateFunction, filter) override val initialValues: Seq[GpuExpression] = aggregateFunction.asInstanceOf[GpuDeclarativeAggregate].initialValues @@ -136,13 +136,13 @@ case class GpuAggregateExpression(origAggregateFunction: GpuAggregateFunction, normalizedAggFunc.canonicalized.asInstanceOf[GpuAggregateFunction], mode, isDistinct, - filter, + filter.map(_.canonicalized), ExprId(0)) } override def nullable: Boolean = aggregateFunction.nullable override def dataType: DataType = aggregateFunction.dataType - override def children: Seq[Expression] = aggregateFunction.children + override def children: Seq[Expression] = aggregateFunction +: filter.toSeq @transient override lazy val references: AttributeSet = { @@ -285,7 +285,7 @@ abstract class GpuDeclarativeAggregate extends GpuAggregateFunction with GpuUnev } case class GpuMin(child: Expression) extends GpuDeclarativeAggregate { - private lazy val cudfMin = AttributeReference("cudf_min", child.dataType)() + private lazy val cudfMin = AttributeReference("min", child.dataType)() override lazy val inputProjection: Seq[Expression] = Seq(child) override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfMin(cudfMin)) @@ -305,7 +305,7 @@ case class GpuMin(child: Expression) extends GpuDeclarativeAggregate { } case class GpuMax(child: Expression) extends GpuDeclarativeAggregate { - private lazy val cudfMax = AttributeReference("cudf_max", child.dataType)() + private lazy val cudfMax = AttributeReference("max", child.dataType)() override lazy val inputProjection: Seq[Expression] = Seq(child) override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax)) @@ -331,7 +331,7 @@ case class GpuSum(child: Expression) case _ => LongType } - private lazy val cudfSum = AttributeReference("cudf_sum", resultType)() + private lazy val cudfSum = AttributeReference("sum", resultType)() override lazy val inputProjection: Seq[Expression] = Seq(child) override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum)) @@ -353,7 +353,7 @@ case class GpuSum(child: Expression) case class GpuCount(children: Seq[Expression]) extends GpuDeclarativeAggregate { // counts are Long - private lazy val cudfCount = AttributeReference("cudf_count", LongType)() + private lazy val cudfCount = AttributeReference("count", LongType)() override lazy val inputProjection: Seq[Expression] = Seq(children.head) override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfCount(cudfCount)) @@ -371,8 +371,8 @@ case class GpuCount(children: Seq[Expression]) extends GpuDeclarativeAggregate { case class GpuAverage(child: Expression) extends GpuDeclarativeAggregate { // averages are either Decimal or Double. We don't support decimal yet, so making this double. - private lazy val cudfSum = AttributeReference("cudf_sum", DoubleType)() - private lazy val cudfCount = AttributeReference("cudf_count", LongType)() + private lazy val cudfSum = AttributeReference("sum", DoubleType)() + private lazy val cudfCount = AttributeReference("count", LongType)() private def toDoubleLit(v: Any): GpuLiteral = { val litVal = v match { @@ -444,7 +444,7 @@ abstract class GpuFirstBase(child: Expression) val ignoreNulls: Boolean - private lazy val cudfFirst = AttributeReference("cudf_first", child.dataType)() + private lazy val cudfFirst = AttributeReference("first", child.dataType)() private lazy val valueSet = AttributeReference("valueSet", BooleanType)() override lazy val inputProjection: Seq[Expression] = @@ -481,7 +481,7 @@ abstract class GpuLastBase(child: Expression) val ignoreNulls: Boolean - private lazy val cudfLast = AttributeReference("cudf_last", child.dataType)() + private lazy val cudfLast = AttributeReference("last", child.dataType)() private lazy val valueSet = AttributeReference("valueSet", BooleanType)() override lazy val inputProjection: Seq[Expression] = diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index a38cf364ba1..88190f59f8f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -73,4 +73,6 @@ object TrampolineUtil { /** Shuts down and cleans up any existing Spark session */ def cleanupAnyExistingSession(): Unit = SparkSession.cleanupAnyExistingSession() + + def asNullable(dt: DataType): DataType = dt.asNullable }