Skip to content

Commit

Permalink
Add Alluxio auto mount feature
Browse files Browse the repository at this point in the history
Mount the cloud bucket to Alluxio when driver converts FileSourceScanExec to GPU plan
The Alluxio master should be the same node as Spark driver node when using this feature
Introduce new configs:
    spark.rapids.alluxio.automount.enabled
    spark.rapids.alluxio.bucket.regex
    spark.rapids.alluxio.mount.cmd

Signed-off-by: Gary Shen <gashen@nvidia.com>
  • Loading branch information
GaryShen2008 committed Jun 28, 2022
1 parent 30467f2 commit eeebe92
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 54 deletions.
5 changes: 4 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ scala> spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)

Name | Description | Default Value
-----|-------------|--------------
<a name="alluxio.pathsToReplace"></a>spark.rapids.alluxio.pathsToReplace|List of paths to be replaced with corresponding alluxio scheme. Eg, when configureis set to "s3:/foo->alluxio://0.1.2.3:19998/foo,gcs:/bar->alluxio://0.1.2.3:19998/bar", which means: s3:/foo/a.csv will be replaced to alluxio://0.1.2.3:19998/foo/a.csv and gcs:/bar/b.csv will be replaced to alluxio://0.1.2.3:19998/bar/b.csv|None
<a name="alluxio.automount.enabled"></a>spark.rapids.alluxio.automount.enabled|Enable the feature of auto mounting the cloud storage to Alluxio. It requires the Alluxio master is the same node of Spark driver node. When it's true, it requires an environment variable ALLUXIO_HOME be set properly. The Alluxio master's host and port will be read from alluxio.master.hostname and alluxio.master.rpc.port(default: 19998) from ALLUXIO_HOME/conf/alluxio-site.properties, then replace a cloud path which matches spark.rapids.alluxio.bucket.regex like "s3://bar/b.csv" to "alluxio://0.1.2.3:19998/bar/b.csv", and the bucket "s3://bar" will be mounted to "/bar" in Alluxio automatically.|false
<a name="alluxio.bucket.regex"></a>spark.rapids.alluxio.bucket.regex|A regex to decide which bucket should be auto-mounted to Alluxio. E.g. when setting as "^s3://bucket.*", the bucket which starts with "s3://bucket" will be mounted to Alluxio and the path "s3://bucket-foo/a.csv" will be replaced to "alluxio://0.1.2.3:19998/bucket-foo/a.csv". It's only valid when setting spark.rapids.alluxio.automount.enabled=true. The default value matches all the buckets in "s3://" or "s3a://" scheme.|^s3a{0,1}://.*
<a name="alluxio.mount.cmd"></a>spark.rapids.alluxio.mount.cmd|Provide the command to mount a cloud path to Alluxio. E.g. "su,ubuntu,-c,/opt/alluxio-2.8.0/bin/alluxio fs mount --readonly", it means: run Process(Seq("su", "ubuntu", "-c", "/opt/alluxio-2.8.0/bin/alluxio fs mount --readonly /bucket-foo s3://bucket-foo")), the delimiter "," is used to convert to Seq[String] when you need to use a special user to run the mount command.|None
<a name="alluxio.pathsToReplace"></a>spark.rapids.alluxio.pathsToReplace|List of paths to be replaced with corresponding alluxio scheme. E.g. when configure is set to "s3://foo->alluxio://0.1.2.3:19998/foo,gs://bar->alluxio://0.1.2.3:19998/bar", it means: "s3://foo/a.csv" will be replaced to "alluxio://0.1.2.3:19998/foo/a.csv" and "gs://bar/b.csv" will be replaced to "alluxio://0.1.2.3:19998/bar/b.csv". To use this config, you have to mount the buckets to Alluxio by yourself. If you set this config, spark.rapids.alluxio.automount.enabled won't be valid.|None
<a name="cloudSchemes"></a>spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: abfs, abfss, dbfs, gs, s3, s3a, s3n, wasbs. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None
<a name="gpu.resourceName"></a>spark.rapids.gpu.resourceName|The name of the Spark resource that represents a GPU that you want the plugin to use if using custom resources with Spark.|gpu
<a name="memory.gpu.allocFraction"></a>spark.rapids.memory.gpu.allocFraction|The fraction of available (free) GPU memory that should be allocated for pooled memory. This must be less than or equal to the maximum limit configured via spark.rapids.memory.gpu.maxAllocFraction, and greater than or equal to the minimum limit configured via spark.rapids.memory.gpu.minAllocFraction.|1.0
Expand Down
199 changes: 151 additions & 48 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/AlluxioUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,92 +16,195 @@

