Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Use logDebug
Write new functions to return the replaceFunc
Use URI to parse the scheme and bucket

Signed-off-by: Gary Shen <gashen@nvidia.com>
  • Loading branch information
GaryShen2008 committed Jul 18, 2022
1 parent cc5d956 commit 43a06c9
Showing 1 changed file with 66 additions and 58 deletions.
124 changes: 66 additions & 58 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/AlluxioUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object AlluxioUtils extends Logging {
// /local_path on /
for (line <- output) {
val items = line.trim.split(" +")
logInfo(line)
logDebug(line)
if (items.length >= 3) {
val uri = new URI(items(0))
// record all the mounted point which has scheme like s3 s3a or gs
Expand All @@ -101,13 +101,12 @@ object AlluxioUtils extends Logging {
}

private def getSchemeAndBucketFromPath(path: String) : (String, String) = {
val i = path.split("//")
val scheme = i(0)
if (i.length <= 1) {
val uri = new URI(path)
// the bucket is the host in URI
if (uri.getScheme == null || uri.getHost == null) {
throw new RuntimeException(s"path $path is not expected for Alluxio auto mount")
}
val bucket = i(1).split("/")(0)
(scheme, bucket)
(uri.getScheme, uri.getHost)
}

private def runAlluxioCmd(param : String) : (Int,
Expand All @@ -131,7 +130,7 @@ object AlluxioUtils extends Logging {
secret_key: Option[String]): Unit = {
// to match the output of alluxio fs mount, append / to remote_path
// and add / before bucket name for absolute path in Alluxio
val remote_path = scheme + "//" + bucket + "/"
val remote_path = scheme + "://" + bucket + "/"
val local_bucket = "/" + bucket
this.synchronized {
if (!mountedBuckets.contains(local_bucket)) {
Expand Down Expand Up @@ -181,6 +180,64 @@ object AlluxioUtils extends Logging {
}
}

def genFuncForPathReplacement(alluxioPathsReplace: Option[Seq[String]]) : Option[Path => Path] = {
// alluxioPathsReplace: Seq("key->value", "key1->value1")
// turn the rules to the Map with eg
// { s3://foo -> alluxio://0.1.2.3:19998/foo,
// gs://bar -> alluxio://0.1.2.3:19998/bar }
val replaceMapOption = alluxioPathsReplace.map(rules => {
rules.map(rule => {
val split = rule.split("->")
if (split.size == 2) {
split(0).trim -> split(1).trim
} else {
throw new IllegalArgumentException(s"Invalid setting for " +
s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}")
}
}).toMap
})
if (replaceMapOption.isDefined) {
Some((f: Path) => {
val pathStr = f.toString
val matchedSet = replaceMapOption.get.filter(a => pathStr.startsWith(a._1))
if (matchedSet.size > 1) {
// never reach here since replaceMap is a Map
throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " +
s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule " +
s"for each file path")
} else if (matchedSet.size == 1) {
new Path(pathStr.replaceFirst(matchedSet.head._1, matchedSet.head._2))
} else {
f
}
})
} else {
None
}
}

def genFuncForAutoMountReplacement(conf: RapidsConf, relation: HadoopFsRelation,
alluxioBucketRegex: String) : Option[Path => Path] = {
Some((f: Path) => {
val pathStr = f.toString
if (pathStr.matches(alluxioBucketRegex)) {
initAlluxioInfo(conf)
val (access_key, secret_key) = getKeyAndSecret(relation)

val (scheme, bucket) = getSchemeAndBucketFromPath(pathStr)
autoMountBucket(scheme, bucket, access_key, secret_key)

// replace s3://foo/.. to alluxio://alluxioMasterHost/foo/...
val newPath = new Path(pathStr.replaceFirst(
scheme + "/", "alluxio://" + alluxioMasterHost.get))
logInfo(s"Replace $pathStr to ${newPath.toString}")
newPath
} else {
f
}
})
}

def replacePathIfNeeded(
conf: RapidsConf,
relation: HadoopFsRelation,
Expand All @@ -192,58 +249,9 @@ object AlluxioUtils extends Logging {
val alluxioBucketRegex: String = conf.getAlluxioBucketRegex

val replaceFunc = if (alluxioPathsReplace.isDefined) {
// alluxioPathsReplace: Seq("key->value", "key1->value1")
// turn the rules to the Map with eg
// { s3://foo -> alluxio://0.1.2.3:19998/foo,
// gs://bar -> alluxio://0.1.2.3:19998/bar }
val replaceMapOption = alluxioPathsReplace.map(rules => {
rules.map(rule => {
val split = rule.split("->")
if (split.size == 2) {
split(0).trim -> split(1).trim
} else {
throw new IllegalArgumentException(s"Invalid setting for " +
s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}")
}
}).toMap
})
if (replaceMapOption.isDefined) {
Some((f: Path) => {
val pathStr = f.toString
val matchedSet = replaceMapOption.get.filter(a => pathStr.startsWith(a._1))
if (matchedSet.size > 1) {
// never reach here since replaceMap is a Map
throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " +
s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule " +
s"for each file path")
} else if (matchedSet.size == 1) {
new Path(pathStr.replaceFirst(matchedSet.head._1, matchedSet.head._2))
} else {
f
}
})
} else {
None
}
genFuncForPathReplacement(alluxioPathsReplace)
} else if (alluxioAutoMountEnabled) {
Some((f: Path) => {
val pathStr = f.toString
if (pathStr.matches(alluxioBucketRegex)) {
initAlluxioInfo(conf)
val (access_key, secret_key) = getKeyAndSecret(relation)

val (scheme, bucket) = getSchemeAndBucketFromPath(pathStr)
autoMountBucket(scheme, bucket, access_key, secret_key)

// replace s3://foo/.. to alluxio://alluxioMasterHost/foo/...
val newPath = new Path(pathStr.replaceFirst(
scheme + "/", "alluxio://" + alluxioMasterHost.get))
logInfo(s"Replace $pathStr to ${newPath.toString}")
newPath
} else {
f
}
})
genFuncForAutoMountReplacement(conf, relation, alluxioBucketRegex)
} else {
None
}
Expand Down

0 comments on commit 43a06c9

Please sign in to comment.