From 4b449034f2a0105c687646176590b349f9901ea7 Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Mon, 24 Jun 2024 09:32:03 +0800 Subject: [PATCH] Support bucketing write for GPU (#10957) This PR adds the GPU support for the bucketing write. - React the code of the dynamic partition single writer and concurrent writer to try to reuse the code as much as possible, and then add in the bucketing write logic for both of them. - Update the bucket check during the plan overriding for the write commands, including InsertIntoHadoopFsRelationCommand, CreateDataSourceTableAsSelectCommand, InsertIntoHiveTable, CreateHiveTableAsSelectCommand. - From 330, Spark also supports HiveHash to generate the bucket IDs, in addition to Murmur3Hash. So the shim object GpuBucketingUtils is introduced to handle the shim things. - This change also adds two functions (tagForHiveBucketingWrite and tagForBucketing) to do the overriding check for the two hashing functions separately. And the Hive write nodes will fall back to CPU when HiveHash is chosen, because HiveHash is not supported on GPU. --------- Signed-off-by: Firestarman --- integration_tests/src/main/python/asserts.py | 6 +- .../src/main/python/orc_write_test.py | 48 +- .../src/main/python/parquet_write_test.py | 79 +- .../rapids/GpuHashPartitioningBase.scala | 8 +- .../nvidia/spark/rapids/GpuOverrides.scala | 7 +- .../sql/hive/rapids/GpuHiveFileFormat.scala | 6 +- .../sql/rapids/GpuFileFormatDataWriter.scala | 1112 +++++++---------- ...aSourceTableAsSelectCommandMetaShims.scala | 8 +- ...dCreateHiveTableAsSelectCommandShims.scala | 5 +- .../shims/spark311/GpuBucketingUtils.scala | 77 ++ .../GpuCreateHiveTableAsSelectCommand.scala | 9 +- .../rapids/shims/GpuInsertIntoHiveTable.scala | 5 +- .../sql/rapids/GpuFileFormatWriter.scala | 15 +- .../shims/spark330/GpuBucketingUtils.scala | 88 ++ ...aSourceTableAsSelectCommandMetaShims.scala | 12 +- .../rapids/shims/GpuInsertIntoHiveTable.scala | 7 +- ...dCreateHiveTableAsSelectCommandShims.scala | 6 +- .../sql/rapids/GpuFileFormatWriter.scala | 15 +- .../rapids/GpuFileFormatDataWriterSuite.scala | 132 +- 19 files changed, 896 insertions(+), 749 deletions(-) create mode 100644 sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/spark311/GpuBucketingUtils.scala create mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/spark330/GpuBucketingUtils.scala diff --git a/integration_tests/src/main/python/asserts.py b/integration_tests/src/main/python/asserts.py index 32416612d26..b861e89b726 100644 --- a/integration_tests/src/main/python/asserts.py +++ b/integration_tests/src/main/python/asserts.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -101,6 +101,10 @@ def _assert_equal(cpu, gpu, float_check, path): else: assert False, "Found unexpected type {} at {}".format(t, path) +def assert_equal_with_local_sort(cpu, gpu): + _sort_locally(cpu, gpu) + assert_equal(cpu, gpu) + def assert_equal(cpu, gpu): """Verify that the result from the CPU and the GPU are equal""" try: diff --git a/integration_tests/src/main/python/orc_write_test.py b/integration_tests/src/main/python/orc_write_test.py index 8d3013cbe8b..5b5c7b786b6 100644 --- a/integration_tests/src/main/python/orc_write_test.py +++ b/integration_tests/src/main/python/orc_write_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -209,7 +209,7 @@ def test_write_sql_save_table(spark_tmp_path, orc_gens, ts_type, orc_impl, spark @pytest.mark.parametrize('codec', ['zlib', 'lzo']) def test_orc_write_compression_fallback(spark_tmp_path, codec, spark_tmp_table_factory): gen = TimestampGen() - data_path = spark_tmp_path + '/PARQUET_DATA' + data_path = spark_tmp_path + '/ORC_DATA' all_confs={'spark.sql.orc.compression.codec': codec, 'spark.rapids.sql.format.orc.write.enabled': True} assert_gpu_fallback_write( lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("orc").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()), @@ -218,17 +218,45 @@ def test_orc_write_compression_fallback(spark_tmp_path, codec, spark_tmp_table_f 'DataWritingCommandExec', conf=all_confs) -@ignore_order -@allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec') -def test_buckets_write_fallback(spark_tmp_path, spark_tmp_table_factory): +@ignore_order(local=True) +def test_buckets_write_round_trip(spark_tmp_path, spark_tmp_table_factory): data_path = spark_tmp_path + '/ORC_DATA' + gen_list = [["id", int_gen], ["data", long_gen]] + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: gen_df(spark, gen_list).selectExpr("id % 100 as b_id", "data").write + .bucketBy(4, "b_id").format('orc').mode('overwrite').option("path", path) + .saveAsTable(spark_tmp_table_factory.get()), + lambda spark, path: spark.read.orc(path), + data_path, + conf={'spark.rapids.sql.format.orc.write.enabled': True}) + +@ignore_order(local=True) +@allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec, SortExec') +def test_buckets_write_fallback_unsupported_types(spark_tmp_path, spark_tmp_table_factory): + data_path = spark_tmp_path + '/ORC_DATA' + gen_list = [["id", binary_gen], ["data", long_gen]] assert_gpu_fallback_write( - lambda spark, path: spark.range(10e4).write.bucketBy(4, "id").sortBy("id").format('orc').mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()), - lambda spark, path: spark.read.orc(path), - data_path, - 'DataWritingCommandExec', - conf = {'spark.rapids.sql.format.orc.write.enabled': True}) + lambda spark, path: gen_df(spark, gen_list).selectExpr("id as b_id", "data").write + .bucketBy(4, "b_id").format('orc').mode('overwrite').option("path", path) + .saveAsTable(spark_tmp_table_factory.get()), + lambda spark, path: spark.read.orc(path), + data_path, + 'DataWritingCommandExec', + conf={'spark.rapids.sql.format.orc.write.enabled': True}) +@ignore_order(local=True) +def test_partitions_and_buckets_write_round_trip(spark_tmp_path, spark_tmp_table_factory): + data_path = spark_tmp_path + '/ORC_DATA' + gen_list = [["id", int_gen], ["data", long_gen]] + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: gen_df(spark, gen_list) + .selectExpr("id % 5 as b_id", "id % 10 as p_id", "data").write + .partitionBy("p_id") + .bucketBy(4, "b_id").format('orc').mode('overwrite').option("path", path) + .saveAsTable(spark_tmp_table_factory.get()), + lambda spark, path: spark.read.orc(path), + data_path, + conf={'spark.rapids.sql.format.orc.write.enabled': True}) @ignore_order @allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec') diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 38dab9e84a4..805a0b8137c 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -409,16 +409,81 @@ def test_parquet_writeLegacyFormat_fallback(spark_tmp_path, spark_tmp_table_fact 'DataWritingCommandExec', conf=all_confs) -@ignore_order -@allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec') -def test_buckets_write_fallback(spark_tmp_path, spark_tmp_table_factory): +@ignore_order(local=True) +def test_buckets_write_round_trip(spark_tmp_path, spark_tmp_table_factory): + data_path = spark_tmp_path + '/PARQUET_DATA' + gen_list = [["id", int_gen], ["data", long_gen]] + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: gen_df(spark, gen_list).selectExpr("id % 100 as b_id", "data").write + .bucketBy(4, "b_id").format('parquet').mode('overwrite').option("path", path) + .saveAsTable(spark_tmp_table_factory.get()), + lambda spark, path: spark.read.parquet(path), + data_path, + conf=writer_confs) + + +def test_buckets_write_correctness(spark_tmp_path, spark_tmp_table_factory): + cpu_path = spark_tmp_path + '/PARQUET_DATA/CPU' + gpu_path = spark_tmp_path + '/PARQUET_DATA/GPU' + gen_list = [["id", int_gen], ["data", long_gen]] + num_buckets = 4 + + def do_bucketing_write(spark, path): + df = gen_df(spark, gen_list).selectExpr("id % 100 as b_id", "data") + df.write.bucketBy(num_buckets, "b_id").format('parquet').mode('overwrite') \ + .option("path", path).saveAsTable(spark_tmp_table_factory.get()) + + def read_single_bucket(path, bucket_id): + # Bucket Id string format: f"_$id%05d" + ".c$fileCounter%03d". + # fileCounter is always 0 in this test. For example '_00002.c000' is for + # bucket id being 2. + # We leverage this bucket segment in the file path to filter rows belong + # to a bucket. + bucket_segment = '_' + "{}".format(bucket_id).rjust(5, '0') + '.c000' + return with_cpu_session( + lambda spark: spark.read.parquet(path) + .withColumn('file_name', f.input_file_name()) + .filter(f.col('file_name').contains(bucket_segment)) + .selectExpr('b_id', 'data') # need to drop the file_name column for comparison. + .collect()) + + with_cpu_session(lambda spark: do_bucketing_write(spark, cpu_path), writer_confs) + with_gpu_session(lambda spark: do_bucketing_write(spark, gpu_path), writer_confs) + cur_bucket_id = 0 + while cur_bucket_id < num_buckets: + # Verify the result bucket by bucket + ret_cpu = read_single_bucket(cpu_path, cur_bucket_id) + ret_gpu = read_single_bucket(gpu_path, cur_bucket_id) + assert_equal_with_local_sort(ret_cpu, ret_gpu) + cur_bucket_id += 1 + +@ignore_order(local=True) +@allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec, SortExec') +def test_buckets_write_fallback_unsupported_types(spark_tmp_path, spark_tmp_table_factory): data_path = spark_tmp_path + '/PARQUET_DATA' + gen_list = [["id", binary_gen], ["data", long_gen]] assert_gpu_fallback_write( - lambda spark, path: spark.range(10e4).write.bucketBy(4, "id").sortBy("id").format('parquet').mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()), - lambda spark, path: spark.read.parquet(path), - data_path, - 'DataWritingCommandExec') + lambda spark, path: gen_df(spark, gen_list).selectExpr("id as b_id", "data").write + .bucketBy(4, "b_id").format('parquet').mode('overwrite').option("path", path) + .saveAsTable(spark_tmp_table_factory.get()), + lambda spark, path: spark.read.parquet(path), + data_path, + 'DataWritingCommandExec', + conf=writer_confs) +@ignore_order(local=True) +def test_partitions_and_buckets_write_round_trip(spark_tmp_path, spark_tmp_table_factory): + data_path = spark_tmp_path + '/PARQUET_DATA' + gen_list = [["id", int_gen], ["data", long_gen]] + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: gen_df(spark, gen_list) + .selectExpr("id % 5 as b_id", "id % 10 as p_id", "data").write + .partitionBy("p_id") + .bucketBy(4, "b_id").format('parquet').mode('overwrite').option("path", path) + .saveAsTable(spark_tmp_table_factory.get()), + lambda spark, path: spark.read.parquet(path), + data_path, + conf=writer_confs) @ignore_order @allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec') diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala index b17b2782e90..baa009d0669 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.rapids.GpuMurmur3Hash +import org.apache.spark.sql.rapids.{GpuMurmur3Hash, GpuPmod} import org.apache.spark.sql.types.{DataType, IntegerType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -59,6 +59,10 @@ abstract class GpuHashPartitioningBase(expressions: Seq[Expression], numPartitio sliceInternalGpuOrCpuAndClose(numRows, partitionIndexes, partitionColumns) } } + + def partitionIdExpression: GpuExpression = GpuPmod( + GpuMurmur3Hash(expressions, GpuHashPartitioningBase.DEFAULT_HASH_SEED), + GpuLiteral(numPartitions)) } object GpuHashPartitioningBase { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 295480d24cc..9e26cf751f4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -322,10 +322,11 @@ final class InsertIntoHadoopFsRelationCommandMeta( private var fileFormat: Option[ColumnarFileFormat] = None override def tagSelfForGpuInternal(): Unit = { - if (cmd.bucketSpec.isDefined) { - willNotWorkOnGpu("bucketing is not supported") + if (GpuBucketingUtils.isHiveHashBucketing(cmd.options)) { + GpuBucketingUtils.tagForHiveBucketingWrite(this, cmd.bucketSpec, cmd.outputColumns, false) + } else { + BucketIdMetaUtils.tagForBucketingWrite(this, cmd.bucketSpec, cmd.outputColumns) } - val spark = SparkSession.active val formatCls = cmd.fileFormat.getClass fileFormat = if (formatCls == classOf[CSVFileFormat]) { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index 21437a64481..69189b2600c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -24,6 +24,7 @@ import com.google.common.base.Charsets import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.jni.CastStrings +import com.nvidia.spark.rapids.shims.GpuBucketingUtils import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.internal.Logging @@ -43,9 +44,8 @@ object GpuHiveFileFormat extends Logging { def tagGpuSupport(meta: GpuInsertIntoHiveTableMeta): Option[ColumnarFileFormat] = { val insertCmd = meta.wrapped // Bucketing write - if (insertCmd.table.bucketSpec.isDefined) { - meta.willNotWorkOnGpu("bucketed tables are not supported yet") - } + GpuBucketingUtils.tagForHiveBucketingWrite(meta, insertCmd.table.bucketSpec, + insertCmd.outputColumns, false) // Infer the file format from the serde string, similar as what Spark does in // RelationConversions for Hive. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 4ceac365314..939a421e0b9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,30 +17,30 @@ package org.apache.spark.sql.rapids import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.collection.mutable.ListBuffer +import scala.util.hashing.{MurmurHash3 => ScalaMurmur3Hash} -import ai.rapids.cudf.{ColumnVector, OrderByArg, Table} +import ai.rapids.cudf.{OrderByArg, Table} import com.nvidia.spark.TimingUtils import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit -import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.GpuFileFormatDataWriterShim import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Cast, Concat, Expression, Literal, NullsFirst, ScalaUDF, SortOrder, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Cast, Concat, Expression, Literal, Murmur3Hash, NullsFirst, ScalaUDF, UnsafeProjection} import org.apache.spark.sql.connector.write.DataWriter import org.apache.spark.sql.execution.datasources.{BucketingUtils, PartitioningUtils, WriteTaskResult} -import org.apache.spark.sql.rapids.GpuFileFormatDataWriter.{shouldSplitToFitMaxRecordsPerFile, splitToFitMaxRecordsAndClose} +import org.apache.spark.sql.rapids.GpuFileFormatDataWriter._ import org.apache.spark.sql.rapids.GpuFileFormatWriter.GpuConcurrentOutputWriterSpec -import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -50,7 +50,7 @@ object GpuFileFormatDataWriter { } def shouldSplitToFitMaxRecordsPerFile( - maxRecordsPerFile: Long, recordsInFile: Long, numRowsInBatch: Long) = { + maxRecordsPerFile: Long, recordsInFile: Long, numRowsInBatch: Long): Boolean = { maxRecordsPerFile > 0 && (recordsInFile + numRowsInBatch) > maxRecordsPerFile } @@ -88,13 +88,8 @@ object GpuFileFormatDataWriter { maxRecordsPerFile: Long, recordsInFile: Long): Array[SpillableColumnarBatch] = { val (types, splitIndexes) = closeOnExcept(batch) { _ => - val types = GpuColumnVector.extractTypes(batch) - val splitIndexes = - getSplitIndexes( - maxRecordsPerFile, - recordsInFile, - batch.numRows()) - (types, splitIndexes) + val splitIndexes = getSplitIndexes(maxRecordsPerFile, recordsInFile, batch.numRows()) + (GpuColumnVector.extractTypes(batch), splitIndexes) } if (splitIndexes.isEmpty) { // this should never happen, as `splitToFitMaxRecordsAndClose` is called when @@ -124,6 +119,31 @@ abstract class GpuFileFormatDataWriter( description: GpuWriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol) extends DataWriter[ColumnarBatch] { + + protected class WriterAndStatus { + var writer: ColumnarOutputWriter = _ + + /** Number of records in current file. */ + var recordsInFile: Long = 0 + + /** + * File counter for writing current partition or bucket. For same partition or bucket, + * we may have more than one file, due to number of records limit per file. + */ + var fileCounter: Int = 0 + + final def release(): Unit = { + if (writer != null) { + try { + writer.close() + statsTrackers.foreach(_.closeFile(writer.path())) + } finally { + writer = null + } + } + } + } + /** * Max number of files a single task writes out due to file size. In most cases the number of * files written should be very small. This is just a safe guard to protect some really bad @@ -131,28 +151,26 @@ abstract class GpuFileFormatDataWriter( */ protected val MAX_FILE_COUNTER: Int = 1000 * 1000 protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]() - protected var currentWriter: ColumnarOutputWriter = _ + protected var currentWriterStatus: WriterAndStatus = new WriterAndStatus() /** Trackers for computing various statistics on the data as it's being written out. */ protected val statsTrackers: Seq[ColumnarWriteTaskStatsTracker] = description.statsTrackers.map(_.newTaskInstance()) - /** Release resources of `currentWriter`. */ - protected def releaseCurrentWriter(): Unit = { - if (currentWriter != null) { - try { - currentWriter.close() - statsTrackers.foreach(_.closeFile(currentWriter.path())) - } finally { - currentWriter = null - } - } + /** Release resources of a WriterStatus. */ + protected final def releaseOutWriter(status: WriterAndStatus): Unit = { + status.release() + } + + protected final def writeUpdateMetricsAndClose(scb: SpillableColumnarBatch, + writerStatus: WriterAndStatus): Unit = { + writerStatus.recordsInFile += writerStatus.writer.writeSpillableAndClose(scb, statsTrackers) } /** Release all resources. Public for testing */ def releaseResources(): Unit = { - // Call `releaseCurrentWriter()` by default, as this is the only resource to be released. - releaseCurrentWriter() + // Release current writer by default, as this is the only resource to be released. + releaseOutWriter(currentWriterStatus) } /** Write an iterator of column batch. */ @@ -211,8 +229,6 @@ class GpuSingleDirectoryDataWriter( taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol) extends GpuFileFormatDataWriter(description, taskAttemptContext, committer) { - private var fileCounter: Int = _ - private var recordsInFile: Long = _ // Initialize currentWriter and statsTrackers newOutputWriter() @@ -220,7 +236,8 @@ class GpuSingleDirectoryDataWriter( "msg=method newTaskTempFile in class FileCommitProtocol is deprecated" ) private def newOutputWriter(): Unit = { - recordsInFile = 0 + currentWriterStatus.recordsInFile = 0 + val fileCounter = currentWriterStatus.fileCounter releaseResources() val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) @@ -229,7 +246,7 @@ class GpuSingleDirectoryDataWriter( None, f"-c$fileCounter%03d" + ext) - currentWriter = description.outputWriterFactory.newInstance( + currentWriterStatus.writer = description.outputWriterFactory.newInstance( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) @@ -237,32 +254,30 @@ class GpuSingleDirectoryDataWriter( statsTrackers.foreach(_.newFile(currentPath)) } - private def writeUpdateMetricsAndClose(scb: SpillableColumnarBatch): Unit = { - recordsInFile += currentWriter.writeSpillableAndClose(scb, statsTrackers) - } - override def write(batch: ColumnarBatch): Unit = { val maxRecordsPerFile = description.maxRecordsPerFile + val recordsInFile = currentWriterStatus.recordsInFile if (!shouldSplitToFitMaxRecordsPerFile( maxRecordsPerFile, recordsInFile, batch.numRows())) { writeUpdateMetricsAndClose( - SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY), + currentWriterStatus) } else { val partBatches = splitToFitMaxRecordsAndClose( batch, maxRecordsPerFile, recordsInFile) - var needNewWriter = recordsInFile >= maxRecordsPerFile + val needNewWriterForFirstPart = recordsInFile >= maxRecordsPerFile closeOnExcept(partBatches) { _ => partBatches.zipWithIndex.foreach { case (partBatch, partIx) => - if (needNewWriter) { - fileCounter += 1 + if (partIx > 0 || needNewWriterForFirstPart) { + currentWriterStatus.fileCounter += 1 + val fileCounter = currentWriterStatus.fileCounter assert(fileCounter <= MAX_FILE_COUNTER, s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") newOutputWriter() } // null out the entry so that we don't double close partBatches(partIx) = null - writeUpdateMetricsAndClose(partBatch) - needNewWriter = true + writeUpdateMetricsAndClose(partBatch, currentWriterStatus) } } } @@ -280,35 +295,44 @@ class GpuDynamicPartitionDataSingleWriter( taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol) extends GpuFileFormatDataWriter(description, taskAttemptContext, committer) { + /** Wrapper class to index a unique concurrent output writer. */ + protected class WriterIndex( + var partitionPath: Option[String], + var bucketId: Option[Int]) extends Product2[Option[String], Option[Int]] { - /** Wrapper class for status of a unique single output writer. */ - protected class WriterStatus( - // output writer - var outputWriter: ColumnarOutputWriter, + override def hashCode(): Int = ScalaMurmur3Hash.productHash(this) - /** Number of records in current file. */ - var recordsInFile: Long = 0, + override def equals(obj: Any): Boolean = { + if (obj.isInstanceOf[WriterIndex]) { + val otherWI = obj.asInstanceOf[WriterIndex] + partitionPath == otherWI.partitionPath && bucketId == otherWI.bucketId + } else { + false + } + } - /** - * File counter for writing current partition or bucket. For same partition or bucket, - * we may have more than one file, due to number of records limit per file. - */ - var fileCounter: Int = 0 - ) + override def _1: Option[String] = partitionPath + override def _2: Option[Int] = bucketId + override def canEqual(that: Any): Boolean = that.isInstanceOf[WriterIndex] + } - /** Wrapper class for status and caches of a unique concurrent output writer. - * Used by `GpuDynamicPartitionDataConcurrentWriter` + /** + * A case class to hold the batch, the optional partition path and the optional bucket + * ID for a split group. All the rows in the batch belong to the group defined by the + * partition path and the bucket ID. */ - class WriterStatusWithCaches( - // writer status - var writerStatus: WriterStatus, - - // caches for this partition or writer - val tableCaches: ListBuffer[SpillableColumnarBatch] = ListBuffer(), - - // current device bytes for the above caches - var deviceBytes: Long = 0 - ) + private case class SplitPack(split: SpillableColumnarBatch, path: Option[String], + bucketId: Option[Int]) extends AutoCloseable { + override def close(): Unit = { + split.safeClose() + } + } + /** + * The index for current writer. Intentionally make the index mutable and reusable. + * Avoid JVM GC issue when many short-living `WriterIndex` objects are created + * if switching between concurrent writers frequently. + */ + private val currentWriterId: WriterIndex = new WriterIndex(None, None) /** Flag saying whether or not the data to be written out is partitioned. */ protected val isPartitioned: Boolean = description.partitionColumns.nonEmpty @@ -316,25 +340,17 @@ class GpuDynamicPartitionDataSingleWriter( /** Flag saying whether or not the data to be written out is bucketed. */ protected val isBucketed: Boolean = description.bucketSpec.isDefined - private var currentPartPath: String = "" - - private var currentWriterStatus: WriterStatus = _ - - // All data is sorted ascending with default null ordering - private val nullsSmallest = Ascending.defaultNullOrdering == NullsFirst - - if (isBucketed) { - throw new UnsupportedOperationException("Bucketing is not supported on the GPU yet.") - } - assert(isPartitioned || isBucketed, s"""GpuDynamicPartitionWriteTask should be used for writing out data that's either |partitioned or bucketed. In this case neither is true. |GpuWriteJobDescription: $description """.stripMargin) + // All data is sorted ascending with default null ordering + private val nullsSmallest = Ascending.defaultNullOrdering == NullsFirst + /** Extracts the partition values out of an input batch. */ - protected lazy val getPartitionColumnsAsBatch: ColumnarBatch => ColumnarBatch = { + private lazy val getPartitionColumnsAsBatch: ColumnarBatch => ColumnarBatch = { val expressions = GpuBindReferences.bindGpuReferences( description.partitionColumns, description.allColumns) @@ -343,20 +359,9 @@ class GpuDynamicPartitionDataSingleWriter( } } - /** Extracts the output values of an input batch. */ - private lazy val getOutputColumnsAsBatch: ColumnarBatch => ColumnarBatch= { + private lazy val getBucketIdColumnAsBatch: ColumnarBatch => ColumnarBatch = { val expressions = GpuBindReferences.bindGpuReferences( - description.dataColumns, - description.allColumns) - cb => { - GpuProjectExec.project(cb, expressions) - } - } - - /** Extracts the output values of an input batch. */ - protected lazy val getOutputCb: ColumnarBatch => ColumnarBatch = { - val expressions = GpuBindReferences.bindGpuReferences( - description.dataColumns, + Seq(description.bucketSpec.get.bucketIdExpression), description.allColumns) cb => { GpuProjectExec.project(cb, expressions) @@ -379,62 +384,58 @@ class GpuDynamicPartitionDataSingleWriter( /** Evaluates the `partitionPathExpression` above on a row of `partitionValues` and returns * the partition string. */ - protected lazy val getPartitionPath: InternalRow => String = { + private lazy val getPartitionPath: InternalRow => String = { val proj = UnsafeProjection.create(Seq(partitionPathExpression), description.partitionColumns) row => proj(row).getString(0) } - /** Release resources of writer. */ - private def releaseWriter(writer: ColumnarOutputWriter): Unit = { - if (writer != null) { - val path = writer.path() - writer.close() - statsTrackers.foreach(_.closeFile(path)) + /** Extracts the output values of an input batch. */ + protected lazy val getDataColumnsAsBatch: ColumnarBatch => ColumnarBatch = { + val expressions = GpuBindReferences.bindGpuReferences( + description.dataColumns, + description.allColumns) + cb => { + GpuProjectExec.project(cb, expressions) } } - /** - * Opens a new OutputWriter given a partition key and/or a bucket id. - * If bucket id is specified, we will append it to the end of the file name, but before the - * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet - * - * @param partDir the partition directory - * @param bucketId the bucket which all tuples being written by this OutputWriter belong to, - * currently does not support `bucketId`, it's always None - * @param fileCounter integer indicating the number of files to be written to `partDir` - */ - @scala.annotation.nowarn( - "msg=method newTaskTempFile.* in class FileCommitProtocol is deprecated" - ) - def newWriter( - partDir: String, - bucketId: Option[Int], // Currently it's always None - fileCounter: Int - ): ColumnarOutputWriter = { - updatedPartitions.add(partDir) - // Currently will be empty - val bucketIdStr = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - - // This must be in a form that matches our bucketing format. See BucketingUtils. - val ext = f"$bucketIdStr.c$fileCounter%03d" + - description.outputWriterFactory.getFileExtension(taskAttemptContext) - - val customPath = description.customPartitionLocations - .get(PartitioningUtils.parsePathFragment(partDir)) + protected def getKeysBatch(cb: ColumnarBatch): ColumnarBatch = { + val keysBatch = withResource(getPartitionColumnsAsBatch(cb)) { partCb => + if (isBucketed) { + withResource(getBucketIdColumnAsBatch(cb)) { bucketIdCb => + GpuColumnVector.combineColumns(partCb, bucketIdCb) + } + } else { + GpuColumnVector.incRefCounts(partCb) + } + } + require(keysBatch.numCols() > 0, "No sort key is specified") + keysBatch + } - val currentPath = if (customPath.isDefined) { - committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + protected def genGetBucketIdFunc(keyHostCb: ColumnarBatch): Int => Option[Int] = { + if (isBucketed) { + // The last column is the bucket id column + val bucketIdCol = keyHostCb.column(keyHostCb.numCols() - 1) + i => Some(bucketIdCol.getInt(i)) } else { - committer.newTaskTempFile(taskAttemptContext, Option(partDir), ext) + _ => None } + } - val newWriter = description.outputWriterFactory.newInstance( - path = currentPath, - dataSchema = description.dataColumns.toStructType, - context = taskAttemptContext) - - statsTrackers.foreach(_.newFile(currentPath)) - newWriter + protected def genGetPartitionPathFunc(keyHostCb: ColumnarBatch): Int => Option[String] = { + if (isPartitioned) { + // Use the existing code to convert each row into a path. It would be nice to do this + // on the GPU, but the data should be small and there are things we cannot easily + // support on the GPU right now + import scala.collection.JavaConverters._ + val partCols = description.partitionColumns.indices.map(keyHostCb.column) + val iter = new ColumnarBatch(partCols.toArray, keyHostCb.numRows()).rowIterator() + .asScala.map(getPartitionPath) + _ => Some(iter.next) + } else { + _ => None + } } // distinct value sorted the same way the input data is sorted. @@ -461,282 +462,195 @@ class GpuDynamicPartitionDataSingleWriter( } } - override def write(batch: ColumnarBatch): Unit = { - // this single writer always passes `cachesMap` as None - write(batch, cachesMap = None) - } - - private case class SplitAndPath(var split: SpillableColumnarBatch, path: String) - extends AutoCloseable { - override def close(): Unit = { - split.safeClose() - split = null - } - } - /** - * Split a batch according to the sorted keys (partitions). Returns a tuple with an - * array of the splits as `ContiguousTable`'s, and an array of paths to use to - * write each partition. + * Split a batch according to the sorted keys (partitions + bucket ids). + * Returns a tuple with an array of the splits as `ContiguousTable`'s, an array of + * paths and bucket ids to use to write each partition and(or) bucket file. */ - private def splitBatchByKeyAndClose( - batch: ColumnarBatch, - partDataTypes: Array[DataType]): Array[SplitAndPath] = { - val (outputColumnsBatch, partitionColumnsBatch) = withResource(batch) { _ => - closeOnExcept(getOutputColumnsAsBatch(batch)) { outputColumnsBatch => - closeOnExcept(getPartitionColumnsAsBatch(batch)) { partitionColumnsBatch => - (outputColumnsBatch, partitionColumnsBatch) - } + private def splitBatchByKeyAndClose(batch: ColumnarBatch): Array[SplitPack] = { + val (keysCb, dataCb) = withResource(batch) { _ => + closeOnExcept(getDataColumnsAsBatch(batch)) { data => + (getKeysBatch(batch), data) } } - val (cbKeys, partitionIndexes) = closeOnExcept(outputColumnsBatch) { _ => - val partitionColumnsTbl = withResource(partitionColumnsBatch) { _ => - GpuColumnVector.from(partitionColumnsBatch) - } - withResource(partitionColumnsTbl) { _ => - withResource(distinctAndSort(partitionColumnsTbl)) { distinctKeysTbl => - val partitionIndexes = splitIndexes(partitionColumnsTbl, distinctKeysTbl) - val cbKeys = copyToHostAsBatch(distinctKeysTbl, partDataTypes) - (cbKeys, partitionIndexes) + val (keyHostCb, splitIds) = closeOnExcept(dataCb) { _ => + val (splitIds, distinctKeysTbl, keysCbTypes) = withResource(keysCb) { _ => + val keysCbTypes = GpuColumnVector.extractTypes(keysCb) + withResource(GpuColumnVector.from(keysCb)) { keysTable => + closeOnExcept(distinctAndSort(keysTable)) { distinctKeysTbl => + (splitIndexes(keysTable, distinctKeysTbl), distinctKeysTbl, keysCbTypes) + } } } + withResource(distinctKeysTbl) { _ => + (copyToHostAsBatch(distinctKeysTbl, keysCbTypes), splitIds) + } } - - val splits = closeOnExcept(cbKeys) { _ => - val spillableOutputColumnsBatch = - SpillableColumnarBatch(outputColumnsBatch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) - withRetryNoSplit(spillableOutputColumnsBatch) { spillable => - withResource(spillable.getColumnarBatch()) { outCb => + val splits = closeOnExcept(keyHostCb) { _ => + val scbOutput = closeOnExcept(dataCb)( _ => + SpillableColumnarBatch(dataCb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + withRetryNoSplit(scbOutput) { scb => + withResource(scb.getColumnarBatch()) { outCb => withResource(GpuColumnVector.from(outCb)) { outputColumnsTbl => withResource(outputColumnsTbl) { _ => - outputColumnsTbl.contiguousSplit(partitionIndexes: _*) + outputColumnsTbl.contiguousSplit(splitIds: _*) } } } } } - - val paths = closeOnExcept(splits) { _ => - withResource(cbKeys) { _ => - // Use the existing code to convert each row into a path. It would be nice to do this - // on the GPU, but the data should be small and there are things we cannot easily - // support on the GPU right now - import scala.collection.JavaConverters._ - // paths - cbKeys.rowIterator().asScala.map(getPartitionPath).toArray - } - } + // Build the split result withResource(splits) { _ => - // NOTE: the `zip` here has the effect that will remove an extra `ContiguousTable` - // added at the end of `splits` because we use `upperBound` to find the split points, - // and the last split point is the number of rows. - val outDataTypes = description.dataColumns.map(_.dataType).toArray - splits.zip(paths).zipWithIndex.map { case ((split, path), ix) => - splits(ix) = null - withResource(split) { _ => - SplitAndPath( - SpillableColumnarBatch( - split, outDataTypes, SpillPriorities.ACTIVE_BATCHING_PRIORITY), - path) - } + withResource(keyHostCb) { _ => + val getBucketId = genGetBucketIdFunc(keyHostCb) + val getNextPartPath = genGetPartitionPathFunc(keyHostCb) + val outDataTypes = description.dataColumns.map(_.dataType).toArray + (0 until keyHostCb.numRows()).safeMap { idx => + val split = splits(idx) + splits(idx) = null + closeOnExcept(split) { _ => + SplitPack( + SpillableColumnarBatch(split, outDataTypes, + SpillPriorities.ACTIVE_BATCHING_PRIORITY), + getNextPartPath(idx), getBucketId(idx)) + } + }.toArray } } } - private def getBatchToWrite( - partBatch: SpillableColumnarBatch, - savedStatus: Option[WriterStatusWithCaches]): SpillableColumnarBatch = { - val outDataTypes = description.dataColumns.map(_.dataType).toArray - if (savedStatus.isDefined && savedStatus.get.tableCaches.nonEmpty) { - // In the case where the concurrent partition writers fall back, we need to - // incorporate into the current part any pieces that are already cached - // in the `savedStatus`. Adding `partBatch` to what was saved could make a - // concatenated batch with number of rows larger than `maxRecordsPerFile`, - // so this concatenated result could be split later, which is not efficient. However, - // the concurrent writers are default off in Spark, so it is not clear if this - // code path is worth optimizing. - val concat: Table = - withResource(savedStatus.get.tableCaches) { subSpillableBatches => - val toConcat = subSpillableBatches :+ partBatch - - // clear the caches - savedStatus.get.tableCaches.clear() - - withRetryNoSplit(toConcat.toSeq) { spillables => - withResource(spillables.safeMap(_.getColumnarBatch())) { batches => - withResource(batches.map(GpuColumnVector.from)) { subTables => - Table.concatenate(subTables: _*) - } - } - } - } - withResource(concat) { _ => - SpillableColumnarBatch( - GpuColumnVector.from(concat, outDataTypes), - SpillPriorities.ACTIVE_ON_DECK_PRIORITY) - } - } else { - partBatch + /** + * Create a new writer according to the given writer id, and update the given + * writer status. It also closes the old writer in the writer status by default. + */ + protected final def renewOutWriter(newWriterId: WriterIndex, curWriterStatus: WriterAndStatus, + closeOldWriter: Boolean = true): Unit = { + if (closeOldWriter) { + releaseOutWriter(curWriterStatus) } + curWriterStatus.recordsInFile = 0 + curWriterStatus.writer = newWriter(newWriterId.partitionPath, newWriterId.bucketId, + curWriterStatus.fileCounter) + } + + /** + * Set up a writer to the given writer status for the given writer id. + * It will create a new one if needed. This is used when seeing a new partition + * and(or) a new bucket id. + */ + protected def setupCurrentWriter(newWriterId: WriterIndex, curWriterStatus: WriterAndStatus, + closeOldWriter: Boolean = true): Unit = { + renewOutWriter(newWriterId, curWriterStatus, closeOldWriter) } /** - * Write columnar batch. - * If the `cachesMap` is not empty, this single writer should restore the writers and caches in - * the `cachesMap`, this single writer should first combine the caches and current split data - * for a specific partition before write. + * Opens a new OutputWriter given a partition key and/or a bucket id. + * If bucket id is specified, we will append it to the end of the file name, but before the + * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet * - * @param cb the column batch - * @param cachesMap used by `GpuDynamicPartitionDataConcurrentWriter` when fall back to single - * writer, single writer should handle the stored writers and the pending caches + * @param partDir the partition directory + * @param bucketId the bucket which all tuples being written by this OutputWriter belong to, + * currently does not support `bucketId`, it's always None + * @param fileCounter integer indicating the number of files to be written to `partDir` */ - protected def write( - batch: ColumnarBatch, - cachesMap: Option[mutable.HashMap[String, WriterStatusWithCaches]]): Unit = { - assert(isPartitioned) - assert(!isBucketed) + @scala.annotation.nowarn( + "msg=method newTaskTempFile.* in class FileCommitProtocol is deprecated" + ) + def newWriter(partDir: Option[String], bucketId: Option[Int], + fileCounter: Int): ColumnarOutputWriter = { + partDir.foreach(updatedPartitions.add) + // Currently will be empty + val bucketIdStr = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - val maxRecordsPerFile = description.maxRecordsPerFile - val partDataTypes = description.partitionColumns.map(_.dataType).toArray - - // We have an entire batch that is sorted, so we need to split it up by key - // to get a batch per path - withResource(splitBatchByKeyAndClose(batch, partDataTypes)) { splitsAndPaths => - splitsAndPaths.zipWithIndex.foreach { case (SplitAndPath(partBatch, partPath), ix) => - // If we fall back from `GpuDynamicPartitionDataConcurrentWriter`, we should get the - // saved status - val savedStatus = updateCurrentWriterIfNeeded(partPath, cachesMap) - - // combine `partBatch` with any remnants for this partition for the concurrent - // writer fallback case in `savedStatus` - splitsAndPaths(ix) = null - val batchToWrite = getBatchToWrite(partBatch, savedStatus) - - // if the batch fits, write it as is, else split and write it. - if (!shouldSplitToFitMaxRecordsPerFile(maxRecordsPerFile, - currentWriterStatus.recordsInFile, batchToWrite.numRows())) { - writeUpdateMetricsAndClose(currentWriterStatus, batchToWrite) - } else { - // materialize an actual batch since we are going to split it - // on the GPU - val batchToSplit = withRetryNoSplit(batchToWrite) { _ => - batchToWrite.getColumnarBatch() - } - val maxRecordsPerFileSplits = splitToFitMaxRecordsAndClose( - batchToSplit, - maxRecordsPerFile, - currentWriterStatus.recordsInFile) - writeSplitBatchesAndClose(maxRecordsPerFileSplits, maxRecordsPerFile, partPath) - } - } + // This must be in a form that matches our bucketing format. See BucketingUtils. + val ext = f"$bucketIdStr.c$fileCounter%03d" + + description.outputWriterFactory.getFileExtension(taskAttemptContext) + + val customPath = partDir.flatMap { dir => + description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) } + + val currentPath = if (customPath.isDefined) { + committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + } else { + committer.newTaskTempFile(taskAttemptContext, partDir, ext) + } + + val outWriter = description.outputWriterFactory.newInstance( + path = currentPath, + dataSchema = description.dataColumns.toStructType, + context = taskAttemptContext) + + statsTrackers.foreach(_.newFile(currentPath)) + outWriter } - private def updateCurrentWriterIfNeeded( - partPath: String, - cachesMap: Option[mutable.HashMap[String, WriterStatusWithCaches]]): - Option[WriterStatusWithCaches] = { - var savedStatus: Option[WriterStatusWithCaches] = None - if (currentPartPath != partPath) { - val previousPartPath = currentPartPath - currentPartPath = partPath - - // see a new partition, close the old writer - val previousWriterStatus = currentWriterStatus - if (previousWriterStatus != null) { - releaseWriter(previousWriterStatus.outputWriter) - } + protected final def writeBatchPerMaxRecordsAndClose(scb: SpillableColumnarBatch, + writerId: WriterIndex, writerStatus: WriterAndStatus): Unit = { + val maxRecordsPerFile = description.maxRecordsPerFile + val recordsInFile = writerStatus.recordsInFile - if (cachesMap.isDefined) { - savedStatus = cachesMap.get.get(currentPartPath) - if (savedStatus.isDefined) { - // first try to restore the saved writer status, - // `GpuDynamicPartitionDataConcurrentWriter` may already opened the writer, and may - // have pending caches - currentWriterStatus = savedStatus.get.writerStatus - // entire batch that is sorted, see a new partition, the old write status is useless - cachesMap.get.remove(previousPartPath) - } else { - // create a new one - val writer = newWriter(partPath, None, 0) - currentWriterStatus = new WriterStatus(writer) - statsTrackers.foreach(_.newPartition()) + if (!shouldSplitToFitMaxRecordsPerFile(maxRecordsPerFile, recordsInFile, scb.numRows())) { + writeUpdateMetricsAndClose(scb, writerStatus) + } else { + val batch = withRetryNoSplit(scb) { scb => + scb.getColumnarBatch() + } + val splits = splitToFitMaxRecordsAndClose(batch, maxRecordsPerFile, recordsInFile) + withResource(splits) { _ => + val needNewWriterForFirstPart = recordsInFile >= maxRecordsPerFile + splits.zipWithIndex.foreach { case (part, partIx) => + if (partIx > 0 || needNewWriterForFirstPart) { + writerStatus.fileCounter += 1 + assert(writerStatus.fileCounter <= MAX_FILE_COUNTER, + s"File counter ${writerStatus.fileCounter} is beyond max value $MAX_FILE_COUNTER") + // will create a new file, so close the old writer + renewOutWriter(writerId, writerStatus) + } + splits(partIx) = null + writeUpdateMetricsAndClose(part, writerStatus) } - } else { - // create a new one - val writer = newWriter(partPath, None, 0) - currentWriterStatus = new WriterStatus(writer) - statsTrackers.foreach(_.newPartition()) } } - savedStatus } /** - * Write an array of spillable batches. + * Called just before updating the current writer status when seeing a new partition + * or a bucket. * - * Note: `spillableBatches` will be closed in this function. - * - * @param batches the SpillableColumnarBatch splits to be written - * @param maxRecordsPerFile the max number of rows per file - * @param partPath the partition directory + * @param curWriterId the current writer index */ - private def writeSplitBatchesAndClose( - spillableBatches: Array[SpillableColumnarBatch], - maxRecordsPerFile: Long, - partPath: String): Unit = { - var needNewWriter = currentWriterStatus.recordsInFile >= maxRecordsPerFile - withResource(spillableBatches) { _ => - spillableBatches.zipWithIndex.foreach { case (part, partIx) => - if (needNewWriter) { - currentWriterStatus.fileCounter += 1 - assert(currentWriterStatus.fileCounter <= MAX_FILE_COUNTER, - s"File counter ${currentWriterStatus.fileCounter} " + - s"is beyond max value $MAX_FILE_COUNTER") - - // will create a new file, close the old writer - if (currentWriterStatus != null) { - releaseWriter(currentWriterStatus.outputWriter) - } + protected def preUpdateCurrentWriterStatus(curWriterId: WriterIndex): Unit ={} - // create a new writer and update the writer in the status - currentWriterStatus.outputWriter = - newWriter(partPath, None, currentWriterStatus.fileCounter) - currentWriterStatus.recordsInFile = 0 + override def write(batch: ColumnarBatch): Unit = { + // The input batch that is entirely sorted, so split it up by partitions and (or) + // bucket ids, and write the split batches one by one. + withResource(splitBatchByKeyAndClose(batch)) { splitPacks => + splitPacks.zipWithIndex.foreach { case (SplitPack(sp, partPath, bucketId), i) => + val hasDiffPart = partPath != currentWriterId.partitionPath + val hasDiffBucket = bucketId != currentWriterId.bucketId + if (hasDiffPart || hasDiffBucket) { + preUpdateCurrentWriterStatus(currentWriterId) + if (hasDiffPart) { + currentWriterId.partitionPath = partPath + statsTrackers.foreach(_.newPartition()) + } + if (hasDiffBucket) { + currentWriterId.bucketId = bucketId + } + currentWriterStatus.fileCounter = 0 + setupCurrentWriter(currentWriterId, currentWriterStatus) } - spillableBatches(partIx) = null - writeUpdateMetricsAndClose(currentWriterStatus, part) - needNewWriter = true - } - } - } - - protected def writeUpdateMetricsAndClose( - writerStatus: WriterStatus, - spillableBatch: SpillableColumnarBatch): Unit = { - writerStatus.recordsInFile += - writerStatus.outputWriter.writeSpillableAndClose(spillableBatch, statsTrackers) - } - - /** Release all resources. */ - override def releaseResources(): Unit = { - // does not use `currentWriter`, single writer use `currentWriterStatus` - assert(currentWriter == null) - - if (currentWriterStatus != null) { - try { - currentWriterStatus.outputWriter.close() - statsTrackers.foreach(_.closeFile(currentWriterStatus.outputWriter.path())) - } finally { - currentWriterStatus = null + splitPacks(i) = null + writeBatchPerMaxRecordsAndClose(sp, currentWriterId, currentWriterStatus) } } } } /** - * Dynamic partition writer with concurrent writers, meaning multiple concurrent writers are opened - * for writing. + * Dynamic partition writer with concurrent writers, meaning multiple concurrent + * writers are opened for writing. * * The process has the following steps: * - Step 1: Maintain a map of output writers per each partition columns. Keep all @@ -754,18 +668,29 @@ class GpuDynamicPartitionDataConcurrentWriter( description: GpuWriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, - spec: GpuConcurrentOutputWriterSpec, - taskContext: TaskContext) - extends GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) { + spec: GpuConcurrentOutputWriterSpec) + extends GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + with Logging { - // Keep all the unclosed writers, key is partition directory string. - // Note: if fall back to sort-based mode, also use the opened writers in the map. - private val concurrentWriters = mutable.HashMap[String, WriterStatusWithCaches]() + /** Wrapper class for status and caches of a unique concurrent output writer. */ + private class WriterStatusWithBatches extends WriterAndStatus with AutoCloseable { + // caches for this partition or writer + val tableCaches: ListBuffer[SpillableColumnarBatch] = ListBuffer() - // guarantee to close the caches and writers when task is finished - onTaskCompletion(taskContext)(closeCachesAndWriters()) + // current device bytes for the above caches + var deviceBytes: Long = 0 - private val outDataTypes = description.dataColumns.map(_.dataType).toArray + override def close(): Unit = try { + releaseOutWriter(this) + } finally { + tableCaches.safeClose() + tableCaches.clear() + } + } + + // Keep all the unclosed writers, key is a partition path and(or) bucket id. + // Note: if fall back to sort-based mode, also use the opened writers in the map. + private val concurrentWriters = mutable.HashMap[WriterIndex, WriterStatusWithBatches]() private val partitionFlushSize = if (description.concurrentWriterPartitionFlushSize <= 0) { @@ -777,324 +702,196 @@ class GpuDynamicPartitionDataConcurrentWriter( description.concurrentWriterPartitionFlushSize } - // refer to current batch if should fall back to `single writer` - private var currentFallbackColumnarBatch: ColumnarBatch = _ + // Pending split batches that are not cached for the concurrent write because + // there are too many open writers, and it is going to fall back to the sorted + // sequential write. + private val pendingBatches: mutable.Queue[SpillableColumnarBatch] = mutable.Queue.empty - override def abort(): Unit = { - try { - closeCachesAndWriters() - } finally { - committer.abortTask(taskAttemptContext) + override def writeWithIterator(iterator: Iterator[ColumnarBatch]): Unit = { + // 1: try concurrent writer + while (iterator.hasNext && pendingBatches.isEmpty) { + // concurrent write and update the `concurrentWriters` map. + this.write(iterator.next()) } - } - /** - * State to indicate if we are falling back to sort-based writer. - * Because we first try to use concurrent writers, its initial value is false. - */ - private var fallBackToSortBased: Boolean = false + // 2: fall back to single write if the input is not all consumed. + if (pendingBatches.nonEmpty || iterator.hasNext) { + // sort the all the pending batches and ones in `iterator` + val pendingCbsIter = new Iterator[ColumnarBatch] { + override def hasNext: Boolean = pendingBatches.nonEmpty - private def writeWithSingleWriter(cb: ColumnarBatch): Unit = { - // invoke `GpuDynamicPartitionDataSingleWriter`.write, - // single writer will take care of the unclosed writers and the pending caches - // in `concurrentWriters` - super.write(cb, Some(concurrentWriters)) + override def next(): ColumnarBatch = { + if (!hasNext) { + throw new NoSuchElementException() + } + withResource(pendingBatches.dequeue())(_.getColumnarBatch()) + } + } + val sortIter = GpuOutOfCoreSortIterator(pendingCbsIter ++ iterator, + new GpuSorter(spec.sortOrder, spec.output), GpuSortExec.targetSize(spec.batchSize), + NoopMetric, NoopMetric, NoopMetric, NoopMetric) + while (sortIter.hasNext) { + // write with sort-based sequential writer + super.write(sortIter.next()) + } + } } - private def writeWithConcurrentWriter(cb: ColumnarBatch): Unit = { - this.write(cb) + /** This is for the fallback case, used to clean the writers map. */ + override def preUpdateCurrentWriterStatus(curWriterId: WriterIndex): Unit = { + concurrentWriters.remove(curWriterId) } - /** - * Write an iterator of column batch. - * - * @param iterator the iterator of column batch - */ - override def writeWithIterator(iterator: Iterator[ColumnarBatch]): Unit = { - // 1: try concurrent writer - while (iterator.hasNext && !fallBackToSortBased) { - // concurrently write and update the `concurrentWriters` map - // the `` will be updated - writeWithConcurrentWriter(iterator.next()) + /** This is for the fallback case, try to find the writer from cache first. */ + override def setupCurrentWriter(newWriterId: WriterIndex, writerStatus: WriterAndStatus, + closeOldWriter: Boolean): Unit = { + if (closeOldWriter) { + releaseOutWriter(writerStatus) } - - // 2: fall back to single writer - // Note single writer should restore writer status and handle the pending caches - if (fallBackToSortBased) { - // concat the put back batch and un-coming batches - val newIterator = Iterator.single(currentFallbackColumnarBatch) ++ iterator - // sort the all the batches in `iterator` - - val sortIterator: GpuOutOfCoreSortIterator = getSorted(newIterator) - while (sortIterator.hasNext) { - // write with sort-based single writer - writeWithSingleWriter(sortIterator.next()) - } + val oOpenStatus = concurrentWriters.get(newWriterId) + if (oOpenStatus.isDefined) { + val openStatus = oOpenStatus.get + writerStatus.writer = openStatus.writer + writerStatus.recordsInFile = openStatus.recordsInFile + writerStatus.fileCounter = openStatus.fileCounter + } else { + super.setupCurrentWriter(newWriterId, writerStatus, closeOldWriter = false) } } /** - * Sort the input iterator by out of core sort - * - * @param iterator the input iterator - * @return sorted iterator - */ - private def getSorted(iterator: Iterator[ColumnarBatch]): GpuOutOfCoreSortIterator = { - val gpuSortOrder: Seq[SortOrder] = spec.sortOrder - val output: Seq[Attribute] = spec.output - val sorter = new GpuSorter(gpuSortOrder, output) - - // use noop metrics below - val sortTime = NoopMetric - val opTime = NoopMetric - val outputBatch = NoopMetric - val outputRows = NoopMetric - - val targetSize = GpuSortExec.targetSize(spec.batchSize) - // out of core sort the entire iterator - GpuOutOfCoreSortIterator(iterator, sorter, targetSize, - opTime, sortTime, outputBatch, outputRows) - } - - /** - * concurrent write the columnar batch - * Note: if new partitions number in `cb` plus existing partitions number is greater than - * `maxWriters` limit, will put back the whole `cb` to 'single writer` + * The write path of concurrent writers * - * @param cb the columnar batch + * @param cb the columnar batch to be written */ override def write(cb: ColumnarBatch): Unit = { - assert(isPartitioned) - assert(!isBucketed) - if (cb.numRows() == 0) { // TODO https://github.com/NVIDIA/spark-rapids/issues/6453 // To solve above issue, I assume that an empty batch will be wrote for saving metadata. // If the assumption it's true, this concurrent writer should write the metadata here, // and should not run into below splitting and caching logic + cb.close() return } - // 1. combine partition columns and `cb` columns into a column array - val columnsWithPartition = ArrayBuffer[ColumnVector]() - - // this withResource is here to decrement the refcount of the partition columns - // that are projected out of `cb` - withResource(getPartitionColumnsAsBatch(cb)) { partitionColumnsBatch => - columnsWithPartition.appendAll(GpuColumnVector.extractBases(partitionColumnsBatch)) - } - - val cols = GpuColumnVector.extractBases(cb) - columnsWithPartition ++= cols - - // 2. group by the partition columns - // get sub-groups for each partition and get unique keys for each partition - val groupsAndKeys = withResource( - new Table(columnsWithPartition.toSeq: _*)) { colsWithPartitionTbl => - // [0, partition columns number - 1] - val partitionIndices = description.partitionColumns.indices - - // group by partition columns - val op = colsWithPartitionTbl.groupBy(partitionIndices: _*) - // return groups and uniq keys table - // Each row in uniq keys table is corresponding to a group - op.contiguousSplitGroupsAndGenUniqKeys() - } - - withResource(groupsAndKeys) { _ => - // groups number should equal to uniq keys number - assert(groupsAndKeys.getGroups.length == groupsAndKeys.getUniqKeyTable.getRowCount) - - val (groups, keys) = (groupsAndKeys.getGroups, groupsAndKeys.getUniqKeyTable) - - // 3. generate partition strings for all sub-groups in advance - val partDataTypes = description.partitionColumns.map(_.dataType).toArray - val dataTypes = GpuColumnVector.extractTypes(cb) - // generate partition string list for all groups - val partitionStrList = getPartitionStrList(keys, partDataTypes) - // key table is useless now - groupsAndKeys.closeUniqKeyTable() - - // 4. cache each group according to each partitionStr - withResource(groups) { _ => - - // first update fallBackToSortBased - withResource(cb) { _ => - var newPartitionNum = 0 - var groupIndex = 0 - while (!fallBackToSortBased && groupIndex < groups.length) { - // get the partition string - val partitionStr = partitionStrList(groupIndex) - groupIndex += 1 - if (!concurrentWriters.contains(partitionStr)) { - newPartitionNum += 1 - if (newPartitionNum + concurrentWriters.size >= spec.maxWriters) { - fallBackToSortBased = true - currentFallbackColumnarBatch = cb - // `cb` should be put back to single writer - GpuColumnVector.incRefCounts(cb) - } - } - } - } - - if (!fallBackToSortBased) { - // not fall, collect all caches - var groupIndex = 0 - while (groupIndex < groups.length) { - // get the partition string and group pair - val (partitionStr, group) = (partitionStrList(groupIndex), groups(groupIndex)) - val groupTable = group.getTable - groupIndex += 1 - - // create writer if encounter a new partition and put into `concurrentWriters` map - if (!concurrentWriters.contains(partitionStr)) { - val w = newWriter(partitionStr, None, 0) - val ws = new WriterStatus(w) - concurrentWriters.put(partitionStr, new WriterStatusWithCaches(ws)) - statsTrackers.foreach(_.newPartition()) - } - - // get data columns, tail part is data columns - val dataColumns = ArrayBuffer[ColumnVector]() - for (i <- description.partitionColumns.length until groupTable.getNumberOfColumns) { - dataColumns += groupTable.getColumn(i) - } - withResource(new Table(dataColumns.toSeq: _*)) { dataTable => - withResource(GpuColumnVector.from(dataTable, dataTypes)) { cb => - val outputCb = getOutputCb(cb) - // convert to spillable cache and add to the pending cache - val currWriterStatus = concurrentWriters(partitionStr) - // create SpillableColumnarBatch to take the owner of `outputCb` - currWriterStatus.tableCaches += SpillableColumnarBatch( - outputCb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) - currWriterStatus.deviceBytes += GpuColumnVector.getTotalDeviceMemoryUsed(outputCb) - } - } + // Split the batch and cache the result, along with opening the writers. + splitBatchToCacheAndClose(cb) + // Write the cached batches + val writeFunc: (WriterIndex, WriterStatusWithBatches) => Unit = + if (pendingBatches.nonEmpty) { + // Flush all the caches before going into sorted sequential write + writeOneCacheAndClose + } else { + // Still the concurrent write, so write out only partitions that size > threshold. + (wi, ws) => + if (ws.deviceBytes > partitionFlushSize) { + writeOneCacheAndClose(wi, ws) } - } } - } - - // 5. find all big enough partitions and write - if(!fallBackToSortBased) { - for ((partitionDir, ws) <- findBigPartitions(partitionFlushSize)) { - writeAndCloseCache(partitionDir, ws) - } - } - } - - private def getPartitionStrList( - uniqKeysTable: Table, partDataTypes: Array[DataType]): Array[String] = { - withResource(copyToHostAsBatch(uniqKeysTable, partDataTypes)) { oneRowCb => - import scala.collection.JavaConverters._ - oneRowCb.rowIterator().asScala.map(getPartitionPath).toArray + concurrentWriters.foreach { case (writerIdx, writerStatus) => + writeFunc(writerIdx, writerStatus) } } - private def writeAndCloseCache(partitionDir: String, status: WriterStatusWithCaches): Unit = { + private def writeOneCacheAndClose(writerId: WriterIndex, + status: WriterStatusWithBatches): Unit = { assert(status.tableCaches.nonEmpty) + // Concat tables if needed + val scbToWrite = GpuBatchUtils.concatSpillBatchesAndClose(status.tableCaches.toSeq).get + status.tableCaches.clear() + status.deviceBytes = 0 + writeBatchPerMaxRecordsAndClose(scbToWrite, writerId, status) + } - // get concat table or the single table - val spillableToWrite = if (status.tableCaches.length >= 2) { - // concat the sub batches to write in once. - val concatted = withRetryNoSplit(status.tableCaches.toSeq) { spillableSubBatches => - withResource(spillableSubBatches.safeMap(_.getColumnarBatch())) { subBatches => - withResource(subBatches.map(GpuColumnVector.from)) { subTables => - Table.concatenate(subTables: _*) - } - } + private def splitBatchToCacheAndClose(batch: ColumnarBatch): Unit = { + // Split batch to groups by sort columns, [partition and(or) bucket id column]. + val (keysAndGroups, keyTypes) = withResource(batch) { _ => + val (opBatch, keyTypes) = withResource(getKeysBatch(batch)) { keysBatch => + val combinedCb = GpuColumnVector.combineColumns(keysBatch, batch) + (combinedCb, GpuColumnVector.extractTypes(keysBatch)) } - withResource(concatted) { _ => - SpillableColumnarBatch( - GpuColumnVector.from(concatted, outDataTypes), - SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + withResource(opBatch) { _ => + withResource(GpuColumnVector.from(opBatch)) { opTable => + (opTable.groupBy(keyTypes.indices: _*).contiguousSplitGroupsAndGenUniqKeys(), + keyTypes) + } } - } else { - // only one single table - status.tableCaches.head } - - status.tableCaches.clear() - - val maxRecordsPerFile = description.maxRecordsPerFile - if (!shouldSplitToFitMaxRecordsPerFile( - maxRecordsPerFile, status.writerStatus.recordsInFile, spillableToWrite.numRows())) { - writeUpdateMetricsAndClose(status.writerStatus, spillableToWrite) - } else { - val batchToSplit = withRetryNoSplit(spillableToWrite) { _ => - spillableToWrite.getColumnarBatch() - } - val splits = splitToFitMaxRecordsAndClose( - batchToSplit, - maxRecordsPerFile, - status.writerStatus.recordsInFile) - var needNewWriter = status.writerStatus.recordsInFile >= maxRecordsPerFile - withResource(splits) { _ => - splits.zipWithIndex.foreach { case (split, partIndex) => - if (needNewWriter) { - status.writerStatus.fileCounter += 1 - assert(status.writerStatus.fileCounter <= MAX_FILE_COUNTER, - s"File counter ${status.writerStatus.fileCounter} " + - s"is beyond max value $MAX_FILE_COUNTER") - status.writerStatus.outputWriter.close() - // start a new writer - val w = newWriter(partitionDir, None, status.writerStatus.fileCounter) - status.writerStatus.outputWriter = w - status.writerStatus.recordsInFile = 0L + // Copy keys table to host and make group batches spillable + val (keyHostCb, groups) = withResource(keysAndGroups) { _ => + // groups number should equal to uniq keys number + assert(keysAndGroups.getGroups.length == keysAndGroups.getUniqKeyTable.getRowCount) + closeOnExcept(copyToHostAsBatch(keysAndGroups.getUniqKeyTable, keyTypes)) { keyHostCb => + keysAndGroups.closeUniqKeyTable() + val allTypes = description.allColumns.map(_.dataType).toArray + val allColsIds = allTypes.indices.map(_ + keyTypes.length) + val gps = keysAndGroups.getGroups.safeMap { gp => + withResource(gp.getTable) { gpTable => + withResource(new Table(allColsIds.map(gpTable.getColumn): _*)) { allTable => + SpillableColumnarBatch(GpuColumnVector.from(allTable, allTypes), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } } - splits(partIndex) = null - writeUpdateMetricsAndClose(status.writerStatus, split) - needNewWriter = true } + (keyHostCb, gps) } } - status.tableCaches.clear() - status.deviceBytes = 0 - } - - def closeCachesAndWriters(): Unit = { - // collect all caches and writers - val allResources = ArrayBuffer[AutoCloseable]() - allResources ++= concurrentWriters.values.flatMap(ws => ws.tableCaches) - allResources ++= concurrentWriters.values.map { ws => - new AutoCloseable() { - override def close(): Unit = { - ws.writerStatus.outputWriter.close() - statsTrackers.foreach(_.closeFile(ws.writerStatus.outputWriter.path())) + // Cache the result to either the map or the pending queue. + withResource(groups) { _ => + withResource(keyHostCb) { _ => + val getBucketId = genGetBucketIdFunc(keyHostCb) + val getNextPartPath = genGetPartitionPathFunc(keyHostCb) + var idx = 0 + while (idx < groups.length && concurrentWriters.size < spec.maxWriters) { + val writerId = new WriterIndex(getNextPartPath(idx), getBucketId(idx)) + val writerStatus = + concurrentWriters.getOrElseUpdate(writerId, new WriterStatusWithBatches) + if (writerStatus.writer == null) { + // a new partition or bucket, so create a writer + renewOutWriter(writerId, writerStatus, closeOldWriter = false) + } + withResource(groups(idx)) { gp => + groups(idx) = null + withResource(gp.getColumnarBatch()) { cb => + val dataScb = SpillableColumnarBatch(getDataColumnsAsBatch(cb), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + writerStatus.tableCaches.append(dataScb) + writerStatus.deviceBytes += dataScb.sizeInBytes + } + } + idx += 1 + } + if (idx < groups.length) { + // The open writers number reaches the limit, and still some partitions are + // not cached. Append to the queue for the coming fallback to the sorted + // sequential write. + groups.drop(idx).foreach(g => pendingBatches.enqueue(g)) + // Set to null to avoid double close + (idx until groups.length).foreach(groups(_) = null) + logInfo(s"Number of concurrent writers ${concurrentWriters.size} reaches " + + "the threshold. Fall back from concurrent writers to sort-based sequential" + + " writer.") } } } - - // safe close all the caches and writers - allResources.safeClose() - - // clear `concurrentWriters` map - concurrentWriters.values.foreach(ws => ws.tableCaches.clear()) - concurrentWriters.clear() } /** Release all resources. */ override def releaseResources(): Unit = { - // does not use `currentWriter`, only use the writers in the concurrent writer map - assert(currentWriter == null) - - if (fallBackToSortBased) { - // Note: we should close the last partition writer in the single writer. - super.releaseResources() - } + pendingBatches.safeClose() + pendingBatches.clear() // write all caches - concurrentWriters.filter(pair => pair._2.tableCaches.nonEmpty) - .foreach(pair => writeAndCloseCache(pair._1, pair._2)) + concurrentWriters.foreach { case (wi, ws) => + if (ws.tableCaches.nonEmpty) { + writeOneCacheAndClose(wi, ws) + } + } // close all resources - closeCachesAndWriters() - } - - private def findBigPartitions( - sizeThreshold: Long): mutable.Map[String, WriterStatusWithCaches] = { - concurrentWriters.filter(pair => pair._2.deviceBytes >= sizeThreshold) + concurrentWriters.values.toSeq.safeClose() + concurrentWriters.clear() + super.releaseResources() } } @@ -1105,7 +902,7 @@ class GpuDynamicPartitionDataConcurrentWriter( * @param bucketFileNamePrefix Prefix of output file name based on bucket id. */ case class GpuWriterBucketSpec( - bucketIdExpression: Expression, + bucketIdExpression: GpuExpression, bucketFileNamePrefix: Int => String) /** @@ -1134,4 +931,23 @@ class GpuWriteJobDescription( |Partition columns: ${partitionColumns.mkString(", ")} |Data columns: ${dataColumns.mkString(", ")} """.stripMargin) -} \ No newline at end of file +} + +object BucketIdMetaUtils { + // Tag for the bucketing write using Spark Murmur3Hash + def tagForBucketingWrite(meta: RapidsMeta[_, _, _], bucketSpec: Option[BucketSpec], + outputColumns: Seq[Attribute]): Unit = { + bucketSpec.foreach { bSpec => + // Create a Murmur3Hash expression to leverage the overriding types check. + val expr = Murmur3Hash( + bSpec.bucketColumnNames.map(n => outputColumns.find(_.name == n).get), + GpuHashPartitioningBase.DEFAULT_HASH_SEED) + val hashMeta = GpuOverrides.wrapExpr(expr, meta.conf, None) + hashMeta.tagForGpu() + if(!hashMeta.canThisBeReplaced) { + meta.willNotWorkOnGpu(s"Hashing for generating bucket IDs can not run" + + s" on GPU. Details: ${hashMeta.explain(all=false)}") + } + } + } +} diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala index de066a5486d..d1a26dc80fc 100644 --- a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.rapids.{GpuDataSourceBase, GpuOrcFileFormat} +import org.apache.spark.sql.rapids.{BucketIdMetaUtils, GpuDataSourceBase, GpuOrcFileFormat} import org.apache.spark.sql.rapids.shims.GpuCreateDataSourceTableAsSelectCommand @@ -56,9 +56,7 @@ final class CreateDataSourceTableAsSelectCommandMeta( private var gpuProvider: Option[ColumnarFileFormat] = None override def tagSelfForGpuInternal(): Unit = { - if (cmd.table.bucketSpec.isDefined) { - willNotWorkOnGpu("bucketing is not supported") - } + BucketIdMetaUtils.tagForBucketingWrite(this, cmd.table.bucketSpec, cmd.outputColumns) if (cmd.table.provider.isEmpty) { willNotWorkOnGpu("provider must be defined") } @@ -94,4 +92,4 @@ final class CreateDataSourceTableAsSelectCommandMeta( conf.stableSort, conf.concurrentWriterPartitionFlushSize) } -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuOptimizedCreateHiveTableAsSelectCommandShims.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuOptimizedCreateHiveTableAsSelectCommandShims.scala index 5e2601a0467..55d9bc53704 100644 --- a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuOptimizedCreateHiveTableAsSelectCommandShims.scala +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/GpuOptimizedCreateHiveTableAsSelectCommandShims.scala @@ -184,9 +184,8 @@ final class OptimizedCreateHiveTableAsSelectCommandMeta( willNotWorkOnGpu("partitioned writes are not supported") } - if (tableDesc.bucketSpec.isDefined) { - willNotWorkOnGpu("bucketing is not supported") - } + GpuBucketingUtils.tagForHiveBucketingWrite(this, tableDesc.bucketSpec, + cmd.outputColumns, false) val serde = tableDesc.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) if (serde.contains("parquet")) { diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/spark311/GpuBucketingUtils.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/spark311/GpuBucketingUtils.scala new file mode 100644 index 00000000000..a604267d1d9 --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/spark311/GpuBucketingUtils.scala @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.RapidsMeta + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.rapids.{BucketIdMetaUtils, GpuWriterBucketSpec} + +object GpuBucketingUtils { + + def getWriterBucketSpec( + bucketSpec: Option[BucketSpec], + dataColumns: Seq[Attribute], + options: Map[String, String], + forceHiveHash: Boolean): Option[GpuWriterBucketSpec] = { + bucketSpec.map { spec => + val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) + if (forceHiveHash) { + // Forcely use HiveHash for Hive write commands for some customized Spark binaries. + // TODO: Cannot support this until we support Hive hash partitioning on the GPU + throw new UnsupportedOperationException("Hive hash partitioning is not supported" + + " on GPU") + } else { + // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id + // expression, so that we can guarantee the data distribution is same between shuffle and + // bucketed data source, which enables us to only shuffle one side when join a bucketed + // table and a normal one. + val bucketIdExpression = GpuHashPartitioning(bucketColumns, spec.numBuckets) + .partitionIdExpression + GpuWriterBucketSpec(bucketIdExpression, (_: Int) => "") + } + } + } + + def isHiveHashBucketing(options: Map[String, String]): Boolean = false + + def getOptionsWithHiveBucketWrite(bucketSpec: Option[BucketSpec]): Map[String, String] = { + Map.empty + } + + def tagForHiveBucketingWrite(meta: RapidsMeta[_, _, _], bucketSpec: Option[BucketSpec], + outColumns: Seq[Attribute], forceHiveHash: Boolean): Unit = { + if (forceHiveHash) { + bucketSpec.foreach(_ => + meta.willNotWorkOnGpu("Hive Hashing for generating bucket IDs is not supported yet") + ) + } else { + BucketIdMetaUtils.tagForBucketingWrite(meta, bucketSpec, outColumns) + } + } +} diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuCreateHiveTableAsSelectCommand.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuCreateHiveTableAsSelectCommand.scala index 034567d60e5..acdd53b74ab 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuCreateHiveTableAsSelectCommand.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuCreateHiveTableAsSelectCommand.scala @@ -36,7 +36,7 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.hive.rapids.shims import com.nvidia.spark.rapids.{DataFromReplacementRule, DataWritingCommandMeta, GpuDataWritingCommand, GpuOverrides, RapidsConf, RapidsMeta} -import com.nvidia.spark.rapids.shims.GpuCreateHiveTableAsSelectBase +import com.nvidia.spark.rapids.shims.{GpuBucketingUtils, GpuCreateHiveTableAsSelectBase} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} @@ -61,9 +61,8 @@ final class GpuCreateHiveTableAsSelectCommandMeta(cmd: CreateHiveTableAsSelectCo willNotWorkOnGpu("partitioned writes are not supported") } - if (tableDesc.bucketSpec.isDefined) { - willNotWorkOnGpu("bucketing is not supported") - } + GpuBucketingUtils.tagForHiveBucketingWrite(this, tableDesc.bucketSpec, + cmd.outputColumns, false) val catalog = spark.sessionState.catalog val tableExists = catalog.tableExists(tableDesc.identifier) @@ -137,4 +136,4 @@ case class GpuCreateHiveTableAsSelectCommand( // Do not support partitioned or bucketed writes override def requireSingleBatch: Boolean = false -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala index 2ea0301fa2c..3f59d6565a5 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala @@ -38,6 +38,7 @@ package org.apache.spark.sql.hive.rapids.shims import java.util.Locale import com.nvidia.spark.rapids.{ColumnarFileFormat, DataFromReplacementRule, DataWritingCommandMeta, GpuDataWritingCommand, RapidsConf, RapidsMeta} +import com.nvidia.spark.rapids.shims.GpuBucketingUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf @@ -216,7 +217,9 @@ case class GpuInsertIntoHiveTable( hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation = tmpLocation.toString, - partitionAttributes = partitionAttributes) + partitionAttributes = partitionAttributes, + bucketSpec = table.bucketSpec, + options = GpuBucketingUtils.getOptionsWithHiveBucketWrite(table.bucketSpec)) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index f788971a85f..4adbd7b2ef5 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -39,7 +39,7 @@ import java.util.{Date, UUID} import com.nvidia.spark.TimingUtils import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.shims.RapidsFileSourceMetaUtils +import com.nvidia.spark.rapids.shims.{GpuBucketingUtils, RapidsFileSourceMetaUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ @@ -136,13 +136,8 @@ object GpuFileFormatWriter extends Logging { if (projectList.nonEmpty) GpuProjectExec(projectList, plan)() else plan } - val writerBucketSpec: Option[GpuWriterBucketSpec] = bucketSpec.map { spec => - // TODO: Cannot support this until we: - // support Hive hash partitioning on the GPU - throw new UnsupportedOperationException("GPU hash partitioning for bucketed data is not " - + "compatible with the CPU version") - } - + val writerBucketSpec = GpuBucketingUtils.getWriterBucketSpec(bucketSpec, dataColumns, + options, false) val sortColumns = bucketSpec.toSeq.flatMap { spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) } @@ -328,8 +323,8 @@ object GpuFileFormatWriter extends Logging { } else { concurrentOutputWriterSpec match { case Some(spec) => - new GpuDynamicPartitionDataConcurrentWriter( - description, taskAttemptContext, committer, spec, TaskContext.get()) + new GpuDynamicPartitionDataConcurrentWriter(description, taskAttemptContext, + committer, spec) case _ => new GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) } diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/spark330/GpuBucketingUtils.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/spark330/GpuBucketingUtils.scala new file mode 100644 index 00000000000..feb562fa9b8 --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/spark330/GpuBucketingUtils.scala @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "334"} +{"spark": "340"} +{"spark": "341"} +{"spark": "341db"} +{"spark": "342"} +{"spark": "343"} +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.RapidsMeta + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.datasources.BucketingUtils +import org.apache.spark.sql.rapids.GpuWriterBucketSpec + +object GpuBucketingUtils { + + def getWriterBucketSpec( + bucketSpec: Option[BucketSpec], + dataColumns: Seq[Attribute], + options: Map[String, String], + forceHiveHash: Boolean): Option[GpuWriterBucketSpec] = { + bucketSpec.map { spec => + val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) + val shouldHiveCompatibleWrite = options.getOrElse( + BucketingUtils.optionForHiveCompatibleBucketWrite, "false").toBoolean + if (shouldHiveCompatibleWrite) { + // TODO: Cannot support this until we support Hive hash partitioning on the GPU + throw new UnsupportedOperationException("Hive hash partitioning is not supported" + + " on GPU") + } else { + // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id + // expression, so that we can guarantee the data distribution is same between shuffle and + // bucketed data source, which enables us to only shuffle one side when join a bucketed + // table and a normal one. + val bucketIdExpression = GpuHashPartitioning(bucketColumns, spec.numBuckets) + .partitionIdExpression + GpuWriterBucketSpec(bucketIdExpression, (_: Int) => "") + } + } + } + + def isHiveHashBucketing(options: Map[String, String]): Boolean = { + options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false").toBoolean + } + + def getOptionsWithHiveBucketWrite(bucketSpec: Option[BucketSpec]): Map[String, String] = { + bucketSpec + .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) + .getOrElse(Map.empty) + } + + def tagForHiveBucketingWrite(meta: RapidsMeta[_, _, _], bucketSpec: Option[BucketSpec], + outColumns: Seq[Attribute], forceHiveHash: Boolean): Unit = { + bucketSpec.foreach(_ => + // From Spark330, Hive write always uses HiveHash to generate bucket IDs. + meta.willNotWorkOnGpu("Hive Hashing for generating bucket IDs is not supported yet") + ) + } +} diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala index faa550c0cb6..f51bd984bdc 100644 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala @@ -30,10 +30,10 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand +import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.rapids.{GpuDataSourceBase, GpuOrcFileFormat} +import org.apache.spark.sql.rapids.{BucketIdMetaUtils, GpuDataSourceBase, GpuOrcFileFormat} import org.apache.spark.sql.rapids.shims.GpuCreateDataSourceTableAsSelectCommand final class CreateDataSourceTableAsSelectCommandMeta( @@ -46,9 +46,9 @@ final class CreateDataSourceTableAsSelectCommandMeta( private var origProvider: Class[_] = _ override def tagSelfForGpu(): Unit = { - if (cmd.table.bucketSpec.isDefined) { - willNotWorkOnGpu("bucketing is not supported") - } + val outputColumns = + DataWritingCommand.logicalPlanOutputWithNames(cmd.query, cmd.outputColumnNames) + BucketIdMetaUtils.tagForBucketingWrite(this, cmd.table.bucketSpec, outputColumns) if (cmd.table.provider.isEmpty) { willNotWorkOnGpu("provider must be defined") } @@ -76,4 +76,4 @@ final class CreateDataSourceTableAsSelectCommandMeta( cmd.outputColumnNames, origProvider) } -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala index 42fd5941025..b3103c3c76e 100644 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala @@ -30,6 +30,7 @@ package org.apache.spark.sql.hive.rapids.shims import java.util.Locale import com.nvidia.spark.rapids.{ColumnarFileFormat, DataFromReplacementRule, DataWritingCommandMeta, GpuDataWritingCommand, RapidsConf, RapidsMeta} +import com.nvidia.spark.rapids.shims.GpuBucketingUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf @@ -205,7 +206,9 @@ case class GpuInsertIntoHiveTable( hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation = tmpLocation.toString, - partitionAttributes = partitionAttributes) + partitionAttributes = partitionAttributes, + bucketSpec = table.bucketSpec, + options = GpuBucketingUtils.getOptionsWithHiveBucketWrite(table.bucketSpec)) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { @@ -349,4 +352,4 @@ case class GpuInsertIntoHiveTable( } override def requireSingleBatch: Boolean = false // TODO: Re-evaluate. If partitioned or bucketed? -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuOptimizedCreateHiveTableAsSelectCommandShims.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuOptimizedCreateHiveTableAsSelectCommandShims.scala index 53c17d2f946..e74bf979af9 100644 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuOptimizedCreateHiveTableAsSelectCommandShims.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuOptimizedCreateHiveTableAsSelectCommandShims.scala @@ -197,9 +197,9 @@ final class OptimizedCreateHiveTableAsSelectCommandMeta( willNotWorkOnGpu("partitioned writes are not supported") } - if (tableDesc.bucketSpec.isDefined) { - willNotWorkOnGpu("bucketing is not supported") - } + val outputColumns = + DataWritingCommand.logicalPlanOutputWithNames(cmd.query, cmd.outputColumnNames) + GpuBucketingUtils.tagForHiveBucketingWrite(this, tableDesc.bucketSpec, outputColumns, false) val serde = tableDesc.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) if (serde.contains("parquet")) { diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index e7b3561f5fd..874d89353aa 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -31,7 +31,7 @@ import java.util.{Date, UUID} import com.nvidia.spark.TimingUtils import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.shims.RapidsFileSourceMetaUtils +import com.nvidia.spark.rapids.shims.{GpuBucketingUtils, RapidsFileSourceMetaUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ @@ -119,13 +119,8 @@ object GpuFileFormatWriter extends Logging { .map(RapidsFileSourceMetaUtils.cleanupFileSourceMetadataInformation)) val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains) - val writerBucketSpec: Option[GpuWriterBucketSpec] = bucketSpec.map { spec => - // TODO: Cannot support this until we: - // support Hive hash partitioning on the GPU - throw new UnsupportedOperationException("GPU hash partitioning for bucketed data is not " - + "compatible with the CPU version") - } - + val writerBucketSpec = GpuBucketingUtils.getWriterBucketSpec(bucketSpec, dataColumns, + options, false) val sortColumns = bucketSpec.toSeq.flatMap { spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) } @@ -419,8 +414,8 @@ object GpuFileFormatWriter extends Logging { } else { concurrentOutputWriterSpec match { case Some(spec) => - new GpuDynamicPartitionDataConcurrentWriter( - description, taskAttemptContext, committer, spec, TaskContext.get()) + new GpuDynamicPartitionDataConcurrentWriter(description, taskAttemptContext, + committer, spec) case _ => new GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) } diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala index 5aaeae2c7b9..d52c8b47ae7 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf.TableWriter -import com.nvidia.spark.rapids.{ColumnarOutputWriter, ColumnarOutputWriterFactory, GpuBoundReference, GpuColumnVector, RapidsBufferCatalog, RapidsDeviceMemoryStore, ScalableTaskCompletion} +import com.nvidia.spark.rapids.{ColumnarOutputWriter, ColumnarOutputWriterFactory, GpuColumnVector, GpuLiteral, RapidsBufferCatalog, RapidsDeviceMemoryStore, ScalableTaskCompletion} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.jni.{GpuRetryOOM, GpuSplitAndRetryOOM} import org.apache.hadoop.conf.Configuration @@ -28,7 +28,6 @@ import org.scalatest.BeforeAndAfterEach import org.scalatest.funsuite.AnyFunSuite import org.scalatestplus.mockito.MockitoSugar.mock -import org.apache.spark.TaskContext import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, ExprId, SortOrder} @@ -39,7 +38,6 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { private var mockJobDescription: GpuWriteJobDescription = _ - private var mockTaskContext: TaskContext = _ private var mockTaskAttemptContext: TaskAttemptContext = _ private var mockCommitter: FileCommitProtocol = _ private var mockOutputWriterFactory: ColumnarOutputWriterFactory = _ @@ -48,6 +46,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { private var allCols: Seq[AttributeReference] = _ private var partSpec: Seq[AttributeReference] = _ private var dataSpec: Seq[AttributeReference] = _ + private var bucketSpec: Option[GpuWriterBucketSpec] = None private var includeRetry: Boolean = false class NoTransformColumnarOutputWriter( @@ -102,9 +101,9 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { allCols = null partSpec = null dataSpec = null + bucketSpec = None mockJobDescription = mock[GpuWriteJobDescription] when(mockJobDescription.statsTrackers).thenReturn(Seq.empty) - mockTaskContext = mock[TaskContext] mockTaskAttemptContext = mock[TaskAttemptContext] mockCommitter = mock[FileCommitProtocol] mockOutputWriterFactory = mock[ColumnarOutputWriterFactory] @@ -130,8 +129,12 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { * It is used to setup certain mocks before `body` is executed. After execution, the * columns in the batches are checked for `refCount==0` (e.g. that they were closed). * @note it is assumed that the schema of each batch is identical. + * numBuckets > 0: Bucketing only + * numBuckets == 0: Partition only + * numBuckets < 0: Both partition and bucketing */ - def withColumnarBatchesVerifyClosed[V](cbs: Seq[ColumnarBatch])(body: => V): Unit = { + def withColumnarBatchesVerifyClosed[V]( + cbs: Seq[ColumnarBatch], numBuckets: Int = 0)(body: => V): Unit = { val allTypes = cbs.map(GpuColumnVector.extractTypes) allCols = Seq.empty dataSpec = Seq.empty @@ -140,8 +143,17 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { allCols = allTypes.head.zipWithIndex.map { case (dataType, colIx) => AttributeReference(s"col_$colIx", dataType, nullable = false)(ExprId(colIx)) } - partSpec = Seq(allCols.head) - dataSpec = allCols.tail + if (numBuckets <= 0) { + partSpec = Seq(allCols.head) + dataSpec = allCols.tail + } else { + dataSpec = allCols + } + if (numBuckets != 0) { + bucketSpec = Some(GpuWriterBucketSpec( + GpuPmod(GpuMurmur3Hash(Seq(allCols.last), 42), GpuLiteral(Math.abs(numBuckets))), + _ => "")) + } } val fields = new Array[StructField](allCols.size) allCols.zipWithIndex.foreach { case (col, ix) => @@ -153,6 +165,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { } when(mockJobDescription.dataColumns).thenReturn(dataSpec) when(mockJobDescription.partitionColumns).thenReturn(partSpec) + when(mockJobDescription.bucketSpec).thenReturn(bucketSpec) when(mockJobDescription.allColumns).thenReturn(allCols) try { body @@ -187,6 +200,20 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { new ColumnarBatch(cols, rowCount) } + def buildBatchWithPartitionedAndBucketCols( + partInts: Seq[Int], bucketInts: Seq[Int]): ColumnarBatch = { + assert(partInts.length == bucketInts.length) + val rowCount = partInts.size + val cols: Array[ColumnVector] = new Array[ColumnVector](3) + val partCol = ai.rapids.cudf.ColumnVector.fromInts(partInts: _*) + val dataCol = ai.rapids.cudf.ColumnVector.fromStrings(partInts.map(_.toString): _*) + val bucketCol = ai.rapids.cudf.ColumnVector.fromInts(bucketInts: _*) + cols(0) = GpuColumnVector.from(partCol, IntegerType) + cols(1) = GpuColumnVector.from(dataCol, StringType) + cols(2) = GpuColumnVector.from(bucketCol, IntegerType) + new ColumnarBatch(cols, rowCount) + } + def verifyClosed(cbs: Seq[ColumnarBatch]): Unit = { cbs.foreach { cb => val cols = GpuColumnVector.extractBases(cb) @@ -198,7 +225,6 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { def prepareDynamicPartitionSingleWriter(): GpuDynamicPartitionDataSingleWriter = { - when(mockJobDescription.bucketSpec).thenReturn(None) when(mockJobDescription.customPartitionLocations) .thenReturn(Map.empty[TablePartitionSpec, String]) @@ -212,13 +238,10 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { GpuDynamicPartitionDataConcurrentWriter = { val mockConfig = new Configuration() when(mockTaskAttemptContext.getConfiguration).thenReturn(mockConfig) - when(mockJobDescription.bucketSpec).thenReturn(None) when(mockJobDescription.customPartitionLocations) .thenReturn(Map.empty[TablePartitionSpec, String]) - // assume the first column is the partition-by column - val sortExpr = - GpuBoundReference(0, partSpec.head.dataType, nullable = false)(ExprId(0), "") - val sortSpec = Seq(SortOrder(sortExpr, Ascending)) + val sortSpec = (partSpec ++ bucketSpec.map(_.bucketIdExpression)) + .map(SortOrder(_, Ascending)) val concurrentSpec = GpuConcurrentOutputWriterSpec( maxWriters, allCols, batchSize, sortSpec) @@ -226,8 +249,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { mockJobDescription, mockTaskAttemptContext, mockCommitter, - concurrentSpec, - mockTaskContext)) + concurrentSpec)) } test("empty directory data writer") { @@ -317,18 +339,6 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { } } - test("dynamic partition data writer doesn't support bucketing") { - resetMocksWithAndWithoutRetry { - withColumnarBatchesVerifyClosed(Seq.empty) { - when(mockJobDescription.bucketSpec).thenReturn(Some(GpuWriterBucketSpec(null, null))) - assertThrows[UnsupportedOperationException] { - new GpuDynamicPartitionDataSingleWriter( - mockJobDescription, mockTaskAttemptContext, mockCommitter) - } - } - } - } - test("dynamic partition data writer without splits") { resetMocksWithAndWithoutRetry { // 4 partitions @@ -353,6 +363,35 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { } } + test("dynamic partition data writer bucketing write without splits") { + Seq(5, -5).foreach { numBuckets => + val (numWrites, numNewWriters) = if (numBuckets > 0) { // Bucket only + (6, 6) // 3 buckets + 3 buckets + } else { // partition and bucket + (10, 10) // 5 pairs + 5 pairs + } + resetMocksWithAndWithoutRetry { + val cb = buildBatchWithPartitionedAndBucketCols( + IndexedSeq(1, 1, 2, 2, 3, 3, 4, 4), + IndexedSeq(1, 1, 1, 1, 2, 2, 2, 3)) + val cb2 = buildBatchWithPartitionedAndBucketCols( + IndexedSeq(1, 2, 3, 4, 5), + IndexedSeq(1, 1, 2, 2, 3)) + val cbs = Seq(spy(cb), spy(cb2)) + withColumnarBatchesVerifyClosed(cbs, numBuckets) { + // setting to 9 then the writer won't split as no group has more than 9 rows + when(mockJobDescription.maxRecordsPerFile).thenReturn(9) + val dynamicSingleWriter = prepareDynamicPartitionSingleWriter() + dynamicSingleWriter.writeWithIterator(cbs.iterator) + dynamicSingleWriter.commit() + verify(mockOutputWriter, times(numWrites)).writeSpillableAndClose(any(), any()) + verify(dynamicSingleWriter, times(numNewWriters)).newWriter(any(), any(), any()) + verify(mockOutputWriter, times(numNewWriters)).close() + } + } + } + } + test("dynamic partition data writer with splits") { resetMocksWithAndWithoutRetry { val cb = buildBatchWithPartitionedCol(1, 1, 2, 2, 3, 3, 4, 4) @@ -399,6 +438,38 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { } } + test("dynamic partition concurrent data writer bucketing write without splits") { + Seq(5, -5).foreach { numBuckets => + val (numWrites, numNewWriters) = if (numBuckets > 0) { // Bucket only + (3, 3) // 3 distinct buckets in total + } else { // partition and bucket + (6, 6) // 6 distinct pairs in total + } + resetMocksWithAndWithoutRetry { + val cb = buildBatchWithPartitionedAndBucketCols( + IndexedSeq(1, 1, 2, 2, 3, 3, 4, 4), + IndexedSeq(1, 1, 1, 1, 2, 2, 2, 3)) + val cb2 = buildBatchWithPartitionedAndBucketCols( + IndexedSeq(1, 2, 3, 4, 5), + IndexedSeq(1, 1, 2, 2, 3)) + val cbs = Seq(spy(cb), spy(cb2)) + withColumnarBatchesVerifyClosed(cbs, numBuckets) { + // setting to 9 then the writer won't split as no group has more than 9 rows + when(mockJobDescription.maxRecordsPerFile).thenReturn(9) + // I would like to not flush on the first iteration of the `write` method + when(mockJobDescription.concurrentWriterPartitionFlushSize).thenReturn(1000) + val dynamicConcurrentWriter = + prepareDynamicPartitionConcurrentWriter(maxWriters = 20, batchSize = 100) + dynamicConcurrentWriter.writeWithIterator(cbs.iterator) + dynamicConcurrentWriter.commit() + verify(mockOutputWriter, times(numWrites)).writeSpillableAndClose(any(), any()) + verify(dynamicConcurrentWriter, times(numNewWriters)).newWriter(any(), any(), any()) + verify(mockOutputWriter, times(numNewWriters)).close() + } + } + } + } + test("dynamic partition concurrent data writer with splits and flush") { resetMocksWithAndWithoutRetry { val cb = buildBatchWithPartitionedCol(1, 1, 2, 2, 3, 3, 4, 4) @@ -438,8 +509,9 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { prepareDynamicPartitionConcurrentWriter(maxWriters = 1, batchSize = 1) dynamicConcurrentWriter.writeWithIterator(cbs.iterator) dynamicConcurrentWriter.commit() - // 5 batches written, one per partition (no splitting) - verify(mockOutputWriter, times(5)) + // 6 batches written, one per partition (no splitting) plus one written by + // the concurrent writer. + verify(mockOutputWriter, times(6)) .writeSpillableAndClose(any(), any()) verify(dynamicConcurrentWriter, times(5)).newWriter(any(), any(), any()) // 5 files written because this is the single writer mode