Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use parser from spark to normalize json path in GetJsonObject #10466

Merged
10 changes: 0 additions & 10 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,16 +479,6 @@ The following is a list of known differences.

The following is a list of bugs in either the GPU version or arguably in Apache Spark itself.
* https://github.com/NVIDIA/spark-rapids/issues/10219 non-matching quotes in quoted strings
* https://github.com/NVIDIA/spark-rapids/issues/10213 array index notation works without root
* https://github.com/NVIDIA/spark-rapids/issues/10214 unquoted array index notation is not
supported
* https://github.com/NVIDIA/spark-rapids/issues/10215 leading spaces can be stripped from named
keys.
* https://github.com/NVIDIA/spark-rapids/issues/10216 It appears that Spark is flattening some
output, which is different from other implementations including the GPU version.
* https://github.com/NVIDIA/spark-rapids/issues/10217 a JSON path execution bug
* https://issues.apache.org/jira/browse/SPARK-46761 Apache Spark does not allow the `?` character in
a quoted JSON path string.

## Avro

Expand Down
54 changes: 37 additions & 17 deletions integration_tests/src/main/python/get_json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,19 @@ def test_get_json_object_single_quotes():
"$['key with spaces']",
"$.store.book",
"$.store.book[0]",
"$.store.book[*]",
pytest.param("$",marks=[
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10218'),
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10196'),
pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10194')]),
"$.store.book[0].category",
"$.store.book[*].category",
"$.store.book[*].isbn",
pytest.param("$.store.book[*].reader",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10216')),
"$.store.basket[0][1]",
"$.store.basket[*]",
"$.store.basket[*][0]",
"$.store.basket[0][*]",
"$.store.basket[*][*]",
"$.store.basket[0][2].b",
pytest.param("$.store.basket[0][*].b",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10217')),
"$.zip code",
"$.fb:testid",
pytest.param("$.a",marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10196')),
"$.non_exist_key",
pytest.param("$..no_recursive", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10212')),
"$.store.book[0].non_exist_key",
"$.store.basket[*].non_exist_key"])
"$..no_recursive",
"$.store.book[0].non_exist_key"])
def test_get_json_object_spark_unit_tests(query):
schema = StructType([StructField("jsonStr", StringType())])
data = [
Expand All @@ -110,6 +100,26 @@ def test_get_json_object_spark_unit_tests(query):
f.get_json_object('jsonStr', query)),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@allow_non_gpu("ProjectExec", "GetJsonObject")
@pytest.mark.parametrize('query',["$.store.basket[0][*].b",
"$.store.book[*].reader",
"$.store.book[*]",
"$.store.book[*].category",
"$.store.book[*].isbn",
"$.store.basket[*]",
"$.store.basket[*][0]",
"$.store.basket[0][*]",
"$.store.basket[*][*]",
"$.store.basket[*].non_exist_key"])
def test_get_json_object_spark_unit_tests_fallback(query):
schema = StructType([StructField("jsonStr", StringType())])
data = [['''{"store":{"fruit":[{"weight":8,"type":"apple"},{"weight":9,"type":"pear"}],"basket":[[1,2,{"b":"y","a":"x"}],[3,4],[5,6]],"book":[{"author":"Nigel Rees","title":"Sayings of the Century","category":"reference","price":8.95},{"author":"Herman Melville","title":"Moby Dick","category":"fiction","price":8.99,"isbn":"0-553-21311-3"},{"author":"J. R. R. Tolkien","title":"The Lord of the Rings","category":"fiction","reader":[{"age":25,"name":"bob"},{"age":26,"name":"jack"}],"price":22.99,"isbn":"0-395-19395-8"}],"bicycle":{"price":19.95,"color":"red"}},"email":"amy@only_for_json_udf_test.net","owner":"amy","zip code":"94025","fb:testid":"1234"}''']]
assert_gpu_fallback_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.get_json_object('jsonStr', query)),
"GetJsonObject",
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10218")
def test_get_json_object_normalize_non_string_output():
schema = StructType([StructField("jsonStr", StringType())])
Expand All @@ -132,7 +142,6 @@ def test_get_json_object_normalize_non_string_output():
f.get_json_object('jsonStr', '$')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@pytest.mark.xfail(reason="https://issues.apache.org/jira/browse/SPARK-46761")
def test_get_json_object_quoted_question():
schema = StructType([StructField("jsonStr", StringType())])
data = [[r'{"?":"QUESTION"}']]
Expand Down Expand Up @@ -221,7 +230,6 @@ def test_get_json_object_invalid_path():
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10213")
def test_get_json_object_top_level_array_notation():
# This is a special version of invalid path. It is something that the GPU supports
# but the CPU thinks is invalid
Expand All @@ -239,7 +247,6 @@ def test_get_json_object_top_level_array_notation():
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10214")
def test_get_json_object_unquoted_array_notation():
# This is a special version of invalid path. It is something that the GPU supports
# but the CPU thinks is invalid
Expand All @@ -257,27 +264,40 @@ def test_get_json_object_unquoted_array_notation():
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})


