Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34863][SQL] Support complex types for Parquet vectorized reader #34659

Closed
wants to merge 15 commits into from

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Nov 19, 2021

What changes were proposed in this pull request?

This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes:

  1. Added a new class ParquetColumnVector which encapsulates all the necessary information needed when reading a Parquet column, including the ParquetColumn for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels.
  2. Changes are made in VectorizedParquetRecordReader to initialize a list of ParquetColumnVector for the columns read.
  3. VectorizedColumnReader now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., readBatch versus readBatchNested.
  4. Added logic to handle complex type in VectorizedRleValuesReader. For data types involving only struct or primitive types, it still goes with the old readBatch method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path readBatchNested is introduced to handle repetition levels.
    This PR also introduced a new flag spark.sql.parquet.enableNestedColumnVectorizedReader which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage.

Why are the changes needed?

Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map.

Does this PR introduce any user-facing change?

With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config spark.sql.parquet.enableNestedColumnVectorizedReader is introduced to turn the feature on or off.

How was this patch tested?

Added new unit tests.

@SparkQA
Copy link

SparkQA commented Nov 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49902/

@cloud-fan
Copy link
Contributor

cc @sadikovi

@SparkQA
Copy link

SparkQA commented Nov 19, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49902/

@sadikovi
Copy link
Contributor

ACK. I will review it tomorrow or next week if that's okay 🙂.

@SparkQA
Copy link

SparkQA commented Nov 19, 2021

Test build #145429 has finished for PR 34659 at commit 10c3a45.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sunchao
Copy link
Member Author

sunchao commented Nov 19, 2021

Thanks @sadikovi! take your time on this one :)

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening a PR. Looks very good but I need to re-review a couple of files. Tests look good, I remember there were a couple of tricky examples with array + struct + null nesting, let me find them and check, maybe we could add them to the tests.

On a more style-like note: could you make all of the comments to start with an uppercase letter? I find it easier to read them this way as they indicate the beginning of a paragraph. Thank you.

val PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED =
buildConf("spark.sql.parquet.enableNestedColumnVectorizedReader")
.doc("Enables vectorized Parquet decoding for nested columns (e.g., struct, list, map). " +
s"Note to enable this ${PARQUET_VECTORIZED_READER_ENABLED.key} also needs to be enabled.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the guidelines are on a conf text but maybe this would sound a bit crisper:

Enables vectorized Parquet decoding for nested columns (e.g., struct, list, map). Requires ${PARQUET_VECTORIZED_READER_ENABLED.key} to be enabled.


void setColumnReader(VectorizedColumnReader reader) {
if (!isPrimitive) {
throw new IllegalStateException("can't set reader for non-primitive column");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can't set...

}
}

void reset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it requires a lot of typing but it would be good if we could add a small javadoc here and below to highlight that the methods do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added.

vector.putArray(rowId, offset, 0);
rowId++;
} else if (definitionLevel > maxDefinitionLevel) {
// collection is defined and non-empty: find out how many top element there is till the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: find out how many elements are there until the start of the next array. Alternatively, find out the number of elements until the start of the next array.

return idx;
}

