Skip to content

Commit

Permalink
[FEA] Introduce low shuffle merge. (#10979)
Browse files Browse the repository at this point in the history
* feat: Introduce low shuffle merge.

Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>

* fix

* Test databricks parallel

* Test more databricks parallel

* Fix comments

* Config && scala 2.13

* Revert

* Fix comments

* scala 2.13

* Revert unnecessary changes

* Revert "Revert unnecessary changes"

This reverts commit 9fa4cf2.

* restore change

---------

Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>
  • Loading branch information
liurenjie1024 authored Jun 19, 2024
1 parent 0952dea commit 7bac3a6
Show file tree
Hide file tree
Showing 20 changed files with 3,061 additions and 154 deletions.
4 changes: 4 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
<pattern>com.google.flatbuffers</pattern>
<shadedPattern>${rapids.shade.package}.com.google.flatbuffers</shadedPattern>
</relocation>
<relocation>
<pattern>org.roaringbitmap</pattern>
<shadedPattern>${rapids.shade.package}.org.roaringbitmap</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.delta

import ai.rapids.cudf.{ColumnVector => CudfColumnVector, Scalar, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuColumnVector
import org.roaringbitmap.longlong.{PeekableLongIterator, Roaring64Bitmap}

import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}


object GpuDeltaParquetFileFormatUtils {
/**
* Row number of the row in the file. When used with [[FILE_PATH_COL]] together, it can be used
* as unique id of a row in file. Currently to correctly calculate this, the caller needs to
* set both [[isSplitable]] to false, and [[RapidsConf.PARQUET_READER_TYPE]] to "PERFILE".
*/
val METADATA_ROW_IDX_COL: String = "__metadata_row_index"
val METADATA_ROW_IDX_FIELD: StructField = StructField(METADATA_ROW_IDX_COL, LongType,
nullable = false)

val METADATA_ROW_DEL_COL: String = "__metadata_row_del"
val METADATA_ROW_DEL_FIELD: StructField = StructField(METADATA_ROW_DEL_COL, BooleanType,
nullable = false)


/**
* File path of the file that the row came from.
*/
val FILE_PATH_COL: String = "_metadata_file_path"
val FILE_PATH_FIELD: StructField = StructField(FILE_PATH_COL, StringType, nullable = false)

/**
* Add a metadata column to the iterator. Currently only support [[METADATA_ROW_IDX_COL]].
*/
def addMetadataColumnToIterator(
schema: StructType,
delVector: Option[Roaring64Bitmap],
input: Iterator[ColumnarBatch],
maxBatchSize: Int): Iterator[ColumnarBatch] = {
val metadataRowIndexCol = schema.fieldNames.indexOf(METADATA_ROW_IDX_COL)
val delRowIdx = schema.fieldNames.indexOf(METADATA_ROW_DEL_COL)
if (metadataRowIndexCol == -1 && delRowIdx == -1) {
return input
}
var rowIndex = 0L
input.map { batch =>
withResource(batch) { _ =>
val rowIdxCol = if (metadataRowIndexCol == -1) {
None
} else {
Some(metadataRowIndexCol)
}

val delRowIdx2 = if (delRowIdx == -1) {
None
} else {
Some(delRowIdx)
}
val newBatch = addMetadataColumns(rowIdxCol, delRowIdx2, delVector,maxBatchSize,
rowIndex, batch)
rowIndex += batch.numRows()
newBatch
}
}
}

private def addMetadataColumns(
rowIdxPos: Option[Int],
delRowIdx: Option[Int],
delVec: Option[Roaring64Bitmap],
maxBatchSize: Int,
rowIdxStart: Long,
batch: ColumnarBatch): ColumnarBatch = {
val rowIdxCol = rowIdxPos.map { _ =>
withResource(Scalar.fromLong(rowIdxStart)) { start =>
GpuColumnVector.from(CudfColumnVector.sequence(start, batch.numRows()),
METADATA_ROW_IDX_FIELD.dataType)
}
}

closeOnExcept(rowIdxCol) { rowIdxCol =>

val delVecCol = delVec.map { delVec =>
withResource(Scalar.fromBool(false)) { s =>
withResource(CudfColumnVector.fromScalar(s, batch.numRows())) { c =>
var table = new Table(c)
val posIter = new RoaringBitmapIterator(
delVec.getLongIteratorFrom(rowIdxStart),
rowIdxStart,
rowIdxStart + batch.numRows(),
).grouped(Math.min(maxBatchSize, batch.numRows()))

for (posChunk <- posIter) {
withResource(CudfColumnVector.fromLongs(posChunk: _*)) { poses =>
withResource(Scalar.fromBool(true)) { s =>
table = withResource(table) { _ =>
Table.scatter(Array(s), poses, table)
}
}
}
}

withResource(table) { _ =>
GpuColumnVector.from(table.getColumn(0).incRefCount(),
METADATA_ROW_DEL_FIELD.dataType)
}
}
}
}

closeOnExcept(delVecCol) { delVecCol =>
// Replace row_idx column
val columns = new Array[ColumnVector](batch.numCols())
for (i <- 0 until batch.numCols()) {
if (rowIdxPos.contains(i)) {
columns(i) = rowIdxCol.get
} else if (delRowIdx.contains(i)) {
columns(i) = delVecCol.get
} else {
columns(i) = batch.column(i) match {
case gpuCol: GpuColumnVector => gpuCol.incRefCount()
case col => col
}
}
}

new ColumnarBatch(columns, batch.numRows())
}
}
}
}

class RoaringBitmapIterator(val inner: PeekableLongIterator, val start: Long, val end: Long)
extends Iterator[Long] {

override def hasNext: Boolean = {
inner.hasNext && inner.peekNext() < end
}

override def next(): Long = {
inner.next() - start
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,19 @@

package com.nvidia.spark.rapids.delta

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import ai.rapids.cudf.{ColumnVector, Scalar, Table}
import ai.rapids.cudf.Table.DuplicateKeepOption
import com.nvidia.spark.RapidsUDF
import com.nvidia.spark.rapids.Arm.withResource
import org.roaringbitmap.longlong.Roaring64Bitmap

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.types.{BinaryType, DataType, SQLUserDefinedType, UserDefinedType}
import org.apache.spark.util.AccumulatorV2

class GpuDeltaRecordTouchedFileNameUDF(accum: AccumulatorV2[String, java.util.Set[String]])
Expand Down Expand Up @@ -73,3 +80,77 @@ class GpuDeltaMetricUpdateUDF(metric: SQLMetric)
}
}
}

