Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support coalescing reading for avro #5306

Merged
merged 9 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ Name | Description | Default Value
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
<a name="sql.format.avro.enabled"></a>spark.rapids.sql.format.avro.enabled|When set to true enables all avro input and output acceleration. (only input is currently supported anyways)|false
<a name="sql.format.avro.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.avro.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type|2147483647
<a name="sql.format.avro.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.avro.multiThreadedRead.numThreads|The maximum number of threads, on one executor, to use for reading small avro files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.avro.reader.type.|20
<a name="sql.format.avro.read.enabled"></a>spark.rapids.sql.format.avro.read.enabled|When set to true enables avro input acceleration|false
<a name="sql.format.avro.reader.type"></a>spark.rapids.sql.format.avro.reader.type|Sets the avro reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using COALESCING is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.avro.multiThreadedRead.numThreads. By default this is set to AUTO so we select the reader we think is best. This will be COALESCING.|AUTO
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.json.enabled"></a>spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false
Expand Down
9 changes: 3 additions & 6 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,10 @@ else
TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$INTEGRATION_TEST_VERSION.jar)
fi

# `./run_pyspark_from_build.sh` runs all tests including avro_test.py with spark-avro.jar
# in the classpath.
# `./run_pyspark_from_build.sh` runs all the tests excluding the avro tests in 'avro_test.py'.
#
# `./run_pyspark_from_build.sh -k xxx ` runs all xxx tests with spark-avro.jar in the classpath
#
# `INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh` run all tests (except the marker skipif())
# without spark-avro.jar
# `INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh` runs all the tests, including the tests
# in 'avro_test.py'.
if [[ $( echo ${INCLUDE_SPARK_AVRO_JAR} | tr [:upper:] [:lower:] ) == "true" ]];
then
export INCLUDE_SPARK_AVRO_JAR=true
Expand Down
57 changes: 32 additions & 25 deletions integration_tests/src/main/python/avro_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,55 +30,62 @@
'spark.rapids.sql.format.avro.enabled': 'true',
'spark.rapids.sql.format.avro.read.enabled': 'true'}

rapids_reader_types = ['PERFILE', 'COALESCING']

@pytest.mark.parametrize('gen', support_gens)
@pytest.mark.parametrize('v1_enabled_list', ["avro", ""])
def test_basic_read(spark_tmp_path, gen, v1_enabled_list):
data_path = spark_tmp_path + '/AVRO_DATA'
# 50 files for the coalescing reading case
coalescingPartitionNum = 50

def gen_avro_files(gen_list, out_path):
with_cpu_session(
lambda spark: unary_op_df(spark, gen).write.format("avro").save(data_path)
lambda spark: gen_df(spark,
gen_list).repartition(coalescingPartitionNum).write.format("avro").save(out_path)
)


@pytest.mark.parametrize('v1_enabled_list', ["avro", ""], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_basic_read(spark_tmp_path, v1_enabled_list, reader_type):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
data_path = spark_tmp_path + '/AVRO_DATA'
gen_avro_files(gen_list, data_path)

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path),
conf=all_confs)


@pytest.mark.parametrize('v1_enabled_list', ["", "avro"])
def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list):
@pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_type):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0/key2=20'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(first_data_path))
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1/key2=21'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(second_data_path))
third_data_path = spark_tmp_path + '/AVRO_DATA/key=2/key2=22'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(third_data_path))

data_path = spark_tmp_path + '/AVRO_DATA'
# generate partitioned files
for v in [0, 1, 2]:
out_path = data_path + '/key={}/key2=2{}'.format(v, v)
gen_avro_files(gen_list, out_path)

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path),
conf=all_confs)


@pytest.mark.parametrize('v1_enabled_list', ["", "avro"])
def test_avro_input_meta(spark_tmp_path, v1_enabled_list):
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(first_data_path))
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(second_data_path))
@pytest.mark.parametrize('v1_enabled_list', ["", "avro"], ids=["v1", "v2"])
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_avro_input_meta(spark_tmp_path, v1_enabled_list, reader_type):
data_path = spark_tmp_path + '/AVRO_DATA'
for v in [0, 1]:
out_path = data_path + '/key={}'.format(v)
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(out_path))

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': reader_type,
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path)
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,12 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
Expand Down
2 changes: 0 additions & 2 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package com.nvidia.spark.rapids
import java.io.{InputStream, IOException}
import java.nio.charset.StandardCharsets

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

import org.apache.avro.Schema
import org.apache.avro.file.{DataFileConstants, SeekableInput}
import org.apache.avro.file.DataFileConstants.{MAGIC, SYNC_SIZE}
import org.apache.avro.file.DataFileConstants._
import org.apache.avro.file.SeekableInput
import org.apache.avro.io.{BinaryData, BinaryDecoder, DecoderFactory}
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}

private class SeekableInputStream(in: SeekableInput) extends InputStream with SeekableInput {
var oneByte = new Array[Byte](1)
Expand Down Expand Up @@ -59,22 +60,62 @@ private class SeekableInputStream(in: SeekableInput) extends InputStream with Se
}

