Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable regular expressions on GPU by default [databricks] #4740

Merged
merged 9 commits into from
Feb 10, 2022
15 changes: 4 additions & 11 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,18 +499,11 @@ The following Apache Spark regular expression functions and expressions are supp
- `regexp_like`
- `regexp_replace`

These operations are disabled by default because of known incompatibilities between the Java regular expression
engine that Spark uses and the cuDF regular expression engine on the GPU, and also because the regular expression
kernels can potentially have high memory overhead.
Regular expression evaluation on the GPU can potentially have high memory overhead and cause out-of-memory errors. To
disable regular expressions on the GPU, set `spark.rapids.sql.regexp.enabled=false`.

These operations can be enabled on the GPU with the following configuration settings:

- `spark.rapids.sql.expression.RLike=true` (for `RLIKE`, `regexp`, and `regexp_like`)
- `spark.rapids.sql.expression.RegExpReplace=true` for `regexp_replace`
- `spark.rapids.sql.expression.RegExpExtract=true` for `regexp_extract`

Even when these expressions are enabled, there are instances where regular expression operations will fall back to
CPU when the RAPIDS Accelerator determines that a pattern is either unsupported or would produce incorrect results on the GPU.
There are instances where regular expression operations will fall back to CPU when the RAPIDS Accelerator determines
that a pattern is either unsupported or would produce incorrect results on the GPU.

Here are some examples of regular expression patterns that are not supported on the GPU and will fall back to the CPU.

