From 654b7ad52887ef2c76942f0f271e63aef923719c Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Tue, 25 Jun 2024 13:18:40 +0800 Subject: [PATCH] add hdfs support Signed-off-by: Hongbin Ma (Mahone) --- .../com/nvidia/spark/rapids/DumpUtils.scala | 8 ++--- .../com/nvidia/spark/rapids/GpuExec.scala | 34 +++++++++++-------- .../com/nvidia/spark/rapids/RapidsConf.scala | 9 +++++ 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index ba19619633b..cb9bafe6ba7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -15,7 +15,7 @@ */ package com.nvidia.spark.rapids -import java.io.{ByteArrayOutputStream, File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} +import java.io.{ByteArrayOutputStream, File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream} import java.util.Random import scala.collection.mutable @@ -118,7 +118,7 @@ object DumpUtils extends Logging { private def dumpToParquetFileImp(table: Table, filePrefix: String): String = { val path = genPath(filePrefix) - withResource(new ParquetDumper(path, table)) { dumper => + withResource(new ParquetDumper(new FileOutputStream(path), table)) { dumper => dumper.writeTable(table) path } @@ -146,9 +146,9 @@ object DumpUtils extends Logging { } // parquet dumper -class ParquetDumper(path: String, table: Table) extends HostBufferConsumer +class ParquetDumper(private[this] val outputStream: OutputStream, table: Table) + extends HostBufferConsumer with AutoCloseable { - private[this] val outputStream = new FileOutputStream(path) private[this] val tempBuffer = new Array[Byte](128 * 1024) private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index a3fc1bff0c9..e3489904466 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -122,7 +122,7 @@ object GpuMetric extends Logging { val DESCRIPTION_FILECACHE_DATA_RANGE_READ_TIME = "cached data read time" def unwrap(input: GpuMetric): SQLMetric = input match { - case w :WrappedGpuMetric => w.sqlMetric + case w: WrappedGpuMetric => w.sqlMetric case i => throw new IllegalArgumentException(s"found unsupported GpuMetric ${i.getClass}") } @@ -228,6 +228,7 @@ trait GpuExec extends SparkPlan { @transient lazy val loreDumpOperator: Option[String] = RapidsConf.LORE_DUMP_OPERATOR.get(conf) @transient lazy val loreDumpLOREIds: String = RapidsConf.LORE_DUMP_LORE_IDS.get(conf) @transient lazy val loreDumpPartitions: String = RapidsConf.LORE_DUMP_PARTITIONS.get(conf) + @transient lazy val loreDumpPath: String = RapidsConf.LORE_DUMP_PATH.get(conf) // For LORE DumpedExecReplayer, the spark plan is deserialized from the plan.meta file, so // some of the transient fields will be null, and we need to workaround this @@ -273,9 +274,9 @@ trait GpuExec extends SparkPlan { */ def outputBatching: CoalesceGoal = null - private [this] lazy val metricsConf = MetricsLevel(RapidsConf.METRICS_LEVEL.get(conf)) + private[this] lazy val metricsConf = MetricsLevel(RapidsConf.METRICS_LEVEL.get(conf)) - private [this] def createMetricInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = { + private[this] def createMetricInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = { if (level >= metricsConf) { WrappedGpuMetric(f) } else { @@ -333,7 +334,7 @@ trait GpuExec extends SparkPlan { lazy val allMetrics: Map[String, GpuMetric] = Map( NUM_OUTPUT_ROWS -> createMetric(outputRowsLevel, DESCRIPTION_NUM_OUTPUT_ROWS), NUM_OUTPUT_BATCHES -> createMetric(outputBatchesLevel, DESCRIPTION_NUM_OUTPUT_BATCHES)) ++ - additionalMetrics + additionalMetrics def gpuLongMetric(name: String): GpuMetric = allMetrics(name) @@ -391,8 +392,8 @@ trait GpuExec extends SparkPlan { final override def doExecuteColumnar(): RDD[ColumnarBatch] = { val hadoopConf = new SerializableConfiguration(sparkSession.sparkContext.hadoopConfiguration) - def getOutputStream(filePath: String): FSDataOutputStream = { - val hadoopPath = new Path(filePath) + + def getOutputStream(hadoopPath: Path): FSDataOutputStream = { val fs = hadoopPath.getFileSystem(hadoopConf.value) fs.create(hadoopPath, true) } @@ -412,7 +413,7 @@ trait GpuExec extends SparkPlan { // dump plan node val planBytes = serializeObject(this) val fos = getOutputStream( - s"file:/tmp/lore/lore_id=${myLoreId}_plan_id=${childPlanId}/plan.meta") + new Path(new Path(loreDumpPath), s"lore_id=${myLoreId}_plan_id=${childPlanId}/plan.meta")) fos.write(planBytes) fos.close() } @@ -454,21 +455,24 @@ trait GpuExec extends SparkPlan { val cbTypes = GpuColumnVector.extractTypes(cb) val bytes = serializeObject(cbTypes) val fos = getOutputStream( - s"file:/tmp/lore/lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" + - s"partition_id=${partitionId}/" + - s"batch_id=${batchId}/col_types.meta") + new Path(new Path(loreDumpPath), + s"lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" + + s"partition_id=${partitionId}/" + + s"batch_id=${batchId}/col_types.meta")) fos.write(bytes) fos.close() } // dump data for column batch to /tmp dir withResource(GpuColumnVector.from(cb)) { table => - val path = s"/tmp/lore/lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" + - s"partition_id=${partitionId}/" + - s"batch_id=${batchId}/cb_data.parquet" - withResource(new ParquetDumper(path, table)) { dumper => + val fos = getOutputStream( + new Path(new Path(loreDumpPath), + s"lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" + + s"partition_id=${partitionId}/" + + s"batch_id=${batchId}/cb_data.parquet")) + // ParquetDumper will close the output stream + withResource(new ParquetDumper(fos, table)) { dumper => dumper.writeTable(table) - path } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index d4c7b49e6f8..41a77ad4fbc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -735,6 +735,13 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .stringConf .createWithDefault("0") + val LORE_DUMP_PATH = conf("spark.rapids.LORE.pathPrefix") + .doc("Specifies a URI path to use when dumping with LORE, the default path is: " + + "file:/tmp/lore/") + .internal() + .stringConf + .createWithDefault("file:/tmp/lore/") + val PROFILE_PATH = conf("spark.rapids.profile.pathPrefix") .doc("Enables profiling and specifies a URI path to use when writing profile data") .internal() @@ -2519,6 +2526,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val loreDumpPartitions: String = get(LORE_DUMP_PARTITIONS) + lazy val loreDumpPath: String = get(LORE_DUMP_PATH) + lazy val profilePath: Option[String] = get(PROFILE_PATH) lazy val profileExecutors: String = get(PROFILE_EXECUTORS)