Skip to content

Commit

Permalink
Support faster copy for a custom DataSource V2 which supplies Arrow d…
Browse files Browse the repository at this point in the history
…ata (#1622)

* Add in data source v2, csv file and test for arrow copy
* remove commented out line
  • Loading branch information
tgravescs authored Jan 29, 2021
1 parent d4f5753 commit f4c912a
Show file tree
Hide file tree
Showing 12 changed files with 1,109 additions and 39 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ scala> spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)

Name | Description | Default Value
-----|-------------|--------------
<a name="alluxio.pathsToReplace"></a>spark.rapids.alluxio.pathsToReplace|List of paths to be replaced with corresponding alluxio scheme. Eg, when configureis set to "s3:/foo->alluxio://0.1.2.3:19998/foo,gcs:/bar->alluxio://0.1.2.3:19998/bar", which means: s3:/foo/a.csv will be replaced to alluxio://0.1.2.3:19998/foo/a.csv and gcs:/bar/b.csv will be replaced to alluxio://0.1.2.3:19998/bar/b.csv|None
<a name="cloudSchemes"></a>spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: dbfs, s3, s3a, s3n, wasbs, gs. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None
<a name="memory.gpu.allocFraction"></a>spark.rapids.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory. Extra memory will be allocated as needed, but it may result in more fragmentation. This must be less than or equal to the maximum limit configured via spark.rapids.memory.gpu.maxAllocFraction.|0.9
<a name="memory.gpu.debug"></a>spark.rapids.memory.gpu.debug|Provides a log of GPU memory allocations and frees. If set to STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. All other values are reserved for possible future expansion and in the mean time will disable logging.|NONE
Expand Down
104 changes: 104 additions & 0 deletions integration_tests/src/main/python/datasourcev2_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from marks import *
from pyspark.sql.types import *
from spark_session import with_cpu_session

# This test requires a datasource v2 jar containing the class
# org.apache.spark.sql.connector.InMemoryTableCatalog
# which returns ArrowColumnVectors be specified in order for it to run.
# If that class is not present it skips the tests.

catalogName = "columnar"
columnarClass = 'org.apache.spark.sql.connector.InMemoryTableCatalog'

def createPeopleCSVDf(spark, peopleCSVLocation):
return spark.read.format("csv")\
.option("header", "false")\
.option("inferSchema", "true")\
.load(peopleCSVLocation)\
.withColumnRenamed("_c0", "name")\
.withColumnRenamed("_c1", "age")\
.withColumnRenamed("_c2", "job")

def setupInMemoryTableWithPartitioning(spark, csv, tname, column_and_table):
peopleCSVDf = createPeopleCSVDf(spark, csv)
peopleCSVDf.createOrReplaceTempView(tname)
spark.table(tname).write.partitionBy("job").saveAsTable(column_and_table)

def setupInMemoryTableNoPartitioning(spark, csv, tname, column_and_table):
peopleCSVDf = createPeopleCSVDf(spark, csv)
peopleCSVDf.createOrReplaceTempView(tname)
spark.table(tname).write.saveAsTable(column_and_table)

def readTable(csvPath, tableToRead):
return lambda spark: spark.table(tableToRead)\
.orderBy("name", "age")

def createDatabase(spark):
try:
spark.sql("create database IF NOT EXISTS " + catalogName)
spark.sql("use " + catalogName)
except Exception:
pytest.skip("Failed to load catalog for datasource v2 {}, jar is probably missing".format(columnarClass))

def cleanupDatabase(spark):
spark.sql("drop database IF EXISTS " + catalogName)

@pytest.fixture(autouse=True)
def setupAndCleanUp():
with_cpu_session(lambda spark : createDatabase(spark),
conf={'spark.sql.catalog.columnar': columnarClass})
yield
with_cpu_session(lambda spark : cleanupDatabase(spark),
conf={'spark.sql.catalog.columnar': columnarClass})

@allow_non_gpu('ShowTablesExec', 'DropTableExec')
@pytest.mark.parametrize('csv', ['people.csv'])
def test_read_round_trip_partitioned(std_input_path, csv, spark_tmp_table_factory):
csvPath = std_input_path + "/" + csv
tableName = spark_tmp_table_factory.get()
columnarTableName = catalogName + "." + tableName
with_cpu_session(lambda spark : setupInMemoryTableWithPartitioning(spark, csvPath, tableName, columnarTableName),
conf={'spark.sql.catalog.columnar': columnarClass})
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableName),
conf={'spark.sql.catalog.columnar': columnarClass})

@allow_non_gpu('ShowTablesExec', 'DropTableExec')
@pytest.mark.parametrize('csv', ['people.csv'])
def test_read_round_trip_no_partitioned(std_input_path, csv, spark_tmp_table_factory):
csvPath = std_input_path + "/" + csv
tableNameNoPart = spark_tmp_table_factory.get()
columnarTableNameNoPart = catalogName + "." + tableNameNoPart
with_cpu_session(lambda spark : setupInMemoryTableNoPartitioning(spark, csvPath, tableNameNoPart, columnarTableNameNoPart),
conf={'spark.sql.catalog.columnar': columnarClass})
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableNameNoPart),
conf={'spark.sql.catalog.columnar': columnarClass})

