Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support faster copy for a custom DataSource V2 which supplies Arrow data #1622

Merged
merged 51 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
6509c8a
Add in data source v2, csv file and stub test
tgravescs Jan 8, 2021
1777628
update datasource v2 test
tgravescs Jan 8, 2021
dbbaf30
fix up test issues
tgravescs Jan 8, 2021
927abfe
add no partitioned test
tgravescs Jan 8, 2021
39dc434
Fix up test to properly work with in memory table datasource
tgravescs Jan 8, 2021
78a54ff
logs in hostcolumnar
tgravescs Jan 13, 2021
19d9685
Add test code and AccessibleArrowcolumnVector
tgravescs Jan 15, 2021
e012476
Fix accesible retrieval
tgravescs Jan 15, 2021
29f3d25
working
tgravescs Jan 15, 2021
a983d33
more debug
tgravescs Jan 20, 2021
14844f2
first go
tgravescs Jan 20, 2021
6e9fdb9
building
tgravescs Jan 20, 2021
d3db4ce
more changes
tgravescs Jan 20, 2021
fb006cb
fix static
tgravescs Jan 20, 2021
eb1dd14
more changes
tgravescs Jan 21, 2021
1414079
more cudf changes
tgravescs Jan 22, 2021
396a488
debug
tgravescs Jan 22, 2021
17e446b
working checkpoint
tgravescs Jan 25, 2021
47de41b
working without mem crash on free
tgravescs Jan 25, 2021
ecc953a
remove use of HostmMeoryBuffer
tgravescs Jan 25, 2021
016bad0
check null count
tgravescs Jan 26, 2021
a342c05
changes
tgravescs Jan 26, 2021
d72f255
Merge remote-tracking branch 'origin/branch-0.4' into sktDatasourceV2
tgravescs Jan 26, 2021
40e4115
comment
tgravescs Jan 26, 2021
0e441ef
working
tgravescs Jan 26, 2021
7a7fc22
working
tgravescs Jan 27, 2021
2f42c08
Update to the new CUDF code and add a config to toggle arrow copy on and
tgravescs Jan 27, 2021
3bdf7f1
have the reflection code to access ArrowColumnVEctor working
tgravescs Jan 28, 2021
5b7561c
remove logging and commonize
tgravescs Jan 28, 2021
95a9e63
cleanup
tgravescs Jan 28, 2021
5527531
formatting
tgravescs Jan 28, 2021
ce1d7b0
formatting
tgravescs Jan 28, 2021
393ff6f
expand comment
tgravescs Jan 28, 2021
1572afc
move getting Arrow vectors into shims since Arrow versions changed be…
tgravescs Jan 28, 2021
d1ba0a3
Revert "move getting Arrow vectors into shims since Arrow versions ch…
tgravescs Jan 28, 2021
c4c1f61
shims working
tgravescs Jan 28, 2021
4e19741
fix test and check types
tgravescs Jan 29, 2021
624abbc
Add test for host iterator
tgravescs Jan 29, 2021
765d36a
working tests
tgravescs Jan 29, 2021
b24d3d5
fix style
tgravescs Jan 29, 2021
11dc637
cleanup
tgravescs Jan 29, 2021
5dd07bc
comment test
tgravescs Jan 29, 2021
5ba6a51
update copyright and wrap exception
tgravescs Jan 29, 2021
f5bfeae
comment test
tgravescs Jan 29, 2021
5617421
update comment in exception
tgravescs Jan 29, 2021
5b5f9db
Merge remote-tracking branch 'origin/branch-0.4' into sktDatasourceV2
tgravescs Jan 29, 2021
c68c26d
fix merge conflicts
tgravescs Jan 29, 2021
513cea0
use case, logDebug, update test to use tmp table
tgravescs Jan 29, 2021
9b1a2fa
make some of the reflection lazy vals so it only happens once
tgravescs Jan 29, 2021
0c28106
fix line length
tgravescs Jan 29, 2021
7ce4bef
remove commented out line
tgravescs Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ scala> spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true)

