Skip to content

Commit

Permalink
Added in Decimal support to ParquetCachedBatchSerializer (NVIDIA#1596)
Browse files Browse the repository at this point in the history
* Added in Decimal support to ParquetCachedBatchSerializer

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* added nested and changed copyrights

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed test failures

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed test error

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* added import

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

* fixed tests for spark311tests profile

Signed-off-by: Raza Jafri <rjafri@nvidia.com>

Co-authored-by: Raza Jafri <rjafri@nvidia.com>
  • Loading branch information
razajafri and razajafri authored Feb 10, 2021
1 parent 932ca6d commit 52a61ec
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 35 deletions.
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Name | Description | Default Value
<a name="memory.gpu.direct.storage.spill.enabled"></a>spark.rapids.memory.gpu.direct.storage.spill.enabled|Should GPUDirect Storage (GDS) be used to spill GPU memory buffers directly to disk. GDS must be enabled and the directory `spark.local.dir` must support GDS. This is an experimental feature. For more information on GDS, see https://docs.nvidia.com/gpudirect-storage/.|false
<a name="memory.gpu.maxAllocFraction"></a>spark.rapids.memory.gpu.maxAllocFraction|The fraction of total GPU memory that limits the maximum size of the RMM pool. The value must be greater than or equal to the setting for spark.rapids.memory.gpu.allocFraction. Note that this limit will be reduced by the reserve memory configured in spark.rapids.memory.gpu.reserve.|1.0
<a name="memory.gpu.oomDumpDir"></a>spark.rapids.memory.gpu.oomDumpDir|The path to a local directory where a heap dump will be created if the GPU encounters an unrecoverable out-of-memory (OOM) error. The filename will be of the form: "gpu-oom-<pid>.hprof" where <pid> is the process ID.|None
<a name="memory.gpu.pool"></a>spark.rapids.memory.gpu.pool|Select the RMM pooling allocator to use. Valid values are "DEFAULT", "ARENA", and "NONE". With "DEFAULT", `rmm::mr::pool_memory_resource` is used; with "ARENA", `rmm::mr::arena_memory_resource` is used. If set to "NONE", pooling is disabled and RMM just passes through to CUDA memory allocation directly. Note: "ARENA" is the recommended pool allocator if CUDF is built with Per-thread Default Stream (PTDS), as "DEFAULT" is known to be unstable (https://github.com/NVIDIA/spark-rapids/issues/1141)|ARENA
<a name="memory.gpu.pool"></a>spark.rapids.memory.gpu.pool|Select the RMM pooling allocator to use. Valid values are "DEFAULT", "ARENA", and "NONE". With "DEFAULT", `rmm::mr::pool_memory_resource` is used; with "ARENA", `rmm::mr::arena_memory_resource` is used. If set to "NONE", pooling is disabled and RMM just passes through to CUDA memory allocation directly. Note: "ARENA" is the recommended pool allocator if CUDF is built with Per-Thread Default Stream (PTDS), as "DEFAULT" is known to be unstable (https://github.com/NVIDIA/spark-rapids/issues/1141)|ARENA
<a name="memory.gpu.pooling.enabled"></a>spark.rapids.memory.gpu.pooling.enabled|Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. DEPRECATED: please use spark.rapids.memory.gpu.pool instead.|true
<a name="memory.gpu.reserve"></a>spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels, kernel launches or JIT compilation.|1073741824
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824
Expand Down
17 changes: 3 additions & 14 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,6 @@ def test_passing_gpuExpr_as_Expr(enableVectorizedConf):
all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()]

all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"),
(ByteGen(), "a < 100"),
(ShortGen(), "a < 100"),
(IntegerGen(), "a < 1000"),
(LongGen(), "a < 1000"),
(BooleanGen(), "a == false"),
(DateGen(), "a > '1/21/2012'"),
(TimestampGen(), "a > '1/21/2012'"),
pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]),
pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])]

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn)
Expand All @@ -79,7 +68,7 @@ def do_join(spark):

assert_gpu_and_cpu_are_equal_collect(do_join, conf = enableVectorizedConf)

@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn)
# We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360'
Expand All @@ -88,15 +77,15 @@ def do_join(spark):
@allow_non_gpu(any=True)
@ignore_order
def test_cached_join_filter(data_gen, join_type, enableVectorizedConf):
data, filter = data_gen
data = data_gen
if is_spark_300() and data.data_type == BooleanType():
pytest.xfail("https://issues.apache.org/jira/browse/SPARK-32672")

