diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 7b6ff992419..b656f8799de 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -611,10 +611,7 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name): StringGen('[A-Za-z0-9]{0,10}', nullable=True), pytest.param(StringGen(nullable=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9514')), ], ids=idfn) -@pytest.mark.parametrize('ignore_null_fields', [ - True, - pytest.param(False, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9516')) -]) +@pytest.mark.parametrize('ignore_null_fields', [True, False]) @pytest.mark.parametrize('pretty', [ pytest.param(True, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9517')), False diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 5cfde1c9d15..31752d482c3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -217,7 +217,8 @@ class CastOptions( legacyCastComplexTypesToString: Boolean, ansiMode: Boolean, stringToDateAnsiMode: Boolean, - val castToJsonString: Boolean = false) extends Serializable { + val castToJsonString: Boolean = false, + val ignoreNullFieldsInStructs: Boolean = true) extends Serializable { /** * Retuns the left bracket to use when surrounding brackets when converting @@ -1086,7 +1087,7 @@ object GpuCast { * The main differences are: * * - Struct field names are included - * - Null fields are omitted + * - Null fields are optionally omitted */ def castStructToJsonString(input: ColumnView, inputSchema: Array[StructField], @@ -1098,7 +1099,8 @@ object GpuCast { colon: ColumnVector, quote: ColumnVector): ColumnVector = { val jsonName = StringEscapeUtils.escapeJson(inputSchema(fieldIndex).name) - val needsQuoting = inputSchema(fieldIndex).dataType == DataTypes.StringType + val dataType = inputSchema(fieldIndex).dataType + val needsQuoting = dataType == DataTypes.StringType withResource(input.getChildColumnView(fieldIndex)) { cv => withResource(ArrayBuffer.empty[ColumnVector]) { attrColumns => // prefix with quoted column name followed by colon @@ -1106,28 +1108,57 @@ object GpuCast { attrColumns += ColumnVector.fromScalar(name, rowCount) attrColumns += colon.incRefCount() } - // write the value - val attrValue = castToString(cv, inputSchema(fieldIndex).dataType, options) - if (needsQuoting) { - attrColumns += quote.incRefCount() - attrColumns += escapeJsonString(attrValue) - attrColumns += quote.incRefCount() + if (options.ignoreNullFieldsInStructs) { + // write the value + val attrValue = castToString(cv, inputSchema(fieldIndex).dataType, options) + if (needsQuoting) { + attrColumns += quote.incRefCount() + attrColumns += escapeJsonString(attrValue) + attrColumns += quote.incRefCount() + } else { + attrColumns += attrValue + } + // now concatenate + val jsonAttr = withResource(Scalar.fromString("")) { emptyString => + ColumnVector.stringConcatenate(emptyString, emptyString, attrColumns.toArray) + } + // add an empty string or the attribute + withResource(jsonAttr) { _ => + withResource(cv.isNull) { isNull => + withResource(Scalar.fromNull(DType.STRING)) { nullScalar => + isNull.ifElse(nullScalar, jsonAttr) + } + } + } } else { - attrColumns += attrValue - } - // now concatenate - val jsonAttr = withResource(Scalar.fromString("")) { emptyString => - ColumnVector.stringConcatenate(emptyString, emptyString, attrColumns.toArray) - } - // add an empty string or the attribute - val jsonAttrOrEmptyString = withResource(jsonAttr) { _ => - withResource(cv.isNull) { isNull => - withResource(Scalar.fromNull(DType.STRING)) { nullScalar => - isNull.ifElse(nullScalar, jsonAttr) + val jsonAttr = withResource(ArrayBuffer.empty[ColumnVector]) { attrValues => + withResource(castToString(cv, inputSchema(fieldIndex).dataType, options)) { + attrValue => + if (needsQuoting) { + attrValues += quote.incRefCount() + attrValues += escapeJsonString(attrValue.incRefCount()) + attrValues += quote.incRefCount() + withResource(Scalar.fromString("")) { emptyString => + ColumnVector.stringConcatenate(emptyString, emptyString, attrValues.toArray) + } + } else { + attrValue.incRefCount() + } } } + // add attribute value, or null literal string if value is null + attrColumns += withResource(jsonAttr) { _ => + withResource(cv.isNull) { isNull => + withResource(Scalar.fromString("null")) { nullScalar => + isNull.ifElse(nullScalar, jsonAttr) + } + } + } + // now concatenate + withResource(Scalar.fromString("")) { emptyString => + ColumnVector.stringConcatenate(emptyString, emptyString, attrColumns.toArray) + } } - jsonAttrOrEmptyString } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a0585310d37..25a7ca3619b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3604,9 +3604,6 @@ object GpuOverrides extends Logging { ))), (a, conf, p, r) => new UnaryExprMeta[StructsToJson](a, conf, p, r) { override def tagExprForGpu(): Unit = { - if (a.options.get("ignoreNullFields").exists(_.equalsIgnoreCase("false"))) { - willNotWorkOnGpu("to_json option ignore_null_fields=false is not supported") - } if (a.options.get("pretty").exists(_.equalsIgnoreCase("true"))) { willNotWorkOnGpu("to_json option pretty=true is not supported") } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuStructsToJson.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuStructsToJson.scala index 91e98aca323..3e674ecb6d8 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuStructsToJson.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuStructsToJson.scala @@ -20,6 +20,7 @@ import ai.rapids.cudf.ColumnVector import com.nvidia.spark.rapids.{CastOptions, GpuCast, GpuColumnVector, GpuUnaryExpression} import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StringType, StructType} case class GpuStructsToJson( @@ -27,8 +28,12 @@ case class GpuStructsToJson( child: Expression, timeZoneId: Option[String] = None) extends GpuUnaryExpression { override protected def doColumnar(input: GpuColumnVector): ColumnVector = { + val ignoreNullFields = options.getOrElse("ignoreNullFields", SQLConf.get.getConfString( + SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key)).toBoolean GpuCast.castStructToJsonString(input.getBase, child.dataType.asInstanceOf[StructType].fields, - new CastOptions(false, false, false, true)) + new CastOptions(legacyCastComplexTypesToString = false, ansiMode = false, + stringToDateAnsiMode = false, castToJsonString = true, + ignoreNullFieldsInStructs = ignoreNullFields)) } override def dataType: DataType = StringType