/**
* The header information of Avro file
* The header information of an Avro file.
*/
class Header {
var meta = Map[String, Array[Byte]]()
var metaKeyList = ArrayBuffer[String]()
var sync = new Array[Byte](DataFileConstants.SYNC_SIZE)
var schema: Schema = _
private var firstBlockStart: Long = _

private[rapids] def update(schemaValue: String, firstBlockStart: Long) = {
schema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
.parse(schemaValue)
this.firstBlockStart = firstBlockStart
case class Header(
meta: Map[String, Array[Byte]],
// Array in scala is mutable, so keep it private to avoid unexpected update.
private val syncBuffer: Array[Byte]) {

/** Get a copy of the sync marker. */
def sync: Array[Byte] = syncBuffer.clone

@transient
lazy val schema: Schema = {
getMetaString(SCHEMA)
.map(s => new Schema.Parser().setValidateDefaults(false).setValidate(false).parse(s))
.orNull
}

def getFirstBlockStart: Long = firstBlockStart
private def getMetaString(key: String): Option[String] = {
meta.get(key).map(new String(_, StandardCharsets.UTF_8))
}
}

object Header {
/** Compute header size in bytes for serialization */
def headerSizeInBytes(h: Header): Long = {
val out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM)
AvroFileWriter(out).writeHeader(h)
out.getByteCount
}

/**
* Merge the metadata of the given headers.
* Note: It does not check the compatibility of the headers.
* @param headers whose metadata to be merged.
* @return a header with the new merged metadata and the first header's
* sync marker, or None if the input is empty.
*/
def mergeMetadata(headers: Seq[Header]): Option[Header] = {
if (headers.isEmpty) {
None
} else {
val mergedMeta = headers.map(_.meta).reduce { (merged, meta) =>
merged ++ meta
}
Some(Header(mergedMeta, headers.head.sync))
}
}

/**
* Test whether the two headers have conflicts in the metadata.
* A conflict means a key exists in both of the two headers' metadata,
* and maps to different values.
*/
def hasConflictInMetadata(h1: Header, h2: Header): Boolean = h1.meta.exists {
case (k, v) => h2.meta.get(k).exists(!_.sameElements(v))
}
}

/**
Expand All @@ -94,31 +135,26 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
private val sin = new SeekableInputStream(si)
sin.seek(0) // seek to the start of file and get some meta info.
private var vin: BinaryDecoder = DecoderFactory.get.binaryDecoder(sin, vin);
private val header: Header = new Header()
private var firstBlockStart: Long = 0
jlowe marked this conversation as resolved.
Show resolved Hide resolved

// store all blocks info
private val blocks: ArrayBuffer[BlockInfo] = ArrayBuffer.empty

initialize()
val header: Header = initialize()

def getBlocks(): ArrayBuffer[BlockInfo] = {
blocks
}

def getHeader(): Header = header
lazy val headerSize: Long = firstBlockStart
// store all blocks info
lazy val blocks: Seq[BlockInfo] = parseBlocks()

private def initialize() = {
private def initialize(): Header = {
// read magic
val magic = new Array[Byte](MAGIC.length)
vin.readFixed(magic)

magic match {
case Array(79, 98, 106, 1) => // current avro format
case Array(79, 98, 106, 0) => // old format
throw new UnsupportedOperationException("avro 1.2 format is not support by GPU")
case _ => throw new RuntimeException("Not an Avro data file.")
}

// read metadata map
val meta = mutable.Map[String, Array[Byte]]()
var l = vin.readMapStart().toInt
if (l > 0) {
do {
Expand All @@ -127,35 +163,36 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
val value = vin.readBytes(null)
val bb = new Array[Byte](value.remaining())
value.get(bb)
header.meta += (key -> bb)
header.metaKeyList += key
meta += (key -> bb)
}
l = vin.mapNext().toInt
} while (l != 0)
}
vin.readFixed(header.sync)
firstBlockStart = sin.tell - vin.inputStream.available // get the first block Start address
header.update(getMetaString(DataFileConstants.SCHEMA), firstBlockStart)
parseBlocks()
// read sync marker
val sync = new Array[Byte](SYNC_SIZE)
vin.readFixed(sync)
firstBlockStart = sin.tell - vin.inputStream.available
Header(meta.toMap, sync)
}

private def seek(position: Long): Unit = {
sin.seek(position)
vin = DecoderFactory.get().binaryDecoder(this.sin, vin);
}

private def parseBlocks(): Unit = {
private def parseBlocks(): Seq[BlockInfo] = {
if (firstBlockStart >= sin.length() || vin.isEnd()) {
// no blocks
return
return Seq.empty
}
val blocks = mutable.ArrayBuffer.empty[BlockInfo]
// buf is used for writing long
val buf = new Array[Byte](12)
var blockStart = firstBlockStart
while (blockStart < sin.length()) {
seek(blockStart)
if (vin.isEnd()) {
return
return blocks.toSeq
}
val blockCount = vin.readLong()
val blockDataSize = vin.readLong()
Expand All @@ -174,15 +211,7 @@ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
// Do we need to check the SYNC BUFFER, or just let cudf do it?
blockStart += blockLength
}
}

/** Return the value of a metadata property. */
private def getMeta(key: String): Array[Byte] = header.meta.getOrElse(key, new Array[Byte](1))

private def getMetaString(key: String): String = {
val value = getMeta(key)
if (value == null) return null
new String(value, StandardCharsets.UTF_8)
blocks.toSeq
}

override def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import java.io.OutputStream

import org.apache.avro.file.DataFileConstants
import org.apache.avro.io.EncoderFactory

/**
* AvroDataWriter, used to write a avro file header to the output stream.
*/
class AvroFileWriter(os: OutputStream) {

private val vout = new EncoderFactory().directBinaryEncoder(os, null)

final def writeHeader(header: Header): Unit = {
val meta = header.meta
// 1) write magic
vout.writeFixed(DataFileConstants.MAGIC)
// 2) write metadata
vout.writeMapStart()
vout.setItemCount(meta.size)
meta.foreach{ case (key, value) =>
vout.startItem()
vout.writeString(key)
vout.writeBytes(value)
}
vout.writeMapEnd()
// 3) write initial sync
vout.writeFixed(header.sync)
vout.flush()
}
}

object AvroFileWriter {

def apply(os: OutputStream): AvroFileWriter = new AvroFileWriter(os)
}
Loading