-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37974][SQL] Implement vectorized DELTA_BYTE_ARRAY and DELTA_LENGTH_BYTE_ARRAY encodings for Parquet V2 support #35262
Conversation
@sunchao, @dongjoon-hyun, @viirya, @LuciferYang could you please review? |
Will do, @parthchandra could you use a different JIRA for this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @parthchandra .
Yea, this looks more than a follow up. |
4ada974
to
2c73794
Compare
Created a new JIRA and modified the commit message(s) and the PR title. |
|
||
@Override | ||
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { | ||
deltaByteArrayReader.initFromPage(valueCount, in); | ||
this.valueCount = valueCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... may be a stupid question, why does VectorizedDeltaByteArrayReader
need to hold valueCount
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope it doesn't. Removed
private int currentRow = 0; | ||
|
||
//temporary variable used by getBinary | ||
Binary binaryVal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
@Override | ||
public Binary readBinary(int len) { | ||
return deltaByteArrayReader.readBytes(); | ||
readValues(1, null, 0, | ||
(w, r, v, l) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this lambda create a new object every time when readBinary
is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really hope not. AFAIK, lambdas are highly optimized to not incur object creation overhead. I'm not sure if the function call overhead might also be eliminated by inlining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @rednaxelafx , can you help check whether multiple objects or one object will be generated In this lambda scene?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed parquet v2 pages - delta encoding
in ParquetEncodingSuite
into a circular query
while (true) {
val actual = spark.read.parquet(path).collect()
assert(actual.sortBy(_.getInt(0)) === data.map(Row.fromTuple))
}
and dumped the memory many times.
Then I found there are many object of class
org.apache.spark.sql.execution.datasources.parquet.VectorizedDeltaByteArrayReader$$Lambda$3232
in memory dump:
It seems that because the lambda involves an external variable binaryVal
, a new object will be generated every time when the method called @parthchandra ,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you say makes sense: that the reference to the external variable will cause multiple object instantiation. Thank you for doing this reasearch!
I tried something similar but with the unit test in ParquetEncodingSuite and see only a single instance of the lambda created (not sure why).
I've changed the code to use a WritableVector of size 1 which eliminates the need to access the variable directly.
interface ByteBufferOutputWriter { | ||
void write(WritableColumnVector c, int rowId, ByteBuffer val, int length); | ||
|
||
static void writeArrayByteBuffer(WritableColumnVector c, int rowId, ByteBuffer val, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a good practice to add static
methods to interface? I'm not sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if it is frowned upon. In this case, not including in the interface only leads to some code bloat.
@@ -283,25 +294,30 @@ private void initDataReader( | |||
} catch (IOException e) { | |||
throw new IOException("could not read page in col " + descriptor, e); | |||
} | |||
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) && | |||
previousReader != null && previousReader instanceof RequiresPreviousReader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is previousReader != null
necessary?
previousReader instanceof RequiresPreviousReader
can covered previousReader != null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it is not needed
Will this pr speed up string related benchmark result when use Parquet Data Page V2? |
val data = (1 to 8197).map { i => | ||
( i, | ||
i.toLong, i.toShort, Array[Byte](i.toByte), | ||
if (i % 2 == 1) s"test_${i}" else null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_$i
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
VectorizedValuesReader { | ||
|
||
private final MemoryMode memoryMode; | ||
private int valueCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this valueCount
can also be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@parthchandra Can you update the benchmark running with Java 8 again? The marked data is much slower than before. I'm not sure whether this data is reasonable |
@@ -93,6 +96,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont | |||
HadoopInputFile.fromPath(file, configuration), options); | |||
this.reader = new ParquetRowGroupReaderImpl(fileReader); | |||
this.fileSchema = fileReader.getFileMetaData().getSchema(); | |||
try { | |||
this.writerVersion = VersionParser.parse(fileReader.getFileMetaData().getCreatedBy()); | |||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will other types of exceptions be thrown here, except VersionParseException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well yes. I encountered at least one case where the version information was empty and the version check threw a NPE.
Updated the JDK 8 benchmark results as well. |
After comparing the new bench data, I find that the data corresponding to
|
It's hard to reasonably compare the numbers across runs (even though the difference is substantial) because of the difference in the environment. Line 237 in 6e64e92
Let me do a profile run to see if any obvious bottlenecks stand out. |
@parthchandra I think we should add some UTs similar to
to
I manually verified that there was no such problem before this pr |
@Override | ||
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { | ||
if (memoryMode == MemoryMode.OFF_HEAP) { | ||
lengthsVector = new OffHeapColumnVector(valueCount, IntegerType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should call lengthsVector.putInts(0, valueCount, 0);
to ensure this init value of OffHeapColumnVector, or use other ways to avoid reading unexpected values in line 75
when memoryMode is MemoryMode.OFF_HEAP
Thank you for finding this issue! Let me address this and add the unit test(s) as well |
|
||
@Override | ||
public void skipBinary(int total) { | ||
if (total == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(), | ||
prefixLengthVector, 0); | ||
suffixReader.initFromPage(valueCount, in); | ||
suffixReader.readBinary(prefixLengthReader.getTotalValueCount(), suffixVector, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of eagerly read the suffixes, we can have a method in VectorizedDeltaLengthByteArrayReader
that just return the suffix at rowId
:
public ByteBuffer getBytes(int rowId) {
int length = lengthsVector.getInt(rowId);
try {
return in.slice(length);
} catch (EOFException e) {
throw new ParquetDecodingException("Failed to read " + length + " bytes");
}
}
I tried this approach here, and it can improve the benchmark.
// but it incurs the same cost of copying the values twice _and_ c.getBinary | ||
// is a _slow_ byte by byte copy | ||
// The following always uses the faster system arraycopy method | ||
byte[] out = new byte[length]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also potentially skip this copying at least for OnHeapColumnVector
. I tried it and it gives some extra performance improvements.
[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV 5721 5727 8 1.8 545.6 1.0X
[info] SQL Json 6289 6295 9 1.7 599.7 0.9X
[info] SQL Parquet Vectorized: DataPageV1 700 800 87 15.0 66.7 8.2X
[info] SQL Parquet Vectorized: DataPageV2 994 1031 52 10.5 94.8 5.8X
[info] SQL Parquet MR: DataPageV1 2035 2051 23 5.2 194.1 2.8X
[info] SQL Parquet MR: DataPageV2 2289 2454 232 4.6 218.3 2.5X
[info] ParquetReader Vectorized: DataPageV1 472 482 15 22.2 45.0 12.1X
[info] ParquetReader Vectorized: DataPageV2 640 645 4 16.4 61.0 8.9X
[info] SQL ORC Vectorized 670 694 35 15.7 63.9 8.5X
[info] SQL ORC MR 1846 2047 284 5.7 176.0 3.1X
[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV 4825 4890 91 2.2 460.2 1.0X
[info] SQL Json 5298 7385 2951 2.0 505.3 0.9X
[info] SQL Parquet Vectorized: DataPageV1 701 889 169 14.9 66.9 6.9X
[info] SQL Parquet Vectorized: DataPageV2 684 737 58 15.3 65.2 7.1X
[info] SQL Parquet MR: DataPageV1 1857 1869 17 5.6 177.1 2.6X
[info] SQL Parquet MR: DataPageV2 2034 2146 159 5.2 193.9 2.4X
[info] ParquetReader Vectorized: DataPageV1 474 493 11 22.1 45.2 10.2X
[info] ParquetReader Vectorized: DataPageV2 585 586 1 17.9 55.8 8.2X
[info] SQL ORC Vectorized 810 845 53 12.9 77.3 6.0X
[info] SQL ORC MR 1854 1935 114 5.7 176.8 2.6X
[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV 3212 3256 63 3.3 306.3 1.0X
[info] SQL Json 3693 3695 3 2.8 352.2 0.9X
[info] SQL Parquet Vectorized: DataPageV1 147 203 46 71.2 14.0 21.8X
[info] SQL Parquet Vectorized: DataPageV2 160 286 144 65.4 15.3 20.0X
[info] SQL Parquet MR: DataPageV1 1229 1351 172 8.5 117.2 2.6X
[info] SQL Parquet MR: DataPageV2 1074 1099 36 9.8 102.4 3.0X
[info] ParquetReader Vectorized: DataPageV1 107 109 2 97.9 10.2 30.0X
[info] ParquetReader Vectorized: DataPageV2 124 127 2 84.7 11.8 25.9X
[info] SQL ORC Vectorized 262 308 86 40.0 25.0 12.3X
[info] SQL ORC MR 1002 1070 96 10.5 95.5 3.2X
``
@sunchao I merged your changes into the PR. Also updated the benchmarks. |
@sunchao should we continue this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
cc @cloud-fan @sadikovi if you want to take another look.
private ByteBuffer previous; | ||
private int currentRow = 0; | ||
|
||
// temporary variable used by getBinary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getBinary
-> readBinary
. Also can we add some comments on what tempBinaryValVector
is for?
@@ -443,6 +444,8 @@ public UTF8String getUTF8String(int rowId) { | |||
} | |||
} | |||
|
|||
public abstract ByteBuffer getBytesUnsafe(int rowId, int count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe add a few comments here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, the method is misleading since there is a memory copy involved, it is just does not call System.arraycopy in OnHeapColumnVector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree as well. @sunchao given that this is from your patch, is it ok to change the name to say getByteBuffer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think we can use getByteBuffer
. the "unsafe" here is a bit confusing.
@@ -16,50 +16,126 @@ | |||
*/ | |||
package org.apache.spark.sql.execution.datasources.parquet; | |||
|
|||
import static org.apache.spark.sql.types.DataTypes.BinaryType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a clear import order definition for static import ? @sunchao @dongjoon-hyun
length = lengthsVector.getInt(currentRow + i); | ||
int remaining = length; | ||
while (remaining > 0) { | ||
remaining -= in.skip(remaining); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did I miss anything? Do we really need length
here?
Addressed the latest few comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments, would appreciate it if you could take a look. Thanks.
@@ -283,6 +290,11 @@ private void initDataReader( | |||
} catch (IOException e) { | |||
throw new IOException("could not read page in col " + descriptor, e); | |||
} | |||
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this happen? Can you add a comment on why we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment. Detailed explanation is in the comment in VectorizedDeltaByteArrayReader.setPreviousValue
@@ -90,13 +90,18 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce | |||
Preconditions.checkArgument(miniSize % 8 == 0, | |||
"miniBlockSize must be multiple of 8, but it's " + miniSize); | |||
this.miniBlockSizeInValues = (int) miniSize; | |||
// True value count. May be less than valueCount because of nulls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more useful to annotate the method getTotalValueCount
instead of here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the comment to getTotalValueCount
as well.
@@ -283,6 +290,11 @@ private void initDataReader( | |||
} catch (IOException e) { | |||
throw new IOException("could not read page in col " + descriptor, e); | |||
} | |||
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) && | |||
previousReader instanceof RequiresPreviousReader) { | |||
// previous reader can only be set if reading sequentially |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: [P]revious.
private ByteBuffer previous; | ||
private int currentRow = 0; | ||
|
||
// temporary variable used by readBinary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Upper case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
// temporary variable used by readBinary | ||
private final WritableColumnVector binaryValVector; | ||
// temporary variable used by skipBinary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Upper case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
static void skipWrite(WritableColumnVector c, int rowId, ByteBuffer val, int length) { } | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -221,6 +221,13 @@ protected UTF8String getBytesAsUTF8String(int rowId, int count) { | |||
return UTF8String.fromAddress(null, data + rowId, count); | |||
} | |||
|
|||
@Override | |||
public ByteBuffer getBytesUnsafe(int rowId, int count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we replace it with:
return ByteBuffer.wrap(getBytes(rowId, count));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could, but it would incur an additional function call in performance sensitive code. @sunchao ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems fine to use getBytes
here also - the function call will be inlined by JIT if this is in a hot path.
@@ -443,6 +444,8 @@ public UTF8String getUTF8String(int rowId) { | |||
} | |||
} | |||
|
|||
public abstract ByteBuffer getBytesUnsafe(int rowId, int count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, the method is misleading since there is a memory copy involved, it is just does not call System.arraycopy in OnHeapColumnVector.
i += skipCount + 1 | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
// reads at least twice from the reader). This will catch any issues with state | ||
// maintained by the reader(s) | ||
// Add at least one string with a null | ||
val data = (1 to 81971).map { i => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand how this number was chosen. Can you elaborate? Can we make it 2 * 4096 + 1
- would it work as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would. Changed. (Sorry that number sneaked in after I tested something else and forgot to undo it).
Updated getBytesUnsafe to getByteBuffer and cleaned up the off-heap implementation. |
Thanks @sadikovi for the review! looks like we need to fix lint. Since Spark 3.3 branch has been cut, I've asked in the dev mailing list to see if we can still include this in the release. |
@sunchao, @LuciferYang, @sadikovi thank you for your reviews! |
Sorry for the delay. Going to merge this now since the PR is included in the the allowed list of Spark 3.3. The linter issue looks unrelated. |
…NGTH_BYTE_ARRAY encodings for Parquet V2 support ### What changes were proposed in this pull request? This PR provides a vectorized implementation of the DELTA_BYTE_ARRAY encoding of Parquet V2. The PR also implements the DELTA_LENGTH_BYTE_ARRAY encoding which is needed by the former. ### Why are the changes needed? The current support for Parquet V2 in the vectorized reader uses a non-vectorized version of the above encoding and needs to be vectorized. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Reproduces all the tests for the encodings from the Parquet implementation. Also adds more cases to the Parquet Encoding test suite. Closes #35262 from parthchandra/SPARK-36879-PR3. Lead-authored-by: Parth Chandra <parthc@apache.org> Co-authored-by: Chao Sun <sunchao@apache.org> Signed-off-by: Chao Sun <sunchao@apple.com>
Merged to master/3.3, thanks @parthchandra and all!! |
Thank you @sunchao , @LuciferYang , @sadikovi . I'll submit a few smaller, followup PRs for the issues that were deferred as a result of the review. |
What changes were proposed in this pull request?
This PR provides a vectorized implementation of the DELTA_BYTE_ARRAY encoding of Parquet V2. The PR also implements the DELTA_LENGTH_BYTE_ARRAY encoding which is needed by the former.
Why are the changes needed?
The current support for Parquet V2 in the vectorized reader uses a non-vectorized version of the above encoding and needs to be vectorized.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Reproduces all the tests for the encodings from the Parquet implementation. Also adds more cases to the Parquet Encoding test suite.