Skip to content

Commit

Permalink
KE-36175 [FOLLOWUP] fix read-write separation mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Gan authored and yugan95 committed Aug 15, 2022
1 parent 752156c commit 854e3b9
Showing 1 changed file with 63 additions and 30 deletions.
93 changes: 63 additions & 30 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import com.google.common.collect.Maps
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
Expand Down Expand Up @@ -146,9 +147,6 @@ private[spark] class SparkHadoopUtil extends Logging {
return
}
val cacheKeys = getCacheKeys(fsCache)
if (cacheKeys.isEmpty) {
return
}
val current = UserGroupInformation.getCurrentUser
cacheKeys.map(getUGI) //
.filterNot(_ == null) //
Expand All @@ -162,10 +160,10 @@ private[spark] class SparkHadoopUtil extends Logging {
.filterNot(_.getKind == null) //
.filterNot(_.getService == null)
// For potential private tokens.
updatePrivateTokens(ugi, tokens)
updateTokens(ugi, tokens)
} catch {
case e: Exception =>
logWarning("Failed to update private tokens, hadoop version not supported.", e)
logWarning("Failed to update tokens, hadoop version not supported.", e)
}

// For tgt & public credentials.
Expand Down Expand Up @@ -194,31 +192,49 @@ private[spark] class SparkHadoopUtil extends Logging {
ugiField.get(key).asInstanceOf[UserGroupInformation]
}

private def updatePrivateTokens(ugi: UserGroupInformation, //
tokens: Iterable[Token[_ <: TokenIdentifier]]): Unit = {
private def updateTokens(ugi: UserGroupInformation, //
tokens: Iterable[Token[_ <: TokenIdentifier]]): Unit = {
val creds = getCredentialsInternal(ugi)
if (creds == null) {
return
}
val keys = getTokenKeys(creds)
if (keys.isEmpty) {
return
}
val oldInternalTokens = getTokenMapInternal(creds)
// For potential private tokens.
tokens.foreach { token =>
val kind = token.getKind
val service = token.getService
keys.foreach { key =>
val otk = creds.getToken(key)
if (otk == null || otk.getKind == null || otk.getService == null //
|| !otk.getKind.equals(kind) || otk.getService.equals(service)) {
return
}
val tk = new Token(token)
tk.setService(otk.getService)
ugi.addToken(key, tk)
}
}
tokens.foreach(token =>
getSentinelToken(token, oldInternalTokens)
.foreach(sentinelToken => //
updateTokensInternal(token, ugi, sentinelToken, oldInternalTokens)))
}

private def updateTokensInternal(token: Token[_ <: TokenIdentifier], //
ugi: UserGroupInformation, //
sentinelToken: Token[_ <: TokenIdentifier], //
oldInternalTokens: //
Map[Text, Token[_ <: TokenIdentifier]]): Unit = {
getDelegationTokenIdentifier(token).map(_.getSequenceNumber)
.foreach(dtSeq => getDelegationTokenIdentifier(sentinelToken).map(_.getSequenceNumber)
.foreach(sentinelSeq => //
oldInternalTokens.filter(e => token.getKind.equals(e._2.getKind)) //
.filterNot(e => token.getService.equals(e._2.getService)) //
.foreach { e => //
val key = e._1
val otk = e._2
getDelegationTokenIdentifier(otk).map(_.getSequenceNumber) //
.foreach { odtSeq =>
if (odtSeq == sentinelSeq && odtSeq < dtSeq) {
val tk = new Token(token)
tk.setService(otk.getService)
ugi.addToken(key, tk)
}
}
}))
}

private def getSentinelToken(token: Token[_ <: TokenIdentifier], //
internalTokenMap: Map[Text, Token[_ <: TokenIdentifier]]): //
Option[Token[_ <: TokenIdentifier]] = {
internalTokenMap.values //
.find(otk => token.getKind.equals(otk.getKind) && token.getService.equals(otk.getService))
}

private def getCredentialsInternal(ugi: UserGroupInformation): Credentials = {
Expand All @@ -227,14 +243,31 @@ private[spark] class SparkHadoopUtil extends Logging {
credsMethod.invoke(ugi).asInstanceOf[Credentials]
}

private def getTokenKeys(creds: Credentials): Iterable[Text] = {
private def getTokenMapInternal(creds: Credentials): Map[Text, Token[_ <: TokenIdentifier]] = {
val mapFiled = classOf[Credentials].getDeclaredField("tokenMap")
mapFiled.setAccessible(true)
val tokenMap = mapFiled.get(creds).asInstanceOf[java.util.Map[Text, Object]]
if (tokenMap == null) {
return Iterable.empty
val internalTokenMap = mapFiled.get(creds) //
.asInstanceOf[java.util.Map[Text, Token[_ <: TokenIdentifier]]]
if (internalTokenMap == null) {
return Map.empty
}
val tokenMap = Maps.newHashMap[Text, Token[_ <: TokenIdentifier]](internalTokenMap)
tokenMap.asScala.toMap
}

private def getDelegationTokenIdentifier(token: Token[_ <: TokenIdentifier]): //
Option[AbstractDelegationTokenIdentifier] = {
try {
val ti = token.decodeIdentifier()
ti match {
case dt: AbstractDelegationTokenIdentifier => Some(dt)
case _ => None
}
} catch {
case e: IOException =>
logDebug(s"Failed to decode $token", e)
None
}
Collections.unmodifiableCollection(tokenMap.keySet()).asScala
}

def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
Expand Down

0 comments on commit 854e3b9

Please sign in to comment.