Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support coalescing reading for avro #5306

Merged
merged 9 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ Name | Description | Default Value
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
<a name="sql.format.avro.enabled"></a>spark.rapids.sql.format.avro.enabled|When set to true enables all avro input and output acceleration. (only input is currently supported anyways)|false
<a name="sql.format.avro.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.avro.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type|2147483647
<a name="sql.format.avro.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.avro.multiThreadedRead.numThreads|The maximum number of threads, on one executor, to use for reading small avro files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type.|20
<a name="sql.format.avro.read.enabled"></a>spark.rapids.sql.format.avro.read.enabled|When set to true enables avro input acceleration|false
<a name="sql.format.avro.reader.type"></a>spark.rapids.sql.format.avro.reader.type|Sets the avro reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using COALESCING is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.avro.multiThreadedRead.numThreads. By default this is set to AUTO so we select the reader we think is best. This will be COALESCING.|AUTO
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.json.enabled"></a>spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false
Expand Down
9 changes: 3 additions & 6 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,10 @@ else
TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$INTEGRATION_TEST_VERSION.jar)
fi

# `./run_pyspark_from_build.sh` runs all tests including avro_test.py with spark-avro.jar
# in the classpath.
# `./run_pyspark_from_build.sh` runs all the tests excluding the avro tests in 'avro_test.py'.
#
# `./run_pyspark_from_build.sh -k xxx ` runs all xxx tests with spark-avro.jar in the classpath
#
# `INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh` run all tests (except the marker skipif())
# without spark-avro.jar
# `INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh` runs all the tests, including the tests
# in 'avro_test.py'.
if [[ $( echo ${INCLUDE_SPARK_AVRO_JAR} | tr [:upper:] [:lower:] ) == "true" ]];
then
export INCLUDE_SPARK_AVRO_JAR=true
Expand Down
33 changes: 22 additions & 11 deletions integration_tests/src/main/python/avro_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,56 @@
'spark.rapids.sql.format.avro.enabled': 'true',
'spark.rapids.sql.format.avro.read.enabled': 'true'}

rapids_reader_types = ['PERFILE', 'COALESCING']

@pytest.mark.parametrize('gen', support_gens)
@pytest.mark.parametrize('v1_enabled_list', ["avro", ""])
def test_basic_read(spark_tmp_path, gen, v1_enabled_list):
@pytest.mark.parametrize('v1_enabled_list', ["avro", ""], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_basic_read(spark_tmp_path, v1_enabled_list, reader_type):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
data_path = spark_tmp_path + '/AVRO_DATA'
# 50 files for the coalescing reading case
jlowe marked this conversation as resolved.
Show resolved Hide resolved
with_cpu_session(
lambda spark: unary_op_df(spark, gen).write.format("avro").save(data_path)
lambda spark: gen_df(spark, gen_list).repartition(50).write.format("avro").save(data_path)
)

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path),
conf=all_confs)


@pytest.mark.parametrize('v1_enabled_list', ["", "avro"])
def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list):
@pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_type):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0/key2=20'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(first_data_path))
lambda spark: gen_df(spark,
gen_list).repartition(50).write.format("avro").save(first_data_path))
wbo4958 marked this conversation as resolved.
Show resolved Hide resolved
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1/key2=21'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(second_data_path))
lambda spark: gen_df(spark,
gen_list).repartition(50).write.format("avro").save(second_data_path))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
third_data_path = spark_tmp_path + '/AVRO_DATA/key=2/key2=22'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(third_data_path))
lambda spark: gen_df(spark,
gen_list).repartition(50).write.format("avro").save(third_data_path))

data_path = spark_tmp_path + '/AVRO_DATA'

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path),
conf=all_confs)


@pytest.mark.parametrize('v1_enabled_list', ["", "avro"])
def test_avro_input_meta(spark_tmp_path, v1_enabled_list):
@pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_avro_input_meta(spark_tmp_path, v1_enabled_list, reader_type):
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(first_data_path))
Expand All @@ -79,6 +89,7 @@ def test_avro_input_meta(spark_tmp_path, v1_enabled_list):
data_path = spark_tmp_path + '/AVRO_DATA'

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package com.nvidia.spark.rapids
import java.io.{InputStream, IOException}
import java.nio.charset.StandardCharsets

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
jlowe marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.avro.Schema
import org.apache.avro.file.{DataFileConstants, SeekableInput}
import org.apache.avro.file.DataFileConstants.{MAGIC, SYNC_SIZE}
import org.apache.avro.io.{BinaryData, BinaryDecoder, DecoderFactory}
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}

