From 018060fd9c68c582ac8e971deae7f1f05fad420d Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 29 Oct 2020 21:19:42 -0700 Subject: [PATCH 1/9] Sanity checks for cudf jar mismatch Signed-off-by: Niranjan Artal --- build/build-info | 3 +- pom.xml | 1 + .../com/nvidia/spark/rapids/Plugin.scala | 33 +++++++++++++++++++ .../com/nvidia/spark/rapids/RapidsConf.scala | 10 ++++++ 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/build/build-info b/build/build-info index 057f137892c..a905661920b 100755 --- a/build/build-info +++ b/build/build-info @@ -22,6 +22,7 @@ echo_build_properties() { echo version=$1 + echo cudf_version=$2 echo user=$USER echo revision=$(git rev-parse HEAD) echo branch=$(git rev-parse --abbrev-ref HEAD) @@ -29,4 +30,4 @@ echo_build_properties() { echo url=$(git config --get remote.origin.url) } -echo_build_properties $1 +echo_build_properties $1 $2 diff --git a/pom.xml b/pom.xml index be6767b250e..21c6d28190a 100644 --- a/pom.xml +++ b/pom.xml @@ -538,6 +538,7 @@ + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 7f60f3d860b..5d66be0c39a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import java.util +import java.util.Properties import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.JavaConverters._ @@ -128,6 +129,38 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { ShimLoader.setSparkShimProviderClass(conf.shimsProviderOverride.get) } + // Compare if the cudf version mentioned in the classpath is equal to the version which plugin + // expects. If there is a version mismatch, throw error. This check can be disabled by + // setting this config spark.rapids.cudf-version-override=true + if (!conf.cudfVersionOverride) { + val props = new Properties + val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader + try { + props.load(cudfClassLoader.getResourceAsStream("cudf-java-version-info.properties")) + } catch { + case t: Throwable => { + logError("cudf properties file not found.", t) + } + } + val cudfVersion = props.get("version").toString + + val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader + try { + props.load(pluginClassLoader.getResourceAsStream("rapids4spark-version-info.properties")) + } catch { + case t: Throwable => { + logError("plugin properties file not found.", t) + } + } + val expectedCudfVersion = props.get("cudf_version").toString + + if (cudfVersion != expectedCudfVersion) { + logError("Cudf version in the classpath is different. Found " + cudfVersion + + ", Plugin expects " + expectedCudfVersion) + System.exit(1) + } + } + // we rely on the Rapids Plugin being run with 1 GPU per executor so we can initialize // on executor startup. if (!GpuDeviceManager.rmmTaskInitEnabled) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 9397284527b..3855f47444d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -759,6 +759,14 @@ object RapidsConf { .stringConf .createOptional + val CUDF_VERSION_OVERRIDE = conf("spark.rapids.cudf-version-override") + .internal() + .doc("Overrides the cudf version compatibility check between cudf jar and plugin jar. " + + "If you are sure that the cudf jar which is mentioned in the classpath is compatible with " + + "the plugin version, then set this to true.") + .booleanConf + .createWithDefault(false) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -1040,6 +1048,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shimsProviderOverride: Option[String] = get(SHIMS_PROVIDER_OVERRIDE) + lazy val cudfVersionOverride: Boolean = get(CUDF_VERSION_OVERRIDE) + lazy val getCloudSchemes: Option[Seq[String]] = get(CLOUD_SCHEMES) def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = { From 6071f98885712a33e389a5ec449dbe4358d2d563 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Sun, 1 Nov 2020 19:27:45 -0800 Subject: [PATCH 2/9] addressed review comments Signed-off-by: Niranjan Artal --- .../com/nvidia/spark/rapids/Plugin.scala | 43 +++++++++++-------- .../com/nvidia/spark/rapids/RapidsConf.scala | 2 +- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 5d66be0c39a..b3027745f72 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.util.QueryExecutionListener + case class ColumnarOverrideRules() extends ColumnarRule with Logging { val overrides: Rule[SparkPlan] = GpuOverrides() val overrideTransitions: Rule[SparkPlan] = new GpuTransitionOverrides() @@ -131,33 +132,41 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { // Compare if the cudf version mentioned in the classpath is equal to the version which plugin // expects. If there is a version mismatch, throw error. This check can be disabled by - // setting this config spark.rapids.cudf-version-override=true + // setting this config spark.rapids.cudfVersionOverride=true if (!conf.cudfVersionOverride) { val props = new Properties val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader - try { - props.load(cudfClassLoader.getResourceAsStream("cudf-java-version-info.properties")) - } catch { - case t: Throwable => { - logError("cudf properties file not found.", t) - } + val cudfProperties = cudfClassLoader. + getResourceAsStream("cudf-java-version-info.properties") + + if (cudfProperties == null) { + throw new RuntimeException("Could not find the properties file in the cudf jar." + + " Cannot verify if the cudf version is same as expected by plugin") + } + props.load(cudfProperties) + if (props.get("version") == null) { + throw new RuntimeException("Property name not found in " + + "cudf-java-version-info.properties file") } val cudfVersion = props.get("version").toString val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader - try { - props.load(pluginClassLoader.getResourceAsStream("rapids4spark-version-info.properties")) - } catch { - case t: Throwable => { - logError("plugin properties file not found.", t) - } + val pluginResource = pluginClassLoader. + getResourceAsStream("rapids4spark-version-info.properties") + if (pluginResource == null) { + throw new RuntimeException("Could not find the properties file in the plugin jar." + + " Cannot verify if the cudf version is same as expected by plugin") + } + props.load(pluginResource) + if (props.get("cudf_version") == null) { + throw new RuntimeException("Property name not found in " + + "rapids4spark-version-info.properties file") } val expectedCudfVersion = props.get("cudf_version").toString - if (cudfVersion != expectedCudfVersion) { - logError("Cudf version in the classpath is different. Found " + cudfVersion + - ", Plugin expects " + expectedCudfVersion) - System.exit(1) + if (!cudfVersion.equals(expectedCudfVersion)) { + throw new IllegalArgumentException("Cudf version in the classpath is different. Found " + + cudfVersion + ", Plugin expects " + expectedCudfVersion) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 3855f47444d..a66db978c72 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -759,7 +759,7 @@ object RapidsConf { .stringConf .createOptional - val CUDF_VERSION_OVERRIDE = conf("spark.rapids.cudf-version-override") + val CUDF_VERSION_OVERRIDE = conf("spark.rapids.cudfVersionOverride") .internal() .doc("Overrides the cudf version compatibility check between cudf jar and plugin jar. " + "If you are sure that the cudf jar which is mentioned in the classpath is compatible with " + From 67d7f6ced99c5981de790533649dbe9f1cb271f6 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 2 Nov 2020 10:59:18 -0800 Subject: [PATCH 3/9] addressed review comments Signed-off-by: Niranjan Artal --- .../com/nvidia/spark/rapids/Plugin.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index b3027745f72..eec730c8c07 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -126,6 +126,8 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { extraConf: util.Map[String, String]): Unit = { try { val conf = new RapidsConf(extraConf.asScala.toMap) + val cudfPropertiesFileName = "cudf-java-version-info.properties" + val pluginPropertiesFileName = "rapids4spark-version-info.properties" if (conf.shimsProviderOverride.isDefined) { ShimLoader.setSparkShimProviderClass(conf.shimsProviderOverride.get) } @@ -136,33 +138,33 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { if (!conf.cudfVersionOverride) { val props = new Properties val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader - val cudfProperties = cudfClassLoader. - getResourceAsStream("cudf-java-version-info.properties") + val cudfProperties = cudfClassLoader.getResourceAsStream(cudfPropertiesFileName) if (cudfProperties == null) { throw new RuntimeException("Could not find the properties file in the cudf jar." + " Cannot verify if the cudf version is same as expected by plugin") } props.load(cudfProperties) - if (props.get("version") == null) { - throw new RuntimeException("Property name not found in " + - "cudf-java-version-info.properties file") + val classpathCudfVersion = props.get("version") + if ( classpathCudfVersion == null) { + throw new RuntimeException("Property name `version` not found in " + + cudfPropertiesFileName + " file") } - val cudfVersion = props.get("version").toString + val cudfVersion = classpathCudfVersion.toString val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader - val pluginResource = pluginClassLoader. - getResourceAsStream("rapids4spark-version-info.properties") + val pluginResource = pluginClassLoader.getResourceAsStream(pluginPropertiesFileName) if (pluginResource == null) { - throw new RuntimeException("Could not find the properties file in the plugin jar." + - " Cannot verify if the cudf version is same as expected by plugin") + throw new RuntimeException("Could not find the properties file in the Rapids " + + "Accelerator jar. Cannot verify if the cudf version is same as expected by plugin") } props.load(pluginResource) - if (props.get("cudf_version") == null) { - throw new RuntimeException("Property name not found in " + - "rapids4spark-version-info.properties file") + val pluginCudfVersion = props.get("cudf_version") + if (pluginCudfVersion == null) { + throw new RuntimeException("Property name `cudf_version` not found in " + + pluginPropertiesFileName + " file") } - val expectedCudfVersion = props.get("cudf_version").toString + val expectedCudfVersion = pluginCudfVersion.toString if (!cudfVersion.equals(expectedCudfVersion)) { throw new IllegalArgumentException("Cudf version in the classpath is different. Found " + From 31a8e843263de27ce1044f83f444b27f5ff002e9 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 2 Nov 2020 12:05:40 -0800 Subject: [PATCH 4/9] log warnings if the config is set but versions mismatch Signed-off-by: Niranjan Artal --- .../com/nvidia/spark/rapids/Plugin.scala | 69 ++++++++++++------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index eec730c8c07..26d9c96bf92 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -135,40 +135,63 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { // Compare if the cudf version mentioned in the classpath is equal to the version which plugin // expects. If there is a version mismatch, throw error. This check can be disabled by // setting this config spark.rapids.cudfVersionOverride=true - if (!conf.cudfVersionOverride) { - val props = new Properties - val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader - val cudfProperties = cudfClassLoader.getResourceAsStream(cudfPropertiesFileName) - - if (cudfProperties == null) { - throw new RuntimeException("Could not find the properties file in the cudf jar." + + val props = new Properties + val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader + val cudfProperties = cudfClassLoader.getResourceAsStream(cudfPropertiesFileName) + if (cudfProperties == null) { + if (!conf.cudfVersionOverride) { + throw new RuntimeException("Could not find properties file in the cudf jar." + + " Cannot verify if the cudf version is same as expected by plugin") + } else { + logWarning("Could not find properties file in the cudf jar." + " Cannot verify if the cudf version is same as expected by plugin") } - props.load(cudfProperties) - val classpathCudfVersion = props.get("version") - if ( classpathCudfVersion == null) { + } + props.load(cudfProperties) + + val classpathCudfVersion = props.get("version") + if (classpathCudfVersion == null) { + if (!conf.cudfVersionOverride) { throw new RuntimeException("Property name `version` not found in " + cudfPropertiesFileName + " file") + } else { + logWarning("Property name `version` not found in " + cudfPropertiesFileName + " file") } - val cudfVersion = classpathCudfVersion.toString + } + val cudfVersion = classpathCudfVersion.toString - val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader - val pluginResource = pluginClassLoader.getResourceAsStream(pluginPropertiesFileName) - if (pluginResource == null) { - throw new RuntimeException("Could not find the properties file in the Rapids " + + val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader + val pluginResource = pluginClassLoader.getResourceAsStream(pluginPropertiesFileName) + if (pluginResource == null) { + if (!conf.cudfVersionOverride) { + throw new RuntimeException("Could not find properties file in the Rapids " + + "Accelerator jar. Cannot verify if the cudf version is same as expected by plugin") + } else { + logWarning("Could not find properties file in the Rapids " + "Accelerator jar. Cannot verify if the cudf version is same as expected by plugin") } - props.load(pluginResource) - val pluginCudfVersion = props.get("cudf_version") - if (pluginCudfVersion == null) { + } + props.load(pluginResource) + + val pluginCudfVersion = props.get("cudf_version") + if (pluginCudfVersion == null) { + if (!conf.cudfVersionOverride) { throw new RuntimeException("Property name `cudf_version` not found in " + pluginPropertiesFileName + " file") + } else { + logWarning("Property name `cudf_version` not found in " + + pluginPropertiesFileName + " file") } - val expectedCudfVersion = pluginCudfVersion.toString - - if (!cudfVersion.equals(expectedCudfVersion)) { - throw new IllegalArgumentException("Cudf version in the classpath is different. Found " + - cudfVersion + ", Plugin expects " + expectedCudfVersion) + } + val expectedCudfVersion = pluginCudfVersion.toString + // compare cudf version in the classpath with the cudf version expected by plugin + if (!cudfVersion.equals(expectedCudfVersion)) { + if (!conf.cudfVersionOverride) { + throw new IllegalArgumentException("Cudf version in the classpath is different. " + + "Found " + cudfVersion + ", Plugin expects " + expectedCudfVersion) + } else { + logWarning("Cudf version in the classpath is different. " + + "Found " + cudfVersion + ", Plugin expects " + expectedCudfVersion) } } From 983f857cf1c10c8b5bb637a631695108082c6888 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 2 Nov 2020 16:29:39 -0800 Subject: [PATCH 5/9] addressed review comments Signed-off-by: Niranjan Artal --- .../com/nvidia/spark/rapids/Plugin.scala | 38 +++++++++---------- .../com/nvidia/spark/rapids/RapidsConf.scala | 6 +-- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 26d9c96bf92..d55b3a335a2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -139,23 +139,23 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader val cudfProperties = cudfClassLoader.getResourceAsStream(cudfPropertiesFileName) if (cudfProperties == null) { + val errorMsg = s"Could not find properties file $cudfPropertiesFileName in " + + "the cudf jar. Cannot verify cudf version compatibility with RAPIDS Accelerator version." if (!conf.cudfVersionOverride) { - throw new RuntimeException("Could not find properties file in the cudf jar." + - " Cannot verify if the cudf version is same as expected by plugin") + throw new RuntimeException(errorMsg) } else { - logWarning("Could not find properties file in the cudf jar." + - " Cannot verify if the cudf version is same as expected by plugin") + logWarning(errorMsg) } } props.load(cudfProperties) val classpathCudfVersion = props.get("version") if (classpathCudfVersion == null) { + val errorMsg = s"Property name `version` not found in $cudfPropertiesFileName file" if (!conf.cudfVersionOverride) { - throw new RuntimeException("Property name `version` not found in " + - cudfPropertiesFileName + " file") + throw new RuntimeException(errorMsg) } else { - logWarning("Property name `version` not found in " + cudfPropertiesFileName + " file") + logWarning(errorMsg) } } val cudfVersion = classpathCudfVersion.toString @@ -163,35 +163,35 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader val pluginResource = pluginClassLoader.getResourceAsStream(pluginPropertiesFileName) if (pluginResource == null) { + val errMsg = s"Could not find properties file $pluginPropertiesFileName in the RAPIDS " + + "Accelerator jar. Cannot verify cudf version compatibility with RAPIDS Accelerator " + + "version." if (!conf.cudfVersionOverride) { - throw new RuntimeException("Could not find properties file in the Rapids " + - "Accelerator jar. Cannot verify if the cudf version is same as expected by plugin") + throw new RuntimeException(errMsg) } else { - logWarning("Could not find properties file in the Rapids " + - "Accelerator jar. Cannot verify if the cudf version is same as expected by plugin") + logWarning(errMsg) } } props.load(pluginResource) val pluginCudfVersion = props.get("cudf_version") if (pluginCudfVersion == null) { + val errorMsg = s"Property name `cudf_version` not found in $pluginPropertiesFileName file" if (!conf.cudfVersionOverride) { - throw new RuntimeException("Property name `cudf_version` not found in " - + pluginPropertiesFileName + " file") + throw new RuntimeException(errorMsg) } else { - logWarning("Property name `cudf_version` not found in " + - pluginPropertiesFileName + " file") + logWarning(errorMsg) } } val expectedCudfVersion = pluginCudfVersion.toString // compare cudf version in the classpath with the cudf version expected by plugin if (!cudfVersion.equals(expectedCudfVersion)) { + val errorMsg = s"Cudf version in the classpath is different. Found $cudfVersion RAPIDS " + + s"Accelerator expects $expectedCudfVersion " if (!conf.cudfVersionOverride) { - throw new IllegalArgumentException("Cudf version in the classpath is different. " + - "Found " + cudfVersion + ", Plugin expects " + expectedCudfVersion) + throw new IllegalArgumentException(errorMsg) } else { - logWarning("Cudf version in the classpath is different. " + - "Found " + cudfVersion + ", Plugin expects " + expectedCudfVersion) + logWarning(errorMsg) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index a66db978c72..e8e6bab0f88 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -761,9 +761,9 @@ object RapidsConf { val CUDF_VERSION_OVERRIDE = conf("spark.rapids.cudfVersionOverride") .internal() - .doc("Overrides the cudf version compatibility check between cudf jar and plugin jar. " + - "If you are sure that the cudf jar which is mentioned in the classpath is compatible with " + - "the plugin version, then set this to true.") + .doc("Overrides the cudf version compatibility check between cudf jar and RAPIDS Accelerator " + + "jar. If you are sure that the cudf jar which is mentioned in the classpath is compatible " + + "with the RAPIDS Accelerator version, then set this to true.") .booleanConf .createWithDefault(false) From e5b2cb8a40012cd1036efa03c8e9cc2ba74dfc2c Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Nov 2020 00:53:52 -0800 Subject: [PATCH 6/9] addressed review comments Signed-off-by: Niranjan Artal --- .../com/nvidia/spark/rapids/Plugin.scala | 123 +++++++++--------- 1 file changed, 65 insertions(+), 58 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index d55b3a335a2..b60b1fcc627 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -126,73 +126,78 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { extraConf: util.Map[String, String]): Unit = { try { val conf = new RapidsConf(extraConf.asScala.toMap) - val cudfPropertiesFileName = "cudf-java-version-info.properties" - val pluginPropertiesFileName = "rapids4spark-version-info.properties" if (conf.shimsProviderOverride.isDefined) { ShimLoader.setSparkShimProviderClass(conf.shimsProviderOverride.get) } - // Compare if the cudf version mentioned in the classpath is equal to the version which plugin - // expects. If there is a version mismatch, throw error. This check can be disabled by - // setting this config spark.rapids.cudfVersionOverride=true - val props = new Properties - val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader - val cudfProperties = cudfClassLoader.getResourceAsStream(cudfPropertiesFileName) - if (cudfProperties == null) { - val errorMsg = s"Could not find properties file $cudfPropertiesFileName in " + - "the cudf jar. Cannot verify cudf version compatibility with RAPIDS Accelerator version." - if (!conf.cudfVersionOverride) { - throw new RuntimeException(errorMsg) - } else { - logWarning(errorMsg) + try { + // Compare if the cudf version mentioned in the classpath is equal to the version which + // plugin expects. If there is a version mismatch, throw error. This check can be disabled + // by setting this config spark.rapids.cudfVersionOverride=true + val cudfPropertiesFileName = "cudf-java-version-info.properties" + val pluginPropertiesFileName = "rapids4spark-version-info.properties" + + val props = new Properties + val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader + val cudfProperties = cudfClassLoader.getResourceAsStream(cudfPropertiesFileName) + if (cudfProperties == null) { + val errorMsg = s"Could not find properties file $cudfPropertiesFileName in the cudf " + + "jar. Cannot verify cudf version compatibility with RAPIDS Accelerator version." + if (!conf.cudfVersionOverride) { + throw new RuntimeException(errorMsg) + } else { + throw CudfVersionMismatchException(errorMsg) + } } - } - props.load(cudfProperties) - - val classpathCudfVersion = props.get("version") - if (classpathCudfVersion == null) { - val errorMsg = s"Property name `version` not found in $cudfPropertiesFileName file" - if (!conf.cudfVersionOverride) { - throw new RuntimeException(errorMsg) - } else { - logWarning(errorMsg) + props.load(cudfProperties) + + val classpathCudfVersion = props.get("version") + if (classpathCudfVersion == null) { + val errorMsg = s"Property name `version` not found in $cudfPropertiesFileName file" + if (!conf.cudfVersionOverride) { + throw new RuntimeException(errorMsg) + } else { + throw CudfVersionMismatchException(errorMsg) + } } - } - val cudfVersion = classpathCudfVersion.toString - - val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader - val pluginResource = pluginClassLoader.getResourceAsStream(pluginPropertiesFileName) - if (pluginResource == null) { - val errMsg = s"Could not find properties file $pluginPropertiesFileName in the RAPIDS " + - "Accelerator jar. Cannot verify cudf version compatibility with RAPIDS Accelerator " + - "version." - if (!conf.cudfVersionOverride) { - throw new RuntimeException(errMsg) - } else { - logWarning(errMsg) + val cudfVersion = classpathCudfVersion.toString + + val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader + val pluginResource = pluginClassLoader.getResourceAsStream(pluginPropertiesFileName) + if (pluginResource == null) { + val errorMsg = s"Could not find properties file $pluginPropertiesFileName in the " + + "RAPIDS Accelerator jar. Cannot verify cudf version compatibility with RAPIDS " + + "Accelerator version." + if (!conf.cudfVersionOverride) { + throw new RuntimeException(errorMsg) + } else { + throw CudfVersionMismatchException(errorMsg) + } } - } - props.load(pluginResource) - - val pluginCudfVersion = props.get("cudf_version") - if (pluginCudfVersion == null) { - val errorMsg = s"Property name `cudf_version` not found in $pluginPropertiesFileName file" - if (!conf.cudfVersionOverride) { - throw new RuntimeException(errorMsg) - } else { - logWarning(errorMsg) + props.load(pluginResource) + + val pluginCudfVersion = props.get("cudf_version") + if (pluginCudfVersion == null) { + val errorMsg = s"Property name `cudf_version` not found in $pluginPropertiesFileName file" + if (!conf.cudfVersionOverride) { + throw new RuntimeException(errorMsg) + } else { + throw CudfVersionMismatchException(errorMsg) + } } - } - val expectedCudfVersion = pluginCudfVersion.toString - // compare cudf version in the classpath with the cudf version expected by plugin - if (!cudfVersion.equals(expectedCudfVersion)) { - val errorMsg = s"Cudf version in the classpath is different. Found $cudfVersion RAPIDS " + - s"Accelerator expects $expectedCudfVersion " - if (!conf.cudfVersionOverride) { - throw new IllegalArgumentException(errorMsg) - } else { - logWarning(errorMsg) + val expectedCudfVersion = pluginCudfVersion.toString + // compare cudf version in the classpath with the cudf version expected by plugin + if (!cudfVersion.equals(expectedCudfVersion)) { + val errorMsg = s"Cudf version in the classpath is different. Found $cudfVersion, " + + s"RAPIDS Accelerator expects $expectedCudfVersion " + if (!conf.cudfVersionOverride) { + throw new IllegalArgumentException(errorMsg) + } else { + throw CudfVersionMismatchException(errorMsg) + } } + } catch { + case x: CudfVersionMismatchException => logWarning(s"${x.errorMsg}") } // we rely on the Rapids Plugin being run with 1 GPU per executor so we can initialize @@ -213,6 +218,8 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } } + case class CudfVersionMismatchException(errorMsg: String) extends Exception + override def shutdown(): Unit = { GpuSemaphore.shutdown() PythonWorkerSemaphore.shutdown() From 8ea5138525a2d85868b414afeff61d162e1d7bfb Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Nov 2020 11:17:51 -0800 Subject: [PATCH 7/9] refactored code and addressed review comments Signed-off-by: Niranjan Artal --- .../com/nvidia/spark/rapids/Plugin.scala | 130 +++++++----------- 1 file changed, 53 insertions(+), 77 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index b60b1fcc627..cc3487f67f9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -130,82 +130,10 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { ShimLoader.setSparkShimProviderClass(conf.shimsProviderOverride.get) } - try { - // Compare if the cudf version mentioned in the classpath is equal to the version which - // plugin expects. If there is a version mismatch, throw error. This check can be disabled - // by setting this config spark.rapids.cudfVersionOverride=true - val cudfPropertiesFileName = "cudf-java-version-info.properties" - val pluginPropertiesFileName = "rapids4spark-version-info.properties" - - val props = new Properties - val cudfClassLoader = classOf[ai.rapids.cudf.ColumnVector].getClassLoader - val cudfProperties = cudfClassLoader.getResourceAsStream(cudfPropertiesFileName) - if (cudfProperties == null) { - val errorMsg = s"Could not find properties file $cudfPropertiesFileName in the cudf " + - "jar. Cannot verify cudf version compatibility with RAPIDS Accelerator version." - if (!conf.cudfVersionOverride) { - throw new RuntimeException(errorMsg) - } else { - throw CudfVersionMismatchException(errorMsg) - } - } - props.load(cudfProperties) - - val classpathCudfVersion = props.get("version") - if (classpathCudfVersion == null) { - val errorMsg = s"Property name `version` not found in $cudfPropertiesFileName file" - if (!conf.cudfVersionOverride) { - throw new RuntimeException(errorMsg) - } else { - throw CudfVersionMismatchException(errorMsg) - } - } - val cudfVersion = classpathCudfVersion.toString - - val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader - val pluginResource = pluginClassLoader.getResourceAsStream(pluginPropertiesFileName) - if (pluginResource == null) { - val errorMsg = s"Could not find properties file $pluginPropertiesFileName in the " + - "RAPIDS Accelerator jar. Cannot verify cudf version compatibility with RAPIDS " + - "Accelerator version." - if (!conf.cudfVersionOverride) { - throw new RuntimeException(errorMsg) - } else { - throw CudfVersionMismatchException(errorMsg) - } - } - props.load(pluginResource) - - val pluginCudfVersion = props.get("cudf_version") - if (pluginCudfVersion == null) { - val errorMsg = s"Property name `cudf_version` not found in $pluginPropertiesFileName file" - if (!conf.cudfVersionOverride) { - throw new RuntimeException(errorMsg) - } else { - throw CudfVersionMismatchException(errorMsg) - } - } - val expectedCudfVersion = pluginCudfVersion.toString - // compare cudf version in the classpath with the cudf version expected by plugin - if (!cudfVersion.equals(expectedCudfVersion)) { - val errorMsg = s"Cudf version in the classpath is different. Found $cudfVersion, " + - s"RAPIDS Accelerator expects $expectedCudfVersion " - if (!conf.cudfVersionOverride) { - throw new IllegalArgumentException(errorMsg) - } else { - throw CudfVersionMismatchException(errorMsg) - } - } - } catch { - case x: CudfVersionMismatchException => logWarning(s"${x.errorMsg}") - } - - // we rely on the Rapids Plugin being run with 1 GPU per executor so we can initialize - // on executor startup. - if (!GpuDeviceManager.rmmTaskInitEnabled) { - logInfo("Initializing memory from Executor Plugin") - GpuDeviceManager.initializeGpuAndMemory(pluginContext.resources().asScala.toMap) - } + // Compare if the cudf version mentioned in the classpath is equal to the version which + // plugin expects. If there is a version mismatch, throw error. This check can be disabled + // by setting this config spark.rapids.cudfVersionOverride=true + checkCudfVersion(conf) GpuSemaphore.initialize(conf.concurrentGpuTasks) } catch { @@ -218,7 +146,55 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } } - case class CudfVersionMismatchException(errorMsg: String) extends Exception + private def checkCudfVersion(conf: RapidsConf): Unit = { + try { + val cudfPropertiesFileName = "cudf-java-version-info.properties" + val pluginPropertiesFileName = "rapids4spark-version-info.properties" + + val props = new Properties + val classLoader = classOf[RapidsExecutorPlugin].getClassLoader + val cudfProperties = classLoader.getResourceAsStream(cudfPropertiesFileName) + if (cudfProperties == null) { + throw CudfVersionMismatchException(s"Could not find properties file " + + s"$cudfPropertiesFileName in the cudf jar. Cannot verify cudf version compatibility " + + s"with RAPIDS Accelerator version.") + } + props.load(cudfProperties) + + val classpathCudfVersion = props.get("version") + if (classpathCudfVersion == null) { + throw CudfVersionMismatchException(s"Property name `version` not found in " + + s"$cudfPropertiesFileName file.") + } + val cudfVersion = classpathCudfVersion.toString + + //val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader + val pluginResource = classLoader.getResourceAsStream(pluginPropertiesFileName) + if (pluginResource == null) { + throw CudfVersionMismatchException(s"Could not find properties file " + + s"$pluginPropertiesFileName in the RAPIDS Accelerator jar. Cannot verify cudf " + + s"version compatibility with RAPIDS Accelerator version.") + } + props.load(pluginResource) + + val pluginCudfVersion = props.get("cudf_version") + if (pluginCudfVersion == null) { + throw CudfVersionMismatchException(s"Property name `cudf_version` not found in" + + s"$pluginPropertiesFileName file.") + } + val expectedCudfVersion = pluginCudfVersion.toString + // compare cudf version in the classpath with the cudf version expected by plugin + if (!cudfVersion.equals(expectedCudfVersion)) { + throw CudfVersionMismatchException(s"Cudf version in the classpath is different. " + + s"Found $cudfVersion, RAPIDS Accelerator expects $expectedCudfVersion") + } + } catch { + case x: CudfVersionMismatchException if conf.cudfVersionOverride => + logWarning(s"${x.errorMsg}") + } + } + + case class CudfVersionMismatchException(errorMsg: String) extends RuntimeException(errorMsg) override def shutdown(): Unit = { GpuSemaphore.shutdown() From e084e1f91a8318898f2f3e15a5d3260dbb2685f1 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Nov 2020 11:25:50 -0800 Subject: [PATCH 8/9] remove unwanted comment Signed-off-by: Niranjan Artal --- sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index cc3487f67f9..029ea6e6aaf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -168,7 +168,6 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } val cudfVersion = classpathCudfVersion.toString - //val pluginClassLoader = classOf[com.nvidia.spark.SQLPlugin].getClassLoader val pluginResource = classLoader.getResourceAsStream(pluginPropertiesFileName) if (pluginResource == null) { throw CudfVersionMismatchException(s"Could not find properties file " + From c1c2259de574be034b12c793cfebe623e7f62293 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 3 Nov 2020 11:30:23 -0800 Subject: [PATCH 9/9] addressed review comments Signed-off-by: Niranjan Artal --- .../src/main/scala/com/nvidia/spark/rapids/Plugin.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 029ea6e6aaf..337d9dc2bb5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -135,6 +135,13 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { // by setting this config spark.rapids.cudfVersionOverride=true checkCudfVersion(conf) + // we rely on the Rapids Plugin being run with 1 GPU per executor so we can initialize + // on executor startup. + if (!GpuDeviceManager.rmmTaskInitEnabled) { + logInfo("Initializing memory from Executor Plugin") + GpuDeviceManager.initializeGpuAndMemory(pluginContext.resources().asScala.toMap) + } + GpuSemaphore.initialize(conf.concurrentGpuTasks) } catch { case e: Throwable => @@ -179,7 +186,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { val pluginCudfVersion = props.get("cudf_version") if (pluginCudfVersion == null) { throw CudfVersionMismatchException(s"Property name `cudf_version` not found in" + - s"$pluginPropertiesFileName file.") + s" $pluginPropertiesFileName file.") } val expectedCudfVersion = pluginCudfVersion.toString // compare cudf version in the classpath with the cudf version expected by plugin