Name | Description | Default Value
-----|-------------|--------------
<a name="alluxio.pathsToReplace"></a>spark.rapids.alluxio.pathsToReplace|List of paths to be replaced with corresponding alluxio scheme. Eg, when configureis set to "s3:/foo->alluxio://0.1.2.3:19998/foo,gcs:/bar->alluxio://0.1.2.3:19998/bar", which means: s3:/foo/a.csv will be replaced to alluxio://0.1.2.3:19998/foo/a.csv and gcs:/bar/b.csv will be replaced to alluxio://0.1.2.3:19998/bar/b.csv|None
<a name="cloudSchemes"></a>spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: dbfs, s3, s3a, s3n, wasbs, gs. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None
<a name="memory.gpu.allocFraction"></a>spark.rapids.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory. Extra memory will be allocated as needed, but it may result in more fragmentation. This must be less than or equal to the maximum limit configured via spark.rapids.memory.gpu.maxAllocFraction.|0.9
<a name="memory.gpu.debug"></a>spark.rapids.memory.gpu.debug|Provides a log of GPU memory allocations and frees. If set to STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. All other values are reserved for possible future expansion and in the mean time will disable logging.|NONE
Expand Down
104 changes: 104 additions & 0 deletions integration_tests/src/main/python/datasourcev2_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from marks import *
from pyspark.sql.types import *
from spark_session import with_cpu_session

# This test requires a datasource v2 jar containing the class
# org.apache.spark.sql.connector.InMemoryTableCatalog
# which returns ArrowColumnVectors be specified in order for it to run.
# If that class is not present it skips the tests.

catalogName = "columnar"
columnarClass = 'org.apache.spark.sql.connector.InMemoryTableCatalog'

def createPeopleCSVDf(spark, peopleCSVLocation):
return spark.read.format("csv")\
.option("header", "false")\
.option("inferSchema", "true")\
.load(peopleCSVLocation)\
.withColumnRenamed("_c0", "name")\
.withColumnRenamed("_c1", "age")\
.withColumnRenamed("_c2", "job")

def setupInMemoryTableWithPartitioning(spark, csv, tname, column_and_table):
peopleCSVDf = createPeopleCSVDf(spark, csv)
peopleCSVDf.createOrReplaceTempView(tname)
spark.table(tname).write.partitionBy("job").saveAsTable(column_and_table)

def setupInMemoryTableNoPartitioning(spark, csv, tname, column_and_table):
peopleCSVDf = createPeopleCSVDf(spark, csv)
peopleCSVDf.createOrReplaceTempView(tname)
spark.table(tname).write.saveAsTable(column_and_table)

def readTable(csvPath, tableToRead):
return lambda spark: spark.table(tableToRead)\
.orderBy("name", "age")

def createDatabase(spark):
try:
spark.sql("create database IF NOT EXISTS " + catalogName)
spark.sql("use " + catalogName)
except Exception:
pytest.skip("Failed to load catalog for datasource v2 {}, jar is probably missing".format(columnarClass))

def cleanupDatabase(spark):
spark.sql("drop database IF EXISTS " + catalogName)

@pytest.fixture(autouse=True)
def setupAndCleanUp():
with_cpu_session(lambda spark : createDatabase(spark),
conf={'spark.sql.catalog.columnar': columnarClass})
yield
with_cpu_session(lambda spark : cleanupDatabase(spark),
conf={'spark.sql.catalog.columnar': columnarClass})

@allow_non_gpu('ShowTablesExec', 'DropTableExec')
@pytest.mark.parametrize('csv', ['people.csv'])
def test_read_round_trip_partitioned(std_input_path, csv, spark_tmp_table_factory):
csvPath = std_input_path + "/" + csv
tableName = spark_tmp_table_factory.get()
columnarTableName = catalogName + "." + tableName
with_cpu_session(lambda spark : setupInMemoryTableWithPartitioning(spark, csvPath, tableName, columnarTableName),
conf={'spark.sql.catalog.columnar': columnarClass})
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableName),
conf={'spark.sql.catalog.columnar': columnarClass})

@allow_non_gpu('ShowTablesExec', 'DropTableExec')
@pytest.mark.parametrize('csv', ['people.csv'])
def test_read_round_trip_no_partitioned(std_input_path, csv, spark_tmp_table_factory):
csvPath = std_input_path + "/" + csv
tableNameNoPart = spark_tmp_table_factory.get()
columnarTableNameNoPart = catalogName + "." + tableNameNoPart
with_cpu_session(lambda spark : setupInMemoryTableNoPartitioning(spark, csvPath, tableNameNoPart, columnarTableNameNoPart),
conf={'spark.sql.catalog.columnar': columnarClass})
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableNameNoPart),
conf={'spark.sql.catalog.columnar': columnarClass})