package com.nvidia.spark.rapids

import scala.io.Source
import scala.sys.process.Process

import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex}
import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils

object AlluxioUtils {
object AlluxioUtils extends Logging {
val mountedBuckets: scala.collection.mutable.Map[String, String] = scala.collection.mutable.Map()
var alluxioMountCmd: Option[Seq[String]] = None
var alluxioMasterHost: Option[String] = None

// Read out alluxio.master.hostname, alluxio.master.rpc.port
// from Alluxio's conf alluxio-site.properties
// We require an environment variable "ALLUXIO_HOME"
// This function will only read once from ALLUXIO/conf.
private def getAlluxioMasterHost() : Unit = {
if (alluxioMasterHost.isEmpty) {
var host = ""
var port = "19998"
val alluxio_home = scala.util.Properties.envOrNone("ALLUXIO_HOME")
if (alluxio_home.isEmpty) {
throw new RuntimeException("No environment variable ALLUXIO_HOME is set.")
}
val buffered_source = Source.fromFile(alluxio_home.get + "/conf/alluxio-site.properties")
try {
for (line <- buffered_source.getLines) {
if (line.startsWith("alluxio.master.hostname")) {
host = line.split('=')(1).trim
} else if (line.startsWith("alluxio.master.rpc.port")) {
port = line.split('=')(1).trim
}
}
} finally {
buffered_source.close
}

if (host.isEmpty) {
throw new RuntimeException(
"Can't find alluxio.master.hostname from ALLUXIO_HOME/conf/alluxio-site.properties.")
}
alluxioMasterHost = Some(host + ":" + port)
}
}

private def getSchemeAndBucketFromPath(path: String) : (String, String) = {
val i = path.split("//")
val scheme = i(0)
if (i.length <= 1) {
throw new RuntimeException(s"path $path is not expected for Alluxio auto mount")
}
val bucket = i(1).split("/")(0)
(scheme, bucket)
}

// path is like "s3://foo/test...", it mounts bucket "foo" by calling the alluxio CLI
def autoMountBucket(scheme: String, bucket: String): Unit = {
val remote_path = scheme + "//" + bucket
if (!mountedBuckets.contains(bucket)) {
// not mount yet, call mount command
val command : Seq[String] = if (alluxioMountCmd.isDefined) {
alluxioMountCmd.get
} else {
Seq("su", "ubuntu", "-c", "/opt/alluxio-2.8.0/bin/alluxio fs mount --readonly")
}

val params = command.tails.collect{
case Seq(first, _, _*) => first
case Seq(last) => last + s" /$bucket $remote_path"
}.toSeq
logInfo(s"Run command $params")
val output = Process(params).!
if (output != 0) {
throw new RuntimeException(s"Mount bucket $bucket failed $output")
}
logInfo(s"Mounted remote $remote_path to /$bucket in Alluxio")
mountedBuckets(bucket) = remote_path
} else if (mountedBuckets(bucket).equals(remote_path)) {
logInfo(s"Already mounted remote $remote_path to /$bucket in Alluxio")
} else {
throw new RuntimeException(s"Found a same bucket name in $remote_path " +
s"and ${mountedBuckets(bucket)}")
}
}

def replacePathIfNeeded(
conf: RapidsConf,
relation: HadoopFsRelation,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): FileIndex = {

val alluxioPathsReplace: Option[Seq[String]] = conf.getAlluxioPathsToReplace
val alluxioAutoMountEnabled = conf.getAlluxioAutoMountEnabled
val alluxioBucketRegex: String = conf.getAlluxioBucketRegex
alluxioMountCmd = conf.getAlluxioMountCmd

if (alluxioPathsReplace.isDefined) {
val replaceFunc = if (alluxioPathsReplace.isDefined) {
// alluxioPathsReplace: Seq("key->value", "key1->value1")
// turn the rules to the Map with eg
// { s3:/foo -> alluxio://0.1.2.3:19998/foo,
// gs:/bar -> alluxio://0.1.2.3:19998/bar,
// /baz -> alluxio://0.1.2.3:19998/baz }
// { s3://foo -> alluxio://0.1.2.3:19998/foo,
// gs://bar -> alluxio://0.1.2.3:19998/bar,
// s3://baz -> alluxio://0.1.2.3:19998/baz }
val replaceMapOption = alluxioPathsReplace.map(rules => {
rules.map(rule => {
val split = rule.split("->")
if (split.size == 2) {
split(0).trim -> split(1).trim
} else {
throw new IllegalArgumentException(s"Invalid setting for " +
s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}")
s"${RapidsConf.ALLUXIO_PATHS_REPLACE.key}")
}
}).toMap
})

replaceMapOption.map(replaceMap => {

def isDynamicPruningFilter(e: Expression): Boolean =
e.find(_.isInstanceOf[PlanExpression[_]]).isDefined

val partitionDirs = relation.location.listFiles(
partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)

// replacement func to check if the file path is prefixed with the string user configured
// if yes, replace it
val replaceFunc = (f: Path) => {
if (replaceMapOption.isDefined) {
Some((f: Path) => {
val pathStr = f.toString
val matchedSet = replaceMap.keySet.filter(reg => pathStr.startsWith(reg))
val matchedSet = replaceMapOption.get.filter(a => pathStr.startsWith(a._1))
if (matchedSet.size > 1) {
// never reach here since replaceMap is a Map
throw new IllegalArgumentException(s"Found ${matchedSet.size} same replacing rules " +
s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule " +
s"for each file path")
s"from ${RapidsConf.ALLUXIO_PATHS_REPLACE.key} which requires only 1 rule " +
s"for each file path")
} else if (matchedSet.size == 1) {
new Path(pathStr.replaceFirst(matchedSet.head, replaceMap(matchedSet.head)))
new Path(pathStr.replaceFirst(matchedSet.head._1, matchedSet.head._2))
} else {
f
}
})
} else {
None
}
} else if (alluxioAutoMountEnabled) { // alluxio master host is set
Some((f: Path) => {
val pathStr = f.toString
if (pathStr.matches(alluxioBucketRegex)) {
getAlluxioMasterHost()

val (scheme, bucket) = getSchemeAndBucketFromPath(pathStr)
autoMountBucket(scheme, bucket)

// replace s3:/foo/.. to alluxio://alluxioMasterHost/foo/...
val newPath = new Path(pathStr.replaceFirst(
scheme + "/", "alluxio://" + alluxioMasterHost.get))
logInfo(s"Replace $pathStr to ${newPath.toString}")
newPath
} else {
f
}
})
} else {
None
}

// replace all of input files
val inputFiles: Seq[Path] = partitionDirs.flatMap(partitionDir => {
partitionDir.files.map(f => replaceFunc(f.getPath))
})
if (replaceFunc.isDefined) {
def isDynamicPruningFilter(e: Expression): Boolean =
e.find(_.isInstanceOf[PlanExpression[_]]).isDefined

val partitionDirs = relation.location.listFiles(
partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)

// replace all of rootPaths which are already unique
val rootPaths = relation.location.rootPaths.map(replaceFunc)

val parameters: Map[String, String] = relation.options

// infer PartitionSpec
val partitionSpec = GpuPartitioningUtils.inferPartitioning(
relation.sparkSession,
rootPaths,
inputFiles,
parameters,
Option(relation.dataSchema),
replaceFunc)

// generate a new InMemoryFileIndex holding paths with alluxio schema
new InMemoryFileIndex(
relation.sparkSession,
inputFiles,
parameters,
Option(relation.dataSchema),
userSpecifiedPartitionSpec = Some(partitionSpec))
}).getOrElse(relation.location)
// replace all of input files
val inputFiles: Seq[Path] = partitionDirs.flatMap(partitionDir => {
partitionDir.files.map(f => replaceFunc.get(f.getPath))
})

// replace all of rootPaths which are already unique
val rootPaths = relation.location.rootPaths.map(replaceFunc.get)

val parameters: Map[String, String] = relation.options

// infer PartitionSpec
val partitionSpec = GpuPartitioningUtils.inferPartitioning(
relation.sparkSession,
rootPaths,
inputFiles,
parameters,
Option(relation.dataSchema),
replaceFunc.get)

// generate a new InMemoryFileIndex holding paths with alluxio schema
new InMemoryFileIndex(
relation.sparkSession,
inputFiles,
parameters,
Option(relation.dataSchema),
userSpecifiedPartitionSpec = Some(partitionSpec))
} else {
relation.location
}
Expand Down
53 changes: 48 additions & 5 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1264,11 +1264,48 @@ object RapidsConf {
// ALLUXIO CONFIGS

val ALLUXIO_PATHS_REPLACE = conf("spark.rapids.alluxio.pathsToReplace")
.doc("List of paths to be replaced with corresponding alluxio scheme. Eg, when configure" +
"is set to \"s3:/foo->alluxio://0.1.2.3:19998/foo,gcs:/bar->alluxio://0.1.2.3:19998/bar\", " +
"which means: " +
" s3:/foo/a.csv will be replaced to alluxio://0.1.2.3:19998/foo/a.csv and " +
" gcs:/bar/b.csv will be replaced to alluxio://0.1.2.3:19998/bar/b.csv")
.doc("List of paths to be replaced with corresponding alluxio scheme. " +
"E.g. when configure is set to " +
"\"s3://foo->alluxio://0.1.2.3:19998/foo,gs://bar->alluxio://0.1.2.3:19998/bar\", " +
"it means: " +
"\"s3://foo/a.csv\" will be replaced to \"alluxio://0.1.2.3:19998/foo/a.csv\" and " +
"\"gs://bar/b.csv\" will be replaced to \"alluxio://0.1.2.3:19998/bar/b.csv\". " +
"To use this config, you have to mount the buckets to Alluxio by yourself. " +
"If you set this config, spark.rapids.alluxio.automount.enabled won't be valid.")
.stringConf
.toSequence
.createOptional

val ALLUXIO_AUTOMOUNT_ENABLED = conf("spark.rapids.alluxio.automount.enabled")
.doc("Enable the feature of auto mounting the cloud storage to Alluxio. " +
"It requires the Alluxio master is the same node of Spark driver node. " +
"When it's true, it requires an environment variable ALLUXIO_HOME be set properly. " +
"The Alluxio master's host and port will be read from alluxio.master.hostname and " +
"alluxio.master.rpc.port(default: 19998) from ALLUXIO_HOME/conf/alluxio-site.properties, " +
"then replace a cloud path which matches spark.rapids.alluxio.bucket.regex like " +
"\"s3://bar/b.csv\" to \"alluxio://0.1.2.3:19998/bar/b.csv\", " +
"and the bucket \"s3://bar\" will be mounted to \"/bar\" in Alluxio automatically.")
.booleanConf
.createWithDefault(false)

val ALLUXIO_BUCKET_REGEX = conf("spark.rapids.alluxio.bucket.regex")
.doc("A regex to decide which bucket should be auto-mounted to Alluxio. " +
"E.g. when setting as \"^s3://bucket.*\", " +
"the bucket which starts with \"s3://bucket\" will be mounted to Alluxio " +
"and the path \"s3://bucket-foo/a.csv\" will be replaced to " +
"\"alluxio://0.1.2.3:19998/bucket-foo/a.csv\". " +
"It's only valid when setting spark.rapids.alluxio.automount.enabled=true. " +
"The default value matches all the buckets in \"s3://\" or \"s3a://\" scheme.")
.stringConf
.createWithDefault("^s3a{0,1}://.*")

val ALLUXIO_MOUNT_CMD = conf("spark.rapids.alluxio.mount.cmd")
.doc("Provide the command to mount a cloud path to Alluxio. " +
"E.g. \"su,ubuntu,-c,/opt/alluxio-2.8.0/bin/alluxio fs mount --readonly\", it means: " +
"run Process(Seq(\"su\", \"ubuntu\", \"-c\", " +
"\"/opt/alluxio-2.8.0/bin/alluxio fs mount --readonly /bucket-foo s3://bucket-foo\")), " +
"the delimiter \",\" is used to convert to Seq[String] " +
"when you need to use a special user to run the mount command.")
.stringConf
.toSequence
.createOptional
Expand Down Expand Up @@ -1909,6 +1946,12 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val getAlluxioPathsToReplace: Option[Seq[String]] = get(ALLUXIO_PATHS_REPLACE)

lazy val getAlluxioAutoMountEnabled: Boolean = get(ALLUXIO_AUTOMOUNT_ENABLED)

lazy val getAlluxioBucketRegex: String = get(ALLUXIO_BUCKET_REGEX)

lazy val getAlluxioMountCmd: Option[Seq[String]] = get(ALLUXIO_MOUNT_CMD)

lazy val driverTimeZone: Option[String] = get(DRIVER_TIMEZONE)

lazy val isRangeWindowByteEnabled: Boolean = get(ENABLE_RANGE_WINDOW_BYTES)
Expand Down

0 comments on commit eeebe92

Please sign in to comment.