@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/10215")
def test_get_json_object_white_space_removal():
# This is a special version of invalid path. It is something that the GPU supports
# but the CPU thinks is invalid
schema = StructType([StructField("jsonStr", StringType())])
data = [['{" a":" A"," b":" B"}'],
['{"a":"A","b":"B"}'],
['{"a ":"A ","b ":"B "}'],
['{" a ":" A "," b ":" B "}']]
['{" a ":" A "," b ":" B "}'],
['{" a ": {" a ":" A "}," b ": " B "}'],
['{" a":"b","a.a":"c","b":{"a":"ab"}}'],
['{" a":"b"," a. a":"c","b":{"a":"ab"}}'],
['{" a":"b","a .a ":"c","b":{"a":"ab"}}'],
['{" a":"b"," a . a ":"c","b":{"a":"ab"}}']
]

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.createDataFrame(data,schema=schema).select(
f.col('jsonStr'),
f.get_json_object('jsonStr', '$.a').alias('dot_a'),
f.get_json_object('jsonStr', '$. a').alias('dot_space_a'),
f.get_json_object('jsonStr', '$.\ta').alias('dot_tab_a'),
f.get_json_object('jsonStr', '$. a').alias('dot_spaces_a3'),
f.get_json_object('jsonStr', '$.a ').alias('dot_a_space'),
f.get_json_object('jsonStr', '$. a ').alias('dot_space_a_space'),
f.get_json_object('jsonStr', "$['b']").alias('dot_b'),
f.get_json_object('jsonStr', "$[' b']").alias('dot_space_b'),
f.get_json_object('jsonStr', "$['b ']").alias('dot_b_space'),
f.get_json_object('jsonStr', "$[' b ']").alias('dot_space_b_space'),
f.get_json_object('jsonStr', "$. a. a").alias('dot_space_a_dot_space_a'),
f.get_json_object('jsonStr', "$.a .a ").alias('dot_a_space_dot_a_space'),
f.get_json_object('jsonStr', "$. a . a ").alias('dot_space_a_space_dot_space_a_space'),
f.get_json_object('jsonStr', "$[' a. a']").alias('space_a_dot_space_a'),
f.get_json_object('jsonStr', "$['a .a ']").alias('a_space_dot_a_space'),
f.get_json_object('jsonStr', "$[' a . a ']").alias('space_a_space_dot_space_a_space'),
),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,115 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{ColumnVector,GetJsonObjectOptions}
import scala.util.parsing.combinator.RegexParsers

import ai.rapids.cudf.{ColumnVector, GetJsonObjectOptions, Scalar}
import com.nvidia.spark.rapids.Arm.withResource

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, GetJsonObject}
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

// Copied from Apache Spark org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
private[this] sealed trait PathInstruction
private[this] object PathInstruction {
case object Subscript extends PathInstruction
case object Wildcard extends PathInstruction
case object Key extends PathInstruction
case class Index(index: Long) extends PathInstruction
case class Named(name: String) extends PathInstruction
}

