Skip to content

Commit

Permalink
merge from branch-22.02
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 1, 2021
2 parents 34b9fe4 + 91d3383 commit c85a9a3
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 143 deletions.
46 changes: 46 additions & 0 deletions docs/additional-functionality/rapids-shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,52 @@ non-standard location.
With the RAPIDS Shuffle Manager configured, the setting `spark.rapids.shuffle.enabled` (default on)
can be used to enable or disable the usage of RAPIDS Shuffle Manager during your application.

#### Databricks

Please make sure you follow the [Getting Started](../get-started/getting-started-databricks.md)
guide for Databricks. The following are extra steps required to enable UCX.

1) Create and enable an additional "init script" that installs UCX:

```
#!/bin/bash
sudo apt install -y wget libnuma1 &&
wget https://github.com/openucx/ucx/releases/download/v1.11.2/ucx-v1.11.2-ubuntu18.04-mofed5.x-cuda11.2.deb &&
sudo dpkg -i ucx-v1.11.2-ubuntu18.04-mofed5.x-cuda11.2.deb &&
rm ucx-v1.11.2-ubuntu18.04-mofed5.x-cuda11.2.deb
```

Save the script in DBFS and add it to the "Init Scripts" list:

![Init scripts panel showing UCX init script](../img/Databricks/initscript_ucx.png)

2) Add the UCX minimum configuration for your Cluster.

Databricks 9.1:

```
spark.shuffle.service.enabled false
spark.executorEnv.UCX_MEMTYPE_CACHE n
spark.executorEnv.UCX_ERROR_SIGNALS ""
spark.shuffle.manager com.nvidia.spark.rapids.spark312db.RapidsShuffleManager
```

Databricks 7.3:

```
spark.shuffle.service.enabled false
spark.executorEnv.UCX_MEMTYPE_CACHE n
spark.executorEnv.UCX_ERROR_SIGNALS ""
spark.shuffle.manager com.nvidia.spark.rapids.spark301db.RapidsShuffleManager
```

Example of configuration panel with the new settings:

![Configurations with UCX](../img/Databricks/sparkconfig_ucx.png)

Please note that at this time, we have tested with Autoscaling off. It is not clear how an autoscaled
cluster will behave with the RAPIDS Shuffle Manager.

#### UCX Environment Variables
- `UCX_TLS`:
- `cuda_copy`, and `cuda_ipc`: enables handling of CUDA memory in UCX, both for copy-based transport
Expand Down
Binary file added docs/img/Databricks/initscript_ucx.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/img/Databricks/sparkconfig_ucx.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 18 additions & 0 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,16 @@ def test_regexp_like():
'regexp_like(a, "a{1,}")',
'regexp_like(a, "a[bc]d")'),
conf={'spark.rapids.sql.expression.RLike': 'true'})
@allow_non_gpu('ProjectExec', 'RegExpReplace')
def test_regexp_replace_null_pattern_fallback():
gen = mk_str_gen('[abcd]{0,3}')
# Spark translates `NULL` to `CAST(NULL as STRING)` and we only support
# literal expressions for the regex pattern
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'regexp_replace(a, NULL, "A")'),
'RegExpReplace',
conf={'spark.rapids.sql.expression.RegExpReplace': 'true'})

def test_rlike():
gen = mk_str_gen('[abcd]{1,3}')
Expand All @@ -527,6 +537,14 @@ def test_rlike_embedded_null():
'a rlike "a[bc]d"'),
conf={'spark.rapids.sql.expression.RLike': 'true'})

def test_rlike_null_pattern():
gen = mk_str_gen('[abcd]{1,3}')
# Spark optimizes out `RLIKE NULL` in this test
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'a rlike NULL'),
conf={'spark.rapids.sql.expression.RLike': 'true'})