private int getCollectionSize(int maxRepetitionLevel, int idx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need javadoc for this and a couple of methods above to go along with the inline comments.


/* The following fields are only used when reading repeated values */

/** When processing repeated values, whether we've found the beginning of the first list after the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc should be updated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update what?

/** When processing repeated types, the number of accumulated definition levels to process */
int numBatchedDefLevels;

/** When processing repeated types, whether we should skip the current batch of definition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

@@ -39,12 +42,13 @@
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConverters;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this import required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's used below in checkColumn method for example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should probably be moved after java imports 🙂. I am probably nit-picking here but maybe it would be a good nit to fix - it is up to you.

if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException("Complex types not supported.");
}
missingColumns = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if Spark runs java lint. Is it allowed to use <> with generic types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's it allowed. This use pattern is pretty standard in Java (in fact IntelliJ will warn you if you put the generic type there).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good, I remember Java 6 used to complain if you use <> diamond type.

* conforms to the type of the file schema.
*/
private void checkColumn(ParquetColumn column) throws IOException {
String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new String[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the original path could be returned as an array...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could .. although I'd have to change a lot of places in ParquetSchemaSuite. Since this is a relatively small issue, maybe let me address it separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's address separately.

@@ -176,8 +176,7 @@ public void initBatch(
// Initialize the missing columns once.
if (colId == -1) {
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
missingCol.putNulls(0, capacity);
missingCol.setIsConstant();
missingCol.setAllNull();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, almost forgot to ask.

Is missingCol.setAllNull(); equivalent to

missingCol.putNulls(0, capacity);
missingCol.setIsConstant();

? I thought setAllNull was an alias for the two method calls but it does not appear to be the case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite the same. setIsConstant requires to allocate space for the constant elements, but setAllNull doesn't. This is useful for those missing columns that are of nested types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related this PR or new test coverage?
This change looks like relying on the existing ORC tests only.
In that case, shall we make a separate PR for this ORC change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is actually not related. I think we can we have a separate small PR for this.

@sadikovi
Copy link
Contributor

@sunchao Let me know if you would like me to revisit the code in this PR. Thanks.

@sunchao
Copy link
Member Author

sunchao commented Nov 25, 2021

Thanks @sadikovi , really appreciate your feedback! will address the comments soon! and then you can re-visit the PR.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks: This would be a great speed up for Uber given its heavy reliance on Nested data. I am curious how often are nested fields selected in the resultSchema given nested column pruning in effect ?

Would it be possible to retain the benchmark in the older version of this PR #33695 and perhaps also add a case where some of the struct column values are NULL.

How do we confirm that this hasn't slowed down the non nested code path ?


leftInBatch -= n;
if (n > 0) {
repLevels.appendInts(n, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but should repLevels be immutable for the purpose of this function ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what do you mean? we need to read in repetition levels in this method.

values, nulls, valuesReused, valueReader, updater);

while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) {
if (currentCount == 0 && !readNextGroup()) break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original non nested code refers to currentCount as this.currentCount in certain places. Should we pick that style as well ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea good point, let me change the non nested code, although I think for that case readNextGroup will never return false since we know exactly how many elements to read beforehand.

defLevelProcessor.skipValues(n);
rowId += n;
currentCount -= n;
leftInPage -= n;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should leftInBatch be updated here as well ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it shouldn't since we don't add anything to the batch here (we are skipping rows).

defLevelProcessor.readValues(n);
}

rowId += n;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets move leftInBatch update here.

I am not sure why some of these are local variables vs member vars :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes better to move it here. The member vars (e.g., currentCount) tracks the state of the RLE/BitPacked block being read, while the local vars track the state of the input vectors.

updater.readValue(offset + i, values, valueReader);
} else {
nulls.putNull(offset + i);
int v = currentBuffer[currentBufferIdx++];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v -> value ?

s"Note to enable this ${PARQUET_VECTORIZED_READER_ENABLED.key} also needs to be enabled.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am liking the confidence :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make sure we can pass all Parquet related tests :) I plan to turn it off later separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure we remember to turn it off as default before merging. 😄

@@ -227,30 +230,340 @@ private void readBatchInternal(
switch (mode) {
case RLE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too familiar with this code, but where do we assert that these are the only two encoding choices. There is no default arm in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MODE enum is private to this class and the mode is initialized in this class, so I guess it should be fine. This maps to the Parquet spec for RLE/BitPacked encoding.

@sunchao
Copy link
Member Author

sunchao commented Nov 29, 2021

Thanks @agrawaldevesh . Will address your comments soon.

How do we confirm that this hasn't slowed down the non nested code path ?

I've run the DataSourceReadBenchmark through GitHub workflow with and without the PR, and put the results here. It seems the results don't change much but there are some variances.

@sunchao
Copy link
Member Author

sunchao commented Dec 2, 2021

Thanks @sadikovi and @agrawaldevesh ! I've addressed most of your comments, while some left to be answered. Please let me know what you think.

On a more style-like note: could you make all of the comments to start with an uppercase letter? I find it easier to read them this way as they indicate the beginning of a paragraph.

Sure, it's done.

am curious how often are nested fields selected in the resultSchema given nested column pruning in effect ?

This is orthogonal I think. With or without nested column pruning users may still want to define nested schemas and efficiently read data of them. From my experience I've seen struct type being commonly used in data schemas and this PR should help a lot.

@SparkQA
Copy link

SparkQA commented Dec 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50306/

@SparkQA
Copy link

SparkQA commented Dec 2, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50306/

@SparkQA
Copy link

SparkQA commented Dec 2, 2021

Test build #145831 has finished for PR 34659 at commit 7b6e083.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@sadikovi do you have more comments?

// Column is missing in data but the required data is non-nullable. This file is invalid.
throw new IOException("Required column is missing in data file. Col: " +
Arrays.toString(colPath));
for (ParquetColumn childColumn : JavaConverters.seqAsJavaList(column.children())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I think.

Copy link
Member Author

@sunchao sunchao Dec 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yea I can raise another PR to use list in ParquetColumn after this one is done.

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for making the changes!

*
* NOTE: this MUST be called after new elements are appended to child vectors of a struct vector.
*/
public abstract void putStruct(int rowId, int offset);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to point out that this is a behavior change in WritableColumnVector so anyone who's using this to write struct will need to adopt to the new API.

Wanna see if you have any concern on this @cloud-fan @sadikovi .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, it is fine as the PR goes to master, this is supposed to be internal API anyway unless I am mistaken 🙂.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea WritableColumnVector is an internal API

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, thanks for confirming!

@@ -169,8 +170,8 @@ class ParquetFileFormat
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
schema.length <= conf.wholeStageMaxNumFields &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
ParquetFileFormat.isBatchReadSupported(conf, schema) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When spark.sql.parquet.enableNestedColumnVectorizedReader=false, which will be the normal case initially, I think returningBatch will always be false, even for non-complex cases, since this line always passes a struct (resultSchema) to supportBatch.

At least, that's what my testing seems to show. With this PR and spark.sql.parquet.enableNestedColumnVectorizedReader=false, returningBatch is false for a non-complex case. But on master, returningBatch is true for the same test:

spark.range(0, 10).map { x => (x, x + 1, x + 3) }.toDF("a", "b", "c").
  write.mode("overwrite").format("parquet").save("simple_parquet")

sql("select a, b from `parquet`.`simple_parquet`").collect

and...

bash-3.2$ git diff | cat
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index bd8d11d827..96182aafa9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -340,6 +340,7 @@ class ParquetFileFormat
           vectorizedReader.initialize(split, hadoopAttemptContext)
           logDebug(s"Appending $partitionSchema ${file.partitionValues}")
           vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+          print(s"returningBatch is $returningBatch\n")
           if (returningBatch) {
             vectorizedReader.enableReturningBatches()
           }
bash-3.2$ 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I think it should be:

schema.forall(f => ParquetFileFormat.isBatchReadSupported(conf, f.dataType))

@SparkQA
Copy link

SparkQA commented Dec 20, 2021

Test build #146411 has finished for PR 34659 at commit aff8504.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50886/

@SparkQA
Copy link

SparkQA commented Dec 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50886/

@SparkQA
Copy link

SparkQA commented Dec 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50965/

@SparkQA
Copy link

SparkQA commented Dec 22, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50965/

@SparkQA
Copy link

SparkQA commented Dec 22, 2021

Test build #146490 has finished for PR 34659 at commit 77b2aec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
StructType sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration);
this.parquetColumn = converter.convertParquetColumn(requestedSchema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line has a performance impact on parquet files with very wide records.

Take, for example, with a parquet dataset with:

  • 6000 bigint columns
  • 25 files
  • 5000 records per file (total of 125000 records)

Reading all the records in this parquet dataset with a single executor thread is 42% slower with this PR than with the baseline (commit f361ad8 on the master branch).

The main culprit is the n**2 issue here in ParquetToSparkSchemaConverter#convertInternal, which was not exercised anywhere except by unit tests until your PR (as far as I can tell).

By the way, even with a patch for the above n**2 issue, this PR is still 10% slower than the baseline when testing the wide row case. I haven't confirmed it, but I think this is due to the extra OnHeapColumnVector instances created in ParquetColumnVector, for which you already have a TODO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! let me try to reproduce the above scenario (it'd be great if you already have some code snippet to share) locally and see what's the causing the perf issue in ParquetToSparkSchemaConverter#convertInternal. Ideally we can skip the code path if the schema doesn't contain complex types.

Copy link
Contributor

@bersprockets bersprockets Dec 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see what's the causing the perf issue in ParquetToSparkSchemaConverter#convertInternal

It's definitely the n*n (or n*(n/2) on average) loop here:

    val converted = (0 until groupColumn.getChildrenCount).map { i =>
      val field = groupColumn.getChild(i)
      val fieldFromReadSchema = sparkReadSchema.flatMap { schema =>
        schema.find(f => isSameFieldName(f.name, field.getName, caseSensitive))
      }

schema.find should instead be a map lookup, or something like that.

it'd be great if you already have some code snippet to share

Sure, in a verbose state (edit: I ran with --master "local[1]"):

// test for wide rows

val home_candidate = sys.env("HOME")
val home = if (!home_candidate.endsWith("/")) {
  s"${home_candidate}/"
} else {
  home_candidate
}

val width = 6000

val selectExpr = (1 to width).map { i =>
  s"value as c$i"
}


import scala.util.Random
val r = new Random(65657652L)

val loopCount = 25
for (i <- 0 until loopCount) {
  // println(s"iteration $i")
  val df = spark.range(5000).map(_ => r.nextLong).toDF().selectExpr(selectExpr: _*)
  val mode = if (i == 0) {
    "overwrite"
  } else {
    "append"
  }
  df.coalesce(1).write.mode(mode).format("parquet").save(s"${home}data/wide_row_parquet")
}


// read test

val home_candidate = sys.env("HOME")
val home = if (!home_candidate.endsWith("/")) {
  s"${home_candidate}/"
} else {
  home_candidate
}

spark.read.parquet(s"${home}data/wide_row_parquet").createOrReplaceTempView("df")
val startTime = System.currentTimeMillis
sql("select * from df where (c5*c12) == 12").collect
(System.currentTimeMillis - startTime)/60.0/1000

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @bersprockets ! let me address the issue and update the PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR. I also re-run the above test which used to take 1.05m before the fix, and improved to 0.58m afterwards, which is very close to the number in master branch.

@bersprockets it'd be great if you can verify it too. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you've eliminated the schema lookup penalty.

There still seems to be the remaining 10-15% performance penalty. Earlier, I speculated on the cause of this particular penalty:

this PR is still 10% slower than the baseline when testing the wide row case. I haven't confirmed it, but I think this is due to the extra OnHeapColumnVector instances created in ParquetColumnVector, for which you already have a TODO.

So maybe that will get resolved once the TODO is resolved.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yes I'm aware of the other one: we should not need to allocate the vectors for definition & repetition levels when the schema is flat. I'm hoping to address this separately with another PR though - don't want to make this one too bloated :)

BTW @bersprockets : how can I reproduce the 10-15% performance penalty? I was using the above code snippet and got almost the same numbers on my machine with the latest fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can I reproduce the 10-15% performance penalty

Maybe my old 2015 2.5 GHz MBP just feels it more strongly.

I launched spark-shell it with this command line

bin/spark-shell --driver-memory 5g --master "local[1]"

Maybe the 5g makes a difference (e.g., more pressure on the garbage collector when there are lots of OnHeapColumnVector instances).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the follow-up to skip creating these column vectors and reading definition levels if unnecessary done here. I can include it to the PR if it doesn't complicated the review :) cc @sadikovi

@dongjoon-hyun
Copy link
Member

Hi, @sunchao . After rebasing and force-pushing, the PR branch is broken. Could you take a look?

[error] /home/runner/work/spark/spark/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala:426:22: not enough arguments for constructor TaskContextImpl: (stageId: Int, stageAttemptNumber: Int, partitionId: Int, taskAttemptId: Long, attemptNumber: Int, numPartitions: Int, taskMemoryManager: org.apache.spark.memory.TaskMemoryManager, localProperties: java.util.Properties, metricsSystem: org.apache.spark.metrics.MetricsSystem, taskMetrics: org.apache.spark.executor.TaskMetrics, cpus: Int, resources: Map[String,org.apache.spark.resource.ResourceInformation])org.apache.spark.TaskContextImpl.
(https://github.com/sunchao/spark/runs/5778352461?check_suite_focus=true#step:9:600)
[error] Unspecified value parameter metricsSystem.
(https://github.com/sunchao/spark/runs/5778352461?check_suite_focus=true#step:9:601)
[error]       val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null, cpus = 0)
(https://github.com/sunchao/spark/runs/5778352461?check_suite_focus=true#step:9:602)
[error]                      ^
(https://github.com/sunchao/spark/runs/5778352461?check_suite_focus=true#step:9:603)
[error] one error found

isBatchReadSupported(sqlConf, mt.valueType)
case st: StructType =>
sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
st.fields.forall(f => isBatchReadSupported(sqlConf, f.dataType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be 2 spaces here

true
case at: ArrayType =>
sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
isBatchReadSupported(sqlConf, at.elementType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be 2 spaces here

isBatchReadSupported(sqlConf, at.elementType)
case mt: MapType =>
sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
isBatchReadSupported(sqlConf, mt.keyType) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me check all the places with 4 space indentation ... I need to fix my IntelliJ settings.

nulls.putNulls(state.valueOffset, num);
state.valueOffset += num;
}
defLevels.putInts(state.levelOffset, num, currentValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 556 and 561 (state.valueOffset += num;) can move before line 563

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 actually the comments here can also be removed

}
defLevels.putInts(state.levelOffset, num, currentValue);
break;
case PACKED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 565 ~ 575 looks same as line 247 ~ 257 except for the comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me split this into a util method.

/**
* Marks this column only contains null values.
*/
public final void setAllNull() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. column.numNulls() may still 0 after we call setAllNull()
  2. do column.setAllNull() and column.putXXX, then column.isAllNull still true.

The above two scenes look strange

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to setIsConstant though: you can also call column.putXXX after the call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's ignore this for the time being

@sunchao
Copy link
Member Author

sunchao commented Apr 1, 2022

@dongjoon-hyun I think it's unrelated to this PR - looks like it's due to #36012

@sunchao
Copy link
Member Author

sunchao commented Apr 2, 2022

FYI I plan to open a follow-up PR to disable the config and also update related tests.

@viirya
Copy link
Member

viirya commented Apr 2, 2022

Sounds good to me.

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@viirya
Copy link
Member

viirya commented Apr 2, 2022

Thanks all. Merging to master/3.3. @sunchao Please remember to open a follow-up PR to disable the config and update related tests before 3.3 release. Thanks.

@viirya viirya closed this in deac8f9 Apr 2, 2022
viirya pushed a commit that referenced this pull request Apr 2, 2022
### What changes were proposed in this pull request?

This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes:
1. Added a new class `ParquetColumnVector` which encapsulates all the necessary information needed when reading a Parquet column, including the `ParquetColumn` for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels.
2. Changes are made in `VectorizedParquetRecordReader` to initialize a list of `ParquetColumnVector` for the columns read.
3. `VectorizedColumnReader` now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., `readBatch` versus `readBatchNested`.
4. Added logic to handle complex type in `VectorizedRleValuesReader`. For data types involving only struct or primitive types, it still goes with the old `readBatch` method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path `readBatchNested` is introduced to handle repetition levels.
This PR also introduced a new flag `spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage.

### Why are the changes needed?

Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map.

### Does this PR introduce _any_ user-facing change?

With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config `spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn the feature on or off.

### How was this patch tested?

Added new unit tests.

Closes #34659 from sunchao/SPARK-34863-new.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit deac8f9)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
@sunchao
Copy link
Member Author

sunchao commented Apr 3, 2022

Thanks all for the review!!! @viirya I just opened #36055 for the follow-up.

@sunchao sunchao deleted the SPARK-34863-new branch April 3, 2022 23:19
viirya pushed a commit that referenced this pull request Apr 4, 2022
…olumnVectorizedReader` by default

### What changes were proposed in this pull request?

This PR disables `spark.sql.parquet.enableNestedColumnVectorizedReader` by default.

### Why are the changes needed?

In #34659 the config was turned mainly for testing reason. As the feature is new, we should turn it off by default.

### Does this PR introduce _any_ user-facing change?

The config `spark.sql.parquet.enableNestedColumnVectorizedReader` is turned off by default now.

### How was this patch tested?

Existing tests.

Closes #36055 from sunchao/disable.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 1b08673)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
viirya pushed a commit that referenced this pull request Apr 4, 2022
…olumnVectorizedReader` by default

### What changes were proposed in this pull request?

This PR disables `spark.sql.parquet.enableNestedColumnVectorizedReader` by default.

### Why are the changes needed?

In #34659 the config was turned mainly for testing reason. As the feature is new, we should turn it off by default.

### Does this PR introduce _any_ user-facing change?

The config `spark.sql.parquet.enableNestedColumnVectorizedReader` is turned off by default now.

### How was this patch tested?

Existing tests.

Closes #36055 from sunchao/disable.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
dmgburg pushed a commit to joomcode/spark that referenced this pull request Apr 13, 2022
### What changes were proposed in this pull request?

This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes:
1. Added a new class `ParquetColumnVector` which encapsulates all the necessary information needed when reading a Parquet column, including the `ParquetColumn` for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels.
2. Changes are made in `VectorizedParquetRecordReader` to initialize a list of `ParquetColumnVector` for the columns read.
3. `VectorizedColumnReader` now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., `readBatch` versus `readBatchNested`.
4. Added logic to handle complex type in `VectorizedRleValuesReader`. For data types involving only struct or primitive types, it still goes with the old `readBatch` method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path `readBatchNested` is introduced to handle repetition levels.
This PR also introduced a new flag `spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage.

### Why are the changes needed?

Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map.

### Does this PR introduce _any_ user-facing change?

With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config `spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn the feature on or off.

### How was this patch tested?

Added new unit tests.

Closes apache#34659 from sunchao/SPARK-34863-new.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>

(cherry picked from commit deac8f9)
dmgburg pushed a commit to joomcode/spark that referenced this pull request Apr 13, 2022
…olumnVectorizedReader` by default

### What changes were proposed in this pull request?

This PR disables `spark.sql.parquet.enableNestedColumnVectorizedReader` by default.

### Why are the changes needed?

In apache#34659 the config was turned mainly for testing reason. As the feature is new, we should turn it off by default.

### Does this PR introduce _any_ user-facing change?

The config `spark.sql.parquet.enableNestedColumnVectorizedReader` is turned off by default now.

### How was this patch tested?

Existing tests.

Closes apache#36055 from sunchao/disable.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 1b08673)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants