Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spark multi-user support for standalone mode #750

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,63 @@
*/

package spark.deploy

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.{Token, TokenIdentifier}

import java.io.IOException
import java.security.PrivilegedExceptionAction


/**
* Contains util methods to interact with Hadoop from spark.
*/
object SparkHadoopUtil {
val HDFS_TOKEN_KEY = "SPARK_HDFS_TOKEN"
val conf = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def getUserNameFromEnvironment(): String = {
// defaulting to -D ...
System.getProperty("user.name")
}

def runAsUser(func: (Product) => Unit, args: Product) {
runAsUser(func, args, getUserNameFromEnvironment)
}

def runAsUser(func: (Product) => Unit, args: Product, user: String) {
val ugi = UserGroupInformation.createRemoteUser(user)
if (UserGroupInformation.isSecurityEnabled) {
Option(System.getenv(HDFS_TOKEN_KEY)) match {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an environment variable isn't very secure for passing the token as anyone on that machine could simply do a ps and get the token. Perhaps this is ok for the first cut and if you are limiting access to the machines but I think this will eventually need to be made more secure.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I was wrong. environment variables are read only for the user owning the process.

case Some(s) =>
ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN)
val token = new Token[TokenIdentifier]()
token.decodeFromUrlString(s)
ugi.addToken(token)
case None => throw new IOException("Failed to get token in security environment")
}
}

// Add support, if exists - for now, simply run func !
func(args)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = {
func(args)
}
})
}

def createSerializedToken() :Option[String] = {
if (UserGroupInformation.isSecurityEnabled) {
val fs = FileSystem.get(conf)
val user = UserGroupInformation.getCurrentUser.getShortUserName
Option(fs.getDelegationToken(user).asInstanceOf[Token[_ <: TokenIdentifier]])
.map(_.encodeToUrlString())
} else {
None
}
}

// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
Expand All @@ -43,5 +82,4 @@ object SparkHadoopUtil {
def addCredentials(conf: JobConf) {}

def isYarnMode(): Boolean = { false }

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ object SparkHadoopUtil {
func(args)
}

def createSerializedToken(): Option[String] = None

// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
def isYarnMode(): Boolean = {
val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))
Expand Down
44 changes: 41 additions & 3 deletions core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,63 @@
*/

package spark.deploy

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.{Token, TokenIdentifier}

import java.io.IOException
import java.security.PrivilegedExceptionAction

/**
* Contains util methods to interact with Hadoop from spark.
*/
object SparkHadoopUtil {
val HDFS_TOKEN_KEY = "SPARK_HDFS_TOKEN"
val conf = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def getUserNameFromEnvironment(): String = {
// defaulting to -D ...
System.getProperty("user.name")
}

def runAsUser(func: (Product) => Unit, args: Product) {
runAsUser(func, args, getUserNameFromEnvironment())
}

def runAsUser(func: (Product) => Unit, args: Product, user: String) {
val ugi = UserGroupInformation.createRemoteUser(user)
if (UserGroupInformation.isSecurityEnabled) {
Option(System.getenv(HDFS_TOKEN_KEY)) match {
case Some(s) =>
ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN)
val token = new Token[TokenIdentifier]()
token.decodeFromUrlString(s)
ugi.addToken(token)
case None => throw new IOException("Failed to get token in security environment")

}
}

// Add support, if exists - for now, simply run func !
func(args)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = {
func(args)
}
})
}

def createSerializedToken(): Option[String] = {
if (UserGroupInformation.isSecurityEnabled) {
val fs = FileSystem.get(conf)
val user = UserGroupInformation.getCurrentUser.getShortUserName
Option(fs.getDelegationToken(user).asInstanceOf[Token[_ <: TokenIdentifier]])
.map(_.encodeToUrlString())
} else {
None
}
}

// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
Expand All @@ -43,5 +82,4 @@ object SparkHadoopUtil {
def addCredentials(conf: JobConf) {}

def isYarnMode(): Boolean = { false }

}
6 changes: 5 additions & 1 deletion core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.security.UserGroupInformation

import org.apache.mesos.MesosNativeLibrary

Expand Down Expand Up @@ -145,6 +144,11 @@ class SparkContext(
executorEnvs ++= environment
}

// Add token to environment variables to pass to executors for security Hadoop access
SparkHadoopUtil.createSerializedToken() map { e =>
executorEnvs(SparkHadoopUtil.HDFS_TOKEN_KEY) = e
}

// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private[spark] class ExecutorRunner(
case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => Utils.parseHostPort(hostPort)._1
case "{{CORES}}" => cores.toString
case "{{USER}}" => appDesc.user
case other => other
}

Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private[spark] class StandaloneExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int)
cores: Int,
user: String)
extends Actor
with ExecutorBackend
with Logging {
Expand Down Expand Up @@ -81,20 +82,21 @@ private[spark] class StandaloneExecutorBackend(
}

private[spark] object StandaloneExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
def run(driverUrl: String, executorId: String, hostname: String, cores: Int, user: String) {
SparkHadoopUtil.runAsUser(run0, Tuple5[Any, Any, Any, Any, Any] (driverUrl, executorId, hostname, cores, user), user)
}

// This will be run 'as' the user
def run0(args: Product) {
assert(4 == args.productArity)
assert(5 == args.productArity)
runImpl(args.productElement(0).asInstanceOf[String],
args.productElement(1).asInstanceOf[String],
args.productElement(2).asInstanceOf[String],
args.productElement(3).asInstanceOf[Int])
args.productElement(3).asInstanceOf[Int],
args.productElement(4).asInstanceOf[String])
}

private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) {
private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int, user: String) {
// Debug code
Utils.checkHost(hostname)

Expand All @@ -105,17 +107,17 @@ private[spark] object StandaloneExecutorBackend {
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
val actor = actorSystem.actorOf(
Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores, user)),
name = "Executor")
actorSystem.awaitTermination()
}

def main(args: Array[String]) {
if (args.length < 4) {
if (args.length < 5) {
//the reason we allow the last frameworkId argument is to make it easy to kill rogue executors
System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> [<appid>]")
System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores> <user> [<appid>]")
System.exit(1)
}
run(args(0), args(1), args(2), args(3).toInt)
run(args(0), args(1), args(2), args(3).toInt, args(4))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[spark] class SparkDeploySchedulerBackend(
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{USER}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(
throw new IllegalArgumentException("must supply spark home for spark standalone"))
Expand Down