Skip to content

Commit

Permalink
merge from branch-22.02
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove committed Jan 11, 2022
2 parents f70c0d2 + 7564f22 commit 83cfac6
Show file tree
Hide file tree
Showing 17 changed files with 91 additions and 41 deletions.
1 change: 1 addition & 0 deletions .github/workflows/blossom-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
nvliyuan,\
res-life,\
HaoYang670,\
NVnavkumar,\
', format('{0},', github.actor)) && github.event.comment.body == 'build'
steps:
- name: Check if comment is issued by authorized person
Expand Down
3 changes: 2 additions & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ Here are some examples of regular expression patterns that are not supported on
- Empty groups: `()`
- Regular expressions containing null characters (unless the pattern is a simple literal string)
- Beginning-of-line and end-of-line anchors (`^` and `$`) are not supported in some contexts, such as when combined
- with a choice (`^|a`).
with a choice (`^|a`).
- String anchors `\z` and `\Z` are not supported by `regexp_replace`
- Hex and octal digits

In addition to these cases that can be detected, there are also known issues that can cause incorrect results:
Expand Down
11 changes: 8 additions & 3 deletions integration_tests/src/main/python/cast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error, assert_gpu_fallback_collect, assert_py4j_exception
from data_gen import *
from spark_session import is_before_spark_311, is_before_spark_320, is_before_spark_330, with_gpu_session
from spark_session import is_before_spark_320, with_gpu_session
from marks import allow_non_gpu, approximate_float
from pyspark.sql.types import *
from spark_init_internal import spark_version

def test_cast_empty_string_to_int():
assert_gpu_and_cpu_are_equal_collect(
Expand Down Expand Up @@ -300,8 +301,12 @@ def test_cast_struct_with_unsupported_element_to_string_fallback(data_gen, legac
"spark.sql.legacy.castComplexTypesToString.enabled": legacy,
"spark.sql.legacy.allowNegativeScaleOfDecimal": 'true'}
)

@pytest.mark.skipif(not is_before_spark_311() and is_before_spark_330(), reason="RAPIDS doesn't support casting string to decimal for negative scale decimal in this version of Spark because of SPARK-37451")

# The bug SPARK-37451 only affects the following versions
def is_neg_dec_scale_bug_version():
return ("3.1.1" <= spark_version() < "3.1.3") or ("3.2.0" <= spark_version() < "3.2.1")

