diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 14e2df4ed0702..5a73b1ad1ea29 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -42,11 +42,12 @@ are set up as described above: --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ examples/jars/spark_examples_2.11-2.2.0.jar - The Spark master, specified either via passing the `--master` command line argument to `spark-submit` or by setting `spark.master` in the application's configuration, must be a URL with the format `k8s://`. Prefixing the master string with `k8s://` will cause the Spark application to launch on the Kubernetes cluster, with the API server -being contacted at `api_server_url`. The HTTP protocol must also be specified. +being contacted at `api_server_url`. If no HTTP protocol is specified in the URL, it defaults to `https`. For example, +setting the master to `k8s://example.com:443` is equivalent to setting it to `k8s://https://example.com:443`, but to +connect without SSL on a different port, the master would be set to `k8s://http://example.com:8443`. Note that applications can currently only be executed in cluster mode, where the driver and its executors are running on the cluster. @@ -58,17 +59,18 @@ disk of the submitter's machine. These two types of dependencies are specified v `spark-submit`: * Local jars provided by specifying the `--jars` command line argument to `spark-submit`, or by setting `spark.jars` in - the application's configuration, will be treated as jars that are located on the *disk of the driver Docker - container*. This only applies to jar paths that do not specify a scheme or that have the scheme `file://`. Paths with - other schemes are fetched from their appropriate locations. + the application's configuration, will be treated as jars that are located on the *disk of the driver container*. This + only applies to jar paths that do not specify a scheme or that have the scheme `file://`. Paths with other schemes are + fetched from their appropriate locations. * Local jars provided by specifying the `--upload-jars` command line argument to `spark-submit`, or by setting `spark.kubernetes.driver.uploads.jars` in the application's configuration, will be treated as jars that are located on the *disk of the submitting machine*. These jars are uploaded to the driver docker container before executing the application. - * A main application resource path that does not have a scheme or that has the scheme `file://` is assumed to be on the *disk of the submitting machine*. This resource is uploaded to the driver docker container before executing the application. A remote path can still be specified and the resource will be fetched from the appropriate location. +* A main application resource path that has the scheme `container://` is assumed to be on the *disk of the driver + container*. In all of these cases, the jars are placed on the driver's classpath, and are also sent to the executors. Below are some examples of providing application dependencies. @@ -78,8 +80,7 @@ To submit an application with both the main resource and two other jars living o bin/spark-submit \ --deploy-mode cluster \ --class com.example.applications.SampleApplication \ - --master k8s://https://192.168.99.100 \ - --kubernetes-namespace default \ + --master k8s://192.168.99.100 \ --upload-jars /home/exampleuser/exampleapplication/dep1.jar,/home/exampleuser/exampleapplication/dep2.jar \ --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \ --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ @@ -91,8 +92,7 @@ Note that since passing the jars through the `--upload-jars` command line argume bin/spark-submit \ --deploy-mode cluster \ --class com.example.applications.SampleApplication \ - --master k8s://https://192.168.99.100 \ - --kubernetes-namespace default \ + --master k8s://192.168.99.100 \ --conf spark.kubernetes.driver.uploads.jars=/home/exampleuser/exampleapplication/dep1.jar,/home/exampleuser/exampleapplication/dep2.jar \ --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \ --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ @@ -104,8 +104,7 @@ is located in the jar `/opt/spark-plugins/app-plugin.jar` on the docker image's bin/spark-submit \ --deploy-mode cluster \ --class com.example.applications.PluggableApplication \ - --master k8s://https://192.168.99.100 \ - --kubernetes-namespace default \ + --master k8s://192.168.99.100 \ --jars /opt/spark-plugins/app-plugin.jar \ --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \ --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ @@ -117,13 +116,22 @@ Spark property, the above will behave identically to this command: bin/spark-submit \ --deploy-mode cluster \ --class com.example.applications.PluggableApplication \ - --master k8s://https://192.168.99.100 \ - --kubernetes-namespace default \ + --master k8s://192.168.99.100 \ --conf spark.jars=file:///opt/spark-plugins/app-plugin.jar \ --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \ --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ http://example.com:8080/applications/sparkpluggable/app.jar +To specify a main application resource that is in the Docker image, and if it has no other dependencies: + + bin/spark-submit \ + --deploy-mode cluster \ + --class com.example.applications.PluggableApplication \ + --master k8s://192.168.99.100:8443 \ + --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \ + --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ + container:///home/applications/examples/example.jar + ### Spark Properties Below are some other common properties that are specific to Kubernetes. Most of the other configurations are the same @@ -133,10 +141,9 @@ from the other deployment modes. See the [configuration page](configuration.html Property NameDefaultMeaning spark.kubernetes.namespace - - (none) + default - The namespace that will be used for running the driver and executor pods. Must be specified. When using + The namespace that will be used for running the driver and executor pods. When using spark-submit in cluster mode, this can also be passed to spark-submit via the --kubernetes-namespace command line argument. @@ -196,14 +203,6 @@ from the other deployment modes. See the [configuration page](configuration.html mode. Refer to adding other jars for more information. - - - spark.kubernetes.driver.uploads.driverExtraClasspath - (none) - - Comma-separated list of jars to be sent to the driver only when submitting the application in cluster mode. - - spark.kubernetes.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 30eaa6269cf47..fe3256b9e12be 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -35,7 +35,7 @@ import scala.concurrent.duration.DurationInt import scala.util.Success import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException} -import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} +import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -47,13 +47,8 @@ private[spark] class Client( appArgs: Array[String]) extends Logging { import Client._ - private val namespace = sparkConf.getOption("spark.kubernetes.namespace").getOrElse( - throw new IllegalArgumentException("Namespace must be provided in spark.kubernetes.namespace")) - private val rawMaster = sparkConf.get("spark.master") - if (!rawMaster.startsWith("k8s://")) { - throw new IllegalArgumentException("Master should be a URL with scheme k8s://") - } - private val master = rawMaster.replaceFirst("k8s://", "") + private val namespace = sparkConf.get("spark.kubernetes.namespace", "default") + private val master = resolveK8sMaster(sparkConf.get("spark.master")) private val launchTime = System.currentTimeMillis private val appName = sparkConf.getOption("spark.app.name") @@ -64,8 +59,6 @@ private[spark] class Client( private val driverLauncherSelectorValue = s"driver-launcher-$launchTime" private val driverDockerImage = sparkConf.get( "spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION") - private val uploadedDriverExtraClasspath = sparkConf - .getOption("spark.kubernetes.driver.uploads.driverExtraClasspath") private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars") private val secretBase64String = { @@ -112,12 +105,15 @@ private[spark] class Client( .withType("Opaque") .done() try { - val resolvedSelectors = (Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue) + val resolvedSelectors = (Map( + DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue, + SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava val (servicePorts, containerPorts) = configurePorts() val service = kubernetesClient.services().createNew() .withNewMetadata() .withName(kubernetesAppId) + .withLabels(Map(SPARK_APP_NAME_LABEL -> appName).asJava) .endMetadata() .withNewSpec() .withSelector(resolvedSelectors) @@ -355,10 +351,10 @@ private[spark] class Client( val fileBytes = Files.toByteArray(appFile) val fileBase64 = Base64.encodeBase64String(fileBytes) UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName) + case "container" => ContainerAppResource(appResourceUri.getPath) case other => RemoteAppResource(other) } - val uploadDriverExtraClasspathBase64Contents = compressJars(uploadedDriverExtraClasspath) val uploadJarsBase64Contents = compressJars(uploadedJars) KubernetesCreateSubmissionRequest( appResource = resolvedAppResource, @@ -366,7 +362,6 @@ private[spark] class Client( appArgs = appArgs, secret = secretBase64String, sparkProperties = sparkConf.getAll.toMap, - uploadedDriverExtraClasspathBase64Contents = uploadDriverExtraClasspathBase64Contents, uploadedJarsBase64Contents = uploadJarsBase64Contents) } @@ -414,7 +409,7 @@ private[spark] class Client( } } -private object Client { +private[spark] object Client extends Logging { private val SUBMISSION_SERVER_SECRET_NAME = "spark-submission-server-secret" private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector" @@ -430,6 +425,7 @@ private object Client { private val SECURE_RANDOM = new SecureRandom() private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission" private val LAUNCH_TIMEOUT_SECONDS = 30 + private val SPARK_APP_NAME_LABEL = "spark-app-name" def main(args: Array[String]): Unit = { require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} " + @@ -444,4 +440,20 @@ private object Client { sparkConf = sparkConf, appArgs = appArgs).run() } + + def resolveK8sMaster(rawMasterString: String): String = { + if (!rawMasterString.startsWith("k8s://")) { + throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.") + } + val masterWithoutK8sPrefix = rawMasterString.replaceFirst("k8s://", "") + if (masterWithoutK8sPrefix.startsWith("http://") + || masterWithoutK8sPrefix.startsWith("https://")) { + masterWithoutK8sPrefix + } else { + val resolvedURL = s"https://$masterWithoutK8sPrefix" + logDebug(s"No scheme specified for kubernetes master URL, so defaulting to https. Resolved" + + s" URL is $resolvedURL") + resolvedURL + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 6da1a848b25e7..813d070e0f876 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -27,7 +27,6 @@ case class KubernetesCreateSubmissionRequest( val appArgs: Array[String], val sparkProperties: Map[String, String], val secret: String, - val uploadedDriverExtraClasspathBase64Contents: Option[TarGzippedData], val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest { message = "create" clientSparkVersion = SPARK_VERSION @@ -46,6 +45,7 @@ case class TarGzippedData( property = "type") @JsonSubTypes(value = Array( new JsonSubTypes.Type(value = classOf[UploadedAppResource], name = "UploadedAppResource"), + new JsonSubTypes.Type(value = classOf[ContainerAppResource], name = "ContainerLocalAppResource"), new JsonSubTypes.Type(value = classOf[RemoteAppResource], name = "RemoteAppResource"))) abstract class AppResource @@ -53,6 +53,8 @@ case class UploadedAppResource( resourceBase64Contents: String, name: String = "spark-app-resource") extends AppResource +case class ContainerAppResource(resourcePath: String) extends AppResource + case class RemoteAppResource(resource: String) extends AppResource class PingResponse extends SubmitRestProtocolResponse { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 837706ca9f5a8..08ddbaf5e50dc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.rest.kubernetes import java.io.File import java.net.URI -import java.nio.file.Paths import java.util.concurrent.CountDownLatch import javax.servlet.http.{HttpServletRequest, HttpServletResponse} @@ -30,12 +29,12 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.rest._ -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} private case class KubernetesSparkRestServerArguments( - val host: Option[String] = None, - val port: Option[Int] = None, - val secretFile: Option[String] = None) { + val host: Option[String] = None, + val port: Option[Int] = None, + val secretFile: Option[String] = None) { def validate(): KubernetesSparkRestServerArguments = { require(host.isDefined, "Hostname not set via --hostname.") require(port.isDefined, "Port not set via --port") @@ -68,13 +67,21 @@ private object KubernetesSparkRestServerArguments { } } +/** + * Runs in the driver pod and receives a request to run an application. Note that + * unlike the submission rest server in standalone mode, this server is expected + * to be used to run one application only, and then shut down once that application + * is complete. + */ private[spark] class KubernetesSparkRestServer( host: String, port: Int, conf: SparkConf, - expectedApplicationSecret: Array[Byte]) + expectedApplicationSecret: Array[Byte], + shutdownLock: CountDownLatch) extends RestSubmissionServer(host, port, conf) { + private val SERVLET_LOCK = new Object private val javaExecutable = s"${System.getenv("JAVA_HOME")}/bin/java" private val sparkHome = System.getenv("SPARK_HOME") private val securityManager = new SecurityManager(conf) @@ -99,87 +106,105 @@ private[spark] class KubernetesSparkRestServer( private class KubernetesSubmitRequestServlet extends SubmitRequestServlet { + private val waitForProcessCompleteExecutor = ThreadUtils + .newDaemonSingleThreadExecutor("wait-for-spark-app-complete") + private var startedApplication = false + // TODO validating the secret should be done as part of a header of the request. // Instead here we have to specify the secret in the body. override protected def handleSubmit( - requestMessageJson: String, - requestMessage: SubmitRestProtocolMessage, - responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { - requestMessage match { - case KubernetesCreateSubmissionRequest( + requestMessageJson: String, + requestMessage: SubmitRestProtocolMessage, + responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { + SERVLET_LOCK.synchronized { + if (startedApplication) { + throw new IllegalStateException("Application has already been submitted.") + } else { + requestMessage match { + case KubernetesCreateSubmissionRequest( appResource, mainClass, appArgs, sparkProperties, secret, - uploadedDriverExtraClasspath, uploadedJars) => - val decodedSecret = Base64.decodeBase64(secret) - if (!expectedApplicationSecret.sameElements(decodedSecret)) { - responseServlet.setStatus(HttpServletResponse.SC_UNAUTHORIZED) - handleError("Unauthorized to submit application.") - } else { - val tempDir = Utils.createTempDir() - val appResourcePath = resolvedAppResource(appResource, tempDir) - val driverClasspathDirectory = new File(tempDir, "driver-extra-classpath") - if (!driverClasspathDirectory.mkdir) { - throw new IllegalStateException("Failed to create driver extra classpath" + - s" dir at ${driverClasspathDirectory.getAbsolutePath}") - } - val jarsDirectory = new File(tempDir, "jars") - if (!jarsDirectory.mkdir) { - throw new IllegalStateException("Failed to create jars dir at" + - s"${jarsDirectory.getAbsolutePath}") - } - val writtenDriverExtraClasspath = writeBase64ContentsToFiles( - uploadedDriverExtraClasspath, driverClasspathDirectory) - val writtenJars = writeBase64ContentsToFiles(uploadedJars, jarsDirectory) - val originalDriverExtraClasspath = sparkProperties.get("spark.driver.extraClassPath") - .map(_.split(",")) - .getOrElse(Array.empty[String]) - val resolvedDriverExtraClasspath = writtenDriverExtraClasspath ++ - originalDriverExtraClasspath - val originalJars = sparkProperties.get("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty[String]) - val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath) - val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath) - val driverClasspath = resolvedDriverExtraClasspath ++ - resolvedJars ++ - sparkJars ++ - Array(appResourcePath) - val resolvedSparkProperties = new mutable.HashMap[String, String] - resolvedSparkProperties ++= sparkProperties - resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",") - - val command = new ArrayBuffer[String] - command += javaExecutable - command += "-cp" - command += s"${driverClasspath.mkString(":")}" - for (prop <- resolvedSparkProperties) { - command += s"-D${prop._1}=${prop._2}" - } - val driverMemory = resolvedSparkProperties.getOrElse("spark.driver.memory", "1g") - command += s"-Xms$driverMemory" - command += s"-Xmx$driverMemory" - command += mainClass - command ++= appArgs - val pb = new ProcessBuilder(command: _*).inheritIO() - val process = pb.start() - ShutdownHookManager.addShutdownHook(() => { - logInfo("Received stop command, shutting down the running Spark application...") - process.destroy() - }) - val response = new CreateSubmissionResponse - response.success = true - response.submissionId = null - response.message = "success" - response.serverSparkVersion = SPARK_VERSION - response + val decodedSecret = Base64.decodeBase64(secret) + if (!expectedApplicationSecret.sameElements(decodedSecret)) { + responseServlet.setStatus(HttpServletResponse.SC_UNAUTHORIZED) + handleError("Unauthorized to submit application.") + } else { + val tempDir = Utils.createTempDir() + val appResourcePath = resolvedAppResource(appResource, tempDir) + val driverClasspathDirectory = new File(tempDir, "driver-extra-classpath") + if (!driverClasspathDirectory.mkdir) { + throw new IllegalStateException("Failed to create driver extra classpath" + + s" dir at ${driverClasspathDirectory.getAbsolutePath}") + } + val jarsDirectory = new File(tempDir, "jars") + if (!jarsDirectory.mkdir) { + throw new IllegalStateException("Failed to create jars dir at" + + s"${jarsDirectory.getAbsolutePath}") + } + val writtenJars = writeBase64ContentsToFiles(uploadedJars, jarsDirectory) + val driverExtraClasspath = sparkProperties + .get("spark.driver.extraClassPath") + .map(_.split(",")) + .getOrElse(Array.empty[String]) + val originalJars = sparkProperties.get("spark.jars") + .map(_.split(",")) + .getOrElse(Array.empty[String]) + val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath) + val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath) + val driverClasspath = driverExtraClasspath ++ + resolvedJars ++ + sparkJars ++ + Array(appResourcePath) + val resolvedSparkProperties = new mutable.HashMap[String, String] + resolvedSparkProperties ++= sparkProperties + resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",") + + val command = new ArrayBuffer[String] + command += javaExecutable + command += "-cp" + command += s"${driverClasspath.mkString(":")}" + for (prop <- resolvedSparkProperties) { + command += s"-D${prop._1}=${prop._2}" + } + val driverMemory = resolvedSparkProperties.getOrElse("spark.driver.memory", "1g") + command += s"-Xms$driverMemory" + command += s"-Xmx$driverMemory" + command += mainClass + command ++= appArgs + val pb = new ProcessBuilder(command: _*).inheritIO() + val process = pb.start() + ShutdownHookManager.addShutdownHook(() => { + logInfo("Received stop command, shutting down the running Spark application...") + process.destroy() + shutdownLock.countDown() + }) + waitForProcessCompleteExecutor.submit(new Runnable { + override def run(): Unit = { + process.waitFor + SERVLET_LOCK.synchronized { + logInfo("Spark application complete. Shutting down submission server...") + KubernetesSparkRestServer.this.stop + shutdownLock.countDown() + } + } + }) + startedApplication = true + val response = new CreateSubmissionResponse + response.success = true + response.submissionId = null + response.message = "success" + response.serverSparkVersion = SPARK_VERSION + response + } + case unexpected => + responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) + handleError(s"Received message of unexpected type ${unexpected.messageType}.") } - case unexpected => - responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) - handleError(s"Received message of unexpected type ${unexpected.messageType}.") + } } } @@ -196,6 +221,7 @@ private[spark] class KubernetesSparkRestServer( throw new IllegalStateException(s"Failed to write main app resource file" + s" to $resourceFilePath") } + case ContainerAppResource(resource) => resource case RemoteAppResource(resource) => Utils.fetchFile(resource, tempDir, conf, securityManager, SparkHadoopUtil.get.newConfiguration(conf), @@ -237,7 +263,8 @@ private[spark] object KubernetesSparkRestServer { parsedArguments.host.get, parsedArguments.port.get, sparkConf, - secretBytes) + secretBytes, + barrier) server.start() ShutdownHookManager.addShutdownHook(() => { try { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 2717d2f37d910..b7110ba901842 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder +import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -43,15 +43,12 @@ private[spark] class KubernetesClusterSchedulerBackend( private val EXECUTOR_MODIFICATION_LOCK = new Object private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] - private val kubernetesMaster = sc.master.replaceFirst("k8s://", "") + private val kubernetesMaster = Client.resolveK8sMaster(sc.master) private val executorDockerImage = conf .get("spark.kubernetes.executor.docker.image", s"spark-executor:${sc.version}") - private val kubernetesNamespace = conf - .getOption("spark.kubernetes.namespace") - .getOrElse( - throw new SparkException("Kubernetes namespace must be specified in kubernetes mode.")) + private val kubernetesNamespace = conf.get("spark.kubernetes.namespace", "default") private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) diff --git a/resource-managers/kubernetes/docker-minimal-bundle/pom.xml b/resource-managers/kubernetes/docker-minimal-bundle/pom.xml index c20e51c93e7c7..0ec2f36075db3 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/pom.xml +++ b/resource-managers/kubernetes/docker-minimal-bundle/pom.xml @@ -43,6 +43,13 @@ ${project.version} pom + + + org.apache.spark + spark-examples_${scala.binary.version} + ${project.version} + provided +