From d9ea7e82a7487951a7314f3934ef11a0ec92157a Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 2 Feb 2024 08:06:07 -0600 Subject: [PATCH] Fix a memory leak in json tuple (#10360) Signed-off-by: Robert (Bobby) Evans (cherry picked from commit 73b3279330f16f7e1af664e1b7acf5f8762359fb) --- .../nvidia/spark/rapids/GpuJsonTuple.scala | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJsonTuple.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJsonTuple.scala index f29490f3d6f..0b6c839ca2b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJsonTuple.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuJsonTuple.scala @@ -55,30 +55,29 @@ case class GpuJsonTuple(children: Seq[Expression]) extends GpuGenerator generatorOffset: Int, outer: Boolean): Iterator[ColumnarBatch] = { withRetry(inputBatches, splitSpillableInHalfByRows) { attempt => - // this is obviously broken - val inputBatch = attempt.getColumnarBatch() - - val json = inputBatch.column(generatorOffset).asInstanceOf[GpuColumnVector].getBase - val schema = Array.fill[DataType](fieldExpressions.length)(StringType) - - val fieldScalars = fieldExpressions.safeMap { field => - withResourceIfAllowed(field.columnarEvalAny(inputBatch)) { - case fieldScalar: GpuScalar => - // Specials characters like '.', '[', ']' are not supported in field names - Scalar.fromString("$." + fieldScalar.getBase.getJavaString) - case _ => throw new UnsupportedOperationException(s"JSON field must be a scalar value") + withResource(attempt.getColumnarBatch()) { inputBatch => + val json = inputBatch.column(generatorOffset).asInstanceOf[GpuColumnVector].getBase + val schema = Array.fill[DataType](fieldExpressions.length)(StringType) + + val fieldScalars = fieldExpressions.safeMap { field => + withResourceIfAllowed(field.columnarEvalAny(inputBatch)) { + case fieldScalar: GpuScalar => + // Specials characters like '.', '[', ']' are not supported in field names + Scalar.fromString("$." + fieldScalar.getBase.getJavaString) + case _ => throw new UnsupportedOperationException(s"JSON field must be a scalar value") + } } - } - withResource(fieldScalars) { fieldScalars => - withResource(fieldScalars.safeMap(field => json.getJSONObject(field))) { resultCols => - val generatorCols = resultCols.safeMap(_.incRefCount).zip(schema).safeMap { - case (col, dataType) => GpuColumnVector.from(col, dataType) - } - val nonGeneratorCols = (0 until generatorOffset).safeMap { i => - inputBatch.column(i).asInstanceOf[GpuColumnVector].incRefCount + withResource(fieldScalars) { fieldScalars => + withResource(fieldScalars.safeMap(field => json.getJSONObject(field))) { resultCols => + val generatorCols = resultCols.safeMap(_.incRefCount).zip(schema).safeMap { + case (col, dataType) => GpuColumnVector.from(col, dataType) + } + val nonGeneratorCols = (0 until generatorOffset).safeMap { i => + inputBatch.column(i).asInstanceOf[GpuColumnVector].incRefCount + } + new ColumnarBatch((nonGeneratorCols ++ generatorCols).toArray, inputBatch.numRows) } - new ColumnarBatch((nonGeneratorCols ++ generatorCols).toArray, inputBatch.numRows) } } }