Expand Down
7 changes: 4 additions & 3 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ Name | Description | Default Value
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
<a name="sql.regexp.enabled"></a>spark.rapids.sql.regexp.enabled|Specifies whether regular expressions should be evaluated on GPU. Complex expressions can cause out of memory issues. Setting this config to false will make these operations fall back to CPU.|true
<a name="sql.replaceSortMergeJoin.enabled"></a>spark.rapids.sql.replaceSortMergeJoin.enabled|Allow replacing sortMergeJoin with HashJoin|true
<a name="sql.rowBasedUDF.enabled"></a>spark.rapids.sql.rowBasedUDF.enabled|When set to true, optimizes a row-based UDF in a GPU operation by transferring only the data it needs between GPU and CPU inside a query operation, instead of falling this operation back to CPU. This is an experimental feature, and this config might be removed in the future.|false
<a name="sql.shuffle.spillThreads"></a>spark.rapids.sql.shuffle.spillThreads|Number of threads used to spill shuffle data to disk in the background.|6
Expand Down Expand Up @@ -265,11 +266,11 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.PromotePrecision"></a>spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None|
<a name="sql.expression.PythonUDF"></a>spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated|true|None|
<a name="sql.expression.Quarter"></a>spark.rapids.sql.expression.Quarter|`quarter`|Returns the quarter of the year for date, in the range 1 to 4|true|None|
<a name="sql.expression.RLike"></a>spark.rapids.sql.expression.RLike|`rlike`|RLike|false|This is disabled by default because the implementation is not 100% compatible. See the compatibility guide for more information.|
<a name="sql.expression.RLike"></a>spark.rapids.sql.expression.RLike|`rlike`|RLike|true|None|
<a name="sql.expression.Rand"></a>spark.rapids.sql.expression.Rand|`random`, `rand`|Generate a random column with i.i.d. uniformly distributed values in [0, 1)|true|None|
<a name="sql.expression.Rank"></a>spark.rapids.sql.expression.Rank|`rank`|Window function that returns the rank value within the aggregation window|true|None|
<a name="sql.expression.RegExpExtract"></a>spark.rapids.sql.expression.RegExpExtract|`regexp_extract`|RegExpExtract|false|This is disabled by default because the implementation is not 100% compatible. See the compatibility guide for more information.|
<a name="sql.expression.RegExpReplace"></a>spark.rapids.sql.expression.RegExpReplace|`regexp_replace`|RegExpReplace support for string literal input patterns|false|This is disabled by default because the implementation is not 100% compatible. See the compatibility guide for more information.|
<a name="sql.expression.RegExpExtract"></a>spark.rapids.sql.expression.RegExpExtract|`regexp_extract`|RegExpExtract|true|None|
<a name="sql.expression.RegExpReplace"></a>spark.rapids.sql.expression.RegExpReplace|`regexp_replace`|RegExpReplace|true|None|
<a name="sql.expression.Remainder"></a>spark.rapids.sql.expression.Remainder|`%`, `mod`|Remainder or modulo|true|None|
<a name="sql.expression.ReplicateRows"></a>spark.rapids.sql.expression.ReplicateRows| |Given an input row replicates the row N times|true|None|
<a name="sql.expression.Rint"></a>spark.rapids.sql.expression.Rint|`rint`|Rounds up a double value to the nearest double equal to an integer|true|None|
Expand Down
8 changes: 4 additions & 4 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -9850,7 +9850,7 @@ are limited.
<td rowSpan="3">RLike</td>
<td rowSpan="3">`rlike`</td>
<td rowSpan="3">RLike</td>
<td rowSpan="3">This is disabled by default because the implementation is not 100% compatible. See the compatibility guide for more information.</td>
<td rowSpan="3">None</td>
<td rowSpan="3">project</td>
<td>str</td>
<td> </td>
Expand Down Expand Up @@ -10038,7 +10038,7 @@ are limited.
<td rowSpan="4">RegExpExtract</td>
<td rowSpan="4">`regexp_extract`</td>
<td rowSpan="4">RegExpExtract</td>
<td rowSpan="4">This is disabled by default because the implementation is not 100% compatible. See the compatibility guide for more information.</td>
<td rowSpan="4">None</td>
<td rowSpan="4">project</td>
<td>str</td>
<td> </td>
Expand Down Expand Up @@ -10126,8 +10126,8 @@ are limited.
<tr>
<td rowSpan="4">RegExpReplace</td>
<td rowSpan="4">`regexp_replace`</td>
<td rowSpan="4">RegExpReplace support for string literal input patterns</td>
<td rowSpan="4">This is disabled by default because the implementation is not 100% compatible. See the compatibility guide for more information.</td>
<td rowSpan="4">RegExpReplace</td>
<td rowSpan="4">None</td>
<td rowSpan="4">project</td>
<td>str</td>
<td> </td>
Expand Down
6 changes: 2 additions & 4 deletions integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ def test_conditional_with_side_effects_cast(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
'IF(a RLIKE "^[0-9]{1,5}\\z", CAST(a AS INT), 0)'),
conf = {'spark.sql.ansi.enabled':True,
'spark.rapids.sql.expression.RLike': True})
conf = {'spark.sql.ansi.enabled':True})

@pytest.mark.parametrize('data_gen', [mk_str_gen('[0-9]{1,9}')], ids=idfn)
def test_conditional_with_side_effects_case_when(data_gen):
Expand All @@ -217,5 +216,4 @@ def test_conditional_with_side_effects_case_when(data_gen):
WHEN a RLIKE "^[0-9]{1,3}\\z" THEN CAST(a AS INT) \
WHEN a RLIKE "^[0-9]{4,6}\\z" THEN CAST(a AS INT) + 123 \
ELSE -1 END'),
conf = {'spark.sql.ansi.enabled':True,
'spark.rapids.sql.expression.RLike': True})
conf = {'spark.sql.ansi.enabled':True})
5 changes: 2 additions & 3 deletions integration_tests/src/main/python/qa_nightly_select_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -147,8 +147,7 @@ def idfn(val):
'spark.rapids.sql.hasNans': 'false',
'spark.rapids.sql.castStringToFloat.enabled': 'true',
'spark.rapids.sql.castFloatToIntegralTypes.enabled': 'true',
'spark.rapids.sql.castFloatToString.enabled': 'true',
'spark.rapids.sql.expression.RegExpReplace': 'true'
'spark.rapids.sql.castFloatToString.enabled': 'true'
}

