Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use number of threads on executor instead of driver to set core count #9051

Merged
merged 7 commits into from
Aug 16, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ object GpuDeviceManager extends Logging {
java.lang.Boolean.getBoolean("com.nvidia.spark.rapids.memory.gpu.rmm.init.task")
}

private var numCores = 0

/**
* Get an approximate count on the number of cores this executor will use.
*/
def getNumCores: Int = numCores

// Memory resource used only for cudf::chunked_pack to allocate scratch space
// during spill to host. This is done to set aside some memory for this operation
// from the beginning of the job.
Expand Down Expand Up @@ -141,7 +148,8 @@ object GpuDeviceManager extends Logging {
}

def initializeGpuAndMemory(resources: Map[String, ResourceInformation],
conf: RapidsConf): Unit = {
conf: RapidsConf, numCores: Int): Unit = {
this.numCores = numCores
// as long in execute mode initialize everything because we could enable it after startup
if (conf.isSqlExecuteOnGPU) {
// Set the GPU before RMM is initialized if spark provided the GPU address so that RMM
Expand Down Expand Up @@ -191,14 +199,15 @@ object GpuDeviceManager extends Logging {
/**
* Always set the GPU if it was assigned by Spark and initialize the RMM if its configured
* to do so in the task.
* We expect the plugin to be run with 1 task and 1 GPU per executor.
* We expect the plugin to be run with 1 GPU per executor.
*/
def initializeFromTask(): Unit = {
if (threadGpuInitialized.get() == false) {
val resources = getResourcesFromTaskContext
val conf = new RapidsConf(SparkEnv.get.conf)
if (rmmTaskInitEnabled) {
initializeGpuAndMemory(resources, conf)
val numCores = RapidsPluginUtils.estimateCoresOnExec(SparkEnv.get.conf)
initializeGpuAndMemory(resources, conf, numCores)
} else {
// just set the device if provided so task thread uses right GPU
initializeGpu(resources, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,18 @@ object MultiFileReaderThreadPool extends Logging {

/**
* Get the existing thread pool or create one with the given thread count if it does not exist.
* @note The thread number will be ignored if the thread pool is already created.
* @note The thread number will be ignored if the thread pool is already created, or modified
* if it is not the right size compared to the number of cores available.
*/
def getOrCreateThreadPool(numThreads: Int): ThreadPoolExecutor = {
threadPool.getOrElse(initThreadPool(numThreads))
def getOrCreateThreadPool(numThreadsFromConf: Int): ThreadPoolExecutor = {
threadPool.getOrElse {
val numThreads = Math.max(numThreadsFromConf, GpuDeviceManager.getNumCores)

if (numThreadsFromConf != numThreads) {
logWarning(s"Using $numThreads as the number of threads for the thread pool.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting is orthogonal to cores, IMO. But either way: what do we expect the users to do upon seeing this message in the log?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gerashegalov

This setting is orthogonal to cores

The reason for this originally was to avoid bad performance if a user set the thread pool size too small. The idea was that we should not have a thread pool that is smaller than the number of tasks, or else we would lose parallelism by enabling the multi-threaded reader. In this case the number of cores is being used as a stand-in for tasks, even though it is not necessarily a one to one mapping.

what do we expect the users to do upon seeing this message in the log?

I don't expect a user to do much of anything with this message right now. It is in the executor log and it is unlikely that a user will ever see it. It is more here so we have something that is equivalent to the log message on the driver, which is the one most likely to catch this. I also thought that it would be nice to know that this is happening, but you are right it is unlikely to ever be acted on. If you want me to remove it I will.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for elaborating @revans2.

It is in the executor log and it is unlikely that a user will ever see it.

I would not count on it: Devops look at logs by querying log search engines using app id using the likes of Splunk, Sumo, and Kibana rather than raw individual logs. And they could track metrics for different log categories.

I also thought that it would be nice to know that this is happening, but you are right it is unlikely to ever be acted on. If you want me to remove it I will.

I think this message can be more useful if you add details to make it more self-contained.

"Configuring the file reader thread pool size with the max of ${CONF1}=$VAL1 and ${CONF2}=${VAL2}"

}
initThreadPool(numThreads)
}
}
}

Expand Down
38 changes: 24 additions & 14 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ object RapidsPluginUtils extends Logging {
}
}

def fixupConfigs(conf: SparkConf): Unit = {
def estimateCoresOnExec(conf: SparkConf): Int = {
conf.getOption(RapidsPluginUtils.EXECUTOR_CORES_KEY)
.map(_.toInt)
.getOrElse(Runtime.getRuntime.availableProcessors)
}

def fixupConfigsOnDriver(conf: SparkConf): Unit = {
// First add in the SQL executor plugin because that is what we need at a minimum
if (conf.contains(SQL_PLUGIN_CONF_KEY)) {
for (pluginName <- Array(SQL_PLUGIN_NAME, UDF_PLUGIN_NAME)){
Expand Down Expand Up @@ -162,20 +168,21 @@ object RapidsPluginUtils extends Logging {
}
}

// If spark.task.resource.gpu.amount is larger than
// If spark.task.resource.gpu.amount is larger than
// (spark.executor.resource.gpu.amount / spark.executor.cores) then GPUs will be the limiting
// resource for task scheduling.
if (conf.contains(TASK_GPU_AMOUNT_KEY) && conf.contains(EXECUTOR_GPU_AMOUNT_KEY)) {
// resource for task scheduling, but we can only output the warning if executor cores is set
// because this is happening on the driver so the number of cores in the runtime is not
// relevant
if (conf.contains(TASK_GPU_AMOUNT_KEY) &&
conf.contains(EXECUTOR_GPU_AMOUNT_KEY) &&
conf.contains(EXECUTOR_CORES_KEY)) {
val taskGpuAmountSetByUser = conf.get(TASK_GPU_AMOUNT_KEY).toDouble
// get worker's all cores num if spark.executor.cores is not set explicitly
val executorCores = conf.getOption(EXECUTOR_CORES_KEY)
.map(_.toDouble)
.getOrElse(Runtime.getRuntime.availableProcessors.toDouble)
val executorCores = conf.get(EXECUTOR_CORES_KEY).toDouble
val executorGpuAmount = conf.get(EXECUTOR_GPU_AMOUNT_KEY).toDouble
if (executorCores != 0 && taskGpuAmountSetByUser > executorGpuAmount / executorCores) {
logWarning("The current setting of spark.task.resource.gpu.amount " +
s"($taskGpuAmountSetByUser) is not ideal to get the best performance from the " +
"RAPIDS Accelerator plugin. It's recommended to be 1/{executor core count} unless " +
logWarning("The current setting of spark.task.resource.gpu.amount " +
s"($taskGpuAmountSetByUser) is not ideal to get the best performance from the " +
"RAPIDS Accelerator plugin. It's recommended to be 1/{executor core count} unless " +
"you have a special use case.")
}
}
Expand Down Expand Up @@ -269,7 +276,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
override def init(
sc: SparkContext, pluginContext: PluginContext): java.util.Map[String, String] = {
val sparkConf = pluginContext.conf
RapidsPluginUtils.fixupConfigs(sparkConf)
RapidsPluginUtils.fixupConfigsOnDriver(sparkConf)
val conf = new RapidsConf(sparkConf)
RapidsPluginUtils.logPluginMode(conf)

Expand Down Expand Up @@ -314,12 +321,14 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
extraConf: java.util.Map[String, String]): Unit = {
try {
if (Cuda.getComputeCapabilityMajor < 6) {
throw new RuntimeException(s"GPU compute capability ${Cuda.getComputeCapabilityMajor}" +
throw new RuntimeException(s"GPU compute capability ${Cuda.getComputeCapabilityMajor}" +
" is unsupported, requires 6.0+")
}
// if configured, re-register checking leaks hook.
reRegisterCheckLeakHook()

val sparkConf = pluginContext.conf()
val numCores = RapidsPluginUtils.estimateCoresOnExec(sparkConf)
val conf = new RapidsConf(extraConf.asScala.toMap)

// Compare if the cudf version mentioned in the classpath is equal to the version which
Expand All @@ -346,7 +355,8 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
// on executor startup.
if (!GpuDeviceManager.rmmTaskInitEnabled) {
logInfo("Initializing memory from Executor Plugin")
GpuDeviceManager.initializeGpuAndMemory(pluginContext.resources().asScala.toMap, conf)
GpuDeviceManager.initializeGpuAndMemory(pluginContext.resources().asScala.toMap, conf,
numCores)
if (GpuShuffleEnv.isRapidsShuffleAvailable(conf)) {
GpuShuffleEnv.initShuffleManager()
if (GpuShuffleEnv.isUCXShuffleAndEarlyStart(conf)) {
Expand Down