Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix auto merge conflict 5166 [databricks] #5168

Merged
merged 3 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,22 @@ def test_hash_groupby_approx_percentile_reduction(aqe_enabled):
lambda spark: gen_df(spark, [('v', DoubleGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95], conf, reduction = True)

@incompat
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_reduction_single_row(aqe_enabled):
conf = {'spark.sql.adaptive.enabled': aqe_enabled}
compare_percentile_approx(
lambda spark: gen_df(spark, [('v', DoubleGen())], length=1),
[0.05, 0.25, 0.5, 0.75, 0.95], conf, reduction = True)

@incompat
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_reduction_no_rows(aqe_enabled):
conf = {'spark.sql.adaptive.enabled': aqe_enabled}
compare_percentile_approx(
lambda spark: gen_df(spark, [('v', DoubleGen())], length=0),
[0.05, 0.25, 0.5, 0.75, 0.95], conf, reduction = True)

@incompat
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_byte(aqe_enabled):
Expand Down
4 changes: 2 additions & 2 deletions jenkins/databricks/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ case "$BASE_SPARK_VERSION" in
"3.2.1")
COMMONS_LANG3_VERSION=3.12.0
COMMONS_IO_VERSION=2.8.0
DB_VERSION=-0003
DB_VERSION=-0004
FASTERXML_JACKSON_VERSION=2.12.3
HADOOP_VERSION=3.2
HIVE_FULL_VERSION=2.3.9
JSON4S_VERSION=3.7.0-M11
ORC_VERSION=1.6.12
ORC_VERSION=1.6.13
PARQUET_VERSION=1.12.0
;;
"3.1.2")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids.{GpuExpression, GpuPartitioning}

import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, OrderedDistribution}
import org.apache.spark.sql.types.{DataType, IntegerType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A GPU accelerated `org.apache.spark.sql.catalyst.plans.physical.Partitioning` that partitions
* sortable records by range into roughly equal ranges. The ranges are determined by sampling
* the content of the RDD passed in.
*
* @note The actual number of partitions created might not be the same
* as the `numPartitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*
* The GpuRangePartitioner is where all of the processing actually happens.
*/
case class GpuRangePartitioning(
gpuOrdering: Seq[SortOrder],
numPartitions: Int) extends GpuExpression with ShimExpression with GpuPartitioning {

override def children: Seq[SortOrder] = gpuOrdering
override def nullable: Boolean = false
override def dataType: DataType = IntegerType

override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case OrderedDistribution(requiredOrdering) =>
// If `ordering` is a prefix of `requiredOrdering`:
// Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. According to the
// RangePartitioning definition, any [a, b] in a previous partition must be smaller
// than any [a, b] in the following partition. This also means any [a, b, c] in a
// previous partition must be smaller than any [a, b, c] in the following partition.
// Thus `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`.
//
// If `requiredOrdering` is a prefix of `ordering`:
// Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. According to the
// RangePartitioning definition, any [a, b, c] in a previous partition must be smaller
// than any [a, b, c] in the following partition. If there is a [a1, b1] from a
// previous partition which is larger than a [a2, b2] from the following partition,
// then there must be a [a1, b1 c1] larger than [a2, b2, c2], which violates
// RangePartitioning definition. So it's guaranteed that, any [a, b] in a previous
// partition must not be greater(i.e. smaller or equal to) than any [a, b] in the
// following partition. Thus `RangePartitioning(a, b, c)` satisfies
// `OrderedDistribution(a, b)`.
val minSize = Seq(requiredOrdering.size, gpuOrdering.size).min
requiredOrdering.take(minSize) == gpuOrdering.take(minSize)
case ClusteredDistribution(requiredClustering, _) =>
gpuOrdering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x)))
case _ => false
}
}
}

override def columnarEval(batch: ColumnarBatch): Any =
throw new IllegalStateException("This cannot be executed")
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.python.WindowInPandasExec

/**
Expand All @@ -44,4 +45,13 @@ trait Spark320PlusNonDBShims extends SparkShims {
}

def getWindowExpressions(winPy: WindowInPandasExec): Seq[NamedExpression] = winPy.windowExpression

/**
* Case class ShuffleQueryStageExec holds an additional field shuffleOrigin
* affecting the unapply method signature
*/
override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = {
case ShuffleQueryStageExec(_, e: ReusedExchangeExec, _) => e
case BroadcastQueryStageExec(_, e: ReusedExchangeExec, _) => e
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExecBase
Expand Down Expand Up @@ -439,15 +439,6 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging {
(a, conf, p, r) => new RapidsCsvScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

/**
* Case class ShuffleQueryStageExec holds an additional field shuffleOrigin
* affecting the unapply method signature
*/
override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = {
case ShuffleQueryStageExec(_, e: ReusedExchangeExec, _) => e
case BroadcastQueryStageExec(_, e: ReusedExchangeExec, _) => e
}

/** dropped by SPARK-34234 */
override def attachTreeIfSupported[TreeType <: TreeNode[_], A](
tree: TreeType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec

/** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */
object AQEUtils {
/** Return a new QueryStageExec reuse instance with updated output attributes */
def newReuseInstance(sqse: ShuffleQueryStageExec, newOutput: Seq[Attribute]): QueryStageExec = {
val reusedExchange = ReusedExchangeExec(newOutput, sqse.shuffle)
ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan, sqse.isSparkExchange)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int)
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
case ClusteredDistribution(requiredClustering, _) =>
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
if (requireAllClusterKeys) {
c.areAllClusterKeysMatched(expressions)
} else {
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids.{GpuExpression, GpuPartitioning}

import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, OrderedDistribution}
import org.apache.spark.sql.types.{DataType, IntegerType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A GPU accelerated `org.apache.spark.sql.catalyst.plans.physical.Partitioning` that partitions
* sortable records by range into roughly equal ranges. The ranges are determined by sampling
* the content of the RDD passed in.
*
* @note The actual number of partitions created might not be the same
* as the `numPartitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*
* The GpuRangePartitioner is where all of the processing actually happens.
*/
case class GpuRangePartitioning(
gpuOrdering: Seq[SortOrder],
numPartitions: Int) extends GpuExpression with ShimExpression with GpuPartitioning {

override def children: Seq[SortOrder] = gpuOrdering
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case OrderedDistribution(requiredOrdering) =>
// If `ordering` is a prefix of `requiredOrdering`:
// Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. According to the
// RangePartitioning definition, any [a, b] in a previous partition must be smaller
// than any [a, b] in the following partition. This also means any [a, b, c] in a
// previous partition must be smaller than any [a, b, c] in the following partition.
// Thus `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`.
//
// If `requiredOrdering` is a prefix of `ordering`:
// Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. According to the
// RangePartitioning definition, any [a, b, c] in a previous partition must be smaller
// than any [a, b, c] in the following partition. If there is a [a1, b1] from a
// previous partition which is larger than a [a2, b2] from the following partition,
// then there must be a [a1, b1 c1] larger than [a2, b2, c2], which violates
// RangePartitioning definition. So it's guaranteed that, any [a, b] in a previous
// partition must not be greater(i.e. smaller or equal to) than any [a, b] in the
// following partition. Thus `RangePartitioning(a, b, c)` satisfies
// `OrderedDistribution(a, b)`.
val minSize = Seq(requiredOrdering.size, gpuOrdering.size).min
requiredOrdering.take(minSize) == gpuOrdering.take(minSize)
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
val expressions = gpuOrdering.map(_.child)
if (requireAllClusterKeys) {
c.areAllClusterKeysMatched(expressions)
} else {
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
case _ => false
}
}
}