_first_last_qa_conf = copy_and_update(_qa_conf, {
Expand Down
76 changes: 26 additions & 50 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,7 @@ def test_re_replace():
'REGEXP_REPLACE(a, "\\^TEST\\z", "PROD")',
'REGEXP_REPLACE(a, "TEST", "")',
'REGEXP_REPLACE(a, "TEST", "%^[]\ud720")',
'REGEXP_REPLACE(a, "TEST", NULL)'),
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})
'REGEXP_REPLACE(a, "TEST", NULL)'))

@allow_non_gpu('ProjectExec', 'RegExpReplace')
def test_re_replace_backrefs():
Expand All @@ -355,23 +354,20 @@ def test_re_replace_backrefs():
lambda spark: unary_op_df(spark, gen).selectExpr(
'REGEXP_REPLACE(a, "(TEST)", "[$0]")',
'REGEXP_REPLACE(a, "(TEST)", "[$1]")'),
'RegExpReplace',
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})
'RegExpReplace')

def test_re_replace_backrefs_escaped():
gen = mk_str_gen('.{0,5}TEST[\ud720 A]{0,5}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'REGEXP_REPLACE(a, "(TEST)", "[\\\\$0]")',
'REGEXP_REPLACE(a, "(TEST)", "[\\\\$1]")'),
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})
'REGEXP_REPLACE(a, "(TEST)", "[\\\\$1]")'))

def test_re_replace_escaped():
gen = mk_str_gen('.{0,5}TEST[\ud720 A]{0,5}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'REGEXP_REPLACE(a, "[A-Z]+", "\\\\A\\A\\\\t\\\\r\\\\n\\t\\r\\n")'),
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})
'REGEXP_REPLACE(a, "[A-Z]+", "\\\\A\\A\\\\t\\\\r\\\\n\\t\\r\\n")'))

def test_re_replace_null():
gen = mk_str_gen('[\u0000 ]{0,2}TE[\u0000 ]{0,2}ST[\u0000 ]{0,2}')\
Expand All @@ -390,8 +386,7 @@ def test_re_replace_null():
'REGEXP_REPLACE(a, "\x00", "NULL")',
'REGEXP_REPLACE(a, "\0", "NULL")',
'REGEXP_REPLACE(a, "TE\u0000ST", "PROD")',
'REGEXP_REPLACE(a, "TE\u0000\u0000ST", "PROD")'),
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})
'REGEXP_REPLACE(a, "TE\u0000\u0000ST", "PROD")'))

def test_length():
gen = mk_str_gen('.{0,5}TEST[\ud720 A]{0,5}')
Expand Down Expand Up @@ -518,8 +513,7 @@ def test_regexp_replace():
'regexp_replace(a, "[^xyz]", "A")',
'regexp_replace(a, "([^x])|([^y])", "A")',
'regexp_replace(a, "(?:aa)+", "A")',
'regexp_replace(a, "a|b|c", "A")'),
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})
'regexp_replace(a, "a|b|c", "A")'))

@pytest.mark.skipif(is_before_spark_320(), reason='regexp is synonym for RLike starting in Spark 3.2.0')
def test_regexp():
Expand All @@ -529,8 +523,7 @@ def test_regexp():
'regexp(a, "a{2}")',
'regexp(a, "a{1,3}")',
'regexp(a, "a{1,}")',
'regexp(a, "a[bc]d")'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
'regexp(a, "a[bc]d")'))

@pytest.mark.skipif(is_before_spark_320(), reason='regexp_like is synonym for RLike starting in Spark 3.2.0')
def test_regexp_like():
Expand All @@ -540,8 +533,7 @@ def test_regexp_like():
'regexp_like(a, "a{2}")',
'regexp_like(a, "a{1,3}")',
'regexp_like(a, "a{1,}")',
'regexp_like(a, "a[bc]d")'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
'regexp_like(a, "a[bc]d")'))