private[this] object JsonPathParser extends RegexParsers {
import PathInstruction._

def root: Parser[Char] = '$'

def long: Parser[Long] = "\\d+".r ^? {
case x => x.toLong
}

// parse `[*]` and `[123]` subscripts
def subscript: Parser[List[PathInstruction]] =
for {
operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']'
} yield {
Subscript :: operand :: Nil
}

// parse `.name` or `['name']` child expressions
def named: Parser[List[PathInstruction]] =
for {
name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']"
} yield {
Key :: Named(name) :: Nil
}

// child wildcards: `..`, `.*` or `['*']`
def wildcard: Parser[List[PathInstruction]] =
(".*" | "['*']") ^^^ List(Wildcard)

def node: Parser[List[PathInstruction]] =
wildcard |
named |
subscript

val expression: Parser[List[PathInstruction]] = {
phrase(root ~> rep(node) ^^ (x => x.flatten))
}

def parse(str: String): Option[List[PathInstruction]] = {
this.parseAll(expression, str) match {
case Success(result, _) =>
Some(result)

case _ =>
None
}
}

def containsUnsupportedPath(instructions: List[PathInstruction]): Boolean = {
// Gpu GetJsonObject is not supported if JSON path contains wildcard [*]
// see https://github.com/NVIDIA/spark-rapids/issues/10216
instructions.exists {
case Wildcard => true
case Named(name) if name == "*" => true
revans2 marked this conversation as resolved.
Show resolved Hide resolved
case _ => false
}
}

def normalize(instructions: List[PathInstruction]): String = {
// convert List[PathInstruction] to String
"$" + instructions.map {
case Subscript | Key => ""
case Wildcard => "[*]"
case Index(index) => s"[$index]"
case Named(name) => s"['$name']"
case _ => throw new IllegalArgumentException(s"Invalid instruction in path")
}.mkString
}
}

class GpuGetJsonObjectMeta(
expr: GetJsonObject,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule
) extends BinaryExprMeta[GetJsonObject](expr, conf, parent, rule) {

override def tagExprForGpu(): Unit = {
val lit = GpuOverrides.extractLit(expr.right)
lit.map { l =>
val instructions = JsonPathParser.parse(l.value.asInstanceOf[UTF8String].toString)
if (instructions.exists(JsonPathParser.containsUnsupportedPath)) {
willNotWorkOnGpu("get_json_object on GPU does not support wildcard [*] in path")
}
}
}

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuGetJsonObject(lhs, rhs)
}

case class GpuGetJsonObject(json: Expression, path: Expression)
extends GpuBinaryExpressionArgsAnyScalar
Expand All @@ -32,9 +136,30 @@ case class GpuGetJsonObject(json: Expression, path: Expression)
override def nullable: Boolean = true
override def prettyName: String = "get_json_object"

override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = {
lhs.getBase().getJSONObject(rhs.getBase,
GetJsonObjectOptions.builder().allowSingleQuotes(true).build());
private var cachedNormalizedPath: Option[Option[String]] = None

def normalizeJsonPath(path: GpuScalar): Option[String] = {
if (path.isValid) {
val pathStr = path.getValue.toString()
JsonPathParser.parse(pathStr).map(JsonPathParser.normalize)
} else {
None
}
}

override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = {
cachedNormalizedPath.getOrElse {
val normalizedPath: Option[String] = normalizeJsonPath(rhs)
cachedNormalizedPath = Some(normalizedPath)
normalizedPath
} match {
case Some(normalizedStr) =>
withResource(Scalar.fromString(normalizedStr)) { scalar =>
lhs.getBase().getJSONObject(scalar,
GetJsonObjectOptions.builder().allowSingleQuotes(true).build())
}
case None => GpuColumnVector.columnVectorFromNull(lhs.getRowCount.toInt, StringType)
}
}

override def doColumnar(numRows: Int, lhs: GpuScalar, rhs: GpuScalar): ColumnVector = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3646,10 +3646,7 @@ object GpuOverrides extends Logging {
ExprChecks.projectOnly(
TypeSig.STRING, TypeSig.STRING, Seq(ParamCheck("json", TypeSig.STRING, TypeSig.STRING),
ParamCheck("path", TypeSig.lit(TypeEnum.STRING), TypeSig.STRING))),
(a, conf, p, r) => new BinaryExprMeta[GetJsonObject](a, conf, p, r) {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuGetJsonObject(lhs, rhs)
}
(a, conf, p, r) => new GpuGetJsonObjectMeta(a, conf, p, r)
).disabledByDefault("escape sequences are not processed correctly, the input is not " +
"validated, and the output is not normalized the same as Spark"),
expr[JsonToStructs](
Expand Down
Loading