Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in Decimal support to ParquetCachedBatchSerializer #1596

Merged
merged 7 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Name | Description | Default Value
<a name="memory.gpu.direct.storage.spill.enabled"></a>spark.rapids.memory.gpu.direct.storage.spill.enabled|Should GPUDirect Storage (GDS) be used to spill GPU memory buffers directly to disk. GDS must be enabled and the directory `spark.local.dir` must support GDS. This is an experimental feature. For more information on GDS, see https://docs.nvidia.com/gpudirect-storage/.|false
<a name="memory.gpu.maxAllocFraction"></a>spark.rapids.memory.gpu.maxAllocFraction|The fraction of total GPU memory that limits the maximum size of the RMM pool. The value must be greater than or equal to the setting for spark.rapids.memory.gpu.allocFraction. Note that this limit will be reduced by the reserve memory configured in spark.rapids.memory.gpu.reserve.|1.0
<a name="memory.gpu.oomDumpDir"></a>spark.rapids.memory.gpu.oomDumpDir|The path to a local directory where a heap dump will be created if the GPU encounters an unrecoverable out-of-memory (OOM) error. The filename will be of the form: "gpu-oom-<pid>.hprof" where <pid> is the process ID.|None
<a name="memory.gpu.pool"></a>spark.rapids.memory.gpu.pool|Select the RMM pooling allocator to use. Valid values are "DEFAULT", "ARENA", and "NONE". With "DEFAULT", `rmm::mr::pool_memory_resource` is used; with "ARENA", `rmm::mr::arena_memory_resource` is used. If set to "NONE", pooling is disabled and RMM just passes through to CUDA memory allocation directly. Note: "ARENA" is the recommended pool allocator if CUDF is built with Per-thread Default Stream (PTDS), as "DEFAULT" is known to be unstable (https://github.com/NVIDIA/spark-rapids/issues/1141)|ARENA
<a name="memory.gpu.pool"></a>spark.rapids.memory.gpu.pool|Select the RMM pooling allocator to use. Valid values are "DEFAULT", "ARENA", and "NONE". With "DEFAULT", `rmm::mr::pool_memory_resource` is used; with "ARENA", `rmm::mr::arena_memory_resource` is used. If set to "NONE", pooling is disabled and RMM just passes through to CUDA memory allocation directly. Note: "ARENA" is the recommended pool allocator if CUDF is built with Per-Thread Default Stream (PTDS), as "DEFAULT" is known to be unstable (https://github.com/NVIDIA/spark-rapids/issues/1141)|ARENA
<a name="memory.gpu.pooling.enabled"></a>spark.rapids.memory.gpu.pooling.enabled|Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. DEPRECATED: please use spark.rapids.memory.gpu.pool instead.|true
<a name="memory.gpu.reserve"></a>spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels, kernel launches or JIT compilation.|1073741824
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824
Expand Down
19 changes: 4 additions & 15 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,17 +52,6 @@ def test_passing_gpuExpr_as_Expr(enableVectorizedConf):
all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()]

all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"),
(ByteGen(), "a < 100"),
(ShortGen(), "a < 100"),
(IntegerGen(), "a < 1000"),
(LongGen(), "a < 1000"),
(BooleanGen(), "a == false"),
(DateGen(), "a > '1/21/2012'"),
(TimestampGen(), "a > '1/21/2012'"),
pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]),
pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])]

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn)
Expand All @@ -79,7 +68,7 @@ def do_join(spark):

assert_gpu_and_cpu_are_equal_collect(do_join, conf = enableVectorizedConf)

@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn)
# We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360'
Expand All @@ -88,15 +77,15 @@ def do_join(spark):
@allow_non_gpu(any=True)
@ignore_order
def test_cached_join_filter(data_gen, join_type, enableVectorizedConf):
data, filter = data_gen
data = data_gen
if is_spark_300() and data.data_type == BooleanType():
pytest.xfail("https://issues.apache.org/jira/browse/SPARK-32672")

