diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index e6553597b33..d78e4ef0588 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -223,14 +223,18 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs): string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] - first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0/key2=20' with_cpu_session( lambda spark : gen_df(spark, gen_list).write.parquet(first_data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'}) - second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1/key2=21' with_cpu_session( lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) + third_data_path = spark_tmp_path + '/PARQUET_DATA/key=2/key2=22' + with_cpu_session( + lambda spark : gen_df(spark, gen_list).write.parquet(third_data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) data_path = spark_tmp_path + '/PARQUET_DATA' all_confs = reader_confs.copy() all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala index 65a73d8a3ba..d679dabe4ed 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala @@ -83,6 +83,17 @@ trait Arm { } } + /** Executes the provided code block, closing the resources only if an exception occurs */ + def closeOnExcept[T <: AutoCloseable, V](r: Array[T])(block: Array[T] => V): V = { + try { + block(r) + } catch { + case t: Throwable => + r.safeClose(t) + throw t + } + } + /** Executes the provided code block, closing the resources only if an exception occurs */ def closeOnExcept[T <: AutoCloseable, V](r: ArrayBuffer[T])(block: ArrayBuffer[T] => V): V = { try { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala index 1de50840697..9687fee9294 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala @@ -74,25 +74,26 @@ object ColumnarPartitionReaderWithPartitionValues extends Arm { fileBatch: ColumnarBatch, partitionValues: Array[Scalar], sparkTypes: Array[DataType]): ColumnarBatch = { - var partitionColumns: Array[GpuColumnVector] = null - try { - partitionColumns = buildPartitionColumns(fileBatch.numRows, partitionValues, sparkTypes) - val fileBatchCols = (0 until fileBatch.numCols).map(fileBatch.column) - val resultCols = fileBatchCols ++ partitionColumns - val result = new ColumnarBatch(resultCols.toArray, fileBatch.numRows) - fileBatchCols.foreach(_.asInstanceOf[GpuColumnVector].incRefCount()) - partitionColumns = null - result - } finally { - if (fileBatch != null) { - fileBatch.close() - } - if (partitionColumns != null) { - partitionColumns.safeClose() + withResource(fileBatch) { _ => + closeOnExcept(buildPartitionColumns(fileBatch.numRows, partitionValues, sparkTypes)) { + partitionColumns => addGpuColumVectorsToBatch(fileBatch, partitionColumns) } } } + /** + * The caller is responsible for closing the fileBatch passed in. + */ + def addGpuColumVectorsToBatch( + fileBatch: ColumnarBatch, + partitionColumns: Array[GpuColumnVector]): ColumnarBatch = { + val fileBatchCols = (0 until fileBatch.numCols).map(fileBatch.column) + val resultCols = fileBatchCols ++ partitionColumns + val result = new ColumnarBatch(resultCols.toArray, fileBatch.numRows) + fileBatchCols.foreach(_.asInstanceOf[GpuColumnVector].incRefCount()) + result + } + private def buildPartitionColumns( numRows: Int, partitionValues: Array[Scalar], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 746f49cb4d2..895d2526b03 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -990,23 +990,97 @@ class MultiFileParquetPartitionReader( } } + private def buildAndConcatPartitionColumns( + rowsPerPartition: Array[Long], + inPartitionValues: Array[InternalRow]): Array[GpuColumnVector] = { + val numCols = partitionSchema.fields.size + val allPartCols = new Array[GpuColumnVector](numCols) + // build the partitions vectors for all partitions within each column + // and concatenate those together then go to the next column + for ((field, colIndex) <- partitionSchema.fields.zipWithIndex) { + val dataType = field.dataType + val partitionColumns = new Array[GpuColumnVector](inPartitionValues.size) + withResource(new Array[GpuColumnVector](inPartitionValues.size)) { + partitionColumns => + for ((rowsInPart, partIndex) <- rowsPerPartition.zipWithIndex) { + val partInternalRow = inPartitionValues(partIndex) + val partValueForCol = partInternalRow.get(colIndex, dataType) + val partitionScalar = GpuScalar.from(partValueForCol, dataType) + withResource(partitionScalar) { scalar => + partitionColumns(partIndex) = GpuColumnVector.from( + ai.rapids.cudf.ColumnVector.fromScalar(scalar, rowsInPart.toInt), + dataType) + } + } + val baseOfCols = partitionColumns.map(_.getBase) + allPartCols(colIndex) = GpuColumnVector.from( + ColumnVector.concatenate(baseOfCols: _*), field.dataType) + } + } + allPartCols + } + + private def concatAndAddPartitionColsToBatch( + cb: ColumnarBatch, + rowsPerPartition: Array[Long], + inPartitionValues: Array[InternalRow]): ColumnarBatch = { + withResource(cb) { _ => + closeOnExcept(buildAndConcatPartitionColumns(rowsPerPartition, inPartitionValues)) { + allPartCols => + ColumnarPartitionReaderWithPartitionValues.addGpuColumVectorsToBatch(cb, allPartCols) + } + } + } + + /** + * Add all partition values found to the batch. There could be more then one partition + * value in the batch so we have to build up columns with the correct number of rows + * for each partition value. + * + * @param batch - columnar batch to append partition values to + * @param inPartitionValues - array of partition values + * @param rowsPerPartition - the number of rows that require each partition value + * @param partitionSchema - schema of the partitions + * @return + */ + protected def addAllPartitionValues( + batch: Option[ColumnarBatch], + inPartitionValues: Array[InternalRow], + rowsPerPartition: Array[Long], + partitionSchema: StructType): Option[ColumnarBatch] = { + assert(rowsPerPartition.size == inPartitionValues.size) + if (partitionSchema.nonEmpty) { + batch.map { cb => + val numPartitions = inPartitionValues.size + if (numPartitions > 1) { + concatAndAddPartitionColsToBatch(cb, rowsPerPartition, inPartitionValues) + } else { + // single partition, add like other readers + addPartitionValues(Some(cb), inPartitionValues.head, partitionSchema).get + } + } + } else { + batch + } + } + private def readBatch(): Option[ColumnarBatch] = { withResource(new NvtxRange("Parquet readBatch", NvtxColor.GREEN)) { _ => - val (isCorrectRebaseMode, clippedSchema, partValues, seqPathsAndBlocks) = - populateCurrentBlockChunk() + val currentChunkMeta = populateCurrentBlockChunk() if (readDataSchema.isEmpty) { // not reading any data, so return a degenerate ColumnarBatch with the row count - val numRows = seqPathsAndBlocks.map(_._2.getRowCount).sum.toInt - if (numRows == 0) { + if (currentChunkMeta.numTotalRows == 0) { None } else { // Someone is going to process this data, even if it is just a row count GpuSemaphore.acquireIfNecessary(TaskContext.get()) - val emptyBatch = new ColumnarBatch(Array.empty, numRows.toInt) - addPartitionValues(Some(emptyBatch), partValues, partitionSchema) + val emptyBatch = new ColumnarBatch(Array.empty, currentChunkMeta.numTotalRows.toInt) + addAllPartitionValues(Some(emptyBatch), currentChunkMeta.allPartValues, + currentChunkMeta.rowsPerPartition, partitionSchema) } } else { - val table = readToTable(seqPathsAndBlocks, clippedSchema, isCorrectRebaseMode) + val table = readToTable(currentChunkMeta.currentChunk, currentChunkMeta.clippedSchema, + currentChunkMeta.isCorrectRebaseMode) try { val colTypes = readDataSchema.fields.map(f => f.dataType) val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes)) @@ -1015,7 +1089,8 @@ class MultiFileParquetPartitionReader( } // we have to add partition values here for this batch, we already verified that // its not different for all the blocks in this batch - addPartitionValues(maybeBatch, partValues, partitionSchema) + addAllPartitionValues(maybeBatch, currentChunkMeta.allPartValues, + currentChunkMeta.rowsPerPartition, partitionSchema) } finally { table.foreach(_.close()) } @@ -1071,9 +1146,15 @@ class MultiFileParquetPartitionReader( } } - private def populateCurrentBlockChunk(): - (Boolean, MessageType, InternalRow, Seq[(Path, BlockMetaData)]) = { + private case class CurrentChunkMeta( + isCorrectRebaseMode: Boolean, + clippedSchema: MessageType, + currentChunk: Seq[(Path, BlockMetaData)], + numTotalRows: Long, + rowsPerPartition: Array[Long], + allPartValues: Array[InternalRow]) + private def populateCurrentBlockChunk(): CurrentChunkMeta = { val currentChunk = new ArrayBuffer[(Path, BlockMetaData)] var numRows: Long = 0 var numBytes: Long = 0 @@ -1082,6 +1163,9 @@ class MultiFileParquetPartitionReader( var currentPartitionValues: InternalRow = null var currentClippedSchema: MessageType = null var currentIsCorrectRebaseMode: Boolean = false + val rowsPerPartition = new ArrayBuffer[Long]() + var lastPartRows: Long = 0 + val allPartValues = new ArrayBuffer[InternalRow]() @tailrec def readNextBatch(): Unit = { @@ -1089,37 +1173,10 @@ class MultiFileParquetPartitionReader( if (currentFile == null) { currentFile = blockIterator.head.filePath currentPartitionValues = blockIterator.head.partValues + allPartValues += currentPartitionValues currentClippedSchema = blockIterator.head.schema currentIsCorrectRebaseMode = blockIterator.head.isCorrectedRebaseMode } - if (currentFile != blockIterator.head.filePath) { - // We need to ensure all files we are going to combine have the same datetime rebase mode. - if (blockIterator.head.isCorrectedRebaseMode != currentIsCorrectRebaseMode) { - logInfo("datetime rebase mode for the next file " + - s"${blockIterator.head.filePath} is different then current file $currentFile, " + - s"splitting into another batch.") - return - } - - // check to see if partitionValues different, then have to split it - if (blockIterator.head.partValues != currentPartitionValues) { - logInfo(s"Partition values for the next file ${blockIterator.head.filePath}" + - s" doesn't match current $currentFile, splitting it into another batch!") - return - } - val schemaNextfile = - blockIterator.head.schema.asGroupType().getFields.asScala.map(_.getName) - val schemaCurrentfile = - currentClippedSchema.asGroupType().getFields.asScala.map(_.getName) - if (!schemaNextfile.sameElements(schemaCurrentfile)) { - logInfo(s"File schema for the next file ${blockIterator.head.filePath}" + - s" doesn't match current $currentFile, splitting it into another batch!") - return - } - currentFile = blockIterator.head.filePath - currentPartitionValues = blockIterator.head.partValues - currentClippedSchema = blockIterator.head.schema - } val peekedRowGroup = blockIterator.head.blockMeta if (peekedRowGroup.getRowCount > Integer.MAX_VALUE) { throw new UnsupportedOperationException("Too many rows in split") @@ -1129,6 +1186,41 @@ class MultiFileParquetPartitionReader( val estimatedBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema, peekedRowGroup.getRowCount) if (numBytes == 0 || numBytes + estimatedBytes <= maxReadBatchSizeBytes) { + // only care to check if we are actually adding in the next chunk + if (currentFile != blockIterator.head.filePath) { + // We need to ensure all files we are going to combine have the same datetime + // rebase mode. + if (blockIterator.head.isCorrectedRebaseMode != currentIsCorrectRebaseMode) { + logInfo("datetime rebase mode for the next file " + + s"${blockIterator.head.filePath} is different then current file $currentFile, " + + s"splitting into another batch.") + return + } + val schemaNextfile = + blockIterator.head.schema.asGroupType().getFields.asScala.map(_.getName) + val schemaCurrentfile = + currentClippedSchema.asGroupType().getFields.asScala.map(_.getName) + if (!schemaNextfile.sameElements(schemaCurrentfile)) { + logInfo(s"File schema for the next file ${blockIterator.head.filePath}" + + s" doesn't match current $currentFile, splitting it into another batch!") + return + } + // If the partition values are different we have to track the number of rows that get + // each partition value and the partition values themselves so that we can build + // the full columns with different partition values later. + if (blockIterator.head.partValues != currentPartitionValues) { + rowsPerPartition += (numRows - lastPartRows) + lastPartRows = numRows + // we add the actual partition values here but then + // the number of rows in that partition at the end or + // when the partition changes + allPartValues += blockIterator.head.partValues + } + currentFile = blockIterator.head.filePath + currentPartitionValues = blockIterator.head.partValues + currentClippedSchema = blockIterator.head.schema + } + val nextBlock = blockIterator.next() val nextTuple = (nextBlock.filePath, nextBlock.blockMeta) currentChunk += nextTuple @@ -1141,9 +1233,11 @@ class MultiFileParquetPartitionReader( } } readNextBatch() + rowsPerPartition += (numRows - lastPartRows) logDebug(s"Loaded $numRows rows from Parquet. Parquet bytes read: $numParquetBytes. " + - s"Estimated GPU bytes: $numBytes") - (currentIsCorrectRebaseMode, currentClippedSchema, currentPartitionValues, currentChunk) + s"Estimated GPU bytes: $numBytes. Number of different partitions: ${allPartValues.size}") + CurrentChunkMeta(currentIsCorrectRebaseMode, currentClippedSchema, currentChunk, + numRows, rowsPerPartition.toArray, allPartValues.toArray) } }