def do_join(spark):
left, right = create_df(spark, data, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() #populates the cache
return cached.filter(filter)
return cached.filter("a is not null")

assert_gpu_and_cpu_are_equal_collect(do_join, conf = enableVectorizedConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import ai.rapids.cudf._
import ai.rapids.cudf.ParquetWriterOptions.StatisticsFrequency
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -318,7 +319,9 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
if (batch.numCols() == 0) {
List(ParquetCachedBatch(batch.numRows(), new Array[Byte](0)))
} else {
withResource(putOnGpuIfNeeded(batch))(gpuCB => compressColumnarBatchWithParquet(gpuCB))
withResource(putOnGpuIfNeeded(batch)) { gpuCB =>
compressColumnarBatchWithParquet(gpuCB, schema.toStructType)
}
}
})
} else {
Expand All @@ -339,7 +342,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
val BYTES_ALLOWED_PER_BATCH: Long = _2GB - APPROX_PAR_META_DATA

private[rapids] def compressColumnarBatchWithParquet(
gpuCB: ColumnarBatch): List[ParquetCachedBatch] = {
gpuCB: ColumnarBatch,
schema: StructType): List[ParquetCachedBatch]= {
val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { idx =>
gpuCB.column(idx).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize / gpuCB.numRows()
}.sum
Expand Down Expand Up @@ -367,24 +371,31 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {

for (i <- splitVectors.head.indices) {
withResource(makeTableForIndex(i)) { table =>
buffers += ParquetCachedBatch(writeTableToCachedBatch(table))
buffers += ParquetCachedBatch(writeTableToCachedBatch(table, schema))
}
}
} finally {
splitVectors.foreach(array => array.safeClose())
}
} else {
withResource(GpuColumnVector.from(gpuCB)) { table =>
buffers += ParquetCachedBatch(writeTableToCachedBatch(table))
buffers += ParquetCachedBatch(writeTableToCachedBatch(table, schema))
}
}

buffers.toList
}

private def writeTableToCachedBatch(table: Table): ParquetBufferConsumer = {
private def writeTableToCachedBatch(
table: Table,
schema: StructType): ParquetBufferConsumer = {
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
val options = ParquetWriterOptions.builder()
.withPrecisionValues(GpuParquetFileFormat.getFlatPrecisionList(schema):_*)
.withStatisticsFrequency(StatisticsFrequency.ROWGROUP)
.withTimestampInt96(false)
.build()
withResource(Table.writeParquetChunked(options, buffer)) { writer =>
writer.write(table)
}
buffer
Expand Down Expand Up @@ -1249,7 +1260,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters)
})
columnarBatchRdd.flatMap(cb => {
withResource(cb)(cb => compressColumnarBatchWithParquet(cb))
withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema))
})
} else {
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, parquetSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,15 @@ class Spark311Shims extends Spark301Shims {
}),
GpuOverrides.exec[InMemoryTableScanExec](
"Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT).nested(),
TypeSig.all),
(scan, conf, p, r) => new SparkPlanMeta[InMemoryTableScanExec](scan, conf, p, r) {
override def tagPlanForGpu(): Unit = {
if (!scan.relation.cacheBuilder.serializer.isInstanceOf[ParquetCachedBatchSerializer]) {
willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
}
}

/**
* Convert InMemoryTableScanExec to a GPU enabled version.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ object GpuParquetFileFormat {
}
}

def getFlatPrecisionList(schema: StructType): Seq[Int] = {
def precisionsList(t: DataType): Seq[Int] = {
t match {
case d: DecimalType => List(d.precision)
case s: StructType => s.flatMap(f => precisionsList(f.dataType))
case ArrayType(elementType, _) => precisionsList(elementType)
case _ => List.empty
}
}
schema.flatMap(f => precisionsList(f.dataType))
}

def parseCompressionType(compressionType: String): Option[CompressionType] = {
compressionType match {
case "NONE" | "UNCOMPRESSED" => Some(CompressionType.NONE)
Expand Down Expand Up @@ -277,22 +289,15 @@ class GpuParquetWriter(

super.write(newBatch, statsTrackers)
}
override val tableWriter: TableWriter = {
def precisionsList(t: DataType): Seq[Int] = {
t match {
case d: DecimalType => List(d.precision)
case s: StructType => s.flatMap(f => precisionsList(f.dataType))
case ArrayType(elementType, _) => precisionsList(elementType)
case _ => List.empty
}
}


override val tableWriter: TableWriter = {
val writeContext = new ParquetWriteSupport().init(conf)
val builder = ParquetWriterOptions.builder()
.withMetadata(writeContext.getExtraMetaData)
.withCompressionType(compressionType)
.withTimestampInt96(outputTimestampType == ParquetOutputTimestampType.INT96)
.withPrecisionValues(dataSchema.flatMap(f => precisionsList(f.dataType)):_*)
.withPrecisionValues(GpuParquetFileFormat.getFlatPrecisionList(dataSchema):_*)
dataSchema.foreach(entry => {
if (entry.nullable) {
builder.withColumnNames(entry.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.mockito.invocation.InvocationOnMock
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.ByteType
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch


Expand Down Expand Up @@ -146,7 +146,8 @@ class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite {
val cb = new ColumnarBatch(gpuCols, ROWS)
whenSplitCalled(cb)
val ser = new ParquetCachedBatchSerializer
ser.compressColumnarBatchWithParquet(cb)
val dummySchema = new StructType(Array(new StructField("empty", BooleanType, false)))
ser.compressColumnarBatchWithParquet(cb, dummySchema)
theTableMock.close()
}
}
Expand Down

0 comments on commit 52a61ec

Please sign in to comment.