Skip to content

Commit

Permalink
KE-36175 [FOLLOWUP] fix read-write separation mode when application r…
Browse files Browse the repository at this point in the history
…estart
  • Loading branch information
Yu Gan authored and yugan95 committed Aug 18, 2022
1 parent 854e3b9 commit 0b78093
Showing 1 changed file with 57 additions and 37 deletions.
94 changes: 57 additions & 37 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.deploy
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Collections, Date, Locale}
import java.util.{Arrays, Collections, Date, Locale, Objects}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
Expand Down Expand Up @@ -138,27 +138,26 @@ private[spark] class SparkHadoopUtil extends Logging {
}

def addCurrentUserCredentials(creds: Credentials): Unit = {
updateUGICreds(UserGroupInformation.getCurrentUser, creds)
updateCredentials(UserGroupInformation.getCurrentUser, creds)
}

private def addNonCurrentCredentials(creds: Credentials): Unit = {
private def backportCredentials(creds: Credentials): Unit = {
val fsCache = getFileSystemCache
if (fsCache == null) {
return
}
val cacheKeys = getCacheKeys(fsCache)
val current = UserGroupInformation.getCurrentUser
cacheKeys.map(getUGI) //
.filterNot(_ == null) //
.filter(Objects.nonNull(_))
.filterNot(_.equals(current)) //
.foreach(updateUGICreds(_, creds))
.foreach(updateCredentials(_, creds))
}

private def updateUGICreds(ugi: UserGroupInformation, creds: Credentials): Unit = {
private def updateCredentials(ugi: UserGroupInformation, creds: Credentials): Unit = {
try {
val tokens = creds.getAllTokens.asScala //
.filterNot(_.getKind == null) //
.filterNot(_.getService == null)
.filter(tk => Objects.nonNull(tk.getKind) && Objects.nonNull(tk.getService))
// For potential private tokens.
updateTokens(ugi, tokens)
} catch {
Expand Down Expand Up @@ -200,41 +199,62 @@ private[spark] class SparkHadoopUtil extends Logging {
}
val oldInternalTokens = getTokenMapInternal(creds)
// For potential private tokens.
tokens.foreach(token =>
getSentinelToken(token, oldInternalTokens)
.foreach(sentinelToken => //
updateTokensInternal(token, ugi, sentinelToken, oldInternalTokens)))
tokens.foreach(token => updateTokensInternal(token, ugi, oldInternalTokens))
}

private def updateTokensInternal(token: Token[_ <: TokenIdentifier], //
ugi: UserGroupInformation, //
sentinelToken: Token[_ <: TokenIdentifier], //
oldInternalTokens: //
Map[Text, Token[_ <: TokenIdentifier]]): Unit = {
getDelegationTokenIdentifier(token).map(_.getSequenceNumber)
.foreach(dtSeq => getDelegationTokenIdentifier(sentinelToken).map(_.getSequenceNumber)
.foreach(sentinelSeq => //
oldInternalTokens.filter(e => token.getKind.equals(e._2.getKind)) //
.filterNot(e => token.getService.equals(e._2.getService)) //
.foreach { e => //
val key = e._1
val otk = e._2
getDelegationTokenIdentifier(otk).map(_.getSequenceNumber) //
.foreach { odtSeq =>
if (odtSeq == sentinelSeq && odtSeq < dtSeq) {
val tk = new Token(token)
tk.setService(otk.getService)
ugi.addToken(key, tk)
}
}
}))
}

private def getSentinelToken(token: Token[_ <: TokenIdentifier], //
internalTokenMap: Map[Text, Token[_ <: TokenIdentifier]]): //
Option[Token[_ <: TokenIdentifier]] = {
internalTokenMap.values //
.find(otk => token.getKind.equals(otk.getKind) && token.getService.equals(otk.getService))
.foreach(dtSeq => //
oldInternalTokens.foreach { case (key, otk) => //
if (!Objects.equals(token.getKind, otk.getKind)) {
return
}
getDelegationTokenIdentifier(otk).map(_.getSequenceNumber)
.foreach { odtSeq => //
if (odtSeq < dtSeq && isPrivateCloneOf(otk, token.getService)) {
privateClone(token, otk.getService).foreach(tk => ugi.addToken(key, tk))
}
}
})
}


private def isPrivateCloneOf(token: Token[_ <: TokenIdentifier], service: Text): Boolean = {
try {
val privateMethod = token.getClass.getDeclaredMethod("isPrivateCloneOf")
privateMethod.setAccessible(true)
val tkObj = privateMethod.invoke(token, service)
if (Objects.isNull(tkObj)) {
return false
}
tkObj.asInstanceOf[Boolean]
} catch {
case e: NoSuchMethodException =>
logDebug(s"Failed to get method 'isPrivateCloneOf', " + //
s"hadoop version not supported (since 2.8.2).")
false
}
}

private def privateClone(token: Token[_ <: TokenIdentifier], //
service: Text): Option[Token[_ <: TokenIdentifier]] = { //
try {
val cloneMethod = token.getClass.getDeclaredMethod("privateClone")
cloneMethod.setAccessible(true)
val tkObj = cloneMethod.invoke(token, service)
if (Objects.isNull(tkObj)) {
return None
}
Some(tkObj.asInstanceOf[Token[_ <: TokenIdentifier]])
} catch {
case e: NoSuchMethodException =>
logDebug(s"Failed to get method 'privateClone', " + //
s"hadoop version not supported (since 2.8.2).")
None
}
}

private def getCredentialsInternal(ugi: UserGroupInformation): Credentials = {
Expand Down Expand Up @@ -297,7 +317,7 @@ private[spark] class SparkHadoopUtil extends Logging {

try {
// Potential FileSystem$CACHE non current UGIs.
addNonCurrentCredentials(creds)
backportCredentials(creds)
} catch {
case e: Exception =>
logWarning(s"Failed to update non current user credentials.", e)
Expand Down

0 comments on commit 0b78093

Please sign in to comment.