Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-threaded reading for avro #5421

Merged
merged 9 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ Name | Description | Default Value
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
<a name="sql.format.avro.enabled"></a>spark.rapids.sql.format.avro.enabled|When set to true enables all avro input and output acceleration. (only input is currently supported anyways)|false
<a name="sql.format.avro.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.avro.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type|2147483647
<a name="sql.format.avro.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.avro.multiThreadedRead.numThreads|The maximum number of threads, on one executor, to use for reading small avro files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type.|20
<a name="sql.format.avro.read.enabled"></a>spark.rapids.sql.format.avro.read.enabled|When set to true enables avro input acceleration|false
<a name="sql.format.avro.reader.type"></a>spark.rapids.sql.format.avro.reader.type|Sets the avro reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using COALESCING is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.avro.multiThreadedRead.numThreads. By default this is set to AUTO so we select the reader we think is best. This will be COALESCING.|AUTO
<a name="sql.format.avro.reader.type"></a>spark.rapids.sql.format.avro.reader.type|Sets the avro reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.avro.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.format.avro.multiThreadedRead.numThreads and spark.rapids.sql.format.avro.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.json.enabled"></a>spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false
Expand Down
22 changes: 21 additions & 1 deletion integration_tests/src/main/python/avro_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
'spark.rapids.sql.format.avro.enabled': 'true',
'spark.rapids.sql.format.avro.read.enabled': 'true'}

rapids_reader_types = ['PERFILE', 'COALESCING']
rapids_reader_types = ['PERFILE', 'COALESCING', 'MULTITHREADED']

# 50 files for the coalescing reading case
coalescingPartitionNum = 50
Expand Down Expand Up @@ -117,3 +117,23 @@ def test_coalescing_uniform_sync(spark_tmp_path, v1_enabled_list):
# read the coalesced files by CPU
with_cpu_session(
lambda spark: spark.read.format("avro").load(dump_path).collect())


@ignore_order(local=True)
@pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_avro_read_with_corrupt_files(spark_tmp_path, reader_type, v1_enabled_list):
first_dpath = spark_tmp_path + '/AVRO_DATA/first'
with_cpu_session(lambda spark : spark.range(1).toDF("a").write.format("avro").save(first_dpath))
second_dpath = spark_tmp_path + '/AVRO_DATA/second'
with_cpu_session(lambda spark : spark.range(1, 2).toDF("a").write.format("avro").save(second_dpath))
third_dpath = spark_tmp_path + '/AVRO_DATA/third'
with_cpu_session(lambda spark : spark.range(2, 3).toDF("a").write.json(third_dpath))

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.sql.files.ignoreCorruptFiles': "true",
'spark.sql.sources.useV1SourceList': v1_enabled_list})

assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.format("avro").load([first_dpath, second_dpath, third_dpath]),
conf=all_confs)
Loading