Skip to content

Commit

Permalink
Support saveAsTable for writing orc and parquet (#1134)
Browse files Browse the repository at this point in the history
* start saveAsTable

* Add GpuDataSource

* columnar ifle format

* Update to GpuFileFormat

* fix typo

* logging

* more logging

* change format parquet

* fix classof

* fix run to runColumnar

* using original providing instance for end

* remove unneeded code and pass in providers so don't calculate twice

* create shim for SchemaUtils checkSchemaColumnNameDuplication

Signed-off-by: Thomas Graves <tgraves@apache.org>

* fix typo with checkSchemaColumnNameDuplication

* fix name

* fix calling

* fix anothername

* fix none

* Fix provider vs FileFormat

* split read/write tests

* Write a bunch more tests for orc and parquet writing

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* cleanup and csv test

* Add more test

* Add bucket write test

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* remove debug logs

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Update for spark 3.1.0
  • Loading branch information
tgravescs authored Nov 17, 2020
1 parent 2ac2a62 commit 2112f7c
Show file tree
Hide file tree
Showing 14 changed files with 1,333 additions and 130 deletions.
36 changes: 36 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,42 @@ def assert_gpu_and_cpu_writes_are_equal_iterator(write_func, read_func, base_pat
"""
_assert_gpu_and_cpu_writes_are_equal(write_func, read_func, base_path, False, conf=conf)

def assert_gpu_fallback_write(write_func,
read_func,
base_path,
cpu_fallback_class_name,
conf={}):
conf = _prep_incompat_conf(conf)

print('### CPU RUN ###')
cpu_start = time.time()
cpu_path = base_path + '/CPU'
with_cpu_session(lambda spark : write_func(spark, cpu_path), conf=conf)
cpu_end = time.time()
print('### GPU RUN ###')
jvm = spark_jvm()
jvm.com.nvidia.spark.rapids.ExecutionPlanCaptureCallback.startCapture()
gpu_start = time.time()
gpu_path = base_path + '/GPU'
with_gpu_session(lambda spark : write_func(spark, gpu_path), conf=conf)
gpu_end = time.time()
jvm.com.nvidia.spark.rapids.ExecutionPlanCaptureCallback.assertCapturedAndGpuFellBack(cpu_fallback_class_name, 2000)
print('### WRITE: GPU TOOK {} CPU TOOK {} ###'.format(
gpu_end - gpu_start, cpu_end - cpu_start))

(cpu_bring_back, cpu_collect_type) = _prep_func_for_compare(
lambda spark: read_func(spark, cpu_path), True)
(gpu_bring_back, gpu_collect_type) = _prep_func_for_compare(
lambda spark: read_func(spark, gpu_path), True)

from_cpu = with_cpu_session(cpu_bring_back, conf=conf)
from_gpu = with_cpu_session(gpu_bring_back, conf=conf)
if should_sort_locally():
from_cpu.sort(key=_RowCmp)
from_gpu.sort(key=_RowCmp)

assert_equal(from_cpu, from_gpu)

def assert_gpu_fallback_collect(func,
cpu_fallback_class_name,
conf={}):
Expand Down
12 changes: 11 additions & 1 deletion integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_fallback_write
from datetime import datetime, timezone
from data_gen import *
from marks import *
Expand Down Expand Up @@ -245,3 +245,13 @@ def test_input_meta(spark_tmp_path, v1_enabled_list):
'input_file_block_start()',
'input_file_block_length()'),
conf={'spark.sql.sources.useV1SourceList': v1_enabled_list})

@allow_non_gpu('DataWritingCommandExec')
def test_csv_save_as_table_fallback(spark_tmp_path, spark_tmp_table_factory):
gen = TimestampGen()
data_path = spark_tmp_path + '/CSV_DATA'
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("csv").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
lambda spark, path: spark.read.csv(path),
data_path,
'DataWritingCommandExec')
54 changes: 3 additions & 51 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_writes_are_equal_collect, assert_gpu_fallback_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from datetime import date, datetime, timezone
from data_gen import *
from marks import *
Expand Down Expand Up @@ -50,7 +50,7 @@ def test_orc_fallback(spark_tmp_path, read_func, disable_conf):

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/PARQUET_DATA'
data_path = spark_tmp_path + '/ORC_DATA'
reader = read_func(data_path)
with_cpu_session(
lambda spark : gen_df(spark, gen).write.orc(data_path))
Expand Down Expand Up @@ -151,55 +151,6 @@ def test_merge_schema_read(spark_tmp_path, v1_enabled_list):
lambda spark : spark.read.option('mergeSchema', 'true').orc(data_path),
conf={'spark.sql.sources.useV1SourceList': v1_enabled_list})

orc_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))],
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/139')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/140'))]

@pytest.mark.parametrize('orc_gens', orc_write_gens_list, ids=idfn)
def test_write_round_trip(spark_tmp_path, orc_gens):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
data_path = spark_tmp_path + '/ORC_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.orc(path),
lambda spark, path: spark.read.orc(path),
data_path)

orc_part_write_gens = [
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, boolean_gen,
# Some file systems have issues with UTF8 strings so to help the test pass even there
StringGen('(\\w| ){0,50}'),
# Once https://github.com/NVIDIA/spark-rapids/issues/139 is fixed replace this with
# date_gen
DateGen(start=date(1590, 1, 1)),
# Once https://github.com/NVIDIA/spark-rapids/issues/140 is fixed replace this with
# timestamp_gen
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))]

# There are race conditions around when individual files are read in for partitioned data
@ignore_order
@pytest.mark.parametrize('orc_gen', orc_part_write_gens, ids=idfn)
def test_part_write_round_trip(spark_tmp_path, orc_gen):
gen_list = [('a', RepeatSeqGen(orc_gen, 10)),
('b', orc_gen)]
data_path = spark_tmp_path + '/ORC_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.partitionBy('a').orc(path),
lambda spark, path: spark.read.orc(path),
data_path)

orc_write_compress_options = ['none', 'uncompressed', 'snappy']
@pytest.mark.parametrize('compress', orc_write_compress_options)
def test_compress_write_round_trip(spark_tmp_path, compress):
data_path = spark_tmp_path + '/ORC_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.orc(path),
lambda spark, path : spark.read.orc(path),
data_path,
conf={'spark.sql.orc.compression.codec': compress})

@pytest.mark.xfail(
condition=not(is_before_spark_310()),
reason='https://github.com/NVIDIA/spark-rapids/issues/576')
Expand All @@ -218,3 +169,4 @@ def test_input_meta(spark_tmp_path):
'input_file_name()',
'input_file_block_start()',
'input_file_block_length()'))

119 changes: 119 additions & 0 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_writes_are_equal_collect, assert_gpu_fallback_write
from datetime import date, datetime, timezone
from data_gen import *
from marks import *
from pyspark.sql.types import *

orc_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))],
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/139')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/140'))]

@pytest.mark.parametrize('orc_gens', orc_write_gens_list, ids=idfn)
def test_write_round_trip(spark_tmp_path, orc_gens):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
data_path = spark_tmp_path + '/ORC_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.orc(path),
lambda spark, path: spark.read.orc(path),
data_path)

orc_part_write_gens = [
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, boolean_gen,
# Some file systems have issues with UTF8 strings so to help the test pass even there
StringGen('(\\w| ){0,50}'),
# Once https://github.com/NVIDIA/spark-rapids/issues/139 is fixed replace this with
# date_gen
DateGen(start=date(1590, 1, 1)),
# Once https://github.com/NVIDIA/spark-rapids/issues/140 is fixed replace this with
# timestamp_gen
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))]

# There are race conditions around when individual files are read in for partitioned data
@ignore_order
@pytest.mark.parametrize('orc_gen', orc_part_write_gens, ids=idfn)
def test_part_write_round_trip(spark_tmp_path, orc_gen):
gen_list = [('a', RepeatSeqGen(orc_gen, 10)),
('b', orc_gen)]
data_path = spark_tmp_path + '/ORC_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.partitionBy('a').orc(path),
lambda spark, path: spark.read.orc(path),
data_path)

orc_write_compress_options = ['none', 'uncompressed', 'snappy']
@pytest.mark.parametrize('compress', orc_write_compress_options)
def test_compress_write_round_trip(spark_tmp_path, compress):
data_path = spark_tmp_path + '/ORC_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.orc(path),
lambda spark, path : spark.read.orc(path),
data_path,
conf={'spark.sql.orc.compression.codec': compress})

@pytest.mark.parametrize('orc_gens', orc_write_gens_list, ids=idfn)
def test_write_save_table(spark_tmp_path, orc_gens, spark_tmp_table_factory):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
data_path = spark_tmp_path + '/ORC_DATA'
all_confs={'spark.sql.sources.useV1SourceList': "orc"}
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.format("orc").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
lambda spark, path: spark.read.orc(path),
data_path,
conf=all_confs)

def write_orc_sql_from(spark, df, data_path, write_to_table):
tmp_view_name = 'tmp_view_{}'.format(random.randint(0, 1000000))
df.createOrReplaceTempView(tmp_view_name)
write_cmd = 'CREATE TABLE `{}` USING ORC location \'{}\' AS SELECT * from `{}`'.format(write_to_table, data_path, tmp_view_name)
spark.sql(write_cmd)

@pytest.mark.parametrize('orc_gens', orc_write_gens_list, ids=idfn)
@pytest.mark.parametrize('ts_type', ["TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"])
def test_write_sql_save_table(spark_tmp_path, orc_gens, ts_type, spark_tmp_table_factory):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
data_path = spark_tmp_path + '/ORC_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: write_orc_sql_from(spark, gen_df(spark, gen_list).coalesce(1), path, spark_tmp_table_factory.get()),
lambda spark, path: spark.read.orc(path),
data_path)

@allow_non_gpu('DataWritingCommandExec')
@pytest.mark.parametrize('codec', ['zlib', 'lzo'])
def test_orc_write_compression_fallback(spark_tmp_path, codec, spark_tmp_table_factory):
gen = TimestampGen()
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs={'spark.sql.orc.compression.codec': codec}
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("orc").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
lambda spark, path: spark.read.orc(path),
data_path,
'DataWritingCommandExec',
conf=all_confs)

@allow_non_gpu('DataWritingCommandExec')
def test_buckets_write_fallback(spark_tmp_path, spark_tmp_table_factory):
data_path = spark_tmp_path + '/ORC_DATA'
assert_gpu_fallback_write(
lambda spark, path: spark.range(10e4).write.bucketBy(4, "id").sortBy("id").format('orc').mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
lambda spark, path: spark.read.orc(path),
data_path,
'DataWritingCommandExec')
75 changes: 0 additions & 75 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,81 +332,6 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_con
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

parquet_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, timestamp_gen]]

@pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize('ts_type', ["TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"])
def test_write_round_trip(spark_tmp_path, parquet_gens, v1_enabled_list, ts_type, reader_confs):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': ts_type})
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf=all_confs)

@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS'])
@pytest.mark.parametrize('ts_rebase', ['CORRECTED'])
@ignore_order
def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase):
gen = TimestampGen()
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: unary_op_df(spark, gen).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_type})

parquet_part_write_gens = [
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
# Some file systems have issues with UTF8 strings so to help the test pass even there
StringGen('(\\w| ){0,50}'),
boolean_gen, date_gen, timestamp_gen]

# There are race conditions around when individual files are read in for partitioned data
@ignore_order
@pytest.mark.parametrize('parquet_gen', parquet_part_write_gens, ids=idfn)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS'])
def test_part_write_round_trip(spark_tmp_path, parquet_gen, v1_enabled_list, ts_type, reader_confs):
gen_list = [('a', RepeatSeqGen(parquet_gen, 10)),
('b', parquet_gen)]
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': ts_type})
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.partitionBy('a').parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf=all_confs)

parquet_write_compress_options = ['none', 'uncompressed', 'snappy']
@pytest.mark.parametrize('compress', parquet_write_compress_options)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_compress_write_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.parquet.compression.codec': compress})
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.parquet(path),
lambda spark, path : spark.read.parquet(path),
data_path,
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_input_meta(spark_tmp_path, v1_enabled_list, reader_confs):
Expand Down
Loading

0 comments on commit 2112f7c

Please sign in to comment.