Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log plugin and cudf versions on startup #2132

Merged
merged 3 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ CUDA 10.1, 10.2 and 11.0 are currently supported, but you need to download the c
corresponds to the version you are using. Please look [here](download.md) for download
links for the latest release.

### How can I check if the RAPIDS Accelerator is installed and which version is running?

On startup the RAPIDS Accelerator will log a warning message on the Spark driver showing the
version with a message that looks something like this:
```
21/04/14 22:14:55 WARN SQLExecPlugin: RAPIDS Accelerator 0.5.0 using cudf 0.19. To disable GPU support set `spark.rapids.sql.enabled` to false
```

The full RAPIDS Accelerator and cudf build properties are logged at `INFO` level in the
Spark driver and executor logs with messages that are similar to the following:
```
21/04/14 17:20:20 INFO RapidsExecutorPlugin: RAPIDS Accelerator build: {version=0.5.0-SNAPSHOT, user=jlowe, url=, date=2021-04-14T22:12:14Z, revision=79a5cf8acd615587b2c7835072b0d8b0d4604f8b, cudf_version=0.19-SNAPSHOT, branch=branch-0.5}
21/04/14 17:20:20 INFO RapidsExecutorPlugin: cudf build: {version=0.19-SNAPSHOT, user=, date=2021-04-13T08:42:40Z, revision=a5d2407b93de444a6a7faf9db4b7dbf4ecbfe9ed, branch=HEAD}
```

### What is the right hardware setup to run GPU accelerated Spark?

Reference architectures should be available around Q1 2021.
Expand Down
104 changes: 53 additions & 51 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.util.QueryExecutionListener

class PluginException(msg: String) extends RuntimeException(msg)

case class CudfVersionMismatchException(errorMsg: String) extends PluginException(errorMsg)