@allow_non_gpu('ShowTablesExec', 'DropTableExec')
@pytest.mark.parametrize('csv', ['people.csv'])
def test_read_round_trip_no_partitioned_arrow_off(std_input_path, csv, spark_tmp_table_factory):
csvPath = std_input_path + "/" + csv
tableNameNoPart = spark_tmp_table_factory.get()
columnarTableNameNoPart = catalogName + "." + tableNameNoPart
with_cpu_session(lambda spark : setupInMemoryTableNoPartitioning(spark, csvPath, tableNameNoPart, columnarTableNameNoPart),
conf={'spark.sql.catalog.columnar': columnarClass,
'spark.rapids.arrowCopyOptmizationEnabled': 'false'})
assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableNameNoPart),
conf={'spark.sql.catalog.columnar': columnarClass,
'spark.rapids.arrowCopyOptmizationEnabled': 'false'})
66 changes: 66 additions & 0 deletions integration_tests/src/test/resources/people.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
Jackelyn,23,Engineer
Jeannie,40,Banker
Mariella,20,Professor
Ardith,33,Professor
Elena,41,Banker
Noma,39,Student
Corliss,24,Student
Denae,24,Banker
Ned,44,Professor
Karolyn,,Engineer
Cornelia,45,Sales Manager
Kiyoko,41,Banker
Denisha,45,Engineer
Hilton,21,Sales Manager
Becky,32,Sales Manager
Wendie,29,Banker
Veronica,25,Sales Manager
Carolina,26,Student
Laquita,47,Banker
Stephani,,Professor
Emile,29,Professor
Octavio,35,Banker
Florencio,34,Banker
Elna,38,Banker
Cherri,23,Banker
Raleigh,47,Banker
Hollis,32,Professor
Charlette,35,Professor
Yetta,31,Student
Alfreda,,Engineer
Brigette,45,Banker
Maryann,40,Sales Manager
Miki,37,Banker
Pearle,37,Sales Manager
Damian,21,Sales Manager
Theodore,34,Student
Kristi,46,Engineer
Izetta,45,Professor
Tammi,43,Engineer
Star,,Sales Manager
Kaylee,27,Professor
Lakeshia,48,Professor
Elba,43,Sales Manager
Valencia,20,Engineer
Randa,35,Banker
Lourie,36,Professor
Tracie,31,Banker
Antwan,40,Professor
Gerry,23,Student
Jason,,Banker
Steve,51,CEO
Faker,25,Gamer
가나다라마바사,30,Banker
가나다마바사,20,Banker
가나다나다라마바,21,Professor
가나다나라마바사,22,Engineer
가나나나가나나,23,Sales Manager
가나다나다나나다,24,Banker
나다나다가나다,25,Student
Elon,,CEO
Yohwan,38,Gamer
Kitaek,48,Jobless
Kijung,22,Tutor
KiWoo,23,Tutor
Dongik,43,CEO
Dasong,9,Student
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package com.nvidia.spark.rapids.shims.spark300

import java.nio.ByteBuffer
import java.time.ZoneId

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.spark300.RapidsShuffleManager
import org.apache.arrow.vector.ValueVector
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkEnv
Expand Down Expand Up @@ -450,6 +452,19 @@ class Spark300Shims extends SparkShims {
InMemoryFileIndex.shouldFilterOut(path)
}

// Arrow version changed between Spark versions
override def getArrowDataBuf(vec: ValueVector): ByteBuffer = {
vec.getDataBuffer().nioBuffer()
}

override def getArrowValidityBuf(vec: ValueVector): ByteBuffer = {
vec.getValidityBuffer().nioBuffer()
}

override def getArrowOffsetsBuf(vec: ValueVector): ByteBuffer = {
vec.getOffsetBuffer().nioBuffer()
}

override def replaceWithAlluxioPathIfNeeded(
conf: RapidsConf,
relation: HadoopFsRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.nvidia.spark.rapids.shims.spark311

import java.nio.ByteBuffer

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark301.Spark301Shims
import com.nvidia.spark.rapids.spark311.RapidsShuffleManager
import org.apache.arrow.vector.ValueVector

import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -387,4 +390,17 @@ class Spark311Shims extends Spark301Shims {
override def shouldIgnorePath(path: String): Boolean = {
HadoopFSUtilsShim.shouldIgnorePath(path)
}

// Arrow version changed between Spark versions
override def getArrowDataBuf(vec: ValueVector): ByteBuffer = {
vec.getDataBuffer.nioBuffer()
}

override def getArrowValidityBuf(vec: ValueVector): ByteBuffer = {
vec.getValidityBuffer.nioBuffer()
}

override def getArrowOffsetsBuf(vec: ValueVector): ByteBuffer = {
vec.getOffsetBuffer.nioBuffer()
}
}
Loading