@pytest.mark.skipif(is_databricks_runtime(),
reason='Databricks optimizes out regexp_replace call in this case')
Expand All @@ -557,8 +549,7 @@ def test_regexp_replace_null_pattern_fallback():
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'regexp_replace(a, NULL, "A")'),
'RegExpReplace',
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})
'RegExpReplace')

def test_regexp_replace_character_set_negated():
gen = mk_str_gen('[abcd]{0,3}[\r\n]{0,2}[abcd]{0,3}')
Expand All @@ -572,17 +563,15 @@ def test_regexp_replace_character_set_negated():
'regexp_replace(a, "[^a\n]", "1")',
'regexp_replace(a, "[^\r\n]", "1")',
'regexp_replace(a, "[^\r]", "1")',
'regexp_replace(a, "[^\n]", "1")'),
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})
'regexp_replace(a, "[^\n]", "1")'))

def test_regexp_extract():
gen = mk_str_gen('[abcd]{1,3}[0-9]{1,3}[abcd]{1,3}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'regexp_extract(a, "^([a-d]*)([0-9]*)([a-d]*)\\z", 1)',
'regexp_extract(a, "^([a-d]*)([0-9]*)([a-d]*)\\z", 2)',
'regexp_extract(a, "^([a-d]*)([0-9]*)([a-d]*)\\z", 3)'),
conf={'spark.rapids.sql.expression.RegExpExtract': 'true'})
'regexp_extract(a, "^([a-d]*)([0-9]*)([a-d]*)\\z", 3)'))

def test_regexp_extract_no_match():
gen = mk_str_gen('[abcd]{1,3}[0-9]{1,3}[abcd]{1,3}')
Expand All @@ -591,8 +580,7 @@ def test_regexp_extract_no_match():
'regexp_extract(a, "^([0-9]+)([a-z]+)([0-9]+)\\z", 0)',
'regexp_extract(a, "^([0-9]+)([a-z]+)([0-9]+)\\z", 1)',
'regexp_extract(a, "^([0-9]+)([a-z]+)([0-9]+)\\z", 2)',
'regexp_extract(a, "^([0-9]+)([a-z]+)([0-9]+)\\z", 3)'),
conf={'spark.rapids.sql.expression.RegExpExtract': 'true'})
'regexp_extract(a, "^([0-9]+)([a-z]+)([0-9]+)\\z", 3)'))

# if we determine that the index is out of range we fall back to CPU and let
# Spark take care of the error handling
Expand All @@ -603,7 +591,7 @@ def test_regexp_extract_idx_negative():
lambda spark: unary_op_df(spark, gen).selectExpr(
'regexp_extract(a, "^([a-d]*)([0-9]*)([a-d]*)$", -1)').collect(),
error_message = "The specified group index cannot be less than zero",
conf={'spark.rapids.sql.expression.RegExpExtract': 'true'})
conf={})

# if we determine that the index is out of range we fall back to CPU and let
# Spark take care of the error handling
Expand All @@ -614,29 +602,26 @@ def test_regexp_extract_idx_out_of_bounds():
lambda spark: unary_op_df(spark, gen).selectExpr(
'regexp_extract(a, "^([a-d]*)([0-9]*)([a-d]*)$", 4)').collect(),
error_message = "Regex group count is 3, but the specified group index is 4",
conf={'spark.rapids.sql.expression.RegExpExtract': 'true'})
conf={})

def test_regexp_extract_multiline():
gen = mk_str_gen('[abcd]{2}[\r\n]{0,2}[0-9]{2}[\r\n]{0,2}[abcd]{2}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'regexp_extract(a, "^([a-d]*)([\r\n]*)", 2)'),
conf={'spark.rapids.sql.expression.RegExpExtract': 'true'})
'regexp_extract(a, "^([a-d]*)([\r\n]*)", 2)'))

