From 43a06c974fc378fdaa8aca1046a810aa9f44cfcf Mon Sep 17 00:00:00 2001 From: Gary Shen Date: Mon, 18 Jul 2022 11:41:02 +0800 Subject: [PATCH] Address comments Use logDebug Write new functions to return the replaceFunc Use URI to parse the scheme and bucket Signed-off-by: Gary Shen --- .../nvidia/spark/rapids/AlluxioUtils.scala | 124 ++++++++++-------- 1 file changed, 66 insertions(+), 58 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AlluxioUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AlluxioUtils.scala index 413289db7743..61074105bdcd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AlluxioUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AlluxioUtils.scala @@ -81,7 +81,7 @@ object AlluxioUtils extends Logging { // /local_path on / for (line <- output) { val items = line.trim.split(" +") - logInfo(line) + logDebug(line) if (items.length >= 3) { val uri = new URI(items(0)) // record all the mounted point which has scheme like s3 s3a or gs @@ -101,13 +101,12 @@ object AlluxioUtils extends Logging { } private def getSchemeAndBucketFromPath(path: String) : (String, String) = { - val i = path.split("//") - val scheme = i(0) - if (i.length <= 1) { + val uri = new URI(path) + // the bucket is the host in URI + if (uri.getScheme == null || uri.getHost == null) { throw new RuntimeException(s"path $path is not expected for Alluxio auto mount") } - val bucket = i(1).split("/")(0) - (scheme, bucket) + (uri.getScheme, uri.getHost) } private def runAlluxioCmd(param : String) : (Int, @@ -131,7 +130,7 @@ object AlluxioUtils extends Logging { secret_key: Option[String]): Unit = { // to match the output of alluxio fs mount, append / to remote_path // and add / before bucket name for absolute path in Alluxio - val remote_path = scheme + "//" + bucket + "/" + val remote_path = scheme + "://" + bucket + "/" val local_bucket = "/" + bucket this.synchronized { if (!mountedBuckets.contains(local_bucket)) { @@ -181,6 +180,64 @@ object AlluxioUtils extends Logging { } } + def genFuncForPathReplacement(alluxioPathsReplace: Option[Seq[String]]) : Option[Path => Path] = { + // alluxioPathsReplace: Seq("key->value", "key1->value1") + // turn the rules to the Map with eg + // { s3://foo -> alluxio://0.1.2.3:19998/foo, + // gs://bar -> alluxio://0.1.2.3:19998/bar } + val replaceMapOption = alluxioPathsReplace.map(rules => { + rules.map(rule => { + val split = rule.split("->") + if (split.size == 2) { + split(0).trim -> split(1).trim + } else { + throw new IllegalArgumentException(s"Invalid setting for " + + s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}") + } + }).toMap + }) + if (replaceMapOption.isDefined) { + Some((f: Path) => { + val pathStr = f.toString + val matchedSet = replaceMapOption.get.filter(a => pathStr.startsWith(a._1)) + if (matchedSet.size > 1) { + // never reach here since replaceMap is a Map + throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " + + s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule " + + s"for each file path") + } else if (matchedSet.size == 1) { + new Path(pathStr.replaceFirst(matchedSet.head._1, matchedSet.head._2)) + } else { + f + } + }) + } else { + None + } + } + + def genFuncForAutoMountReplacement(conf: RapidsConf, relation: HadoopFsRelation, + alluxioBucketRegex: String) : Option[Path => Path] = { + Some((f: Path) => { + val pathStr = f.toString + if (pathStr.matches(alluxioBucketRegex)) { + initAlluxioInfo(conf) + val (access_key, secret_key) = getKeyAndSecret(relation) + + val (scheme, bucket) = getSchemeAndBucketFromPath(pathStr) + autoMountBucket(scheme, bucket, access_key, secret_key) + + // replace s3://foo/.. to alluxio://alluxioMasterHost/foo/... + val newPath = new Path(pathStr.replaceFirst( + scheme + "/", "alluxio://" + alluxioMasterHost.get)) + logInfo(s"Replace $pathStr to ${newPath.toString}") + newPath + } else { + f + } + }) + } + def replacePathIfNeeded( conf: RapidsConf, relation: HadoopFsRelation, @@ -192,58 +249,9 @@ object AlluxioUtils extends Logging { val alluxioBucketRegex: String = conf.getAlluxioBucketRegex val replaceFunc = if (alluxioPathsReplace.isDefined) { - // alluxioPathsReplace: Seq("key->value", "key1->value1") - // turn the rules to the Map with eg - // { s3://foo -> alluxio://0.1.2.3:19998/foo, - // gs://bar -> alluxio://0.1.2.3:19998/bar } - val replaceMapOption = alluxioPathsReplace.map(rules => { - rules.map(rule => { - val split = rule.split("->") - if (split.size == 2) { - split(0).trim -> split(1).trim - } else { - throw new IllegalArgumentException(s"Invalid setting for " + - s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}") - } - }).toMap - }) - if (replaceMapOption.isDefined) { - Some((f: Path) => { - val pathStr = f.toString - val matchedSet = replaceMapOption.get.filter(a => pathStr.startsWith(a._1)) - if (matchedSet.size > 1) { - // never reach here since replaceMap is a Map - throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " + - s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule " + - s"for each file path") - } else if (matchedSet.size == 1) { - new Path(pathStr.replaceFirst(matchedSet.head._1, matchedSet.head._2)) - } else { - f - } - }) - } else { - None - } + genFuncForPathReplacement(alluxioPathsReplace) } else if (alluxioAutoMountEnabled) { - Some((f: Path) => { - val pathStr = f.toString - if (pathStr.matches(alluxioBucketRegex)) { - initAlluxioInfo(conf) - val (access_key, secret_key) = getKeyAndSecret(relation) - - val (scheme, bucket) = getSchemeAndBucketFromPath(pathStr) - autoMountBucket(scheme, bucket, access_key, secret_key) - - // replace s3://foo/.. to alluxio://alluxioMasterHost/foo/... - val newPath = new Path(pathStr.replaceFirst( - scheme + "/", "alluxio://" + alluxioMasterHost.get)) - logInfo(s"Replace $pathStr to ${newPath.toString}") - newPath - } else { - f - } - }) + genFuncForAutoMountReplacement(conf, relation, alluxioBucketRegex) } else { None }