class GpuDeltaNoopUDF extends Function1[Boolean, Boolean] with RapidsUDF with Serializable {
override def apply(v1: Boolean): Boolean = v1

override def evaluateColumnar(numRows: Int, args: ColumnVector*): ColumnVector = {
require(args.length == 1)
args(0).incRefCount()
}
}

@SQLUserDefinedType(udt = classOf[RoaringBitmapUDT])
case class RoaringBitmapWrapper(inner: Roaring64Bitmap) {
def serializeToBytes(): Array[Byte] = {
withResource(new ByteArrayOutputStream()) { bout =>
withResource(new DataOutputStream(bout)) { dao =>
inner.serialize(dao)
}
bout.toByteArray
}
}
}

object RoaringBitmapWrapper {
def deserializeFromBytes(bytes: Array[Byte]): RoaringBitmapWrapper = {
withResource(new ByteArrayInputStream(bytes)) { bin =>
withResource(new DataInputStream(bin)) { din =>
val ret = RoaringBitmapWrapper(new Roaring64Bitmap)
ret.inner.deserialize(din)
ret
}
}
}
}

class RoaringBitmapUDT extends UserDefinedType[RoaringBitmapWrapper] {

override def sqlType: DataType = BinaryType

override def serialize(obj: RoaringBitmapWrapper): Any = {
obj.serializeToBytes()
}

override def deserialize(datum: Any): RoaringBitmapWrapper = {
datum match {
case b: Array[Byte] => RoaringBitmapWrapper.deserializeFromBytes(b)
case t => throw new IllegalArgumentException(s"t: ${t.getClass}")
}
}

override def userClass: Class[RoaringBitmapWrapper] = classOf[RoaringBitmapWrapper]

override def typeName: String = "RoaringBitmap"
}

