Skip to content

Commit

Permalink
[SPARK-34863][SQL] Support complex types for Parquet vectorized reader
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes:
1. Added a new class `ParquetColumnVector` which encapsulates all the necessary information needed when reading a Parquet column, including the `ParquetColumn` for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels.
2. Changes are made in `VectorizedParquetRecordReader` to initialize a list of `ParquetColumnVector` for the columns read.
3. `VectorizedColumnReader` now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., `readBatch` versus `readBatchNested`.
4. Added logic to handle complex type in `VectorizedRleValuesReader`. For data types involving only struct or primitive types, it still goes with the old `readBatch` method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path `readBatchNested` is introduced to handle repetition levels.
This PR also introduced a new flag `spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage.

### Why are the changes needed?

Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map.

### Does this PR introduce _any_ user-facing change?

With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config `spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn the feature on or off.

### How was this patch tested?

Added new unit tests.

Closes #34659 from sunchao/SPARK-34863-new.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit deac8f9)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
sunchao authored and viirya committed Apr 2, 2022
1 parent 8a072ef commit 09d2b0e
Show file tree
Hide file tree
Showing 25 changed files with 1,813 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED =
buildConf("spark.sql.parquet.enableNestedColumnVectorizedReader")
.doc("Enables vectorized Parquet decoding for nested columns (e.g., struct, list, map). " +
s"Requires ${PARQUET_VECTORIZED_READER_ENABLED.key} to be enabled.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled")
.doc("If true, enables Parquet's native record-level filtering using the pushed down " +
"filters. " +
Expand Down Expand Up @@ -3904,6 +3912,9 @@ class SQLConf extends Serializable with Logging {

def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)

def parquetVectorizedReaderNestedColumnEnabled: Boolean =
getConf(PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED)

def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)

def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
Expand Down
Loading

0 comments on commit 09d2b0e

Please sign in to comment.