From 9b345abe213bb0e43cdd3891d5d7cd3ff836de19 Mon Sep 17 00:00:00 2001 From: sinkinben Date: Wed, 27 Jul 2022 11:19:48 +0800 Subject: [PATCH] Automatically adjust `spark.rapids.sql.multiThreadedRead.numThreads` to the same as `spark.executor.cores` (#6016) If `spark.rapids.sql.multiThreadedRead.numThreads` is not set explicitly by users, then we try to derive it from `spark.executor.cores` (it it's set by users). Otherwise, we keep the users' settings. Signed-off-by: sinkinben --- docs/configs.md | 2 +- .../scala/com/nvidia/spark/rapids/Plugin.scala | 15 +++++++++++++++ .../com/nvidia/spark/rapids/RapidsConf.scala | 9 +++++++-- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 80ecb607409..6e20e2bb73b 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -123,7 +123,7 @@ Name | Description | Default Value spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu -spark.rapids.sql.multiThreadedRead.numThreads|The maximum number of threads on each executor to use for reading small files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED readers, see spark.rapids.sql.format.parquet.reader.type, spark.rapids.sql.format.orc.reader.type, or spark.rapids.sql.format.avro.reader.type for a discussion of reader types.|20 +spark.rapids.sql.multiThreadedRead.numThreads|The maximum number of threads on each executor to use for reading small files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED readers, see spark.rapids.sql.format.parquet.reader.type, spark.rapids.sql.format.orc.reader.type, or spark.rapids.sql.format.avro.reader.type for a discussion of reader types. If it is not set explicitly and spark.executor.cores is set, it will be tried to assign value of `max(MULTITHREAD_READ_NUM_THREADS_DEFAULT, spark.executor.cores)`, where MULTITHREAD_READ_NUM_THREADS_DEFAULT = 20.|20 spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647 spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647 diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 8049b48b901..b238caa8e9c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -69,6 +69,7 @@ object RapidsPluginUtils extends Logging { private val KRYO_SERIALIZER_NAME = classOf[KryoSerializer].getName private val KRYO_REGISTRATOR_KEY = "spark.kryo.registrator" private val KRYO_REGISTRATOR_NAME = classOf[GpuKryoRegistrator].getName + private val EXECUTOR_CORES_KEY = "spark.executor.cores" { val pluginProps = loadProps(RapidsPluginUtils.PLUGIN_PROPS_FILENAME) @@ -146,6 +147,20 @@ object RapidsPluginUtils extends Logging { } // set driver timezone conf.set(RapidsConf.DRIVER_TIMEZONE.key, ZoneId.systemDefault().normalized().toString) + + // If spark.rapids.sql.multiThreadedRead.numThreads is not set explicitly, then we derive it + // from other settings. Otherwise, we keep the users' setting. + val numThreadsKey = RapidsConf.MULTITHREAD_READ_NUM_THREADS.key + if (!conf.contains(numThreadsKey)) { + // Derive it from spark.executor.cores, since spark.executor.cores is not set on all cluster + // managers by default, we should judge whether if it's set explicitly. + if (conf.contains(EXECUTOR_CORES_KEY)) { + val numThreads = Math.max(RapidsConf.MULTITHREAD_READ_NUM_THREADS_DEFAULT, + conf.get(EXECUTOR_CORES_KEY).toInt).toString + conf.set(numThreadsKey, numThreads) + logWarning(s"$numThreadsKey is set to $numThreads.") + } + } } def loadProps(resourceName: String): Properties = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index dc81ddc4056..e6ed4a5ba91 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -293,6 +293,7 @@ object RapidsReaderType extends Enumeration { } object RapidsConf { + val MULTITHREAD_READ_NUM_THREADS_DEFAULT = 20 private val registeredConfs = new ListBuffer[ConfEntry[_]]() private def register(entry: ConfEntry[_]): Unit = { @@ -726,10 +727,14 @@ object RapidsConf { "started. Used with COALESCING and MULTITHREADED readers, see " + "spark.rapids.sql.format.parquet.reader.type, " + "spark.rapids.sql.format.orc.reader.type, or " + - "spark.rapids.sql.format.avro.reader.type for a discussion of reader types.") + "spark.rapids.sql.format.avro.reader.type for a discussion of reader types. " + + "If it is not set explicitly and spark.executor.cores is set, it will be tried to " + + "assign value of `max(MULTITHREAD_READ_NUM_THREADS_DEFAULT, spark.executor.cores)`, " + + s"where MULTITHREAD_READ_NUM_THREADS_DEFAULT = $MULTITHREAD_READ_NUM_THREADS_DEFAULT" + + ".") .integerConf .checkValue(v => v > 0, "The thread count must be greater than zero.") - .createWithDefault(20) + .createWithDefault(MULTITHREAD_READ_NUM_THREADS_DEFAULT) val ENABLE_PARQUET = conf("spark.rapids.sql.format.parquet.enabled") .doc("When set to false disables all parquet input and output acceleration")