diff --git a/docs/configs.md b/docs/configs.md index 845d741f41f..af91974e42d 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -36,7 +36,7 @@ Name | Description | Default Value spark.rapids.memory.gpu.direct.storage.spill.enabled|Should GPUDirect Storage (GDS) be used to spill GPU memory buffers directly to disk. GDS must be enabled and the directory `spark.local.dir` must support GDS. This is an experimental feature. For more information on GDS, see https://docs.nvidia.com/gpudirect-storage/.|false spark.rapids.memory.gpu.maxAllocFraction|The fraction of total GPU memory that limits the maximum size of the RMM pool. The value must be greater than or equal to the setting for spark.rapids.memory.gpu.allocFraction. Note that this limit will be reduced by the reserve memory configured in spark.rapids.memory.gpu.reserve.|1.0 spark.rapids.memory.gpu.oomDumpDir|The path to a local directory where a heap dump will be created if the GPU encounters an unrecoverable out-of-memory (OOM) error. The filename will be of the form: "gpu-oom-.hprof" where is the process ID.|None -spark.rapids.memory.gpu.pool|Select the RMM pooling allocator to use. Valid values are "DEFAULT", "ARENA", and "NONE". With "DEFAULT", `rmm::mr::pool_memory_resource` is used; with "ARENA", `rmm::mr::arena_memory_resource` is used. If set to "NONE", pooling is disabled and RMM just passes through to CUDA memory allocation directly. Note: "ARENA" is the recommended pool allocator if CUDF is built with Per-thread Default Stream (PTDS), as "DEFAULT" is known to be unstable (https://github.com/NVIDIA/spark-rapids/issues/1141)|ARENA +spark.rapids.memory.gpu.pool|Select the RMM pooling allocator to use. Valid values are "DEFAULT", "ARENA", and "NONE". With "DEFAULT", `rmm::mr::pool_memory_resource` is used; with "ARENA", `rmm::mr::arena_memory_resource` is used. If set to "NONE", pooling is disabled and RMM just passes through to CUDA memory allocation directly. Note: "ARENA" is the recommended pool allocator if CUDF is built with Per-Thread Default Stream (PTDS), as "DEFAULT" is known to be unstable (https://github.com/NVIDIA/spark-rapids/issues/1141)|ARENA spark.rapids.memory.gpu.pooling.enabled|Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. DEPRECATED: please use spark.rapids.memory.gpu.pool instead.|true spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels, kernel launches or JIT compilation.|1073741824 spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824 diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 67ab41b0e8f..d6c2c81ad54 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -52,17 +52,6 @@ def test_passing_gpuExpr_as_Expr(enableVectorizedConf): all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()] -all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"), - (ByteGen(), "a < 100"), - (ShortGen(), "a < 100"), - (IntegerGen(), "a < 1000"), - (LongGen(), "a < 1000"), - (BooleanGen(), "a == false"), - (DateGen(), "a > '1/21/2012'"), - (TimestampGen(), "a > '1/21/2012'"), - pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]), - pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])] - @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) @pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn) @@ -79,7 +68,7 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf = enableVectorizedConf) -@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) @pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn) # We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360' @@ -88,7 +77,7 @@ def do_join(spark): @allow_non_gpu(any=True) @ignore_order def test_cached_join_filter(data_gen, join_type, enableVectorizedConf): - data, filter = data_gen + data = data_gen if is_spark_300() and data.data_type == BooleanType(): pytest.xfail("https://issues.apache.org/jira/browse/SPARK-32672") @@ -96,7 +85,7 @@ def do_join(spark): left, right = create_df(spark, data, 500, 500) cached = left.join(right, left.a == right.r_a, join_type).cache() cached.count() #populates the cache - return cached.filter(filter) + return cached.filter("a is not null") assert_gpu_and_cpu_are_equal_collect(do_join, conf = enableVectorizedConf) diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala index eebec2ebdf9..1075eb3139c 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/ParquetCachedBatchSerializer.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} import ai.rapids.cudf._ +import ai.rapids.cudf.ParquetWriterOptions.StatisticsFrequency import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -318,7 +319,9 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { if (batch.numCols() == 0) { List(ParquetCachedBatch(batch.numRows(), new Array[Byte](0))) } else { - withResource(putOnGpuIfNeeded(batch))(gpuCB => compressColumnarBatchWithParquet(gpuCB)) + withResource(putOnGpuIfNeeded(batch)) { gpuCB => + compressColumnarBatchWithParquet(gpuCB, schema.toStructType) + } } }) } else { @@ -339,7 +342,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { val BYTES_ALLOWED_PER_BATCH: Long = _2GB - APPROX_PAR_META_DATA private[rapids] def compressColumnarBatchWithParquet( - gpuCB: ColumnarBatch): List[ParquetCachedBatch] = { + gpuCB: ColumnarBatch, + schema: StructType): List[ParquetCachedBatch]= { val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { idx => gpuCB.column(idx).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize / gpuCB.numRows() }.sum @@ -367,7 +371,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { for (i <- splitVectors.head.indices) { withResource(makeTableForIndex(i)) { table => - buffers += ParquetCachedBatch(writeTableToCachedBatch(table)) + buffers += ParquetCachedBatch(writeTableToCachedBatch(table, schema)) } } } finally { @@ -375,16 +379,23 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { } } else { withResource(GpuColumnVector.from(gpuCB)) { table => - buffers += ParquetCachedBatch(writeTableToCachedBatch(table)) + buffers += ParquetCachedBatch(writeTableToCachedBatch(table, schema)) } } buffers.toList } - private def writeTableToCachedBatch(table: Table): ParquetBufferConsumer = { + private def writeTableToCachedBatch( + table: Table, + schema: StructType): ParquetBufferConsumer = { val buffer = new ParquetBufferConsumer(table.getRowCount.toInt) - withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer => + val options = ParquetWriterOptions.builder() + .withPrecisionValues(GpuParquetFileFormat.getFlatPrecisionList(schema):_*) + .withStatisticsFrequency(StatisticsFrequency.ROWGROUP) + .withTimestampInt96(false) + .build() + withResource(Table.writeParquetChunked(options, buffer)) { writer => writer.write(table) } buffer @@ -1249,7 +1260,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters) }) columnarBatchRdd.flatMap(cb => { - withResource(cb)(cb => compressColumnarBatchWithParquet(cb)) + withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema)) }) } else { val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, parquetSchema) diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala index 019129bf4a3..682e01069d3 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala @@ -243,13 +243,15 @@ class Spark311Shims extends Spark301Shims { }), GpuOverrides.exec[InMemoryTableScanExec]( "Implementation of InMemoryTableScanExec to use GPU accelerated Caching", - ExecChecks(TypeSig.commonCudfTypes, TypeSig.all), + ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT).nested(), + TypeSig.all), (scan, conf, p, r) => new SparkPlanMeta[InMemoryTableScanExec](scan, conf, p, r) { override def tagPlanForGpu(): Unit = { if (!scan.relation.cacheBuilder.serializer.isInstanceOf[ParquetCachedBatchSerializer]) { willNotWorkOnGpu("ParquetCachedBatchSerializer is not being used") } } + /** * Convert InMemoryTableScanExec to a GPU enabled version. */ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 5668ac3bcd7..3146c8daed2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -101,6 +101,18 @@ object GpuParquetFileFormat { } } + def getFlatPrecisionList(schema: StructType): Seq[Int] = { + def precisionsList(t: DataType): Seq[Int] = { + t match { + case d: DecimalType => List(d.precision) + case s: StructType => s.flatMap(f => precisionsList(f.dataType)) + case ArrayType(elementType, _) => precisionsList(elementType) + case _ => List.empty + } + } + schema.flatMap(f => precisionsList(f.dataType)) + } + def parseCompressionType(compressionType: String): Option[CompressionType] = { compressionType match { case "NONE" | "UNCOMPRESSED" => Some(CompressionType.NONE) @@ -277,22 +289,15 @@ class GpuParquetWriter( super.write(newBatch, statsTrackers) } - override val tableWriter: TableWriter = { - def precisionsList(t: DataType): Seq[Int] = { - t match { - case d: DecimalType => List(d.precision) - case s: StructType => s.flatMap(f => precisionsList(f.dataType)) - case ArrayType(elementType, _) => precisionsList(elementType) - case _ => List.empty - } - } + + override val tableWriter: TableWriter = { val writeContext = new ParquetWriteSupport().init(conf) val builder = ParquetWriterOptions.builder() .withMetadata(writeContext.getExtraMetaData) .withCompressionType(compressionType) .withTimestampInt96(outputTimestampType == ParquetOutputTimestampType.INT96) - .withPrecisionValues(dataSchema.flatMap(f => precisionsList(f.dataType)):_*) + .withPrecisionValues(GpuParquetFileFormat.getFlatPrecisionList(dataSchema):_*) dataSchema.foreach(entry => { if (entry.nullable) { builder.withColumnNames(entry.name) diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/Spark310ParquetWriterSuite.scala b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/Spark310ParquetWriterSuite.scala index 2796b40cd22..a984f488fde 100644 --- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/Spark310ParquetWriterSuite.scala +++ b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/Spark310ParquetWriterSuite.scala @@ -29,7 +29,7 @@ import org.mockito.invocation.InvocationOnMock import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.ByteType +import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -146,7 +146,8 @@ class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite { val cb = new ColumnarBatch(gpuCols, ROWS) whenSplitCalled(cb) val ser = new ParquetCachedBatchSerializer - ser.compressColumnarBatchWithParquet(cb) + val dummySchema = new StructType(Array(new StructField("empty", BooleanType, false))) + ser.compressColumnarBatchWithParquet(cb, dummySchema) theTableMock.close() } }