private class SeekableInputStream(in: SeekableInput) extends InputStream with SeekableInput {
var oneByte = new Array[Byte](1)
Expand Down Expand Up @@ -59,22 +59,68 @@ private class SeekableInputStream(in: SeekableInput) extends InputStream with Se
}

/**
* The header information of Avro file
* The header information of an Avro file.
*/
class Header {
var meta = Map[String, Array[Byte]]()
var metaKeyList = ArrayBuffer[String]()
var sync = new Array[Byte](DataFileConstants.SYNC_SIZE)
var schema: Schema = _
private var firstBlockStart: Long = _

private[rapids] def update(schemaValue: String, firstBlockStart: Long) = {
schema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
.parse(schemaValue)
this.firstBlockStart = firstBlockStart
class Header private[rapids] {
private[rapids] val meta = mutable.Map[String, Array[Byte]]()
private[rapids] val sync = new Array[Byte](DataFileConstants.SYNC_SIZE)
private[rapids] var headerSize: Option[Long] = None
jlowe marked this conversation as resolved.
Show resolved Hide resolved

def firstBlockStart: Long = headerSize.getOrElse {
val out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM)
AvroFileWriter(out).writeHeader(this)
val newSize = out.getByteCount
headerSize = Some(newSize)
newSize
}

def getFirstBlockStart: Long = firstBlockStart
@transient
lazy val schema: Schema = {
getMetaString(DataFileConstants.SCHEMA)
.map(s => new Schema.Parser().setValidateDefaults(false).setValidate(false).parse(s))
.orNull
}

private def getMetaString(key: String): Option[String] = {
meta.get(key).map(new String(_, StandardCharsets.UTF_8))
}
}

object Header {
/**
* Merge the metadata of the given headers.
* Note: It does not check the compatibility of the headers.
* @param headers whose metadata to be merged.
* @return the first header but having the new merged metadata, or
* None if the input is empty.
*/
def mergeMetadata(headers: Seq[Header]): Option[Header] = {
if (headers.isEmpty) {
None
} else if (headers.size == 1) {
Some(headers.head)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
} else {
val mergedHeader = headers.reduce { (merged, h) =>
merged.meta ++= h.meta
jlowe marked this conversation as resolved.
Show resolved Hide resolved
merged
}
// need to re-compute the header size
mergedHeader.headerSize = None
Some(mergedHeader)
}
}

/** Test whether the two headers have the same sync marker */
def hasSameSync(h1: Header, h2: Header): Boolean = h1.sync.sameElements(h2.sync)

/**
* Test whether the two headers have conflicts in the metadata.
* A conflict means a key exists in both of the two headers' metadata,
* and maps to different values.
*/
def hasConflictInMetadata(h1: Header, h2: Header): Boolean = h1.meta.exists {
case (k, v) => h2.meta.contains(k) && !h2.meta.get(k).get.sameElements(v)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
Expand All @@ -98,18 +144,16 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
private var firstBlockStart: Long = 0

// store all blocks info
private val blocks: ArrayBuffer[BlockInfo] = ArrayBuffer.empty
private val blocks: mutable.ArrayBuffer[BlockInfo] = mutable.ArrayBuffer.empty
jlowe marked this conversation as resolved.
Show resolved Hide resolved

initialize()

def getBlocks(): ArrayBuffer[BlockInfo] = {
blocks
}
def getBlocks(): Seq[BlockInfo] = blocks.toSeq

def getHeader(): Header = header

private def initialize() = {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
val magic = new Array[Byte](MAGIC.length)
val magic = new Array[Byte](DataFileConstants.MAGIC.length)
vin.readFixed(magic)

magic match {
Expand All @@ -128,14 +172,13 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
val bb = new Array[Byte](value.remaining())
value.get(bb)
header.meta += (key -> bb)
header.metaKeyList += key
}
l = vin.mapNext().toInt
} while (l != 0)
}
vin.readFixed(header.sync)
firstBlockStart = sin.tell - vin.inputStream.available // get the first block Start address
header.update(getMetaString(DataFileConstants.SCHEMA), firstBlockStart)
header.headerSize = Some(firstBlockStart)
parseBlocks()
}

Expand Down Expand Up @@ -168,23 +211,15 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
val blockDataSizeLen: Int = BinaryData.encodeLong(blockDataSize, buf, 0)

// (len of entries) + (len of block size) + (block size) + (sync size)
val blockLength = blockCountLen + blockDataSizeLen + blockDataSize + SYNC_SIZE
val blockLength = blockCountLen + blockDataSizeLen + blockDataSize +
DataFileConstants.SYNC_SIZE
blocks += BlockInfo(blockStart, blockLength, blockDataSize, blockCount)

// Do we need to check the SYNC BUFFER, or just let cudf do it?
blockStart += blockLength
}
}