@allow_non_gpu('ProjectExec', 'RLike')
def test_rlike_fallback_null_pattern():
gen = mk_str_gen('[abcd]{1,3}')
Expand Down
48 changes: 39 additions & 9 deletions jenkins/databricks/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parse input parameters."""

import sys
import getopt
import sys

workspace = 'https://dbc-9ff9942e-a9c4.cloud.databricks.com'
token = ''
Expand All @@ -27,22 +28,50 @@
clusterid = ''
build_profiles = 'databricks,!snapshot-shims'
jar_path = ''
# `spark_conf` can take comma seperated mutiple spark configurations, e.g., spark.foo=1,spark.bar=2,...'
# `spark_conf` can take comma seperated multiple spark configurations, e.g., spark.foo=1,spark.bar=2,...'
spark_conf = ''


def usage():
"""Define usage."""
print('Usage: ' + sys.argv[0] +
' -s <workspace>'
' -t <token>'
' -c <clusterid>'
' -p <privatekeyfile>'
' -l <localscript>'
' -d <scriptdestination>'
' -z <sparktgz>'
' -v <basesparkpomversion>'
' -b <buildprofiles>'
' -j <jarpath>'
' -n <skipstartingcluster>'
' -f <sparkconf>'
' -i <sparkinstallver>')


try:
opts, args = getopt.getopt(sys.argv[1:], 'hw:t:c:p:l:d:z:m:v:b:j:f:i:',
['workspace=', 'token=', 'clusterid=', 'private=', 'localscript=', 'dest=', 'sparktgz=', 'basesparkpomversion=', 'buildprofiles=', 'jarpath', 'sparkconf', 'sparkinstallver='])
opts, script_args = getopt.getopt(sys.argv[1:], 'hw:t:c:p:l:d:z:m:v:b:j:f:i:',
['workspace=',
'token=',
'clusterid=',
'private=',
'localscript=',
'dest=',
'sparktgz=',
'basesparkpomversion=',
'buildprofiles=',
'jarpath',
'sparkconf',
'sparkinstallver='])
except getopt.GetoptError:
print(
'run-tests.py -s <workspace> -t <token> -c <clusterid> -p <privatekeyfile> -l <localscript> -d <scriptdestinatino> -z <sparktgz> -v <basesparkpomversion> -b <buildprofiles> -j <jarpath> -f <sparkconf> -i <sparkinstallver>')
usage()
sys.exit(2)

for opt, arg in opts:
if opt == '-h':
print(
'run-tests.py -s <workspace> -t <token> -c <clusterid> -p <privatekeyfile> -n <skipstartingcluster> -l <localscript> -d <scriptdestinatino>, -z <sparktgz> -v <basesparkpomversion> -b <buildprofiles> -f <sparkconf> -i <sparkinstallver>')
sys.exit()
usage()
sys.exit(1)
elif opt in ('-w', '--workspace'):
workspace = arg
elif opt in ('-t', '--token'):
Expand Down Expand Up @@ -73,6 +102,7 @@
print('-p is ' + private_key_file)
print('-l is ' + local_script)
print('-d is ' + script_dest)
print('script_args is ' + ' '.join(script_args))
print('-z is ' + source_tgz)
print('-v is ' + base_spark_pom_version)
print('-j is ' + jar_path)
Expand Down
43 changes: 24 additions & 19 deletions jenkins/databricks/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import requests
import sys
import getopt
import time
import os
"""Upload & run test script on Databricks cluster."""

import subprocess
import sys

from clusterutils import ClusterUtils

import params


def main():
"""Define main function."""
master_addr = ClusterUtils.cluster_get_master_addr(params.workspace, params.clusterid, params.token)
if master_addr is None:
print("Error, didn't get master address")
sys.exit(1)
print("Master node address is: %s" % master_addr)

master_addr = ClusterUtils.cluster_get_master_addr(params.workspace, params.clusterid, params.token)
if master_addr is None:
print("Error, didn't get master address")
sys.exit(1)
print("Master node address is: %s" % master_addr)
print("Copying script")
rsync_command = "rsync -I -Pave \"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -p 2200 -i %s\"" \
" %s ubuntu@%s:%s" % (params.private_key_file, params.local_script, master_addr, params.script_dest)
print("rsync command: %s" % rsync_command)
subprocess.check_call(rsync_command, shell=True)

print("Copying script")
rsync_command = "rsync -I -Pave \"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -p 2200 -i %s\" %s ubuntu@%s:%s" % (params.private_key_file, params.local_script, master_addr, params.script_dest)
print("rsync command: %s" % rsync_command)
subprocess.check_call(rsync_command, shell = True)
ssh_command = "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null ubuntu@%s -p 2200 -i %s " \
"'LOCAL_JAR_PATH=%s SPARK_CONF=%s BASE_SPARK_VER=%s bash %s %s 2>&1 | tee testout; " \
"if [ ${PIPESTATUS[0]} -ne 0 ]; then false; else true; fi'" % \
(master_addr, params.private_key_file, params.jar_path, params.spark_conf, params.base_spark_pom_version,
params.script_dest, ' '.join(params.script_args))
print("ssh command: %s" % ssh_command)
subprocess.check_call(ssh_command, shell=True)

ssh_command = "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null ubuntu@%s -p 2200 -i %s 'LOCAL_JAR_PATH=%s SPARK_CONF=%s BASE_SPARK_VER=%s bash %s 2>&1 | tee testout; if [ ${PIPESTATUS[0]} -ne 0 ]; then false; else true; fi'" % (master_addr, params.private_key_file, params.jar_path, params.spark_conf, params.base_spark_pom_version, params.script_dest)
print("ssh command: %s" % ssh_command)
subprocess.check_call(ssh_command, shell = True)

if __name__ == '__main__':
main()
main()
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,24 @@ class GpuRegExpReplaceMeta(
rule: DataFromReplacementRule)
extends TernaryExprMeta[RegExpReplace](expr, conf, parent, rule) {

private var pattern: Option[String] = None

override def tagExprForGpu(): Unit = {
expr.regexp match {
case Literal(null, _) =>
willNotWorkOnGpu(s"null pattern is not supported on GPU")
case Literal(s: UTF8String, DataTypes.StringType) =>
val pattern = s.toString
if (pattern.isEmpty) {
willNotWorkOnGpu(s"empty pattern is not supported on GPU")
}

case Literal(s: UTF8String, DataTypes.StringType) if s != null =>
if (GpuOverrides.isSupportedStringReplacePattern(expr.regexp)) {
// use GpuStringReplace
} else {
try {
new CudfRegexTranspiler(replace = true).transpile(pattern)
pattern = Some(new CudfRegexTranspiler(replace = true).transpile(s.toString))
} catch {
case e: RegexUnsupportedException =>
willNotWorkOnGpu(e.getMessage)
}
}

case _ =>
willNotWorkOnGpu(s"non-literal pattern is not supported on GPU")
willNotWorkOnGpu(s"only non-null literal strings are supported on GPU")
}
}

Expand All @@ -62,7 +57,8 @@ class GpuRegExpReplaceMeta(
if (GpuOverrides.isSupportedStringReplacePattern(expr.regexp)) {
GpuStringReplace(lhs, regexp, rep)
} else {
GpuRegExpReplace(lhs, regexp, rep)
GpuRegExpReplace(lhs, regexp, rep, pattern.getOrElse(
throw new IllegalStateException("Expression has not been tagged with cuDF regex pattern")))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,24 @@ class GpuRegExpReplaceMeta(
rule: DataFromReplacementRule)
extends TernaryExprMeta[RegExpReplace](expr, conf, parent, rule) {

private var pattern: Option[String] = None

override def tagExprForGpu(): Unit = {
expr.regexp match {
case Literal(null, _) =>
willNotWorkOnGpu(s"null pattern is not supported on GPU")
case Literal(s: UTF8String, DataTypes.StringType) =>
val pattern = s.toString
if (pattern.isEmpty) {
willNotWorkOnGpu(s"empty pattern is not supported on GPU")
}

case Literal(s: UTF8String, DataTypes.StringType) if s != null =>
if (GpuOverrides.isSupportedStringReplacePattern(expr.regexp)) {
// use GpuStringReplace
} else {
try {
new CudfRegexTranspiler(replace = true).transpile(pattern)
pattern = Some(new CudfRegexTranspiler(replace = true).transpile(s.toString))
} catch {
case e: RegexUnsupportedException =>
willNotWorkOnGpu(e.getMessage)
}
}

case _ =>
willNotWorkOnGpu(s"non-literal pattern is not supported on GPU")
willNotWorkOnGpu(s"only non-null literal strings are supported on GPU")
}
}

Expand All @@ -62,7 +57,8 @@ class GpuRegExpReplaceMeta(
if (GpuOverrides.isSupportedStringReplacePattern(expr.regexp)) {
GpuStringReplace(lhs, regexp, rep)
} else {
GpuRegExpReplace(lhs, regexp, rep)
GpuRegExpReplace(lhs, regexp, rep, pattern.getOrElse(
throw new IllegalStateException("Expression has not been tagged with cuDF regex pattern")))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,24 @@ class GpuRegExpReplaceMeta(
rule: DataFromReplacementRule)
extends QuaternaryExprMeta[RegExpReplace](expr, conf, parent, rule) {

private var pattern: Option[String] = None

override def tagExprForGpu(): Unit = {
expr.regexp match {
case Literal(null, _) =>
willNotWorkOnGpu(s"null pattern is not supported on GPU")
case Literal(s: UTF8String, DataTypes.StringType) =>
val pattern = s.toString
if (pattern.isEmpty) {
willNotWorkOnGpu(s"empty pattern is not supported on GPU")
}

case Literal(s: UTF8String, DataTypes.StringType) if s != null =>
if (GpuOverrides.isSupportedStringReplacePattern(expr.regexp)) {
// use GpuStringReplace
} else {
try {
new CudfRegexTranspiler(replace = true).transpile(pattern)
pattern = Some(new CudfRegexTranspiler(replace = true).transpile(s.toString))
} catch {
case e: RegexUnsupportedException =>
willNotWorkOnGpu(e.getMessage)
}
}

case _ =>
willNotWorkOnGpu(s"non-literal pattern is not supported on GPU")
willNotWorkOnGpu(s"only non-null literal strings are supported on GPU")
}

GpuOverrides.extractLit(expr.pos).foreach { lit =>
Expand All @@ -73,7 +68,8 @@ class GpuRegExpReplaceMeta(
if (GpuOverrides.isSupportedStringReplacePattern(expr.regexp)) {
GpuStringReplace(subject, regexp, rep)
} else {
GpuRegExpReplace(subject, regexp, rep)
GpuRegExpReplace(subject, regexp, rep, pattern.getOrElse(
throw new IllegalStateException("Expression has not been tagged with cuDF regex pattern")))
}
}
}
Loading

0 comments on commit c85a9a3

Please sign in to comment.