forked from NVIDIA/spark-rapids
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support faster copy for a custom DataSource V2 which supplies Arrow d…
…ata (NVIDIA#1622) * Add in data source v2, csv file and test for arrow copy * remove commented out line
- Loading branch information
Showing
12 changed files
with
1,109 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
# Copyright (c) 2021, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import pytest | ||
|
||
from asserts import assert_gpu_and_cpu_are_equal_collect | ||
from marks import * | ||
from pyspark.sql.types import * | ||
from spark_session import with_cpu_session | ||
|
||
# This test requires a datasource v2 jar containing the class | ||
# org.apache.spark.sql.connector.InMemoryTableCatalog | ||
# which returns ArrowColumnVectors be specified in order for it to run. | ||
# If that class is not present it skips the tests. | ||
|
||
catalogName = "columnar" | ||
columnarClass = 'org.apache.spark.sql.connector.InMemoryTableCatalog' | ||
|
||
def createPeopleCSVDf(spark, peopleCSVLocation): | ||
return spark.read.format("csv")\ | ||
.option("header", "false")\ | ||
.option("inferSchema", "true")\ | ||
.load(peopleCSVLocation)\ | ||
.withColumnRenamed("_c0", "name")\ | ||
.withColumnRenamed("_c1", "age")\ | ||
.withColumnRenamed("_c2", "job") | ||
|
||
def setupInMemoryTableWithPartitioning(spark, csv, tname, column_and_table): | ||
peopleCSVDf = createPeopleCSVDf(spark, csv) | ||
peopleCSVDf.createOrReplaceTempView(tname) | ||
spark.table(tname).write.partitionBy("job").saveAsTable(column_and_table) | ||
|
||
def setupInMemoryTableNoPartitioning(spark, csv, tname, column_and_table): | ||
peopleCSVDf = createPeopleCSVDf(spark, csv) | ||
peopleCSVDf.createOrReplaceTempView(tname) | ||
spark.table(tname).write.saveAsTable(column_and_table) | ||
|
||
def readTable(csvPath, tableToRead): | ||
return lambda spark: spark.table(tableToRead)\ | ||
.orderBy("name", "age") | ||
|
||
def createDatabase(spark): | ||
try: | ||
spark.sql("create database IF NOT EXISTS " + catalogName) | ||
spark.sql("use " + catalogName) | ||
except Exception: | ||
pytest.skip("Failed to load catalog for datasource v2 {}, jar is probably missing".format(columnarClass)) | ||
|
||
def cleanupDatabase(spark): | ||
spark.sql("drop database IF EXISTS " + catalogName) | ||
|
||
@pytest.fixture(autouse=True) | ||
def setupAndCleanUp(): | ||
with_cpu_session(lambda spark : createDatabase(spark), | ||
conf={'spark.sql.catalog.columnar': columnarClass}) | ||
yield | ||
with_cpu_session(lambda spark : cleanupDatabase(spark), | ||
conf={'spark.sql.catalog.columnar': columnarClass}) | ||
|
||
@allow_non_gpu('ShowTablesExec', 'DropTableExec') | ||
@pytest.mark.parametrize('csv', ['people.csv']) | ||
def test_read_round_trip_partitioned(std_input_path, csv, spark_tmp_table_factory): | ||
csvPath = std_input_path + "/" + csv | ||
tableName = spark_tmp_table_factory.get() | ||
columnarTableName = catalogName + "." + tableName | ||
with_cpu_session(lambda spark : setupInMemoryTableWithPartitioning(spark, csvPath, tableName, columnarTableName), | ||
conf={'spark.sql.catalog.columnar': columnarClass}) | ||
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableName), | ||
conf={'spark.sql.catalog.columnar': columnarClass}) | ||
|
||
@allow_non_gpu('ShowTablesExec', 'DropTableExec') | ||
@pytest.mark.parametrize('csv', ['people.csv']) | ||
def test_read_round_trip_no_partitioned(std_input_path, csv, spark_tmp_table_factory): | ||
csvPath = std_input_path + "/" + csv | ||
tableNameNoPart = spark_tmp_table_factory.get() | ||
columnarTableNameNoPart = catalogName + "." + tableNameNoPart | ||
with_cpu_session(lambda spark : setupInMemoryTableNoPartitioning(spark, csvPath, tableNameNoPart, columnarTableNameNoPart), | ||
conf={'spark.sql.catalog.columnar': columnarClass}) | ||
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableNameNoPart), | ||
conf={'spark.sql.catalog.columnar': columnarClass}) | ||
|
||
@allow_non_gpu('ShowTablesExec', 'DropTableExec') | ||
@pytest.mark.parametrize('csv', ['people.csv']) | ||
def test_read_round_trip_no_partitioned_arrow_off(std_input_path, csv, spark_tmp_table_factory): | ||
csvPath = std_input_path + "/" + csv | ||
tableNameNoPart = spark_tmp_table_factory.get() | ||
columnarTableNameNoPart = catalogName + "." + tableNameNoPart | ||
with_cpu_session(lambda spark : setupInMemoryTableNoPartitioning(spark, csvPath, tableNameNoPart, columnarTableNameNoPart), | ||
conf={'spark.sql.catalog.columnar': columnarClass, | ||
'spark.rapids.arrowCopyOptmizationEnabled': 'false'}) | ||
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableNameNoPart), | ||
conf={'spark.sql.catalog.columnar': columnarClass, | ||
'spark.rapids.arrowCopyOptmizationEnabled': 'false'}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
Jackelyn,23,Engineer | ||
Jeannie,40,Banker | ||
Mariella,20,Professor | ||
Ardith,33,Professor | ||
Elena,41,Banker | ||
Noma,39,Student | ||
Corliss,24,Student | ||
Denae,24,Banker | ||
Ned,44,Professor | ||
Karolyn,,Engineer | ||
Cornelia,45,Sales Manager | ||
Kiyoko,41,Banker | ||
Denisha,45,Engineer | ||
Hilton,21,Sales Manager | ||
Becky,32,Sales Manager | ||
Wendie,29,Banker | ||
Veronica,25,Sales Manager | ||
Carolina,26,Student | ||
Laquita,47,Banker | ||
Stephani,,Professor | ||
Emile,29,Professor | ||
Octavio,35,Banker | ||
Florencio,34,Banker | ||
Elna,38,Banker | ||
Cherri,23,Banker | ||
Raleigh,47,Banker | ||
Hollis,32,Professor | ||
Charlette,35,Professor | ||
Yetta,31,Student | ||
Alfreda,,Engineer | ||
Brigette,45,Banker | ||
Maryann,40,Sales Manager | ||
Miki,37,Banker | ||
Pearle,37,Sales Manager | ||
Damian,21,Sales Manager | ||
Theodore,34,Student | ||
Kristi,46,Engineer | ||
Izetta,45,Professor | ||
Tammi,43,Engineer | ||
Star,,Sales Manager | ||
Kaylee,27,Professor | ||
Lakeshia,48,Professor | ||
Elba,43,Sales Manager | ||
Valencia,20,Engineer | ||
Randa,35,Banker | ||
Lourie,36,Professor | ||
Tracie,31,Banker | ||
Antwan,40,Professor | ||
Gerry,23,Student | ||
Jason,,Banker | ||
Steve,51,CEO | ||
Faker,25,Gamer | ||
가나다라마바사,30,Banker | ||
가나다마바사,20,Banker | ||
가나다나다라마바,21,Professor | ||
가나다나라마바사,22,Engineer | ||
가나나나가나나,23,Sales Manager | ||
가나다나다나나다,24,Banker | ||
나다나다가나다,25,Student | ||
Elon,,CEO | ||
Yohwan,38,Gamer | ||
Kitaek,48,Jobless | ||
Kijung,22,Tutor | ||
KiWoo,23,Tutor | ||
Dongik,43,CEO | ||
Dasong,9,Student |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.