Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add json reader support #4485

Merged
merged 12 commits into from
Jan 20, 2022
Merged
3 changes: 3 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Name | Description | Default Value
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.json.enabled"></a>spark.rapids.sql.format.json.enabled|When set to false disables all json input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.json.read.enabled"></a>spark.rapids.sql.format.json.read.enabled|When set to false disables json input acceleration|true
<a name="sql.format.orc.enabled"></a>spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
<a name="sql.format.orc.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.orc.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type|2147483647
<a name="sql.format.orc.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.orc.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small orc files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type.|20
Expand Down Expand Up @@ -382,6 +384,7 @@ Name | Description | Default Value | Notes
Name | Description | Default Value | Notes
-----|-------------|---------------|------------------
<a name="sql.input.CSVScan"></a>spark.rapids.sql.input.CSVScan|CSV parsing|true|None|
<a name="sql.input.JsonScan"></a>spark.rapids.sql.input.JsonScan|Json parsing|true|None|
<a name="sql.input.OrcScan"></a>spark.rapids.sql.input.OrcScan|ORC parsing|true|None|
<a name="sql.input.ParquetScan"></a>spark.rapids.sql.input.ParquetScan|Parquet parsing|true|None|

Expand Down
43 changes: 43 additions & 0 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -17543,6 +17543,49 @@ dates or timestamps, or for a lack of type coercion support.
<td> </td>
</tr>
<tr>
<th rowSpan="2">JSON</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th>Write</th>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th rowSpan="2">ORC</th>
<th>Read</th>
<td>S</td>
Expand Down
35 changes: 35 additions & 0 deletions integration_tests/src/main/python/get_json_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *

def mk_json_str_gen(pattern):
return StringGen(pattern).with_special_case('').with_special_pattern('.{0,10}')

@pytest.mark.parametrize('json_str_pattern', [r'\{"store": \{"fruit": \[\{"weight":\d,"type":"[a-z]{1,9}"\}\], ' \
r'"bicycle":\{"price":\d\d\.\d\d,"color":"[a-z]{0,4}"\}\},' \
r'"email":"[a-z]{1,5}\@[a-z]{3,10}\.com","owner":"[a-z]{3,8}"\}',
r'\{"a": "[a-z]{1,3}"\}'], ids=idfn)
def test_get_json_object(json_str_pattern):
gen = mk_json_str_gen(json_str_pattern)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen, length=10).selectExpr(
'get_json_object(a,"$.a")',
'get_json_object(a, "$.owner")',
'get_json_object(a, "$.store.fruit[0]")'),
conf={'spark.sql.parser.escapedStringLiterals': 'true'})
40 changes: 25 additions & 15 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,20 +16,30 @@

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *
from src.main.python.marks import approximate_float

def mk_json_str_gen(pattern):
return StringGen(pattern).with_special_case('').with_special_pattern('.{0,10}')
from src.main.python.spark_session import with_cpu_session

@pytest.mark.parametrize('json_str_pattern', [r'\{"store": \{"fruit": \[\{"weight":\d,"type":"[a-z]{1,9}"\}\], ' \
r'"bicycle":\{"price":\d\d\.\d\d,"color":"[a-z]{0,4}"\}\},' \
r'"email":"[a-z]{1,5}\@[a-z]{3,10}\.com","owner":"[a-z]{3,8}"\}',
r'\{"a": "[a-z]{1,3}"\}'], ids=idfn)
def test_get_json_object(json_str_pattern):
gen = mk_json_str_gen(json_str_pattern)
json_supported_gens = [
byte_gen, short_gen, int_gen, long_gen, boolean_gen,
# FloatGen(no_nans=True), # Test will fail
DoubleGen(no_nans=True)
]

_enable_all_types_conf = {
'spark.rapids.sql.format.json.enabled': 'true',
'spark.rapids.sql.format.json.read.enabled': 'true'}

@approximate_float
@pytest.mark.parametrize('data_gen', json_supported_gens, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_round_trip(spark_tmp_path, data_gen, v1_enabled_list):
gen = StructGen([('a', data_gen)], nullable=False)
data_path = spark_tmp_path + '/JSON_DATA'
schema = gen.data_type
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
with_cpu_session(
lambda spark : gen_df(spark, gen).write.json(data_path))
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen, length=10).selectExpr(
'get_json_object(a,"$.a")',
'get_json_object(a, "$.owner")',
'get_json_object(a, "$.store.fruit[0]")'),
conf={'spark.sql.parser.escapedStringLiterals': 'true'})
lambda spark : spark.read.schema(schema).json(data_path),
conf=updated_conf)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.rapids.TimeStamp
import org.apache.spark.sql.catalyst.json.rapids.GpuJsonScan
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -48,6 +49,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
Expand Down Expand Up @@ -424,6 +426,9 @@ object ParquetFormatType extends FileFormatType {
object OrcFormatType extends FileFormatType {
override def toString = "ORC"
}
object JsonFormatType extends FileFormatType {
override def toString = "JSON"
}

sealed trait FileFormatOp
object ReadFileOp extends FileFormatOp {
Expand Down Expand Up @@ -836,7 +841,12 @@ object GpuOverrides extends Logging {
// Note Map is not put into nested, now CUDF only support single level map
TypeSig.STRUCT + TypeSig.DECIMAL_128).nested() + TypeSig.MAP,
sparkSig = (TypeSig.cpuAtomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))
TypeSig.UDT).nested())),
(JsonFormatType, FileFormatChecks(
cudfRead = TypeSig.integral + TypeSig.fp + TypeSig.BOOLEAN + TypeSig.STRING,
cudfWrite = TypeSig.none,
sparkSig = (TypeSig.cpuAtomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))

val commonExpressions: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq(
expr[Literal](
Expand Down Expand Up @@ -3355,6 +3365,23 @@ object GpuOverrides extends Logging {
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes)
}),
GpuOverrides.scan[JsonScan](
"Json parsing",
(a, conf, p, r) => new ScanMeta[JsonScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuJsonScan.tagSupport(this)

override def convertToGpu(): Scan =
GpuJsonScan(a.sparkSession,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.partitionFilters,
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes)
})).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

val scans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] =
Expand Down
15 changes: 15 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,17 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val ENABLE_JSON = conf("spark.rapids.sql.format.json.enabled")
.doc("When set to false disables all json input and output acceleration. " +
"(only input is currently supported anyways)")
.booleanConf
.createWithDefault(true)
revans2 marked this conversation as resolved.
Show resolved Hide resolved

val ENABLE_JSON_READ = conf("spark.rapids.sql.format.json.read.enabled")
.doc("When set to false disables json input acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_RANGE_WINDOW_BYTES = conf("spark.rapids.sql.window.range.byte.enabled")
.doc("When the order-by column of a range based window is byte type and " +
"the range boundary calculated for a value has overflow, CPU and GPU will get " +
Expand Down Expand Up @@ -1625,6 +1636,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCsvReadEnabled: Boolean = get(ENABLE_CSV_READ)

lazy val isJsonEnabled: Boolean = get(ENABLE_JSON)

lazy val isJsonReadEnabled: Boolean = get(ENABLE_JSON_READ)

lazy val shuffleManagerEnabled: Boolean = get(SHUFFLE_MANAGER_ENABLED)

lazy val shuffleTransportEnabled: Boolean = get(SHUFFLE_TRANSPORT_ENABLE)
Expand Down
Loading