Skip to content

Commit

Permalink
Add avro reader support [databricks] (#4956)
Browse files Browse the repository at this point in the history
Add avro reader support 

Co-authored-by: remzi <13716567376yh@gmail.com>
Co-authored-by: Bobby Wang <wbo498@gmail.com>
  • Loading branch information
3 people authored Mar 23, 2022
1 parent e0e8c17 commit 19124fe
Show file tree
Hide file tree
Showing 21 changed files with 1,177 additions and 8 deletions.
5 changes: 5 additions & 0 deletions dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
Expand Down
12 changes: 12 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,18 @@ parse some variants of `NaN` and `Infinity` even when this option is disabled
([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator behavior is consistent with
Spark version 3.3.0 and later.

## Avro

The Avro format read is a very experimental feature which is expected to have some issues, so we disable
it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.avro.enabled` and
`spark.rapids.sql.format.avro.read.enabled`.

Currently, the GPU accelerated Avro reader doesn't support reading the Avro version 1.2 files.

### Supported types

The boolean, byte, short, int, long, float, double, string are supported in current version.

## Regular Expressions

Regular expression evaluation on the GPU can potentially have high memory overhead and cause out-of-memory errors so
Expand Down
3 changes: 3 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Name | Description | Default Value
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
<a name="sql.format.avro.enabled"></a>spark.rapids.sql.format.avro.enabled|When set to true enables all avro input and output acceleration. (only input is currently supported anyways)|false
<a name="sql.format.avro.read.enabled"></a>spark.rapids.sql.format.avro.read.enabled|When set to true enables avro input acceleration|false
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.json.enabled"></a>spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false
Expand Down Expand Up @@ -390,6 +392,7 @@ Name | Description | Default Value | Notes
<a name="sql.input.JsonScan"></a>spark.rapids.sql.input.JsonScan|Json parsing|true|None|
<a name="sql.input.OrcScan"></a>spark.rapids.sql.input.OrcScan|ORC parsing|true|None|
<a name="sql.input.ParquetScan"></a>spark.rapids.sql.input.ParquetScan|Parquet parsing|true|None|
<a name="sql.input.AvroScan"></a>spark.rapids.sql.input.AvroScan|Avro parsing|true|None|

### Partitioning

Expand Down
43 changes: 43 additions & 0 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -17914,6 +17914,49 @@ dates or timestamps, or for a lack of type coercion support.
<th>UDT</th>
</tr>
<tr>
<th rowSpan="2">Avro</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th>Write</th>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th rowSpan="2">CSV</th>
<th>Read</th>
<td>S</td>
Expand Down
9 changes: 7 additions & 2 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,19 @@
<goals>
<goal>copy</goal>
</goals>
<configuration>
<useBaseVersion>true</useBaseVersion>
<configuration>
<useBaseVersion>true</useBaseVersion>
<artifactItems>
<artifactItem>
<groupId>ai.rapids</groupId>
<artifactId>cudf</artifactId>
<classifier>${cuda.version}</classifier>
</artifactItem>
<artifactItem>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down
19 changes: 18 additions & 1 deletion integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,35 @@ else
# support alternate local jars NOT building from the source code
if [ -d "$LOCAL_JAR_PATH" ]; then
CUDF_JARS=$(echo "$LOCAL_JAR_PATH"/cudf-*.jar)
AVRO_JARS=$(echo "$LOCAL_JAR_PATH"/spark-avro*.jar)
PLUGIN_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark_*.jar)
# the integration-test-spark3xx.jar, should not include the integration-test-spark3xxtest.jar
TEST_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark-integration-tests*-$SPARK_SHIM_VER.jar)
else
CUDF_JARS=$(echo "$SCRIPTPATH"/target/dependency/cudf-*.jar)
AVRO_JARS=$(echo "$SCRIPTPATH"/target/dependency/spark-avro*.jar)
PLUGIN_JARS=$(echo "$SCRIPTPATH"/../dist/target/rapids-4-spark_*.jar)
# the integration-test-spark3xx.jar, should not include the integration-test-spark3xxtest.jar
TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$SPARK_SHIM_VER.jar)
fi

# `./run_pyspark_from_build.sh` runs all tests including avro_test.py with spark-avro.jar
# in the classpath.
#
# `./run_pyspark_from_build.sh -k xxx ` runs all xxx tests with spark-avro.jar in the classpath
#
# `INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh` run all tests (except the marker skipif())
# without spark-avro.jar
if [[ $( echo ${INCLUDE_SPARK_AVRO_JAR} | tr [:upper:] [:lower:] ) == "true" ]];
then
export INCLUDE_SPARK_AVRO_JAR=true
else
export INCLUDE_SPARK_AVRO_JAR=false
AVRO_JARS=""
fi

# Only 3 jars: cudf.jar dist.jar integration-test.jar
ALL_JARS="$CUDF_JARS $PLUGIN_JARS $TEST_JARS"
ALL_JARS="$CUDF_JARS $PLUGIN_JARS $TEST_JARS $AVRO_JARS"
echo "AND PLUGIN JARS: $ALL_JARS"
if [[ "${TEST}" != "" ]];
then
Expand Down
90 changes: 90 additions & 0 deletions integration_tests/src/main/python/avro_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

from spark_session import with_cpu_session
import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from marks import *
from pyspark.sql.types import *

if os.environ.get('INCLUDE_SPARK_AVRO_JAR', 'false') == 'false':
pytestmark = pytest.mark.skip(reason=str("INCLUDE_SPARK_AVRO_JAR is disabled"))

support_gens = numeric_gens + [string_gen, boolean_gen]

_enable_all_types_conf = {
'spark.rapids.sql.format.avro.enabled': 'true',
'spark.rapids.sql.format.avro.read.enabled': 'true'}


@pytest.mark.parametrize('gen', support_gens)
@pytest.mark.parametrize('v1_enabled_list', ["avro", ""])
def test_basic_read(spark_tmp_path, gen, v1_enabled_list):
data_path = spark_tmp_path + '/AVRO_DATA'
with_cpu_session(
lambda spark: unary_op_df(spark, gen).write.format("avro").save(data_path)
)

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path),
conf=all_confs)


@pytest.mark.parametrize('v1_enabled_list', ["", "avro"])
def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0/key2=20'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(first_data_path))
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1/key2=21'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(second_data_path))
third_data_path = spark_tmp_path + '/AVRO_DATA/key=2/key2=22'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(third_data_path))

data_path = spark_tmp_path + '/AVRO_DATA'

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path),
conf=all_confs)


@pytest.mark.parametrize('v1_enabled_list', ["", "avro"])
def test_avro_input_meta(spark_tmp_path, v1_enabled_list):
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(first_data_path))
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(second_data_path))
data_path = spark_tmp_path + '/AVRO_DATA'

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path)
.filter(f.col('a') > 0)
.selectExpr('a',
'input_file_name()',
'input_file_block_start()',
'input_file_block_length()'),
conf=all_confs)
35 changes: 35 additions & 0 deletions jenkins/databricks/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ JACKSONANNOTATION=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive
HADOOPCOMMON=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hadoop--hadoop-common--org.apache.hadoop__hadoop-common__2.7.4.jar
HADOOPMAPRED=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hadoop--hadoop-mapreduce-client-core--org.apache.hadoop__hadoop-mapreduce-client-core__2.7.4.jar

if [[ $BASE_SPARK_VERSION == "3.2.1" ]]
then
AVROSPARKJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--vendor--avro--avro-hive-2.3__hadoop-3.2_2.12_deploy_shaded.jar
AVROMAPRED=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-3.2--org.apache.avro--avro-mapred--org.apache.avro__avro-mapred__1.10.2.jar
AVROJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-3.2--org.apache.avro--avro--org.apache.avro__avro__1.10.2.jar
else
AVROSPARKJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--vendor--avro--avro_2.12_deploy_shaded.jar
AVROMAPRED=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.avro--avro-mapred-hadoop2--org.apache.avro__avro-mapred-hadoop2__1.8.2.jar
AVROJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.avro--avro--org.apache.avro__avro__1.8.2.jar
fi

# Please note we are installing all of these dependencies using the Spark version (SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS) to make it easier
# to specify the dependencies in the pom files

Expand All @@ -177,6 +188,30 @@ mvn -B install:install-file \
-Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \
-Dpackaging=jar

mvn -B install:install-file \
-Dmaven.repo.local=$M2DIR \
-Dfile=$JARDIR/$AVROSPARKJAR\
-DgroupId=org.apache.spark \
-DartifactId=spark-avro_$SCALA_VERSION \
-Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \
-Dpackaging=jar

mvn -B install:install-file \
-Dmaven.repo.local=$M2DIR \
-Dfile=$JARDIR/$AVROMAPRED\
-DgroupId=org.apache.avro\
-DartifactId=avro-mapred \
-Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \
-Dpackaging=jar

mvn -B install:install-file \
-Dmaven.repo.local=$M2DIR \
-Dfile=$JARDIR/$AVROJAR \
-DgroupId=org.apache.avro\
-DartifactId=avro \
-Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \
-Dpackaging=jar

mvn -B install:install-file \
-Dmaven.repo.local=$M2DIR \
-Dfile=$JARDIR/$ANNOTJAR \
Expand Down
1 change: 1 addition & 0 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ ci_2() {
TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh
TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \
./integration_tests/run_pyspark_from_build.sh
INCLUDE_SPARK_AVRO_JAR=true TEST='avro_test.py' ./integration_tests/run_pyspark_from_build.sh
}


Expand Down
24 changes: 24 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<profiles>
Expand Down Expand Up @@ -119,6 +125,24 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.shims

import com.nvidia.spark.rapids.RapidsMeta

import org.apache.spark.sql.avro.AvroOptions

object AvroUtils {

def tagSupport(
parsedOptions: AvroOptions,
meta: RapidsMeta[_, _, _]): Unit = {

}

}
Loading

0 comments on commit 19124fe

Please sign in to comment.