def do_join(spark):
left, right = create_df(spark, data, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() #populates the cache
return cached.filter(filter)
return cached.filter("a is not null")

assert_gpu_and_cpu_are_equal_collect(do_join, conf = enableVectorizedConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import ai.rapids.cudf._
import ai.rapids.cudf.ParquetWriterOptions.StatisticsFrequency
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -318,7 +319,9 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
if (batch.numCols() == 0) {
List(ParquetCachedBatch(batch.numRows(), new Array[Byte](0)))
} else {
withResource(putOnGpuIfNeeded(batch))(gpuCB => compressColumnarBatchWithParquet(gpuCB))
withResource(putOnGpuIfNeeded(batch)) { gpuCB =>
compressColumnarBatchWithParquet(gpuCB, schema.toStructType)
}
}
})
} else {
Expand All @@ -339,7 +342,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
val BYTES_ALLOWED_PER_BATCH: Long = _2GB - APPROX_PAR_META_DATA

private[rapids] def compressColumnarBatchWithParquet(
gpuCB: ColumnarBatch): List[ParquetCachedBatch] = {
gpuCB: ColumnarBatch,
schema: StructType): List[ParquetCachedBatch]= {
val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { idx =>
gpuCB.column(idx).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize / gpuCB.numRows()
}.sum
Expand Down Expand Up @@ -367,24 +371,31 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {

for (i <- splitVectors.head.indices) {
withResource(makeTableForIndex(i)) { table =>
buffers += ParquetCachedBatch(writeTableToCachedBatch(table))
buffers += ParquetCachedBatch(writeTableToCachedBatch(table, schema))
}
}
} finally {
splitVectors.foreach(array => array.safeClose())
}
} else {
withResource(GpuColumnVector.from(gpuCB)) { table =>
buffers += ParquetCachedBatch(writeTableToCachedBatch(table))
buffers += ParquetCachedBatch(writeTableToCachedBatch(table, schema))
}
}

buffers.toList
}

private def writeTableToCachedBatch(table: Table): ParquetBufferConsumer = {
private def writeTableToCachedBatch(
table: Table,
schema: StructType): ParquetBufferConsumer = {
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
val options = ParquetWriterOptions.builder()
.withPrecisionValues(GpuParquetFileFormat.getFlatPrecisionList(schema):_*)
.withStatisticsFrequency(StatisticsFrequency.ROWGROUP)
.withTimestampInt96(false)
.build()
withResource(Table.writeParquetChunked(options, buffer)) { writer =>
writer.write(table)
}
buffer
Expand Down Expand Up @@ -1249,7 +1260,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters)
})
columnarBatchRdd.flatMap(cb => {
withResource(cb)(cb => compressColumnarBatchWithParquet(cb))
withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema))
})
} else {
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, parquetSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,15 @@ class Spark311Shims extends Spark301Shims {
}),
GpuOverrides.exec[InMemoryTableScanExec](
"Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT).nested(),
TypeSig.all),
(scan, conf, p, r) => new SparkPlanMeta[InMemoryTableScanExec](scan, conf, p, r) {
override def tagPlanForGpu(): Unit = {
if (!scan.relation.cacheBuilder.serializer.isInstanceOf[ParquetCachedBatchSerializer]) {
willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
}
}

/**
* Convert InMemoryTableScanExec to a GPU enabled version.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ object GpuParquetFileFormat {
}
}

def getFlatPrecisionList(schema: StructType): Seq[Int] = {
def precisionsList(t: DataType): Seq[Int] = {
t match {
case d: DecimalType => List(d.precision)
case s: StructType => s.flatMap(f => precisionsList(f.dataType))
case ArrayType(elementType, _) => precisionsList(elementType)
case _ => List.empty
}
}
schema.flatMap(f => precisionsList(f.dataType))
}

def parseCompressionType(compressionType: String): Option[CompressionType] = {
compressionType match {
case "NONE" | "UNCOMPRESSED" => Some(CompressionType.NONE)
Expand Down Expand Up @@ -277,22 +289,15 @@ class GpuParquetWriter(

super.write(newBatch, statsTrackers)
}
override val tableWriter: TableWriter = {
def precisionsList(t: DataType): Seq[Int] = {
t match {
case d: DecimalType => List(d.precision)
case s: StructType => s.flatMap(f => precisionsList(f.dataType))
case ArrayType(elementType, _) => precisionsList(elementType)
case _ => List.empty
}
}


override val tableWriter: TableWriter = {
val writeContext = new ParquetWriteSupport().init(conf)
val builder = ParquetWriterOptions.builder()
.withMetadata(writeContext.getExtraMetaData)
.withCompressionType(compressionType)
.withTimestampInt96(outputTimestampType == ParquetOutputTimestampType.INT96)
.withPrecisionValues(dataSchema.flatMap(f => precisionsList(f.dataType)):_*)
.withPrecisionValues(GpuParquetFileFormat.getFlatPrecisionList(dataSchema):_*)
dataSchema.foreach(entry => {
if (entry.nullable) {
builder.withColumnNames(entry.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.mockito.invocation.InvocationOnMock
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.ByteType
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch


Expand Down Expand Up @@ -146,7 +146,8 @@ class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite {
val cb = new ColumnarBatch(gpuCols, ROWS)
whenSplitCalled(cb)
val ser = new ParquetCachedBatchSerializer
ser.compressColumnarBatchWithParquet(cb)
val dummySchema = new StructType(Array(new StructField("empty", BooleanType, false)))
ser.compressColumnarBatchWithParquet(cb, dummySchema)
theTableMock.close()
}
}
Expand Down