/** Return the value of a metadata property. */
private def getMeta(key: String): Array[Byte] = header.meta.getOrElse(key, new Array[Byte](1))

private def getMetaString(key: String): String = {
val value = getMeta(key)
if (value == null) return null
new String(value, StandardCharsets.UTF_8)
}

override def close(): Unit = {
vin.inputStream().close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import java.io.OutputStream

import org.apache.avro.file.DataFileConstants
import org.apache.avro.io.EncoderFactory

/**
* AvroDataWriter, used to write a avro file header to the output stream.
*/
class AvroFileWriter(os: OutputStream) {

private val vout = new EncoderFactory().directBinaryEncoder(os, null)

final def writeHeader(header: Header): Unit = {
val meta = header.meta
// 1) write magic
vout.writeFixed(DataFileConstants.MAGIC)
// 2) write metadata
vout.writeMapStart()
vout.setItemCount(meta.size)
meta.foreach{ case (key, value) =>
vout.startItem()
vout.writeString(key)
vout.writeBytes(value)
}
vout.writeMapEnd()
// 3) write initial sync
vout.writeFixed(header.sync)
vout.flush()
}
}

object AvroFileWriter {

def apply(os: OutputStream): AvroFileWriter = new AvroFileWriter(os)
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,29 @@ object MultiFileThreadPoolUtil {
}
}

/** A thread pool for multi-file reading */
class MultiFileReaderThreadPool {
private var threadPool: Option[ThreadPoolExecutor] = None

private def initThreadPool(
threadTag: String,
numThreads: Int): ThreadPoolExecutor = synchronized {
if (threadPool.isEmpty) {
threadPool = Some(MultiFileThreadPoolUtil.createThreadPool(threadTag, numThreads))
}
threadPool.get
}

/**
* Get the existing internal thread pool or create one with the given tag and thread
* number if it does not exist.
* Note: The tag and thread number will be ignored if the thread pool is already created.
*/
def getOrCreateThreadPool(threadTag: String, numThreads: Int): ThreadPoolExecutor = {
threadPool.getOrElse(initThreadPool(threadTag, numThreads))
}
}

/**
* The base multi-file partition reader factory to create the cloud reading or
* coalescing reading respectively.
Expand Down Expand Up @@ -692,10 +715,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
* Write a header for a specific file format. If there is no header for the file format,
* just ignore it and return 0
*
* @param paths the paths of files to be coalcesed into a single batch
jlowe marked this conversation as resolved.
Show resolved Hide resolved
* @param buffer where the header will be written
* @return how many bytes written
*/
def writeFileHeader(buffer: HostMemoryBuffer): Long
def writeFileHeader(paths: Seq[Path], buffer: HostMemoryBuffer): Long

/**
* Writer a footer for a specific file format. If there is no footer for the file format,
Expand Down Expand Up @@ -823,7 +847,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
val (buffer, bufferSize, footerOffset, outBlocks) =
closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb =>
// Second, write header
var offset = writeFileHeader(hmb)
var offset = writeFileHeader(filesAndBlocks.keys.toSeq, hmb)

val allOutputBlocks = scala.collection.mutable.ArrayBuffer[DataBlockBase]()
val tc = TaskContext.get
Expand Down
Loading