Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in Decimal support to ParquetCachedBatchSerializer #1596

Merged
merged 7 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,20 +49,6 @@ def test_passing_gpuExpr_as_Expr(enableVectorizedConf):
NEG_DOUBLE_NAN_MAX_VALUE
jlowe marked this conversation as resolved.
Show resolved Hide resolved
]

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()]

all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"),
(ByteGen(), "a < 100"),
(ShortGen(), "a < 100"),
(IntegerGen(), "a < 1000"),
(LongGen(), "a < 1000"),
(BooleanGen(), "a == false"),
(DateGen(), "a > '1/21/2012'"),
(TimestampGen(), "a > '1/21/2012'"),
pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]),
pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])]

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn)
Expand All @@ -79,7 +65,7 @@ def do_join(spark):

assert_gpu_and_cpu_are_equal_collect(do_join, conf = enableVectorizedConf)

@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn)
# We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360'
Expand All @@ -88,15 +74,15 @@ def do_join(spark):
@allow_non_gpu(any=True)
@ignore_order
def test_cached_join_filter(data_gen, join_type, enableVectorizedConf):
data, filter = data_gen
data = data_gen
if is_spark_300() and data.data_type == BooleanType():
pytest.xfail("https://issues.apache.org/jira/browse/SPARK-32672")

def do_join(spark):
left, right = create_df(spark, data, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() #populates the cache
return cached.filter(filter)
return cached.filter("a is not null")

assert_gpu_and_cpu_are_equal_collect(do_join, conf = enableVectorizedConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import ai.rapids.cudf._
import ai.rapids.cudf.ParquetWriterOptions.StatisticsFrequency
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -318,7 +319,9 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
if (batch.numCols() == 0) {
List(ParquetCachedBatch(batch.numRows(), new Array[Byte](0)))
} else {
withResource(putOnGpuIfNeeded(batch))(gpuCB => compressColumnarBatchWithParquet(gpuCB))
withResource(putOnGpuIfNeeded(batch)) { gpuCB =>
compressColumnarBatchWithParquet(gpuCB, schema.toStructType)
}
}
})
} else {
Expand All @@ -339,7 +342,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
val BYTES_ALLOWED_PER_BATCH: Long = _2GB - APPROX_PAR_META_DATA

private[rapids] def compressColumnarBatchWithParquet(
gpuCB: ColumnarBatch): List[ParquetCachedBatch] = {
gpuCB: ColumnarBatch,
schema: StructType): List[ParquetCachedBatch]= {
val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { idx =>
gpuCB.column(idx).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize / gpuCB.numRows()
}.sum
Expand Down Expand Up @@ -367,24 +371,31 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {

for (i <- splitVectors.head.indices) {
withResource(makeTableForIndex(i)) { table =>
buffers += ParquetCachedBatch(writeTableToCachedBatch(table))
buffers += ParquetCachedBatch(writeTableToCachedBatch(table, schema))
}
}
} finally {
splitVectors.foreach(array => array.safeClose())
}
} else {
withResource(GpuColumnVector.from(gpuCB)) { table =>
buffers += ParquetCachedBatch(writeTableToCachedBatch(table))
buffers += ParquetCachedBatch(writeTableToCachedBatch(table, schema))
}
}

buffers.toList
}

private def writeTableToCachedBatch(table: Table): ParquetBufferConsumer = {
private def writeTableToCachedBatch(
table: Table,
schema: StructType): ParquetBufferConsumer = {
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
val options = ParquetWriterOptions.builder()
.withPrecisionValues(GpuParquetFileFormat.getFlatPrecisionList(schema):_*)
.withStatisticsFrequency(StatisticsFrequency.ROWGROUP)
.withTimestampInt96(false)
.build()
withResource(Table.writeParquetChunked(options, buffer)) { writer =>
writer.write(table)
}
buffer
Expand Down Expand Up @@ -1249,7 +1260,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters)
})
columnarBatchRdd.flatMap(cb => {
withResource(cb)(cb => compressColumnarBatchWithParquet(cb))
withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema))
})
} else {
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, parquetSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,15 @@ class Spark311Shims extends Spark301Shims {
}),
GpuOverrides.exec[InMemoryTableScanExec](
"Implementation of InMemoryTableScanExec to use GPU accelerated Caching",
ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT).nested(),
TypeSig.all),
(scan, conf, p, r) => new SparkPlanMeta[InMemoryTableScanExec](scan, conf, p, r) {
override def tagPlanForGpu(): Unit = {
if (!scan.relation.cacheBuilder.serializer.isInstanceOf[ParquetCachedBatchSerializer]) {
willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used")
}
}

/**
* Convert InMemoryTableScanExec to a GPU enabled version.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ object GpuParquetFileFormat {
}
}

def getFlatPrecisionList(schema: StructType): Seq[Int] = {
def precisionsList(t: DataType): Seq[Int] = {
t match {
case d: DecimalType => List(d.precision)
case s: StructType => s.flatMap(f => precisionsList(f.dataType))
case ArrayType(elementType, _) => precisionsList(elementType)
case _ => List.empty
}
}
schema.flatMap(f => precisionsList(f.dataType))
}

def parseCompressionType(compressionType: String): Option[CompressionType] = {
compressionType match {
case "NONE" | "UNCOMPRESSED" => Some(CompressionType.NONE)
Expand Down Expand Up @@ -277,22 +289,15 @@ class GpuParquetWriter(

super.write(newBatch, statsTrackers)
}
override val tableWriter: TableWriter = {
def precisionsList(t: DataType): Seq[Int] = {
t match {
case d: DecimalType => List(d.precision)
case s: StructType => s.flatMap(f => precisionsList(f.dataType))
case ArrayType(elementType, _) => precisionsList(elementType)
case _ => List.empty
}
}


override val tableWriter: TableWriter = {
val writeContext = new ParquetWriteSupport().init(conf)
val builder = ParquetWriterOptions.builder()
.withMetadata(writeContext.getExtraMetaData)
.withCompressionType(compressionType)
.withTimestampInt96(outputTimestampType == ParquetOutputTimestampType.INT96)
.withPrecisionValues(dataSchema.flatMap(f => precisionsList(f.dataType)):_*)
.withPrecisionValues(GpuParquetFileFormat.getFlatPrecisionList(dataSchema):_*)
dataSchema.foreach(entry => {
if (entry.nullable) {
builder.withColumnNames(entry.name)
Expand Down