def test_regexp_extract_multiline_negated_character_class():
gen = mk_str_gen('[abcd]{2}[\r\n]{0,2}[0-9]{2}[\r\n]{0,2}[abcd]{2}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'regexp_extract(a, "^([a-d]*)([^a-z]*)([a-d]*)\\z", 2)'),
conf={'spark.rapids.sql.expression.RegExpExtract': 'true'})
'regexp_extract(a, "^([a-d]*)([^a-z]*)([a-d]*)\\z", 2)'))

def test_regexp_extract_idx_0():
gen = mk_str_gen('[abcd]{1,3}[0-9]{1,3}[abcd]{1,3}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'regexp_extract(a, "^([a-d]*)([0-9]*)([a-d]*)\\z", 0)',
'regexp_extract(a, "^([a-d]*)[0-9]*([a-d]*)\\z", 0)'),
conf={'spark.rapids.sql.expression.RegExpExtract': 'true'})
'regexp_extract(a, "^([a-d]*)[0-9]*([a-d]*)\\z", 0)'))

def test_rlike():
gen = mk_str_gen('[abcd]{1,3}')
Expand All @@ -645,8 +630,7 @@ def test_rlike():
'a rlike "a{2}"',
'a rlike "a{1,3}"',
'a rlike "a{1,}"',
'a rlike "a[bc]d"'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
'a rlike "a[bc]d"'))

def test_rlike_embedded_null():
gen = mk_str_gen('[abcd]{1,3}')\
Expand All @@ -656,41 +640,36 @@ def test_rlike_embedded_null():
'a rlike "a{2}"',
'a rlike "a{1,3}"',
'a rlike "a{1,}"',
'a rlike "a[bc]d"'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
'a rlike "a[bc]d"'))

def test_rlike_null_pattern():
gen = mk_str_gen('[abcd]{1,3}')
# Spark optimizes out `RLIKE NULL` in this test
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'a rlike NULL'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
'a rlike NULL'))

@allow_non_gpu('ProjectExec', 'RLike')
def test_rlike_fallback_null_pattern():
gen = mk_str_gen('[abcd]{1,3}')
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'a rlike "a\u0000"'),
'RLike',
conf={'spark.rapids.sql.expression.RLike': 'true'})
'RLike')

@allow_non_gpu('ProjectExec', 'RLike')
def test_rlike_fallback_empty_group():
gen = mk_str_gen('[abcd]{1,3}')
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'a rlike "a()?"'),
'RLike',
conf={'spark.rapids.sql.expression.RLike': 'true'})
'RLike')

def test_rlike_escape():
gen = mk_str_gen('[ab]{0,2}[\\-\\+]{0,2}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'a rlike "a[\\\\-]"'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
'a rlike "a[\\\\-]"'))

def test_rlike_multi_line():
gen = mk_str_gen('[abc]\n[def]')
Expand All @@ -699,23 +678,20 @@ def test_rlike_multi_line():
'a rlike "^a"',
'a rlike "^d"',
'a rlike "c\\z"',
'a rlike "e\\z"'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
'a rlike "e\\z"'))

def test_rlike_missing_escape():
gen = mk_str_gen('a[\\-\\+]')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'a rlike "a[-]"',
'a rlike "a[+-]"',
'a rlike "a[a-b-]"'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
'a rlike "a[a-b-]"'))

@allow_non_gpu('ProjectExec', 'RLike')
def test_rlike_fallback_possessive_quantifier():
gen = mk_str_gen('(\u20ac|\\w){0,3}a[|b*.$\r\n]{0,2}c\\w{0,3}')
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'a rlike "a*+"'),
'RLike',
conf={'spark.rapids.sql.expression.RLike': 'true'})
'RLike')
Loading