object RoaringBitmapUDAF extends Aggregator[Long, RoaringBitmapWrapper, RoaringBitmapWrapper] {
override def zero: RoaringBitmapWrapper = RoaringBitmapWrapper(new Roaring64Bitmap())

override def reduce(b: RoaringBitmapWrapper, a: Long): RoaringBitmapWrapper = {
b.inner.addLong(a)
b
}

override def merge(b1: RoaringBitmapWrapper, b2: RoaringBitmapWrapper): RoaringBitmapWrapper = {
val ret = b1.inner.clone()
ret.or(b2.inner)
RoaringBitmapWrapper(ret)
}

override def finish(reduction: RoaringBitmapWrapper): RoaringBitmapWrapper = reduction

override def bufferEncoder: Encoder[RoaringBitmapWrapper] = ExpressionEncoder()

override def outputEncoder: Encoder[RoaringBitmapWrapper] = ExpressionEncoder()
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,7 +74,8 @@ object Delta24xProvider extends DeltaIOProvider {

override def getReadFileFormat(format: FileFormat): FileFormat = {
val cpuFormat = format.asInstanceOf[DeltaParquetFileFormat]
GpuDelta24xParquetFileFormat(cpuFormat.metadata, cpuFormat.isSplittable)
GpuDelta24xParquetFileFormat(cpuFormat.metadata, cpuFormat.isSplittable,
cpuFormat.disablePushDowns, cpuFormat.broadcastDvMap)
}

override def convertToGpu(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,18 +16,32 @@

package com.nvidia.spark.rapids.delta.delta24x

import com.nvidia.spark.rapids.delta.GpuDeltaParquetFileFormat
import java.net.URI

import com.nvidia.spark.rapids.{GpuMetric, RapidsConf}
import com.nvidia.spark.rapids.delta.{GpuDeltaParquetFileFormat, RoaringBitmapWrapper}
import com.nvidia.spark.rapids.delta.GpuDeltaParquetFileFormatUtils.addMetadataColumnToIterator
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.{DeltaColumnMappingMode, IdMapping}
import org.apache.spark.sql.delta.DeltaParquetFileFormat.DeletionVectorDescriptorWithFilterType
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuDelta24xParquetFileFormat(
metadata: Metadata,
isSplittable: Boolean) extends GpuDeltaParquetFileFormat {
isSplittable: Boolean,
disablePushDown: Boolean,
broadcastDvMap: Option[Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]]])
extends GpuDeltaParquetFileFormat {

override val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
override val referenceSchema: StructType = metadata.schema
Expand All @@ -46,6 +60,47 @@ case class GpuDelta24xParquetFileFormat(
options: Map[String, String],
path: Path): Boolean = isSplittable

override def buildReaderWithPartitionValuesAndMetrics(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric],
alluxioPathReplacementMap: Option[Map[String, String]])
: PartitionedFile => Iterator[InternalRow] = {


val dataReader = super.buildReaderWithPartitionValuesAndMetrics(
sparkSession,
dataSchema,
partitionSchema,
requiredSchema,
if (disablePushDown) Seq.empty else filters,
options,
hadoopConf,
metrics,
alluxioPathReplacementMap)

val delVecs = broadcastDvMap
val maxDelVecScatterBatchSize = RapidsConf
.DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE
.get(sparkSession.sessionState.conf)

(file: PartitionedFile) => {
val input = dataReader(file)
val dv = delVecs.flatMap(_.value.get(new URI(file.filePath.toString())))
.map(dv => RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner)
addMetadataColumnToIterator(prepareSchema(requiredSchema),
dv,
input.asInstanceOf[Iterator[ColumnarBatch]],
maxDelVecScatterBatchSize)
.asInstanceOf[Iterator[InternalRow]]
}
}

/**
* We sometimes need to replace FileFormat within LogicalPlans, so we have to override
* `equals` to ensure file format changes are captured
Expand Down
Loading

0 comments on commit 7bac3a6

Please sign in to comment.