case class ColumnarOverrideRules() extends ColumnarRule with Logging {
val overrides: Rule[SparkPlan] = GpuOverrides()
val overrideTransitions: Rule[SparkPlan] = new GpuTransitionOverrides()
Expand All @@ -52,22 +56,31 @@ case class ColumnarOverrideRules() extends ColumnarRule with Logging {
*/
class SQLExecPlugin extends (SparkSessionExtensions => Unit) with Logging {
override def apply(extensions: SparkSessionExtensions): Unit = {
logWarning("Installing extensions to enable rapids GPU SQL support." +
val pluginProps = RapidsPluginUtils.loadProps(RapidsPluginUtils.PLUGIN_PROPS_FILENAME)
logInfo(s"RAPIDS Accelerator build: $pluginProps")
val cudfProps = RapidsPluginUtils.loadProps(RapidsPluginUtils.CUDF_PROPS_FILENAME)
logInfo(s"cudf build: $pluginProps")
abellina marked this conversation as resolved.
Show resolved Hide resolved
val pluginVersion = pluginProps.getProperty("version", "UNKNOWN")
val cudfVersion = cudfProps.getProperty("version", "UNKNOWN")
logWarning(s"RAPIDS Accelerator $pluginVersion using cudf $cudfVersion." +
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
s" To disable GPU support set `${RapidsConf.SQL_ENABLED}` to false")
extensions.injectColumnar(_ => ColumnarOverrideRules())
ShimLoader.getSparkShims.injectQueryStagePrepRule(extensions, _ => GpuQueryStagePrepOverrides())
}
}

object RapidsPluginUtils extends Logging {
val CUDF_PROPS_FILENAME = "cudf-java-version-info.properties"
val PLUGIN_PROPS_FILENAME = "rapids4spark-version-info.properties"

private val SQL_PLUGIN_NAME = classOf[SQLExecPlugin].getName
private val UDF_PLUGIN_NAME = "com.nvidia.spark.udf.Plugin"
private val SQL_PLUGIN_CONF_KEY = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
private val SERIALIZER_CONF_KEY = "spark.serializer"
private val JAVA_SERIALIZER_NAME = classOf[JavaSerializer].getName
private val KRYO_SERIALIZER_NAME = classOf[KryoSerializer].getName
private val KRYO_REGISRATOR_KEY = "spark.kryo.registrator"
private val KRYO_REGISRATOR_NAME = classOf[GpuKryoRegistrator].getName
private val KRYO_REGISTRATOR_KEY = "spark.kryo.registrator"
private val KRYO_REGISTRATOR_NAME = classOf[GpuKryoRegistrator].getName

def fixupConfigs(conf: SparkConf): Unit = {
// First add in the SQL executor plugin because that is what we need at a minimum
Expand All @@ -86,23 +99,35 @@ object RapidsPluginUtils extends Logging {

val serializer = conf.get(SERIALIZER_CONF_KEY, JAVA_SERIALIZER_NAME)
if (KRYO_SERIALIZER_NAME.equals(serializer)) {
if (conf.contains(KRYO_REGISRATOR_KEY)) {
if (!KRYO_REGISRATOR_NAME.equals(conf.get(KRYO_REGISRATOR_KEY)) ) {
logWarning("Rapids SQL Plugin when used with Kryo needs to register some " +
s"serializers using $KRYO_REGISRATOR_NAME. Please call it from your registrator " +
" to let the plugin work properly.")
if (conf.contains(KRYO_REGISTRATOR_KEY)) {
if (!KRYO_REGISTRATOR_NAME.equals(conf.get(KRYO_REGISTRATOR_KEY)) ) {
logWarning("The RAPIDS Accelerator when used with Kryo needs to register some " +
s"serializers using $KRYO_REGISTRATOR_NAME. Please call it from your registrator " +
" to let the plugin work properly.")
} // else it is set and we are good to go
} else {
// We cannot set the kryo key here, it is not early enough to be picked up everywhere
throw new UnsupportedOperationException("The Rapids SQL Plugin when used with Kryo needs " +
s"to register some serializers. Please set the spark config $KRYO_REGISRATOR_KEY to " +
s"$KRYO_REGISRATOR_NAME or some operations may not work properly.")
throw new UnsupportedOperationException("The RAPIDS Accelerator when used with Kryo " +
"needs to register some serializers. Please set the spark config " +
s"$KRYO_REGISTRATOR_KEY to $KRYO_REGISTRATOR_NAME or some operations may not work " +
"properly.")
}
} else if (!JAVA_SERIALIZER_NAME.equals(serializer)) {
throw new UnsupportedOperationException(s"$serializer is not a supported serializer for " +
s"the Rapids SQL Plugin. Please disable the rapids plugin or use a supported serializer " +
s"serializer ($JAVA_SERIALIZER_NAME, $KRYO_SERIALIZER_NAME).")
s"the RAPIDS Accelerator. Please disable the RAPIDS Accelerator or use a supported " +
s"serializer ($JAVA_SERIALIZER_NAME, $KRYO_SERIALIZER_NAME).")
}
}

def loadProps(resourceName: String): Properties = {
val classLoader = RapidsPluginUtils.getClass.getClassLoader
val resource = classLoader.getResourceAsStream(resourceName)
if (resource == null) {
throw new PluginException(s"Could not find properties file $resourceName in the classpath")
}
val props = new Properties
props.load(resource)
props
}
}

Expand Down Expand Up @@ -188,53 +213,30 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {

private def checkCudfVersion(conf: RapidsConf): Unit = {
try {
val cudfPropertiesFileName = "cudf-java-version-info.properties"
val pluginPropertiesFileName = "rapids4spark-version-info.properties"

val props = new Properties
val classLoader = classOf[RapidsExecutorPlugin].getClassLoader
val cudfProperties = classLoader.getResourceAsStream(cudfPropertiesFileName)
if (cudfProperties == null) {
throw CudfVersionMismatchException(s"Could not find properties file " +
s"$cudfPropertiesFileName in the cudf jar. Cannot verify cudf version compatibility " +
s"with RAPIDS Accelerator version.")
val pluginProps = RapidsPluginUtils.loadProps(RapidsPluginUtils.PLUGIN_PROPS_FILENAME)
logInfo(s"RAPIDS Accelerator build: $pluginProps")
val expectedCudfVersion = Option(pluginProps.getProperty("cudf_version")).getOrElse {
throw CudfVersionMismatchException("Could not find cudf version in " +
RapidsPluginUtils.PLUGIN_PROPS_FILENAME)
}
props.load(cudfProperties)

val classpathCudfVersion = props.get("version")
if (classpathCudfVersion == null) {
throw CudfVersionMismatchException(s"Property name `version` not found in " +
s"$cudfPropertiesFileName file.")
}
val cudfVersion = classpathCudfVersion.toString

val pluginResource = classLoader.getResourceAsStream(pluginPropertiesFileName)
if (pluginResource == null) {
throw CudfVersionMismatchException(s"Could not find properties file " +
s"$pluginPropertiesFileName in the RAPIDS Accelerator jar. Cannot verify cudf " +
s"version compatibility with RAPIDS Accelerator version.")
val cudfProps = RapidsPluginUtils.loadProps(RapidsPluginUtils.CUDF_PROPS_FILENAME)
logInfo(s"cudf build: $cudfProps")
val cudfVersion = Option(cudfProps.getProperty("version")).getOrElse {
throw CudfVersionMismatchException("Could not find cudf version in " +
RapidsPluginUtils.CUDF_PROPS_FILENAME)
}
props.load(pluginResource)

val pluginCudfVersion = props.get("cudf_version")
if (pluginCudfVersion == null) {
throw CudfVersionMismatchException(s"Property name `cudf_version` not found in" +
s" $pluginPropertiesFileName file.")
}
val expectedCudfVersion = pluginCudfVersion.toString
// compare cudf version in the classpath with the cudf version expected by plugin
if (!RapidsExecutorPlugin.cudfVersionSatisfied(expectedCudfVersion, cudfVersion)) {
throw CudfVersionMismatchException(s"Cudf version in the classpath is different. " +
s"Found $cudfVersion, RAPIDS Accelerator expects $expectedCudfVersion")
throw CudfVersionMismatchException(s"Found cudf version $cudfVersion, RAPIDS Accelerator " +
s"expects $expectedCudfVersion")
}
} catch {
case x: CudfVersionMismatchException if conf.cudfVersionOverride =>
logWarning(s"${x.errorMsg}")
case x: PluginException if conf.cudfVersionOverride =>
logWarning(s"Ignoring error due to ${RapidsConf.CUDF_VERSION_OVERRIDE.key}=true: " +
s"${x.getMessage}")
}
}

case class CudfVersionMismatchException(errorMsg: String) extends RuntimeException(errorMsg)

override def shutdown(): Unit = {
GpuSemaphore.shutdown()
PythonWorkerSemaphore.shutdown()
Expand Down