-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34863][SQL] Support complex types for Parquet vectorized reader #34659
Conversation
Kubernetes integration test starting |
cc @sadikovi |
Kubernetes integration test status failure |
ACK. I will review it tomorrow or next week if that's okay 🙂. |
Test build #145429 has finished for PR 34659 at commit
|
Thanks @sadikovi! take your time on this one :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening a PR. Looks very good but I need to re-review a couple of files. Tests look good, I remember there were a couple of tricky examples with array + struct + null nesting, let me find them and check, maybe we could add them to the tests.
On a more style-like note: could you make all of the comments to start with an uppercase letter? I find it easier to read them this way as they indicate the beginning of a paragraph. Thank you.
val PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED = | ||
buildConf("spark.sql.parquet.enableNestedColumnVectorizedReader") | ||
.doc("Enables vectorized Parquet decoding for nested columns (e.g., struct, list, map). " + | ||
s"Note to enable this ${PARQUET_VECTORIZED_READER_ENABLED.key} also needs to be enabled.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what the guidelines are on a conf text but maybe this would sound a bit crisper:
Enables vectorized Parquet decoding for nested columns (e.g., struct, list, map). Requires ${PARQUET_VECTORIZED_READER_ENABLED.key} to be enabled.
|
||
void setColumnReader(VectorizedColumnReader reader) { | ||
if (!isPrimitive) { | ||
throw new IllegalStateException("can't set reader for non-primitive column"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can't set...
} | ||
} | ||
|
||
void reset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand it requires a lot of typing but it would be good if we could add a small javadoc here and below to highlight that the methods do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added.
vector.putArray(rowId, offset, 0); | ||
rowId++; | ||
} else if (definitionLevel > maxDefinitionLevel) { | ||
// collection is defined and non-empty: find out how many top element there is till the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: find out how many elements are there until the start of the next array. Alternatively, find out the number of elements until the start of the next array.
return idx; | ||
} | ||
|
||
private int getCollectionSize(int maxRepetitionLevel, int idx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need javadoc for this and a couple of methods above to go along with the inline comments.
|
||
/* The following fields are only used when reading repeated values */ | ||
|
||
/** When processing repeated values, whether we've found the beginning of the first list after the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc should be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update what?
/** When processing repeated types, the number of accumulated definition levels to process */ | ||
int numBatchedDefLevels; | ||
|
||
/** When processing repeated types, whether we should skip the current batch of definition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
@@ -39,12 +42,13 @@ | |||
import org.apache.spark.sql.vectorized.ColumnarBatch; | |||
import org.apache.spark.sql.types.StructField; | |||
import org.apache.spark.sql.types.StructType; | |||
import scala.collection.JavaConverters; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this import required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's used below in checkColumn
method for example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should probably be moved after java imports 🙂. I am probably nit-picking here but maybe it would be a good nit to fix - it is up to you.
if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { | ||
throw new UnsupportedOperationException("Complex types not supported."); | ||
} | ||
missingColumns = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if Spark runs java lint. Is it allowed to use <>
with generic types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's it allowed. This use pattern is pretty standard in Java (in fact IntelliJ will warn you if you put the generic type there).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All good, I remember Java 6 used to complain if you use <>
diamond type.
* conforms to the type of the file schema. | ||
*/ | ||
private void checkColumn(ParquetColumn column) throws IOException { | ||
String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new String[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the original path could be returned as an array...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could .. although I'd have to change a lot of places in ParquetSchemaSuite
. Since this is a relatively small issue, maybe let me address it separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's address separately.
@@ -176,8 +176,7 @@ public void initBatch( | |||
// Initialize the missing columns once. | |||
if (colId == -1) { | |||
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); | |||
missingCol.putNulls(0, capacity); | |||
missingCol.setIsConstant(); | |||
missingCol.setAllNull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, almost forgot to ask.
Is missingCol.setAllNull();
equivalent to
missingCol.putNulls(0, capacity);
missingCol.setIsConstant();
? I thought setAllNull
was an alias for the two method calls but it does not appear to be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite the same. setIsConstant
requires to allocate space for the constant elements, but setAllNull
doesn't. This is useful for those missing columns that are of nested types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related this PR or new test coverage?
This change looks like relying on the existing ORC tests only.
In that case, shall we make a separate PR for this ORC change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is actually not related. I think we can we have a separate small PR for this.
@sunchao Let me know if you would like me to revisit the code in this PR. Thanks. |
Thanks @sadikovi , really appreciate your feedback! will address the comments soon! and then you can re-visit the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks: This would be a great speed up for Uber given its heavy reliance on Nested data. I am curious how often are nested fields selected in the resultSchema given nested column pruning in effect ?
Would it be possible to retain the benchmark in the older version of this PR #33695 and perhaps also add a case where some of the struct column values are NULL.
How do we confirm that this hasn't slowed down the non nested code path ?
|
||
leftInBatch -= n; | ||
if (n > 0) { | ||
repLevels.appendInts(n, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but should repLevels be immutable for the purpose of this function ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm what do you mean? we need to read in repetition levels in this method.
values, nulls, valuesReused, valueReader, updater); | ||
|
||
while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) { | ||
if (currentCount == 0 && !readNextGroup()) break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original non nested code refers to currentCount as this.currentCount in certain places. Should we pick that style as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea good point, let me change the non nested code, although I think for that case readNextGroup
will never return false since we know exactly how many elements to read beforehand.
defLevelProcessor.skipValues(n); | ||
rowId += n; | ||
currentCount -= n; | ||
leftInPage -= n; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should leftInBatch be updated here as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it shouldn't since we don't add anything to the batch here (we are skipping rows).
defLevelProcessor.readValues(n); | ||
} | ||
|
||
rowId += n; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets move leftInBatch update here.
I am not sure why some of these are local variables vs member vars :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes better to move it here. The member vars (e.g., currentCount
) tracks the state of the RLE/BitPacked block being read, while the local vars track the state of the input vectors.
updater.readValue(offset + i, values, valueReader); | ||
} else { | ||
nulls.putNull(offset + i); | ||
int v = currentBuffer[currentBufferIdx++]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v -> value ?
s"Note to enable this ${PARQUET_VECTORIZED_READER_ENABLED.key} also needs to be enabled.") | ||
.version("3.3.0") | ||
.booleanConf | ||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am liking the confidence :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to make sure we can pass all Parquet related tests :) I plan to turn it off later separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure we remember to turn it off as default before merging. 😄
@@ -227,30 +230,340 @@ private void readBatchInternal( | |||
switch (mode) { | |||
case RLE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not too familiar with this code, but where do we assert that these are the only two encoding choices. There is no default arm in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MODE
enum is private to this class and the mode
is initialized in this class, so I guess it should be fine. This maps to the Parquet spec for RLE/BitPacked encoding.
Thanks @agrawaldevesh . Will address your comments soon.
I've run the |
Thanks @sadikovi and @agrawaldevesh ! I've addressed most of your comments, while some left to be answered. Please let me know what you think.
Sure, it's done.
This is orthogonal I think. With or without nested column pruning users may still want to define nested schemas and efficiently read data of them. From my experience I've seen struct type being commonly used in data schemas and this PR should help a lot. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145831 has finished for PR 34659 at commit
|
@sadikovi do you have more comments? |
// Column is missing in data but the required data is non-nullable. This file is invalid. | ||
throw new IOException("Required column is missing in data file. Col: " + | ||
Arrays.toString(colPath)); | ||
for (ParquetColumn childColumn : JavaConverters.seqAsJavaList(column.children())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, yea I can raise another PR to use list in ParquetColumn
after this one is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for making the changes!
* | ||
* NOTE: this MUST be called after new elements are appended to child vectors of a struct vector. | ||
*/ | ||
public abstract void putStruct(int rowId, int offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to point out that this is a behavior change in WritableColumnVector
so anyone who's using this to write struct will need to adopt to the new API.
Wanna see if you have any concern on this @cloud-fan @sadikovi .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, it is fine as the PR goes to master, this is supposed to be internal API anyway unless I am mistaken 🙂.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea WritableColumnVector
is an internal API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, thanks for confirming!
@@ -169,8 +170,8 @@ class ParquetFileFormat | |||
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { | |||
val conf = sparkSession.sessionState.conf | |||
conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled && | |||
schema.length <= conf.wholeStageMaxNumFields && | |||
schema.forall(_.dataType.isInstanceOf[AtomicType]) | |||
ParquetFileFormat.isBatchReadSupported(conf, schema) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When spark.sql.parquet.enableNestedColumnVectorizedReader=false
, which will be the normal case initially, I think returningBatch will always be false, even for non-complex cases, since this line always passes a struct (resultSchema) to supportBatch
.
At least, that's what my testing seems to show. With this PR and spark.sql.parquet.enableNestedColumnVectorizedReader=false
, returningBatch is false for a non-complex case. But on master, returningBatch is true for the same test:
spark.range(0, 10).map { x => (x, x + 1, x + 3) }.toDF("a", "b", "c").
write.mode("overwrite").format("parquet").save("simple_parquet")
sql("select a, b from `parquet`.`simple_parquet`").collect
and...
bash-3.2$ git diff | cat
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index bd8d11d827..96182aafa9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -340,6 +340,7 @@ class ParquetFileFormat
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ print(s"returningBatch is $returningBatch\n")
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
bash-3.2$
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I think it should be:
schema.forall(f => ParquetFileFormat.isBatchReadSupported(conf, f.dataType))
Test build #146411 has finished for PR 34659 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #146490 has finished for PR 34659 at commit
|
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); | ||
StructType sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); | ||
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration); | ||
this.parquetColumn = converter.convertParquetColumn(requestedSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line has a performance impact on parquet files with very wide records.
Take, for example, with a parquet dataset with:
- 6000 bigint columns
- 25 files
- 5000 records per file (total of 125000 records)
Reading all the records in this parquet dataset with a single executor thread is 42% slower with this PR than with the baseline (commit f361ad8 on the master branch).
The main culprit is the n**2 issue here in ParquetToSparkSchemaConverter#convertInternal, which was not exercised anywhere except by unit tests until your PR (as far as I can tell).
By the way, even with a patch for the above n**2 issue, this PR is still 10% slower than the baseline when testing the wide row case. I haven't confirmed it, but I think this is due to the extra OnHeapColumnVector instances created in ParquetColumnVector, for which you already have a TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! let me try to reproduce the above scenario (it'd be great if you already have some code snippet to share) locally and see what's the causing the perf issue in ParquetToSparkSchemaConverter#convertInternal
. Ideally we can skip the code path if the schema doesn't contain complex types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see what's the causing the perf issue in ParquetToSparkSchemaConverter#convertInternal
It's definitely the n*n (or n*(n/2) on average) loop here:
val converted = (0 until groupColumn.getChildrenCount).map { i =>
val field = groupColumn.getChild(i)
val fieldFromReadSchema = sparkReadSchema.flatMap { schema =>
schema.find(f => isSameFieldName(f.name, field.getName, caseSensitive))
}
schema.find
should instead be a map lookup, or something like that.
it'd be great if you already have some code snippet to share
Sure, in a verbose state (edit: I ran with --master "local[1]"
):
// test for wide rows
val home_candidate = sys.env("HOME")
val home = if (!home_candidate.endsWith("/")) {
s"${home_candidate}/"
} else {
home_candidate
}
val width = 6000
val selectExpr = (1 to width).map { i =>
s"value as c$i"
}
import scala.util.Random
val r = new Random(65657652L)
val loopCount = 25
for (i <- 0 until loopCount) {
// println(s"iteration $i")
val df = spark.range(5000).map(_ => r.nextLong).toDF().selectExpr(selectExpr: _*)
val mode = if (i == 0) {
"overwrite"
} else {
"append"
}
df.coalesce(1).write.mode(mode).format("parquet").save(s"${home}data/wide_row_parquet")
}
// read test
val home_candidate = sys.env("HOME")
val home = if (!home_candidate.endsWith("/")) {
s"${home_candidate}/"
} else {
home_candidate
}
spark.read.parquet(s"${home}data/wide_row_parquet").createOrReplaceTempView("df")
val startTime = System.currentTimeMillis
sql("select * from df where (c5*c12) == 12").collect
(System.currentTimeMillis - startTime)/60.0/1000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @bersprockets ! let me address the issue and update the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR. I also re-run the above test which used to take 1.05m before the fix, and improved to 0.58m afterwards, which is very close to the number in master branch.
@bersprockets it'd be great if you can verify it too. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you've eliminated the schema lookup penalty.
There still seems to be the remaining 10-15% performance penalty. Earlier, I speculated on the cause of this particular penalty:
this PR is still 10% slower than the baseline when testing the wide row case. I haven't confirmed it, but I think this is due to the extra OnHeapColumnVector instances created in ParquetColumnVector, for which you already have a TODO.
So maybe that will get resolved once the TODO is resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, yes I'm aware of the other one: we should not need to allocate the vectors for definition & repetition levels when the schema is flat. I'm hoping to address this separately with another PR though - don't want to make this one too bloated :)
BTW @bersprockets : how can I reproduce the 10-15% performance penalty? I was using the above code snippet and got almost the same numbers on my machine with the latest fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can I reproduce the 10-15% performance penalty
Maybe my old 2015 2.5 GHz MBP just feels it more strongly.
I launched spark-shell it with this command line
bin/spark-shell --driver-memory 5g --master "local[1]"
Maybe the 5g makes a difference (e.g., more pressure on the garbage collector when there are lots of OnHeapColumnVector instances).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
11d7a00
to
cd6ec99
Compare
Hi, @sunchao . After rebasing and force-pushing, the PR branch is broken. Could you take a look?
|
isBatchReadSupported(sqlConf, mt.valueType) | ||
case st: StructType => | ||
sqlConf.parquetVectorizedReaderNestedColumnEnabled && | ||
st.fields.forall(f => isBatchReadSupported(sqlConf, f.dataType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should be 2 spaces here
true | ||
case at: ArrayType => | ||
sqlConf.parquetVectorizedReaderNestedColumnEnabled && | ||
isBatchReadSupported(sqlConf, at.elementType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should be 2 spaces here
isBatchReadSupported(sqlConf, at.elementType) | ||
case mt: MapType => | ||
sqlConf.parquetVectorizedReaderNestedColumnEnabled && | ||
isBatchReadSupported(sqlConf, mt.keyType) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me check all the places with 4 space indentation ... I need to fix my IntelliJ settings.
nulls.putNulls(state.valueOffset, num); | ||
state.valueOffset += num; | ||
} | ||
defLevels.putInts(state.levelOffset, num, currentValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 556 and 561 (state.valueOffset += num;
) can move before line 563
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 actually the comments here can also be removed
} | ||
defLevels.putInts(state.levelOffset, num, currentValue); | ||
break; | ||
case PACKED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 565 ~ 575 looks same as line 247 ~ 257 except for the comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me split this into a util method.
/** | ||
* Marks this column only contains null values. | ||
*/ | ||
public final void setAllNull() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
column.numNulls()
may still 0 after we callsetAllNull()
- do
column.setAllNull()
andcolumn.putXXX
, thencolumn.isAllNull
still true.
The above two scenes look strange
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is similar to setIsConstant
though: you can also call column.putXXX
after the call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let's ignore this for the time being
@dongjoon-hyun I think it's unrelated to this PR - looks like it's due to #36012 |
FYI I plan to open a follow-up PR to disable the config and also update related tests. |
Sounds good to me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks all. Merging to master/3.3. @sunchao Please remember to open a follow-up PR to disable the config and update related tests before 3.3 release. Thanks. |
### What changes were proposed in this pull request? This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes: 1. Added a new class `ParquetColumnVector` which encapsulates all the necessary information needed when reading a Parquet column, including the `ParquetColumn` for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels. 2. Changes are made in `VectorizedParquetRecordReader` to initialize a list of `ParquetColumnVector` for the columns read. 3. `VectorizedColumnReader` now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., `readBatch` versus `readBatchNested`. 4. Added logic to handle complex type in `VectorizedRleValuesReader`. For data types involving only struct or primitive types, it still goes with the old `readBatch` method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path `readBatchNested` is introduced to handle repetition levels. This PR also introduced a new flag `spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage. ### Why are the changes needed? Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map. ### Does this PR introduce _any_ user-facing change? With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config `spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn the feature on or off. ### How was this patch tested? Added new unit tests. Closes #34659 from sunchao/SPARK-34863-new. Authored-by: Chao Sun <sunchao@apple.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit deac8f9) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…olumnVectorizedReader` by default ### What changes were proposed in this pull request? This PR disables `spark.sql.parquet.enableNestedColumnVectorizedReader` by default. ### Why are the changes needed? In #34659 the config was turned mainly for testing reason. As the feature is new, we should turn it off by default. ### Does this PR introduce _any_ user-facing change? The config `spark.sql.parquet.enableNestedColumnVectorizedReader` is turned off by default now. ### How was this patch tested? Existing tests. Closes #36055 from sunchao/disable. Authored-by: Chao Sun <sunchao@apple.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 1b08673) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…olumnVectorizedReader` by default ### What changes were proposed in this pull request? This PR disables `spark.sql.parquet.enableNestedColumnVectorizedReader` by default. ### Why are the changes needed? In #34659 the config was turned mainly for testing reason. As the feature is new, we should turn it off by default. ### Does this PR introduce _any_ user-facing change? The config `spark.sql.parquet.enableNestedColumnVectorizedReader` is turned off by default now. ### How was this patch tested? Existing tests. Closes #36055 from sunchao/disable. Authored-by: Chao Sun <sunchao@apple.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request? This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes: 1. Added a new class `ParquetColumnVector` which encapsulates all the necessary information needed when reading a Parquet column, including the `ParquetColumn` for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels. 2. Changes are made in `VectorizedParquetRecordReader` to initialize a list of `ParquetColumnVector` for the columns read. 3. `VectorizedColumnReader` now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., `readBatch` versus `readBatchNested`. 4. Added logic to handle complex type in `VectorizedRleValuesReader`. For data types involving only struct or primitive types, it still goes with the old `readBatch` method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path `readBatchNested` is introduced to handle repetition levels. This PR also introduced a new flag `spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage. ### Why are the changes needed? Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map. ### Does this PR introduce _any_ user-facing change? With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config `spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn the feature on or off. ### How was this patch tested? Added new unit tests. Closes apache#34659 from sunchao/SPARK-34863-new. Authored-by: Chao Sun <sunchao@apple.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit deac8f9)
…olumnVectorizedReader` by default ### What changes were proposed in this pull request? This PR disables `spark.sql.parquet.enableNestedColumnVectorizedReader` by default. ### Why are the changes needed? In apache#34659 the config was turned mainly for testing reason. As the feature is new, we should turn it off by default. ### Does this PR introduce _any_ user-facing change? The config `spark.sql.parquet.enableNestedColumnVectorizedReader` is turned off by default now. ### How was this patch tested? Existing tests. Closes apache#36055 from sunchao/disable. Authored-by: Chao Sun <sunchao@apple.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 1b08673)
What changes were proposed in this pull request?
This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes:
ParquetColumnVector
which encapsulates all the necessary information needed when reading a Parquet column, including theParquetColumn
for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels.VectorizedParquetRecordReader
to initialize a list ofParquetColumnVector
for the columns read.VectorizedColumnReader
now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g.,readBatch
versusreadBatchNested
.VectorizedRleValuesReader
. For data types involving only struct or primitive types, it still goes with the oldreadBatch
method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code pathreadBatchNested
is introduced to handle repetition levels.This PR also introduced a new flag
spark.sql.parquet.enableNestedColumnVectorizedReader
which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage.Why are the changes needed?
Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map.
Does this PR introduce any user-facing change?
With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config
spark.sql.parquet.enableNestedColumnVectorizedReader
is introduced to turn the feature on or off.How was this patch tested?
Added new unit tests.