Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use number of threads on executor instead of driver to set core count #9051

Merged
merged 7 commits into from
Aug 16, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ object GpuDeviceManager extends Logging {
java.lang.Boolean.getBoolean("com.nvidia.spark.rapids.memory.gpu.rmm.init.task")
}

private var numCores = 0

/**
* Get an approximate count on the number of cores this executor will use.
*/
def getNumCores: Int = numCores

// Memory resource used only for cudf::chunked_pack to allocate scratch space
// during spill to host. This is done to set aside some memory for this operation
// from the beginning of the job.
Expand Down Expand Up @@ -141,7 +148,8 @@ object GpuDeviceManager extends Logging {
}

def initializeGpuAndMemory(resources: Map[String, ResourceInformation],
conf: RapidsConf): Unit = {
conf: RapidsConf, numCores: Int): Unit = {
this.numCores = numCores
// as long in execute mode initialize everything because we could enable it after startup
if (conf.isSqlExecuteOnGPU) {
// Set the GPU before RMM is initialized if spark provided the GPU address so that RMM
Expand Down Expand Up @@ -191,14 +199,15 @@ object GpuDeviceManager extends Logging {
/**
* Always set the GPU if it was assigned by Spark and initialize the RMM if its configured
* to do so in the task.
* We expect the plugin to be run with 1 task and 1 GPU per executor.
* We expect the plugin to be run with 1 GPU per executor.
*/
def initializeFromTask(): Unit = {
if (threadGpuInitialized.get() == false) {
val resources = getResourcesFromTaskContext
val conf = new RapidsConf(SparkEnv.get.conf)
if (rmmTaskInitEnabled) {
initializeGpuAndMemory(resources, conf)
val numCores = RapidsPluginUtils.estimateCoresOnExec(SparkEnv.get.conf)
initializeGpuAndMemory(resources, conf, numCores)
} else {
// just set the device if provided so task thread uses right GPU
initializeGpu(resources, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,19 @@ object MultiFileReaderThreadPool extends Logging {

/**
* Get the existing thread pool or create one with the given thread count if it does not exist.
* @note The thread number will be ignored if the thread pool is already created.
* @note The thread number will be ignored if the thread pool is already created, or modified
* if it is not the right size compared to the number of cores available.
*/
def getOrCreateThreadPool(numThreads: Int): ThreadPoolExecutor = {
threadPool.getOrElse(initThreadPool(numThreads))
def getOrCreateThreadPool(numThreadsFromConf: Int): ThreadPoolExecutor = {
threadPool.getOrElse {
val numThreads = Math.max(numThreadsFromConf, GpuDeviceManager.getNumCores)

if (numThreadsFromConf != numThreads) {
logWarning(s"Configuring the file reader thread pool with a max of $numThreads " +
s"threads instead of ${RapidsConf.MULTITHREAD_READ_NUM_THREADS} = $numThreadsFromConf")
}
initThreadPool(numThreads)
}
}
}

Expand Down
38 changes: 24 additions & 14 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ object RapidsPluginUtils extends Logging {
}
}

def fixupConfigs(conf: SparkConf): Unit = {
def estimateCoresOnExec(conf: SparkConf): Int = {
conf.getOption(RapidsPluginUtils.EXECUTOR_CORES_KEY)
.map(_.toInt)
.getOrElse(Runtime.getRuntime.availableProcessors)
}

def fixupConfigsOnDriver(conf: SparkConf): Unit = {
// First add in the SQL executor plugin because that is what we need at a minimum
if (conf.contains(SQL_PLUGIN_CONF_KEY)) {
for (pluginName <- Array(SQL_PLUGIN_NAME, UDF_PLUGIN_NAME)){
Expand Down Expand Up @@ -162,20 +168,21 @@ object RapidsPluginUtils extends Logging {
}
}

// If spark.task.resource.gpu.amount is larger than
// If spark.task.resource.gpu.amount is larger than
// (spark.executor.resource.gpu.amount / spark.executor.cores) then GPUs will be the limiting
// resource for task scheduling.
if (conf.contains(TASK_GPU_AMOUNT_KEY) && conf.contains(EXECUTOR_GPU_AMOUNT_KEY)) {
// resource for task scheduling, but we can only output the warning if executor cores is set
// because this is happening on the driver so the number of cores in the runtime is not
// relevant
if (conf.contains(TASK_GPU_AMOUNT_KEY) &&
conf.contains(EXECUTOR_GPU_AMOUNT_KEY) &&
conf.contains(EXECUTOR_CORES_KEY)) {
val taskGpuAmountSetByUser = conf.get(TASK_GPU_AMOUNT_KEY).toDouble
// get worker's all cores num if spark.executor.cores is not set explicitly
val executorCores = conf.getOption(EXECUTOR_CORES_KEY)
.map(_.toDouble)
.getOrElse(Runtime.getRuntime.availableProcessors.toDouble)
val executorCores = conf.get(EXECUTOR_CORES_KEY).toDouble
val executorGpuAmount = conf.get(EXECUTOR_GPU_AMOUNT_KEY).toDouble
if (executorCores != 0 && taskGpuAmountSetByUser > executorGpuAmount / executorCores) {
logWarning("The current setting of spark.task.resource.gpu.amount " +
s"($taskGpuAmountSetByUser) is not ideal to get the best performance from the " +
"RAPIDS Accelerator plugin. It's recommended to be 1/{executor core count} unless " +
logWarning("The current setting of spark.task.resource.gpu.amount " +
s"($taskGpuAmountSetByUser) is not ideal to get the best performance from the " +
"RAPIDS Accelerator plugin. It's recommended to be 1/{executor core count} unless " +
"you have a special use case.")
}
}
Expand Down Expand Up @@ -269,7 +276,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
override def init(
sc: SparkContext, pluginContext: PluginContext): java.util.Map[String, String] = {
val sparkConf = pluginContext.conf
RapidsPluginUtils.fixupConfigs(sparkConf)
RapidsPluginUtils.fixupConfigsOnDriver(sparkConf)
val conf = new RapidsConf(sparkConf)
RapidsPluginUtils.logPluginMode(conf)

Expand Down Expand Up @@ -314,12 +321,14 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
extraConf: java.util.Map[String, String]): Unit = {
try {
if (Cuda.getComputeCapabilityMajor < 6) {
throw new RuntimeException(s"GPU compute capability ${Cuda.getComputeCapabilityMajor}" +
throw new RuntimeException(s"GPU compute capability ${Cuda.getComputeCapabilityMajor}" +
" is unsupported, requires 6.0+")
}
// if configured, re-register checking leaks hook.
reRegisterCheckLeakHook()

val sparkConf = pluginContext.conf()
val numCores = RapidsPluginUtils.estimateCoresOnExec(sparkConf)
val conf = new RapidsConf(extraConf.asScala.toMap)

// Compare if the cudf version mentioned in the classpath is equal to the version which
Expand All @@ -346,7 +355,8 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
// on executor startup.
if (!GpuDeviceManager.rmmTaskInitEnabled) {
logInfo("Initializing memory from Executor Plugin")
GpuDeviceManager.initializeGpuAndMemory(pluginContext.resources().asScala.toMap, conf)
GpuDeviceManager.initializeGpuAndMemory(pluginContext.resources().asScala.toMap, conf,
numCores)
if (GpuShuffleEnv.isRapidsShuffleAvailable(conf)) {
GpuShuffleEnv.initShuffleManager()
if (GpuShuffleEnv.isUCXShuffleAndEarlyStart(conf)) {
Expand Down