@allow_non_gpu('ShowTablesExec', 'DropTableExec')
@pytest.mark.parametrize('csv', ['people.csv'])
def test_read_round_trip_no_partitioned_arrow_off(std_input_path, csv, spark_tmp_table_factory):
csvPath = std_input_path + "/" + csv
tableNameNoPart = spark_tmp_table_factory.get()
columnarTableNameNoPart = catalogName + "." + tableNameNoPart
with_cpu_session(lambda spark : setupInMemoryTableNoPartitioning(spark, csvPath, tableNameNoPart, columnarTableNameNoPart),
conf={'spark.sql.catalog.columnar': columnarClass,
'spark.rapids.arrowCopyOptmizationEnabled': 'false'})
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableNameNoPart),
conf={'spark.sql.catalog.columnar': columnarClass,
'spark.rapids.arrowCopyOptmizationEnabled': 'false'})
66 changes: 66 additions & 0 deletions integration_tests/src/test/resources/people.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
Jackelyn,23,Engineer
Jeannie,40,Banker
Mariella,20,Professor
Ardith,33,Professor
Elena,41,Banker
Noma,39,Student
Corliss,24,Student
Denae,24,Banker
Ned,44,Professor
Karolyn,,Engineer
Cornelia,45,Sales Manager
Kiyoko,41,Banker
Denisha,45,Engineer
Hilton,21,Sales Manager
Becky,32,Sales Manager
Wendie,29,Banker
Veronica,25,Sales Manager
Carolina,26,Student
Laquita,47,Banker
Stephani,,Professor
Emile,29,Professor
Octavio,35,Banker
Florencio,34,Banker
Elna,38,Banker
Cherri,23,Banker
Raleigh,47,Banker
Hollis,32,Professor
Charlette,35,Professor
Yetta,31,Student
Alfreda,,Engineer
Brigette,45,Banker
Maryann,40,Sales Manager
Miki,37,Banker
Pearle,37,Sales Manager
Damian,21,Sales Manager
Theodore,34,Student
Kristi,46,Engineer
Izetta,45,Professor
Tammi,43,Engineer
Star,,Sales Manager
Kaylee,27,Professor
Lakeshia,48,Professor
Elba,43,Sales Manager
Valencia,20,Engineer
Randa,35,Banker
Lourie,36,Professor
Tracie,31,Banker
Antwan,40,Professor
Gerry,23,Student
Jason,,Banker
Steve,51,CEO
Faker,25,Gamer
가나다라마바사,30,Banker
가나다마바사,20,Banker
가나다나다라마바,21,Professor
가나다나라마바사,22,Engineer
가나나나가나나,23,Sales Manager
가나다나다나나다,24,Banker
나다나다가나다,25,Student
Elon,,CEO
Yohwan,38,Gamer
Kitaek,48,Jobless
Kijung,22,Tutor
KiWoo,23,Tutor
Dongik,43,CEO
Dasong,9,Student
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package com.nvidia.spark.rapids.shims.spark300

import java.nio.ByteBuffer
import java.time.ZoneId

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.spark300.RapidsShuffleManager
import org.apache.arrow.vector.ValueVector
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkEnv
Expand Down Expand Up @@ -450,6 +452,19 @@ class Spark300Shims extends SparkShims {
InMemoryFileIndex.shouldFilterOut(path)
}

// Arrow version changed between Spark versions
override def getArrowDataBuf(vec: ValueVector): ByteBuffer = {
vec.getDataBuffer().nioBuffer()
}

override def getArrowValidityBuf(vec: ValueVector): ByteBuffer = {
vec.getValidityBuffer().nioBuffer()
}

override def getArrowOffsetsBuf(vec: ValueVector): ByteBuffer = {
vec.getOffsetBuffer().nioBuffer()
}

override def replaceWithAlluxioPathIfNeeded(
conf: RapidsConf,
relation: HadoopFsRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.nvidia.spark.rapids.shims.spark311

import java.nio.ByteBuffer

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark301.Spark301Shims
import com.nvidia.spark.rapids.spark311.RapidsShuffleManager
import org.apache.arrow.vector.ValueVector

import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -387,4 +390,17 @@ class Spark311Shims extends Spark301Shims {
override def shouldIgnorePath(path: String): Boolean = {
HadoopFSUtilsShim.shouldIgnorePath(path)
}

// Arrow version changed between Spark versions
override def getArrowDataBuf(vec: ValueVector): ByteBuffer = {
vec.getDataBuffer.nioBuffer()
}

override def getArrowValidityBuf(vec: ValueVector): ByteBuffer = {
vec.getValidityBuffer.nioBuffer()
}

override def getArrowOffsetsBuf(vec: ValueVector): ByteBuffer = {
vec.getOffsetBuffer.nioBuffer()
}
}
Loading

0 comments on commit f4c912a

Please sign in to comment.