-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25935][SQL] Prevent null rows from JSON parser #22938
Conversation
add to whitelist |
ok to test |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
Outdated
Show resolved
Hide resolved
Test build #98439 has finished for PR 22938 at commit
|
@@ -240,16 +240,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |||
Seq(Row("1"), Row("2"))) | |||
} | |||
|
|||
test("SPARK-11226 Skip empty line in json file") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the test because it is not relevant to the default mode PERMISSIVE
any more. And the SQLQuerySuite
is not perfect place for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is it moved to then? Does that mean we don't have a regression test for SPARK-11226 anymore?
Test build #98444 has finished for PR 22938 at commit
|
Test build #98448 has finished for PR 22938 at commit
|
Test build #98452 has finished for PR 22938 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
Outdated
Show resolved
Hide resolved
Test build #98527 has finished for PR 22938 at commit
|
@HyukjinKwon Are you ok with the changes? |
@MaxGekk I have checked out your PR and played a little bit with it: created a new unit test as a copy of "from_json - input=array, schema=array, output=array" with an invalid JSON. I expected to get an InternalRow(null) for an array schema but I got null. After debugging a little I have found the reason is Please fix these and add a small unit test for each. |
Yea, looks fine in general. Will take a look within this week or weekends. |
@attilapiros, mind showing rough small test codes for it please? just want to see if this is something we should fix or not. |
@HyukjinKwon Sure, the test would be for invalid JSON array: test("from_json - input=invalid JSON array, schema=array, output=array") {
val input = """[{"a": 1}, {a": 2}]"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: InternalRow(2) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), InternalRow(null))
} I have corrupted the JSON by removing Running this test fails with:
|
I guess the problem belongs to spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala Line 36 in 57eddc7
FailureSafeParser was created to safety parse structs not arrays and maps. I think need to properly prepare nullResult for arrays and maps. I will look at it. Thank you @attilapiros for the example.
|
At least it doesn't fail on the cases https://github.com/apache/spark/pull/22938/files#diff-6626026091295ad8c0dfb66ecbcd04b1R568 and https://github.com/apache/spark/pull/22938/files#diff-6626026091295ad8c0dfb66ecbcd04b1R565 which this PR addresses actually. So, I am getting exactly one row from |
I made a fix for broken array and map in |
Test build #98555 has finished for PR 22938 at commit
|
Test build #98565 has finished for PR 22938 at commit
|
docs/sql-migration-guide-upgrade.md
Outdated
@@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide | |||
|
|||
- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`. | |||
|
|||
- In Spark version 2.4 and earlier, JSON data source and the `from_json` function produced `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for curiosity, how can the json data source return null rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we use the data source, we can specify the schema as StructType
only. In that case, we get a Seq[InternalRow]
or Nil
from JacksonParser which is flatMap
ped, or BadRecordException
which is converted to Iterator[InternalRow]
. It seems there is no way to get null
rows. The difference between JSON datasource and JSON functions is formers don't (and cannot) do flattening. So, the Nil
case should be handled especially (this PR addresses the case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Spark version 2.4 and earlier, JSON data source and the
from_json
function producednull
s
Shall we update this? According to what you said, JSON data source can't produce null.
case _: StructType => (row: InternalRow) => row | ||
case _: ArrayType => (row: InternalRow) => | ||
if (row.isNullAt(0)) { | ||
new GenericArrayData(Array()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the place from_json
is different from json data source. A data source must produce data as rows, while the from_json
can return array or map.
I think the previous behavior also makes sense. For array/map, we don't have the corrupted column, and returning null is reasonable. Actually I prefer null over empty array/map, but we need more discussion about this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought what is better to return here - null
or empty Array
/MapData
. In the case of StructType
we return Row
in the PERMISSIVE
mode. For consistency should we return empty array/map in this mode too?
Maybe we can consider special mode when we can return null
for the bad record? For now it is easy to do since we use FailureSafeParser
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we need more discussion about this behavior.
@cloud-fan Should I send an email to the dev list or we can discuss this here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could revert the recent commits and prepare a separate PR for the behaviour change. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay to return null
for map and array. Let's make some changes to make it null
for map and array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have discarded the recent changes.
Test build #98655 has finished for PR 22938 at commit
|
Test build #98660 has finished for PR 22938 at commit
|
Test build #98701 has finished for PR 22938 at commit
|
assert(!rows.hasNext) | ||
castRow(result) | ||
} else { | ||
throw new IllegalArgumentException("Expected one row from JSON parser.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can only happen when we have a bug, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it must not happen.
@@ -1115,6 +1115,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { | |||
Row(null, null, null), | |||
Row(null, null, null), | |||
Row(null, null, null), | |||
Row(null, null, null), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so for json data source, previous behavior is, we would skip the row even it's in PERMISSIVE mode. Shall we clearly mention it in the migration guide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so for json data source, previous behavior is, we would skip the row even it's in PERMISSIVE mode.
Yes, we skipped such rows if Jackson
parser wasn't able to find any root tokens. So, not only empty strings and gaps got into the category.
Shall we clearly mention it in the migration guide?
Sure.
@@ -1813,6 +1817,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { | |||
val path = dir.getCanonicalPath | |||
primitiveFieldAndType | |||
.toDF("value") | |||
.repartition(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the repartition
required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I remember I added the repartition(1)
here and in other places because to eliminate empty files. Such empty files are produced by empty partitions. Probably we could avoid writing empty files at least in the case of text-based datasources but any case let's look at TextOutputWriter
, for example. It creates an input stream for a file in its constructor:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
Line 151 in 46110a5
private val writer = CodecStreams.createOutputStream(context, new Path(path)) |
and closes the empty file in
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
Line 162 in 46110a5
writer.close() |
From the read side, when Jackson
parser tries to read the empty file, it cannot detect any JSON tokens on the root level and returns null from nextToken()
for which I throw a bad record exception for now -> Row(...)
in PERMISSIVE
mode.
@cloud-fan @HyukjinKwon Do you agree with the proposed changes, or there is anything which blocks the PR for now? |
@HyukjinKwon @cloud-fan May I ask you to look at this PR one more time. |
LGTM except the migration guide. JSON data source can't produce null rows, but skip it even with permisive mode. |
Test build #99143 has finished for PR 22938 at commit
|
thanks, merging to master! |
Test build #99144 has finished for PR 22938 at commit
|
@@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { | |||
.text(path) | |||
|
|||
val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) | |||
assert(jsonDF.count() === corruptRecordCount) | |||
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, does this mean that it reads an empty record from empty file after this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's true, we should not do this. Empty files can be generated in many cases for now and the behaviour is not currently well defined. If we rely on this behaviour, it will cause some weird behaviours or bugs hard to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we skip empty files for all the file-based data sources?
Sorry for the late response. The change looks good to me too in general but I had two questions (see also #22938 (comment)). |
## What changes were proposed in this pull request? An input without valid JSON tokens on the root level will be treated as a bad record, and handled according to `mode`. Previously such input was converted to `null`. After the changes, the input is converted to a row with `null`s in the `PERMISSIVE` mode according the schema. This allows to remove a code in the `from_json` function which can produce `null` as result rows. ## How was this patch tested? It was tested by existing test suites. Some of them I have to modify (`JsonSuite` for example) because previously bad input was just silently ignored. For now such input is handled according to specified `mode`. Closes apache#22938 from MaxGekk/json-nulls. Lead-authored-by: Maxim Gekk <max.gekk@gmail.com> Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? This PR reverts apache#22938 per discussion in apache#23325 Closes apache#23325 Closes apache#23543 from MaxGekk/return-nulls-from-json-parser. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
An input without valid JSON tokens on the root level will be treated as a bad record, and handled according to
mode
. Previously such input was converted tonull
. After the changes, the input is converted to a row withnull
s in thePERMISSIVE
mode according the schema. This allows to remove a code in thefrom_json
function which can producenull
as result rows.How was this patch tested?
It was tested by existing test suites. Some of them I have to modify (
JsonSuite
for example) because previously bad input was just silently ignored. For now such input is handled according to specifiedmode
.