Skip to content

Commit

Permalink
[SPARK-15487][WEB UI] Spark Master UI to reverse proxy Application an…
Browse files Browse the repository at this point in the history
…d Workers UI

## What changes were proposed in this pull request?

This pull request adds the functionality to enable accessing worker and application UI through master UI itself. Thus helps in accessing SparkUI when running spark cluster in closed networks e.g. Kubernetes. Cluster admin needs to expose only spark master UI and rest of the UIs can be in the private network, master UI will reverse proxy the connection request to corresponding resource. It adds the path for workers/application UIs as

WorkerUI: <http/https>://master-publicIP:<port>/target/workerID/
ApplicationUI: <http/https>://master-publicIP:<port>/target/appID/

This makes it easy for users to easily protect the Spark master cluster access by putting some reverse proxy e.g. https://github.com/bitly/oauth2_proxy

## How was this patch tested?

The functionality has been tested manually and there is a unit test too for testing access to worker UI with reverse proxy address.

pwendell bomeng BryanCutler can you please review it, thanks.

Author: Gurvinder Singh <gurvinder.singh@uninett.no>

Closes apache#13950 from gurvindersingh/rproxy.
  • Loading branch information
Gurvinder Singh authored and zsxwing committed Sep 9, 2016
1 parent 722afbb commit 92ce8d4
Show file tree
Hide file tree
Showing 16 changed files with 287 additions and 11 deletions.
12 changes: 11 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@
<artifactId>jetty-servlet</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
Expand Down Expand Up @@ -388,7 +398,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security
guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security,jetty-proxy,jetty-client
</includeArtifactIds>
<silent>true</silent>
</configuration>
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)

Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ private[deploy] class Master(

// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
}
Expand All @@ -129,6 +130,11 @@ private[deploy] class Master(
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
if (reverseProxy) {
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
Expand Down Expand Up @@ -755,6 +761,9 @@ private[deploy] class Master(
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
if (reverseProxy) {
webUi.addProxyTargets(worker.id, worker.webUiAddress)
}
true
}

Expand All @@ -763,6 +772,9 @@ private[deploy] class Master(
worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
if (reverseProxy) {
webUi.removeProxyTargets(worker.id)
}
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
Expand Down Expand Up @@ -810,6 +822,9 @@ private[deploy] class Master(
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}

private def finishApplication(app: ApplicationInfo) {
Expand All @@ -823,6 +838,9 @@ private[deploy] class Master(
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (reverseProxy) {
webUi.removeProxyTargets(app.id)
}
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
<li><strong>State:</strong> {app.state}</li>
{
if (!app.isFinished) {
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
<li><strong>
<a href={UIUtils.makeHref(parent.master.reverseProxy,
app.id, app.desc.appUiUrl)}>Application Detail UI</a>
</strong></li>
}
}
</ul>
Expand All @@ -100,19 +103,21 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
}

private def executorRow(executor: ExecutorDesc): Seq[Node] = {
val workerUrlRef = UIUtils.makeHref(parent.master.reverseProxy,
executor.worker.id, executor.worker.webUiAddress)
<tr>
<td>{executor.id}</td>
<td>
<a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
<a href={workerUrlRef}>{executor.worker.id}</a>
</td>
<td>{executor.cores}</td>
<td>{executor.memory}</td>
<td>{executor.state}</td>
<td>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
.format(workerUrlRef, executor.application.id, executor.id)}>stdout</a>
<a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
.format(workerUrlRef, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
<a href={worker.webUiAddress}>{worker.id}</a>
<a href={UIUtils.makeHref(parent.master.reverseProxy,
worker.id, worker.webUiAddress)}>{worker.id}</a>
</td>
<td>{worker.host}:{worker.port}</td>
<td>{worker.state}</td>
Expand Down Expand Up @@ -210,7 +211,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
if (app.isFinished) {
app.desc.name
} else {
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
<a href={UIUtils.makeHref(parent.master.reverseProxy,
app.id, app.desc.appUiUrl)}>{app.desc.name}</a>
}
}
</td>
Expand Down Expand Up @@ -244,7 +246,11 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<tr>
<td>{driver.id} {killLink}</td>
<td>{driver.submitDate}</td>
<td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
<td>{driver.worker.map(w =>
<a href=
{UIUtils.makeHref(parent.master.reverseProxy, w.id, w.webUiAddress)}>
{w.id.toString}</a>
).getOrElse("None")}
</td>
<td>{driver.state}</td>
<td sorttable_customkey={driver.desc.cores.toString}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.deploy.master.ui

import scala.collection.mutable.HashMap

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, WebUI}
Expand All @@ -34,6 +38,7 @@ class MasterWebUI(

val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
private val proxyHandlers = new HashMap[String, ServletContextHandler]

initialize()

Expand All @@ -48,6 +53,17 @@ class MasterWebUI(
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}

def addProxyTargets(id: String, target: String): Unit = {
var endTarget = target.stripSuffix("/")
val handler = createProxyHandler("/proxy/" + id, endTarget)
attachHandler(handler)
proxyHandlers(id) = handler
}

def removeProxyTargets(id: String): Unit = {
proxyHandlers.remove(id).foreach(detachHandler)
}
}

private[master] object MasterWebUI {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ private[deploy] class ExecutorRunner(

// Add webUI log urls
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ private[deploy] class Worker(
activeMasterWebUiUrl = uiUrl
master = Some(masterRef)
connected = true
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId")
}
// Cancel any outstanding re-registration attempts because we found a new master
cancelLastRegistrationRetry()
}
Expand Down
85 changes: 85 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.xml.Node

import org.eclipse.jetty.client.api.Response
import org.eclipse.jetty.proxy.ProxyServlet
import org.eclipse.jetty.server.{Request, Server, ServerConnector}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.servlet._
Expand Down Expand Up @@ -186,6 +188,47 @@ private[spark] object JettyUtils extends Logging {
contextHandler
}

/** Create a handler for proxying request to Workers and Application Drivers */
def createProxyHandler(
prefix: String,
target: String): ServletContextHandler = {
val servlet = new ProxyServlet {
override def rewriteTarget(request: HttpServletRequest): String = {
val rewrittenURI = createProxyURI(
prefix, target, request.getRequestURI(), request.getQueryString())
if (rewrittenURI == null) {
return null
}
if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) {
return null
}
rewrittenURI.toString()
}

override def filterServerResponseHeader(
clientRequest: HttpServletRequest,
serverResponse: Response,
headerName: String,
headerValue: String): String = {
if (headerName.equalsIgnoreCase("location")) {
val newHeader = createProxyLocationHeader(
prefix, headerValue, clientRequest, serverResponse.getRequest().getURI())
if (newHeader != null) {
return newHeader
}
}
super.filterServerResponseHeader(
clientRequest, serverResponse, headerName, headerValue)
}
}

val contextHandler = new ServletContextHandler
val holder = new ServletHolder(servlet)
contextHandler.setContextPath(prefix)
contextHandler.addServlet(holder, "/")
contextHandler
}

/** Add filters, if any, to the given list of ServletContextHandlers */
def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
Expand Down Expand Up @@ -332,6 +375,48 @@ private[spark] object JettyUtils extends Logging {
redirectHandler
}

def createProxyURI(prefix: String, target: String, path: String, query: String): URI = {
if (!path.startsWith(prefix)) {
return null
}

val uri = new StringBuilder(target)
val rest = path.substring(prefix.length())

if (!rest.isEmpty()) {
if (!rest.startsWith("/")) {
uri.append("/")
}
uri.append(rest)
}

val rewrittenURI = URI.create(uri.toString())
if (query != null) {
return new URI(
rewrittenURI.getScheme(),
rewrittenURI.getAuthority(),
rewrittenURI.getPath(),
query,
rewrittenURI.getFragment()
).normalize()
}
rewrittenURI.normalize()
}

def createProxyLocationHeader(
prefix: String,
headerValue: String,
clientRequest: HttpServletRequest,
targetUri: URI): String = {
val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority()
if (headerValue.startsWith(toReplace)) {
clientRequest.getScheme() + "://" + clientRequest.getHeader("host") +
prefix + headerValue.substring(toReplace.length())
} else {
null
}
}

// Create a new URI from the arguments, handling IPv6 host encoding and default ports.
private def createRedirectURI(
scheme: String, server: String, port: Int, path: String, query: String) = {
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,16 @@ private[spark] object UIUtils extends Logging {

def getTimeZoneOffset() : Int =
TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 1000 / 60

/**
* Return the correct Href after checking if master is running in the
* reverse proxy mode or not.
*/
def makeHref(proxy: Boolean, id: String, origHref: String): String = {
if (proxy) {
s"/proxy/$id"
} else {
origHref
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,33 @@ class MasterSuite extends SparkFunSuite
}
}

test("master/worker web ui available with reverseProxy") {
implicit val formats = org.json4s.DefaultFormats
val reverseProxyUrl = "http://localhost:8080"
val conf = new SparkConf()
conf.set("spark.ui.reverseProxy", "true")
conf.set("spark.ui.reverseProxyUrl", reverseProxyUrl)
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
try {
eventually(timeout(5 seconds), interval(100 milliseconds)) {
val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
.getLines().mkString("\n")
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
val JString(workerId) = workerSummaryJson \ "id"
val url = s"http://localhost:${localCluster.masterWebUIPort}/proxy/${workerId}/json"
val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
(workerResponse \ "cores").extract[Int] should be (2)
(workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl)
}
}
} finally {
localCluster.stop()
}
}

test("basic scheduling - spread out") {
basicScheduling(spreadOut = true)
}
Expand Down
Loading

0 comments on commit 92ce8d4

Please sign in to comment.