diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 17ceb5f1887c6..aae340953c5b2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1929,7 +1929,7 @@ class SparkContext(config: SparkConf) extends Logging { } private def addJar(path: String, addedOnSubmit: Boolean): Unit = { - def addLocalJarFile(file: File): String = { + def addLocalJarFile(file: File): Seq[String] = { try { if (!file.exists()) { throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found") @@ -1938,15 +1938,15 @@ class SparkContext(config: SparkConf) extends Logging { throw new IllegalArgumentException( s"Directory ${file.getAbsoluteFile} is not allowed for addJar") } - env.rpcEnv.fileServer.addJar(file) + Seq(env.rpcEnv.fileServer.addJar(file)) } catch { case NonFatal(e) => logError(s"Failed to add $path to Spark environment", e) - null + Nil } } - def checkRemoteJarFile(path: String): String = { + def checkRemoteJarFile(path: String): Seq[String] = { val hadoopPath = new Path(path) val scheme = hadoopPath.toUri.getScheme if (!Array("http", "https", "ftp").contains(scheme)) { @@ -1959,28 +1959,29 @@ class SparkContext(config: SparkConf) extends Logging { throw new IllegalArgumentException( s"Directory ${path} is not allowed for addJar") } - path + Seq(path) } catch { case NonFatal(e) => logError(s"Failed to add $path to Spark environment", e) - null + Nil } } else { - path + Seq(path) } } if (path == null || path.isEmpty) { logWarning("null or empty path specified as parameter to addJar") } else { - val key = if (path.contains("\\") && Utils.isWindows) { + val (keys, scheme) = if (path.contains("\\") && Utils.isWindows) { // For local paths with backslashes on Windows, URI throws an exception - addLocalJarFile(new File(path)) + (addLocalJarFile(new File(path)), "local") } else { val uri = new Path(path).toUri // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies Utils.validateURL(uri) - uri.getScheme match { + val uriScheme = uri.getScheme + val jarPaths = uriScheme match { // A JAR file which exists only on the driver node case null => // SPARK-22585 path without schema is not url encoded @@ -1988,18 +1989,28 @@ class SparkContext(config: SparkConf) extends Logging { // A JAR file which exists only on the driver node case "file" => addLocalJarFile(new File(uri.getPath)) // A JAR file which exists locally on every worker node - case "local" => "file:" + uri.getPath + case "local" => Seq("file:" + uri.getPath) + case "ivy" => + // Since `new Path(path).toUri` will lose query information, + // so here we use `URI.create(path)` + DependencyUtils.resolveMavenDependencies(URI.create(path)) + .flatMap(jar => addLocalJarFile(new File(jar))) case _ => checkRemoteJarFile(path) } + (jarPaths, uriScheme) } - if (key != null) { + if (keys.nonEmpty) { val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis - if (addedJars.putIfAbsent(key, timestamp).isEmpty) { - logInfo(s"Added JAR $path at $key with timestamp $timestamp") + val (added, existed) = keys.partition(addedJars.putIfAbsent(_, timestamp).isEmpty) + if (added.nonEmpty) { + val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI" + logInfo(s"Added $jarMessage $path at ${added.mkString(",")} with timestamp $timestamp") postEnvironmentUpdate() - } else { - logWarning(s"The jar $path has been added already. Overwriting of added jars " + - "is not supported in the current version.") + } + if (existed.nonEmpty) { + val jarMessage = if (scheme != "ivy") "JAR" else "dependency jars of Ivy URI" + logInfo(s"The $jarMessage $path at ${existed.mkString(",")} has been added already." + + " Overwriting of added jar is not supported in the current version.") } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index bb3a20dce2da4..ad95b18ecaeb0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -304,8 +304,8 @@ private[spark] class SparkSubmit extends Logging { // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( - args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, - args.ivySettingsPath) + packagesTransitive = true, args.packagesExclusions, args.packages, + args.repositories, args.ivyRepoPath, args.ivySettingsPath) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { // In K8s client mode, when in the driver, add resolved jars early as we might need @@ -1360,6 +1360,7 @@ private[spark] object SparkSubmitUtils { * Resolves any dependencies that were supplied through maven coordinates * @param coordinates Comma-delimited string of maven coordinates * @param ivySettings An IvySettings containing resolvers to use + * @param transitive Whether resolving transitive dependencies, default is true * @param exclusions Exclusions to apply when resolving transitive dependencies * @return The comma-delimited path to the jars of the given maven artifacts including their * transitive dependencies @@ -1367,6 +1368,7 @@ private[spark] object SparkSubmitUtils { def resolveMavenCoordinates( coordinates: String, ivySettings: IvySettings, + transitive: Boolean, exclusions: Seq[String] = Nil, isTest: Boolean = false): String = { if (coordinates == null || coordinates.trim.isEmpty) { @@ -1396,7 +1398,7 @@ private[spark] object SparkSubmitUtils { val ivy = Ivy.newInstance(ivySettings) // Set resolve options to download transitive dependencies as well val resolveOptions = new ResolveOptions - resolveOptions.setTransitive(true) + resolveOptions.setTransitive(transitive) val retrieveOptions = new RetrieveOptions // Turn downloading and logging off for testing if (isTest) { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 45ffdde58d6c3..c1288d64c53f7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.commons.lang3.StringUtils import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util._ @@ -79,17 +79,11 @@ object DriverWrapper extends Logging { val secMgr = new SecurityManager(sparkConf) val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) - val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) = - Seq( - "spark.jars.excludes", - "spark.jars.packages", - "spark.jars.repositories", - "spark.jars.ivy", - "spark.jars.ivySettings" - ).map(sys.props.get(_).orNull) + val ivyProperties = DependencyUtils.getIvyProperties() - val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions, - packages, repositories, ivyRepoPath, Option(ivySettingsPath)) + val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(true, + ivyProperties.packagesExclusions, ivyProperties.packages, ivyProperties.repositories, + ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath)) val jars = { val jarsProp = sys.props.get(config.JARS.key).orNull if (!StringUtils.isBlank(resolvedMavenCoordinates)) { diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala similarity index 54% rename from core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala rename to core/src/main/scala/org/apache/spark/util/DependencyUtils.scala index 5a17a6b6e169c..9956ccedf5842 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy +package org.apache.spark.util import java.io.File import java.net.URI @@ -25,12 +25,140 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging -import org.apache.spark.util.{MutableURLClassLoader, Utils} -private[deploy] object DependencyUtils extends Logging { +case class IvyProperties( + packagesExclusions: String, + packages: String, + repositories: String, + ivyRepoPath: String, + ivySettingsPath: String) + +private[spark] object DependencyUtils extends Logging { + + def getIvyProperties(): IvyProperties = { + val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) = Seq( + "spark.jars.excludes", + "spark.jars.packages", + "spark.jars.repositories", + "spark.jars.ivy", + "spark.jars.ivySettings" + ).map(sys.props.get(_).orNull) + IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) + } + + private def isInvalidQueryString(tokens: Array[String]): Boolean = { + tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1)) + } + + /** + * Parse URI query string's parameter value of `transitive` and `exclude`. + * Other invalid parameters will be ignored. + * + * @param uri Ivy URI need to be downloaded. + * @return Tuple value of parameter `transitive` and `exclude` value. + * + * 1. transitive: whether to download dependency jar of Ivy URI, default value is false + * and this parameter value is case-sensitive. Invalid value will be treat as false. + * Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true + * Output: true + * + * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, + * consists of `group:module` pairs separated by commas. + * Example: Input: excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http + * Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] + */ + private def parseQueryParams(uri: URI): (Boolean, String) = { + val uriQuery = uri.getQuery + if (uriQuery == null) { + (false, "") + } else { + val mapTokens = uriQuery.split("&").map(_.split("=")) + if (mapTokens.exists(isInvalidQueryString)) { + throw new IllegalArgumentException( + s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery") + } + val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1) + + // Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is false + val transitiveParams = groupedParams.get("transitive") + if (transitiveParams.map(_.size).getOrElse(0) > 1) { + logWarning("It's best to specify `transitive` parameter in ivy URI query only once." + + " If there are multiple `transitive` parameter, we will select the last one") + } + val transitive = + transitiveParams.flatMap(_.takeRight(1).map(_._2 == "true").headOption).getOrElse(false) + + // Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http) + // in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar + // in a excluded list. + val exclusionList = groupedParams.get("exclude").map { params => + params.map(_._2).flatMap { excludeString => + val excludes = excludeString.split(",") + if (excludes.map(_.split(":")).exists(isInvalidQueryString)) { + throw new IllegalArgumentException( + s"Invalid exclude string in Ivy URI ${uri.toString}:" + + " expected 'org:module,org:module,..', found " + excludeString) + } + excludes + }.mkString(",") + }.getOrElse("") + + val validParams = Set("transitive", "exclude") + val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq + if (invalidParams.nonEmpty) { + logWarning(s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " + + s"in Ivy URI query `$uriQuery`.") + } + + (transitive, exclusionList) + } + } + + /** + * Download Ivy URI's dependency jars. + * + * @param uri Ivy URI need to be downloaded. The URI format should be: + * `ivy://group:module:version[?query]` + * Ivy URI query part format should be: + * `parameter=value¶meter=value...` + * Note that currently Ivy URI query part support two parameters: + * 1. transitive: whether to download dependent jars related to your Ivy URI. + * transitive=false or `transitive=true`, if not set, the default value is false. + * 2. exclude: exclusion list when download Ivy URI jar and dependency jars. + * The `exclude` parameter content is a ',' separated `group:module` pair string : + * `exclude=group:module,group:module...` + * @return Comma separated string list of jars downloaded. + */ + def resolveMavenDependencies(uri: URI): Seq[String] = { + val ivyProperties = DependencyUtils.getIvyProperties() + val authority = uri.getAuthority + if (authority == null) { + throw new IllegalArgumentException( + s"Invalid Ivy URI authority in uri ${uri.toString}:" + + " Expected 'org:module:version', found null.") + } + if (authority.split(":").length != 3) { + throw new IllegalArgumentException( + s"Invalid Ivy URI authority in uri ${uri.toString}:" + + s" Expected 'org:module:version', found $authority.") + } + + val (transitive, exclusionList) = parseQueryParams(uri) + + resolveMavenDependencies( + transitive, + exclusionList, + authority, + ivyProperties.repositories, + ivyProperties.ivyRepoPath, + Option(ivyProperties.ivySettingsPath) + ).split(",") + } def resolveMavenDependencies( + packagesTransitive: Boolean, packagesExclusions: String, packages: String, repositories: String, @@ -51,7 +179,8 @@ private[deploy] object DependencyUtils extends Logging { SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath)) } - SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions) + SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, + transitive = packagesTransitive, exclusions = exclusions) } def resolveAndDownloadJars( diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 55bfa70f21fc2..770ffeef4106f 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -1034,6 +1034,122 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .set(EXECUTOR_ALLOW_SPARK_CONTEXT, true)).stop() } } + + test("SPARK-33084: Add jar support Ivy URI -- default transitive = false") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true") + assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + } + + test("SPARK-33084: Add jar support Ivy URI -- invalid transitive use default false") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=foo") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(!sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar"))) + assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + } + + test("SPARK-33084: Add jar support Ivy URI -- transitive=true will download dependency jars") { + val logAppender = new LogAppender("transitive=true will download dependency jars") + withLogAppender(logAppender) { + sc = new SparkContext( + new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true") + val dependencyJars = Array( + "org.apache.hive_hive-storage-api-2.7.0.jar", + "org.slf4j_slf4j-api-1.7.10.jar", + "commons-lang_commons-lang-2.6.jar") + + dependencyJars.foreach(jar => assert(sc.listJars().exists(_.contains(jar)))) + + assert(logAppender.loggingEvents.count(_.getRenderedMessage.contains( + "Added dependency jars of Ivy URI" + + " ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")) == 1) + + // test dependency jars exist + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true") + assert(logAppender.loggingEvents.count(_.getRenderedMessage.contains( + "The dependency jars of Ivy URI" + + " ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")) == 1) + val existMsg = logAppender.loggingEvents.filter(_.getRenderedMessage.contains( + "The dependency jars of Ivy URI" + + " ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")) + .head.getRenderedMessage + dependencyJars.foreach(jar => assert(existMsg.contains(jar))) + } + } + + test("SPARK-33084: Add jar support Ivy URI -- test exclude param when transitive=true") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0" + + "?exclude=commons-lang:commons-lang&transitive=true") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar"))) + assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + } + + test("SPARK-33084: Add jar support Ivy URI -- test different version") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0") + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.6.0") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.6.0.jar"))) + } + + test("SPARK-33084: Add jar support Ivy URI -- test invalid param") { + val logAppender = new LogAppender("test log when have invalid parameter") + withLogAppender(logAppender) { + sc = new SparkContext( + new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?" + + "invalidParam1=foo&invalidParam2=boo") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(logAppender.loggingEvents.exists(_.getRenderedMessage.contains( + "Invalid parameters `invalidParam1,invalidParam2` found in Ivy URI query" + + " `invalidParam1=foo&invalidParam2=boo`."))) + } + } + + test("SPARK-33084: Add jar support Ivy URI -- test multiple transitive params") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + // transitive=invalidValue will win and treated as false + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?" + + "transitive=true&transitive=invalidValue") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + + // transitive=true will win + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?" + + "transitive=false&transitive=invalidValue&transitive=true") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + } + + test("SPARK-33084: Add jar support Ivy URI -- test param key case sensitive") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?TRANSITIVE=true") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + } + + test("SPARK-33084: Add jar support Ivy URI -- test transitive value case sensitive") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=TRUE") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + + sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) + assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) + } } object SparkContextSuite { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index dcd35f3f6b93f..c64f1b5814c20 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -47,7 +47,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.util.{CommandLineUtils, ResetSystemProperties, Utils} +import org.apache.spark.util.{CommandLineUtils, DependencyUtils, ResetSystemProperties, Utils} trait TestPrematureExit { suite: SparkFunSuite => diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 2a37f75d86a41..eaa06ce2aa057 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -135,6 +135,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val jarPath = SparkSubmitUtils.resolveMavenCoordinates( main.toString, SparkSubmitUtils.buildIvySettings(Option(repo), Some(tempIvyPath)), + transitive = true, isTest = true) assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path") } @@ -148,6 +149,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val jarPath = SparkSubmitUtils.resolveMavenCoordinates( main.toString, SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + transitive = true, isTest = true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") @@ -159,6 +161,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val jarPath = SparkSubmitUtils.resolveMavenCoordinates( main.toString, SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + transitive = true, isTest = true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") @@ -171,6 +174,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val jarPath = SparkSubmitUtils.resolveMavenCoordinates( main.toString, SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + transitive = true, isTest = true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") @@ -183,6 +187,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.resolveMavenCoordinates( "a:b:c", SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + transitive = true, isTest = true) } } @@ -195,6 +200,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val path = SparkSubmitUtils.resolveMavenCoordinates( coordinates, SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + transitive = true, isTest = true) assert(path === "", "should return empty path") val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.12", "1.2.0") @@ -202,6 +208,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val files = SparkSubmitUtils.resolveMavenCoordinates( coordinates + "," + main.toString, SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)), + transitive = true, isTest = true) assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") } @@ -214,7 +221,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val files = SparkSubmitUtils.resolveMavenCoordinates( main.toString, SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)), - Seq("my.great.dep:mydep"), + exclusions = Seq("my.great.dep:mydep"), + transitive = true, isTest = true) assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact") @@ -250,7 +258,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { testUtilSettings.setDefaultIvyUserDir(new File(tempIvyPath)) IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true, ivySettings = testUtilSettings) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, settings, isTest = true) + val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, settings, + transitive = true, isTest = true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") assert(jarPath.indexOf("mydep") >= 0, "should find dependency") @@ -265,6 +274,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val jarPath = SparkSubmitUtils.resolveMavenCoordinates( main.toString, ivySettings, + transitive = true, isTest = true) val r = """.*org.apache.spark-spark-submit-parent-.*""".r assert(!ivySettings.getDefaultCache.listFiles.map(_.getName) diff --git a/core/src/test/scala/org/apache/spark/util/DependencyUtils.scala b/core/src/test/scala/org/apache/spark/util/DependencyUtils.scala new file mode 100644 index 0000000000000..d181d4d8ce669 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/DependencyUtils.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI + +import org.apache.spark.SparkFunSuite + +class DependencyUtilsSuite extends SparkFunSuite { + + test("SPARK-33084: Add jar support Ivy URI -- test invalid ivy uri") { + val e1 = intercept[IllegalArgumentException] { + DependencyUtils.resolveMavenDependencies(URI.create("ivy://")) + }.getMessage + assert(e1.contains("Expected authority at index 6: ivy://")) + + val e2 = intercept[IllegalArgumentException] { + DependencyUtils.resolveMavenDependencies(URI.create("ivy://org.apache.hive:hive-contrib")) + }.getMessage + assert(e2.contains("Invalid Ivy URI authority in uri ivy://org.apache.hive:hive-contrib:" + + " Expected 'org:module:version', found org.apache.hive:hive-contrib.")) + + val e3 = intercept[IllegalArgumentException] { + DependencyUtils.resolveMavenDependencies( + URI.create("ivy://org.apache.hive:hive-contrib:2.3.7?foo=")) + }.getMessage + assert(e3.contains("Invalid query string in Ivy URI" + + " ivy://org.apache.hive:hive-contrib:2.3.7?foo=:")) + + val e4 = intercept[IllegalArgumentException] { + DependencyUtils.resolveMavenDependencies( + URI.create("ivy://org.apache.hive:hive-contrib:2.3.7?bar=&baz=foo")) + }.getMessage + assert(e4.contains("Invalid query string in Ivy URI" + + " ivy://org.apache.hive:hive-contrib:2.3.7?bar=&baz=foo: bar=&baz=foo")) + + val e5 = intercept[IllegalArgumentException] { + DependencyUtils.resolveMavenDependencies( + URI.create("ivy://org.apache.hive:hive-contrib:2.3.7?exclude=org.pentaho")) + }.getMessage + assert(e5.contains("Invalid exclude string in Ivy URI" + + " ivy://org.apache.hive:hive-contrib:2.3.7?exclude=org.pentaho:" + + " expected 'org:module,org:module,..', found org.pentaho")) + } +} diff --git a/docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md b/docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md index 4694bff99daf5..6d31125fd612d 100644 --- a/docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md +++ b/docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md @@ -33,8 +33,18 @@ ADD JAR file_name * **file_name** - The name of the JAR file to be added. It could be either on a local file system or a distributed file system. + The name of the JAR file to be added. It could be either on a local file system or a distributed file system or an Ivy URI. + Apache Ivy is a popular dependency manager focusing on flexibility and simplicity. Now we support two parameter in URI query string: + * transitive: whether to download dependent jars related to your ivy URL. It is case-sensitive and only take last one if multiple transitive parameters are specified. + * exclude: exclusion list during downloading Ivy URI jar and dependent jars. + + User can write Ivy URI such as: + + ivy://group:module:version + ivy://group:module:version?transitive=[true|false] + ivy://group:module:version?transitive=[true|false]&exclude=group:module,group:module + ### Examples ```sql @@ -42,6 +52,10 @@ ADD JAR /tmp/test.jar; ADD JAR "/path/to/some.jar"; ADD JAR '/some/other.jar'; ADD JAR "/path with space/abc.jar"; +ADD JAR "ivy://group:module:version"; +ADD JAR "ivy://group:module:version?transitive=false" +ADD JAR "ivy://group:module:version?transitive=true" +ADD JAR "ivy://group:module:version?exclude=group:module&transitive=true" ``` ### Related Statements diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 48d8c3d325347..60ca06dbe0d52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.internal import java.io.File +import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -34,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.execution._ import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.sql.util.ExecutionListenerManager +import org.apache.spark.util.DependencyUtils /** * A class that holds all session-specific state in a given [[SparkSession]]. @@ -159,6 +161,13 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade } } + def resolveJars(path: URI): Seq[String] = { + path.getScheme match { + case "ivy" => DependencyUtils.resolveMavenDependencies(path) + case _ => path.toString :: Nil + } + } + /** * Add a jar path to [[SparkContext]] and the classloader. * @@ -167,16 +176,19 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade * [[SessionState]]. */ def addJar(path: String): Unit = { - session.sparkContext.addJar(path) - val uri = new Path(path).toUri - val jarURL = if (uri.getScheme == null) { - // `path` is a local file path without a URL scheme - new File(path).toURI.toURL - } else { - // `path` is a URL with a scheme - uri.toURL + val uri = URI.create(path) + resolveJars(uri).foreach { p => + session.sparkContext.addJar(p) + val uri = new Path(p).toUri + val jarURL = if (uri.getScheme == null) { + // `path` is a local file path without a URL scheme + new File(p).toURI.toURL + } else { + // `path` is a URL with a scheme + uri.toURL + } + session.sharedState.jarClassLoader.addURL(jarURL) } - session.sharedState.jarClassLoader.addURL(jarURL) Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader) } } diff --git a/sql/core/src/test/resources/SPARK-33084.jar b/sql/core/src/test/resources/SPARK-33084.jar new file mode 100644 index 0000000000000..1dc5e9303b707 Binary files /dev/null and b/sql/core/src/test/resources/SPARK-33084.jar differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b7cec55245564..0ba58e1634f06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -22,6 +22,8 @@ import java.net.{MalformedURLException, URL} import java.sql.{Date, Timestamp} import java.util.concurrent.atomic.AtomicBoolean +import org.apache.commons.io.FileUtils + import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.expressions.GenericRow @@ -3719,6 +3721,25 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-33084: Add jar support Ivy URI in SQL") { + val sc = spark.sparkContext + // default transitive=false, only download specified jar + sql("ADD JAR ivy://org.apache.hive.hcatalog:hive-hcatalog-core:2.3.7") + assert(sc.listJars() + .exists(_.contains("org.apache.hive.hcatalog_hive-hcatalog-core-2.3.7.jar"))) + + // test download ivy URL jar return multiple jars + sql("ADD JAR ivy://org.scala-js:scalajs-test-interface_2.12:1.2.0?transitive=true") + assert(sc.listJars().exists(_.contains("scalajs-library_2.12"))) + assert(sc.listJars().exists(_.contains("scalajs-test-interface_2.12"))) + + sql("ADD JAR ivy://org.apache.hive:hive-contrib:2.3.7" + + "?exclude=org.pentaho:pentaho-aggdesigner-algorithm&transitive=true") + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-contrib-2.3.7.jar"))) + assert(sc.listJars().exists(_.contains("org.apache.hive_hive-exec-2.3.7.jar"))) + assert(!sc.listJars().exists(_.contains("org.pentaho.pentaho_aggdesigner-algorithm"))) + } + test("SPARK-33677: LikeSimplification should be skipped if pattern contains any escapeChar") { withTempView("df") { Seq("m@ca").toDF("s").createOrReplaceTempView("df") @@ -3771,6 +3792,39 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } }) } + + test("SPARK-33084: Add jar support Ivy URI in SQL -- jar contains udf class") { + val sumFuncClass = "org.apache.spark.examples.sql.Spark33084" + val functionName = "test_udf" + withTempDir { dir => + System.setProperty("ivy.home", dir.getAbsolutePath) + val sourceJar = new File(Thread.currentThread().getContextClassLoader + .getResource("SPARK-33084.jar").getFile) + val targetCacheJarDir = new File(dir.getAbsolutePath + + "/local/org.apache.spark/SPARK-33084/1.0/jars/") + targetCacheJarDir.mkdir() + // copy jar to local cache + FileUtils.copyFileToDirectory(sourceJar, targetCacheJarDir) + withTempView("v1") { + withUserDefinedFunction( + s"default.$functionName" -> false, + functionName -> true) { + // create temporary function without class + val e = intercept[AnalysisException] { + sql(s"CREATE TEMPORARY FUNCTION $functionName AS '$sumFuncClass'") + }.getMessage + assert(e.contains("Can not load class 'org.apache.spark.examples.sql.Spark33084")) + sql("ADD JAR ivy://org.apache.spark:SPARK-33084:1.0") + sql(s"CREATE TEMPORARY FUNCTION $functionName AS '$sumFuncClass'") + // create a view using a function in 'default' database + sql(s"CREATE TEMPORARY VIEW v1 AS SELECT $functionName(col1) FROM VALUES (1), (2), (3)") + // view v1 should still using function defined in `default` database + checkAnswer(sql("SELECT * FROM v1"), Seq(Row(2.0))) + } + } + System.clearProperty("ivy.home") + } + } } case class Foo(bar: Option[String]) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index da37b61688951..4ed21c91b8023 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.net.URI + import org.apache.spark.annotation.Unstable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog} @@ -127,7 +129,10 @@ class HiveSessionResourceLoader( extends SessionResourceLoader(session) { private lazy val client = clientBuilder() override def addJar(path: String): Unit = { - client.addJar(path) - super.addJar(path) + val uri = URI.create(path) + resolveJars(uri).foreach { p => + client.addJar(p) + super.addJar(p) + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index c0758dcdfc879..97e685efd27de 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -124,6 +124,7 @@ private[hive] object IsolatedClientLoader extends Logging { SparkSubmitUtils.buildIvySettings( Some(remoteRepos), ivyPath), + transitive = true, exclusions = version.exclusions) } val allFiles = classpath.split(",").map(new File(_)).toSet diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 1cabf6033e8d8..21cc6af398eec 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1219,6 +1219,23 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd } } } + + test("SPARK-33084: Add jar support Ivy URI in SQL") { + val testData = TestHive.getHiveFile("data/files/sample.json").toURI + withTable("t") { + sql("ADD JAR ivy://org.apache.hive.hcatalog:hive-hcatalog-core:2.3.7") + sql( + """CREATE TABLE t(a string, b string) + |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'""".stripMargin) + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t""") + sql("SELECT * FROM src JOIN t on src.key = t.a") + assert(sql("LIST JARS").filter(_.getString(0).contains( + "org.apache.hive.hcatalog_hive-hcatalog-core-2.3.7.jar")).count() > 0) + assert(sql("LIST JAR"). + filter(_.getString(0).contains( + "org.apache.hive.hcatalog_hive-hcatalog-core-2.3.7.jar")).count() > 0) + } + } } // for SPARK-2180 test