@pytest.mark.skipif(is_neg_dec_scale_bug_version(), reason="RAPIDS doesn't support casting string to decimal for negative scale decimal in this version of Spark because of SPARK-37451")
def test_cast_string_to_negative_scale_decimal():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, StringGen("[0-9]{9}")).select(
Expand Down
5 changes: 0 additions & 5 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,6 @@ def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled):
('v', ByteGen())], length=100),
0.5, conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/4060")
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long_repeated_keys(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
Expand All @@ -1336,7 +1335,6 @@ def test_hash_groupby_approx_percentile_long_repeated_keys(aqe_enabled):
('v', LongRangeGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/4060")
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
Expand All @@ -1345,7 +1343,6 @@ def test_hash_groupby_approx_percentile_long(aqe_enabled):
('v', LongRangeGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/4060")
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long_single(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
Expand Down Expand Up @@ -1388,15 +1385,13 @@ def approx_percentile_query(spark):

assert_gpu_fallback_collect(lambda spark: approx_percentile_query(spark), 'ApproximatePercentile', conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/4060")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_decimal32():
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', RepeatSeqGen(ByteGen(nullable=False), length=2)),
('v', DecimalGen(6, 2))]),
[0.05, 0.25, 0.5, 0.75, 0.95], conf = _approx_percentile_conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/4060")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_decimal32_single():
compare_percentile_approx(
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ class Spark312dbShims extends Spark31XdbShims with Spark30Xuntil33XShims {
new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith,
pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode)
}
override def isCastingStringToNegDecimalScaleSupported: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ class Spark313Shims extends Spark31XShims with Spark30Xuntil33XShims {

override def hasCastFloatTimestampUpcast: Boolean = true

override def isCastingStringToNegDecimalScaleSupported: Boolean = true
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,4 +37,6 @@ class Spark320Shims extends Spark320until322Shims with Spark30Xuntil33XShims {
metadataColumns: Seq[AttributeReference]): RDD[InternalRow] = {
new FileScanRDD(sparkSession, readFunction, filePartitions)
}

override def isCastingStringToNegDecimalScaleSupported: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,4 @@ class Spark322Shims extends Spark322PlusShims with Spark30Xuntil33XShims {
metadataColumns: Seq[AttributeReference]): RDD[InternalRow] = {
new FileScanRDD(sparkSession, readFunction, filePartitions)
}

override def isCastingStringToNegDecimalScaleSupported: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,4 @@ import com.nvidia.spark.rapids.shims.v2._

class Spark330Shims extends Spark33XShims {
override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION

override def isCastingStringToNegDecimalScaleSupported: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,6 @@ abstract class Spark30XShims extends Spark301until320Shims with Logging {
adaptivePlan.initialPlan
}

override def isCastingStringToNegDecimalScaleSupported: Boolean = true

// this is to help with an optimization in Spark 3.1, so we disable it by default in Spark 3.0.x
override def isEmptyRelation(relation: Any): Boolean = false
override def tryTransformIfEmptyRelation(mode: BroadcastMode): Option[Any] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,4 @@ trait Spark31XdbShimsBase extends SparkShims {
}

override def shouldFallbackOnAnsiTimestamp(): Boolean = false

override def isCastingStringToNegDecimalScaleSupported: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -1035,8 +1035,6 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging {
adaptivePlan.initialPlan
}

override def isCastingStringToNegDecimalScaleSupported: Boolean = false

override def columnarAdaptivePlan(a: AdaptiveSparkPlanExec,
goal: CoalesceSizeGoal): SparkPlan = {
a.copy(supportsColumnar = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ class RegexParser(pattern: String) {
throw new RegexUnsupportedException("escape at end of string", Some(pos))
case Some(ch) =>
ch match {
case 'A' | 'Z' =>
// BOL / EOL anchors
case 'A' | 'Z' | 'z' =>
// string anchors
consumeExpected(ch)
RegexEscaped(ch)
case 's' | 'S' | 'd' | 'D' | 'w' | 'W' =>
Expand Down Expand Up @@ -471,6 +471,25 @@ class CudfRegexTranspiler(replace: Boolean) {
// this needs further analysis to determine why words boundaries behave
// differently between Java and cuDF
throw new RegexUnsupportedException("word boundaries are not supported")
case 'z' =>
if (replace) {
throw new RegexUnsupportedException("string anchor \\z not supported in replace mode")
}
// cuDF does not support "\z" but supports "$", which is equivalent
RegexChar('$')
case 'Z' =>
if (replace) {
throw new RegexUnsupportedException("string anchor \\Z not supported in replace mode")
}
// We transpile "\\Z" to "(?:[\r\n]?$)" because of the different meanings of "\\Z"
// between Java and cuDF:
// Java: The end of the input but for the final terminator, if any
// cuDF: Matches at the end of the string
RegexGroup(capture = false, RegexSequence(
ListBuffer(RegexRepetition(
RegexCharacterClass(negated = false,
ListBuffer(RegexChar('\r'), RegexChar('\n'))), SimpleQuantifier('?')),
RegexChar('$'))))
case _ =>
regex
}
Expand Down
30 changes: 20 additions & 10 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{ColumnVector, NvtxColor, OrderByArg, Table}

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, NullsFirst, NullsLast, SortOrder}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{ArrayType, DataType, MapType}
import org.apache.spark.sql.vectorized.ColumnarBatch

object SortUtils extends Arm {
Expand Down Expand Up @@ -210,6 +211,20 @@ class GpuSorter(
}
}

private[this] lazy val hasNestedInKeyColumns = cpuOrderingInternal.exists ( order =>
DataTypeUtils.isNestedType(order.child.dataType)
)

/** (This can be removed once https://github.com/rapidsai/cudf/issues/8050 is addressed) */
private[this] lazy val hasUnsupportedNestedInRideColumns = {
val keyColumnIndices = cpuOrderingInternal.map(_.child.asInstanceOf[BoundReference].ordinal)
val rideColumnIndices = projectedBatchTypes.indices.toSet -- keyColumnIndices
rideColumnIndices.exists { idx =>
TrampolineUtil.dataTypeExistsRecursively(projectedBatchTypes(idx),
t => t.isInstanceOf[ArrayType] || t.isInstanceOf[MapType])
}
}

/**
* Merge multiple batches together. All of these batches should be the output of
* `appendProjectedColumns` and the output of this will also be in that same format.
Expand All @@ -226,15 +241,10 @@ class GpuSorter(
batches.foreach { cb =>
tabs += GpuColumnVector.from(cb)
}
// In the current version of cudf merge does not work for structs or lists (nested types)
// In the current version of cudf merge does not work for lists and maps.
// This should be fixed by https://github.com/rapidsai/cudf/issues/8050
val hasNested = {
val tab = tabs.head
(0 until tab.getNumberOfColumns).exists { i =>
tab.getColumn(i).getType.isNestedType
}
}
if (hasNested) {
// Nested types in sort key columns is not supported either.
if (hasNestedInKeyColumns || hasUnsupportedNestedInRideColumns) {
// so as a work around we concatenate all of the data together and then sort it.
// It is slower, but it works
withResource(Table.concatenate(tabs: _*)) { concatenated =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ trait SparkShims {
def int96ParquetRebaseWrite(conf: SQLConf): String
def int96ParquetRebaseReadKey: String
def int96ParquetRebaseWriteKey: String
def isCastingStringToNegDecimalScaleSupported: Boolean
def isCastingStringToNegDecimalScaleSupported: Boolean = true

def getParquetFilters(
schema: MessageType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ class RegularExpressionTranspilerSuite extends FunSuite with Arm {
assertUnsupported(pattern, replace = false,
"cuDF does not support octal digits consistently with Spark"))
}
test("string anchors - find") {
val patterns = Seq("\\Atest", "test\\z", "test\\Z")
assertCpuGpuMatchesRegexpFind(patterns, Seq("", "test", "atest", "testa",
"\ntest", "test\n", "\ntest\n"))
}

test("string anchors - replace") {
val patterns = Seq("\\Atest")
assertCpuGpuMatchesRegexpReplace(patterns, Seq("", "test", "atest", "testa",
"\ntest", "test\n", "\ntest\n", "\ntest\r\ntest\n"))
}

test("end of line anchor with strings ending in valid newline") {
val pattern = "2$"
Expand Down Expand Up @@ -219,6 +230,14 @@ class RegularExpressionTranspilerSuite extends FunSuite with Arm {
.replaceAll("\\$", Matcher.quoteReplacement("[\r]?[\n]?$")))
}

test("transpile \\z") {
doTranspileTest("\\z", "$")
}

test("transpile \\Z") {
doTranspileTest("\\Z", "(?:[\r\n]?$)")
}

test("compare CPU and GPU: character range including unescaped + and -") {
val patterns = Seq("a[-]+", "a[a-b-]+", "a[-a-b]", "a[-+]", "a[+-]")
val inputs = Seq("a+", "a-", "a", "a-+", "a[a-b-]")
Expand All @@ -237,6 +256,14 @@ class RegularExpressionTranspilerSuite extends FunSuite with Arm {
assertCpuGpuMatchesRegexpFind(patterns, inputs)
}

private val REGEXP_LIMITED_CHARS_COMMON = "|()[]{},.^$*+?abc123x\\ \tBsdwSDW"

// we currently only support \\z and \\Z in find mode
// see https://github.com/NVIDIA/spark-rapids/issues/4425
private val REGEXP_LIMITED_CHARS_FIND = REGEXP_LIMITED_CHARS_COMMON + "zZ"

private val REGEXP_LIMITED_CHARS_REPLACE = REGEXP_LIMITED_CHARS_COMMON

test("compare CPU and GPU: find digits") {
val patterns = Seq("\\d", "\\d+", "\\d*", "\\d?",
"\\D", "\\D+", "\\D*", "\\D?")
Expand All @@ -252,13 +279,11 @@ class RegularExpressionTranspilerSuite extends FunSuite with Arm {
assertCpuGpuMatchesRegexpReplace(patterns, inputs)
}

private val REGEXP_LIMITED_CHARS = "|()[]{},.^$*+?abc123x\\ \tBsdwSDW"

test("compare CPU and GPU: regexp find fuzz test with limited chars") {
// testing with this limited set of characters finds issues much
// faster than using the full ASCII set
// CR and LF has been excluded due to known issues
doFuzzTest(Some(REGEXP_LIMITED_CHARS), replace = false)
doFuzzTest(Some(REGEXP_LIMITED_CHARS_FIND), replace = false)
}

test("compare CPU and GPU: regexp replace simple regular expressions") {
Expand Down Expand Up @@ -296,7 +321,7 @@ class RegularExpressionTranspilerSuite extends FunSuite with Arm {
// testing with this limited set of characters finds issues much
// faster than using the full ASCII set
// LF has been excluded due to known issues
doFuzzTest(Some(REGEXP_LIMITED_CHARS), replace = true)
doFuzzTest(Some(REGEXP_LIMITED_CHARS_REPLACE), replace = true)
}

test("compare CPU and GPU: regexp find fuzz test printable ASCII chars plus CR and TAB") {
Expand Down Expand Up @@ -348,7 +373,7 @@ class RegularExpressionTranspilerSuite extends FunSuite with Arm {
gpuContains(cudfPattern, input)
} catch {
case e: CudfException =>
fail(s"cuDF failed to compile pattern: $cudfPattern", e)
fail(s"cuDF failed to compile pattern: ${toReadableString(cudfPattern)}", e)
}
for (i <- input.indices) {
if (cpu(i) != gpu(i)) {
Expand All @@ -371,7 +396,7 @@ class RegularExpressionTranspilerSuite extends FunSuite with Arm {
gpuReplace(cudfPattern, input)
} catch {
case e: CudfException =>
fail(s"cuDF failed to compile pattern: $cudfPattern", e)
fail(s"cuDF failed to compile pattern: ${toReadableString(cudfPattern)}", e)
}
for (i <- input.indices) {
if (cpu(i) != gpu(i)) {
Expand Down Expand Up @@ -435,7 +460,7 @@ class RegularExpressionTranspilerSuite extends FunSuite with Arm {

private def doTranspileTest(pattern: String, expected: String) {
val transpiled: String = transpile(pattern, replace = false)
assert(transpiled === expected)
assert(toReadableString(transpiled) === toReadableString(expected))
}

private def transpile(pattern: String, replace: Boolean): String = {
Expand Down

0 comments on commit 83cfac6

Please sign in to comment.