diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 3644b09951e..da04b2ddee5 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -128,6 +128,7 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.json.read.decimal.enabled|JSON reading is not 100% compatible when reading decimals.|false|Runtime spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime +spark.rapids.sql.json.read.mixedTypesAsString.enabled|JSON reading is not 100% compatible when reading mixed types as string.|false|Runtime spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime diff --git a/docs/compatibility.md b/docs/compatibility.md index 2644c873e98..36da8800212 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -378,6 +378,14 @@ In particular, the output map is not resulted from a regular JSON parsing but in * If the input JSON is given as multiple rows, any row containing invalid JSON format will be parsed as an empty struct instead of a null value ([#9592](https://github.com/NVIDIA/spark-rapids/issues/9592)). +When a JSON attribute contains mixed types (different types in different rows), such as a mix of dictionaries +and lists, Spark will return a string representation of the JSON, but when running on GPU, the default +behavior is to throw an exception. There is an experimental setting +`spark.rapids.sql.json.read.mixedTypesAsString.enabled` that can be set to true to support reading +mixed types as string, but there are known issues where it could also read structs as string in some cases. There +can also be minor formatting differences. Spark will return a parsed and formatted representation, but the +GPU implementation returns the unparsed JSON string. + ### `to_json` function The `to_json` function is disabled by default because it is experimental and has some known incompatibilities diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index ac76139111d..54605455353 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -380,7 +380,9 @@ def test_read_invalid_json(spark_tmp_table_factory, std_input_path, read_func, f @pytest.mark.parametrize('schema', [_int_schema]) @pytest.mark.parametrize('v1_enabled_list', ["", "json"]) def test_read_valid_json(spark_tmp_table_factory, std_input_path, read_func, filename, schema, v1_enabled_list): - conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf = copy_and_update(_enable_all_types_conf, + {'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True}) assert_gpu_and_cpu_are_equal_collect( read_func(std_input_path + '/' + filename, schema, @@ -822,6 +824,18 @@ def test_from_json_struct_of_list(schema): .select(f.from_json('a', schema)), conf={"spark.rapids.sql.expression.JsonToStructs": True}) +@pytest.mark.parametrize('schema', [ + 'struct' +]) +@allow_non_gpu(*non_utc_allow) +def test_from_json_mixed_types_list_struct(schema): + json_string_gen = StringGen(r'{"a": (\[1,2,3\]|{"b":"[a-z]{2}"}) }') + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select('a', f.from_json('a', schema)), + conf={"spark.rapids.sql.expression.JsonToStructs": True, + 'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True}) + @pytest.mark.parametrize('schema', ['struct', 'struct']) @allow_non_gpu(*non_utc_allow) def test_from_json_struct_all_empty_string_input(schema): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index b330eb5b52d..dbeb7a41461 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3662,7 +3662,8 @@ object GpuOverrides extends Logging { override def convertToGpu(child: Expression): GpuExpression = // GPU implementation currently does not support duplicated json key names in input - GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId) + GpuJsonToStructs(a.schema, a.options, child, conf.isJsonMixedTypesAsStringEnabled, + a.timeZoneId) }).disabledByDefault("parsing JSON from a column has a large number of issues and " + "should be considered beta quality right now."), expr[StructsToJson]( @@ -3850,7 +3851,8 @@ object GpuOverrides extends Logging { a.dataFilters, conf.maxReadBatchSizeRows, conf.maxReadBatchSizeBytes, - conf.maxGpuColumnSizeBytes) + conf.maxGpuColumnSizeBytes, + conf.isJsonMixedTypesAsStringEnabled) })).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap val scans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 4857bde2ac0..cac7b5b13c9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1197,6 +1197,12 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(false) + val ENABLE_READ_JSON_MIXED_TYPES_AS_STRING = + conf("spark.rapids.sql.json.read.mixedTypesAsString.enabled") + .doc("JSON reading is not 100% compatible when reading mixed types as string.") + .booleanConf + .createWithDefault(false) + val ENABLE_AVRO = conf("spark.rapids.sql.format.avro.enabled") .doc("When set to true enables all avro input and output acceleration. " + "(only input is currently supported anyways)") @@ -2621,6 +2627,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isJsonDecimalReadEnabled: Boolean = get(ENABLE_READ_JSON_DECIMALS) + lazy val isJsonMixedTypesAsStringEnabled: Boolean = get(ENABLE_READ_JSON_MIXED_TYPES_AS_STRING) + lazy val isAvroEnabled: Boolean = get(ENABLE_AVRO) lazy val isAvroReadEnabled: Boolean = get(ENABLE_AVRO_READ) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index 53f86e3d75e..138f99b0c72 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -249,7 +249,8 @@ case class GpuJsonScan( dataFilters: Seq[Expression], maxReaderBatchSizeRows: Integer, maxReaderBatchSizeBytes: Long, - maxGpuColumnSizeBytes: Long) + maxGpuColumnSizeBytes: Long, + mixedTypesAsStringEnabled: Boolean) extends TextBasedFileScan(sparkSession, options) with GpuScan { private lazy val parsedOptions: JSONOptions = new JSONOptions( @@ -272,7 +273,8 @@ case class GpuJsonScan( GpuJsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, parsedOptions, maxReaderBatchSizeRows, - maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap) + maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap, + mixedTypesAsStringEnabled) } override def withInputFile(): GpuScan = this @@ -290,7 +292,8 @@ case class GpuJsonPartitionReaderFactory( maxReaderBatchSizeBytes: Long, maxGpuColumnSizeBytes: Long, metrics: Map[String, GpuMetric], - @transient params: Map[String, String]) extends ShimFilePartitionReaderFactory(params) { + @transient params: Map[String, String], + mixedTypesAsStringEnabled: Boolean) extends ShimFilePartitionReaderFactory(params) { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { throw new IllegalStateException("ROW BASED PARSING IS NOT SUPPORTED ON THE GPU...") @@ -300,7 +303,7 @@ case class GpuJsonPartitionReaderFactory( val conf = broadcastedConf.value.value val reader = new PartitionReaderWithBytesRead(new JsonPartitionReader(conf, partFile, dataSchema, readDataSchema, parsedOptions, maxReaderBatchSizeRows, maxReaderBatchSizeBytes, - metrics)) + metrics, mixedTypesAsStringEnabled)) ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema, maxGpuColumnSizeBytes) } @@ -346,7 +349,8 @@ class JsonPartitionReader( parsedOptions: JSONOptions, maxRowsPerChunk: Integer, maxBytesPerChunk: Long, - execMetrics: Map[String, GpuMetric]) + execMetrics: Map[String, GpuMetric], + enableMixedTypesAsString: Boolean) extends GpuTextBasedPartitionReader[HostLineBufferer, HostLineBuffererFactory.type](conf, partFile, dataSchema, readDataSchema, parsedOptions.lineSeparatorInRead, maxRowsPerChunk, maxBytesPerChunk, execMetrics, HostLineBuffererFactory) { @@ -354,7 +358,7 @@ class JsonPartitionReader( def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions = { cudf.JSONOptions.builder() .withRecoverWithNull(true) - .withMixedTypesAsStrings(true) + .withMixedTypesAsStrings(enableMixedTypesAsString) .build } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala index 464d0a7cb15..3cec8943cf6 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,8 @@ class GpuReadJsonFileFormat extends JsonFileFormat with GpuReadFileFormatWithMet rapidsConf.maxReadBatchSizeBytes, rapidsConf.maxGpuColumnSizeBytes, metrics, - options) + options, + rapidsConf.isJsonMixedTypesAsStringEnabled) PartitionReaderIterator.buildReader(factory) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index 1e3f232c3ab..a6dcb9d8edf 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ case class GpuJsonToStructs( schema: DataType, options: Map[String, String], child: Expression, + enableMixedTypesAsString: Boolean, timeZoneId: Option[String] = None) extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes with NullIntolerant { @@ -177,9 +178,7 @@ case class GpuJsonToStructs( val jsonOptions = cudf.JSONOptions.builder() .withRecoverWithNull(true) - // tracking issue for enabling mixed type as string - // https://github.com/NVIDIA/spark-rapids/issues/10253 - .withMixedTypesAsStrings(false) + .withMixedTypesAsStrings(enableMixedTypesAsString) .build() withResource(cudf.Table.readJSON(jsonOptions, data, start, length)) { tableWithMeta => val names = tableWithMeta.getColumnNames