override def columnarEval(batch: ColumnarBatch): Any =
throw new IllegalStateException("This cannot be executed")
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
Expand Down Expand Up @@ -65,7 +66,7 @@ object SparkShimImpl extends Spark321PlusShims {
override def newBroadcastQueryStageExec(
old: BroadcastQueryStageExec,
newPlan: SparkPlan): BroadcastQueryStageExec =
BroadcastQueryStageExec(old.id, newPlan, old.originalPlan)
BroadcastQueryStageExec(old.id, newPlan, old.originalPlan, old.isSparkExchange)

override def filesFromFileIndex(fileCatalog: PartitioningAwareFileIndex): Seq[FileStatus] = {
fileCatalog.allFiles().map(_.toFileStatus)
Expand Down Expand Up @@ -183,6 +184,15 @@ object SparkShimImpl extends Spark321PlusShims {

override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] =
super.getExecs ++ shimExecs

/**
* Case class ShuffleQueryStageExec holds an additional field shuffleOrigin
* affecting the unapply method signature
*/
override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = {
case ShuffleQueryStageExec(_, e: ReusedExchangeExec, _, _) => e
case BroadcastQueryStageExec(_, e: ReusedExchangeExec, _, _) => e
}
}

// Fallback to the default definition of `deterministic`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation}
import com.nvidia.spark.rapids.GpuCast.doCast
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.catalyst.util.ArrayData
Expand Down Expand Up @@ -64,9 +65,12 @@ case class GpuApproximatePercentile (
// Attributes of fields in the aggregation buffer.
override def aggBufferAttributes: Seq[AttributeReference] = outputBuf :: Nil

// initialValues is only used in reduction and this is not currently supported
override lazy val initialValues: Seq[GpuExpression] = throw new UnsupportedOperationException(
"approx_percentile does not support reduction")
override lazy val initialValues: Seq[GpuLiteral] = Seq(GpuLiteral(
InternalRow(
ArrayData.toArrayData(Array.empty), // centroids (mean, weight)
0d, // min
0d), // max
CudfTDigest.dataType))

// the update expression will create a t-digest (List[Struct[Double, Double])
override lazy val updateAggregates: Seq[CudfAggregate] =
Expand Down