Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37974][SQL] Implement vectorized DELTA_BYTE_ARRAY and DELTA_LENGTH_BYTE_ARRAY encodings for Parquet V2 support #35262

Closed
wants to merge 20 commits into from

Conversation

parthchandra
Copy link
Contributor

What changes were proposed in this pull request?

This PR provides a vectorized implementation of the DELTA_BYTE_ARRAY encoding of Parquet V2. The PR also implements the DELTA_LENGTH_BYTE_ARRAY encoding which is needed by the former.

Why are the changes needed?

The current support for Parquet V2 in the vectorized reader uses a non-vectorized version of the above encoding and needs to be vectorized.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Reproduces all the tests for the encodings from the Parquet implementation. Also adds more cases to the Parquet Encoding test suite.

@github-actions github-actions bot added the SQL label Jan 20, 2022
@parthchandra
Copy link
Contributor Author

@sunchao, @dongjoon-hyun, @viirya, @LuciferYang could you please review?

@sunchao
Copy link
Member

sunchao commented Jan 20, 2022

Will do, @parthchandra could you use a different JIRA for this?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @parthchandra .

@viirya
Copy link
Member

viirya commented Jan 20, 2022

Yea, this looks more than a follow up.

@parthchandra parthchandra changed the title [SPARK-36879][SQL][FOLLOWUP] Support Parquet v2 data page encodings for the vectorized path [SPARK-37974][SQL] Implement vectorized DELTA_BYTE_ARRAY and DELTA_LENGTH_BYTE_ARRAY encodings for Parquet V2 support Jan 20, 2022
@parthchandra
Copy link
Contributor Author

Created a new JIRA and modified the commit message(s) and the PR title.


@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
deltaByteArrayReader.initFromPage(valueCount, in);
this.valueCount = valueCount;
Copy link
Contributor

@LuciferYang LuciferYang Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... may be a stupid question, why does VectorizedDeltaByteArrayReader need to hold valueCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope it doesn't. Removed

private int currentRow = 0;

//temporary variable used by getBinary
Binary binaryVal;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@Override
public Binary readBinary(int len) {
return deltaByteArrayReader.readBytes();
readValues(1, null, 0,
(w, r, v, l) ->
Copy link
Contributor

@LuciferYang LuciferYang Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this lambda create a new object every time when readBinary is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really hope not. AFAIK, lambdas are highly optimized to not incur object creation overhead. I'm not sure if the function call overhead might also be eliminated by inlining.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rednaxelafx , can you help check whether multiple objects or one object will be generated In this lambda scene?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed parquet v2 pages - delta encoding in ParquetEncodingSuite into a circular query

 while (true) {
          val actual = spark.read.parquet(path).collect()
          assert(actual.sortBy(_.getInt(0)) === data.map(Row.fromTuple))
        }

and dumped the memory many times.

Then I found there are many object of class

org.apache.spark.sql.execution.datasources.parquet.VectorizedDeltaByteArrayReader$$Lambda$3232 in memory dump:

image

image

image

It seems that because the lambda involves an external variable binaryVal, a new object will be generated every time when the method called @parthchandra ,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you say makes sense: that the reference to the external variable will cause multiple object instantiation. Thank you for doing this reasearch!
I tried something similar but with the unit test in ParquetEncodingSuite and see only a single instance of the lambda created (not sure why).
I've changed the code to use a WritableVector of size 1 which eliminates the need to access the variable directly.

interface ByteBufferOutputWriter {
void write(WritableColumnVector c, int rowId, ByteBuffer val, int length);

static void writeArrayByteBuffer(WritableColumnVector c, int rowId, ByteBuffer val,
Copy link
Contributor

@LuciferYang LuciferYang Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good practice to add static methods to interface? I'm not sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it is frowned upon. In this case, not including in the interface only leads to some code bloat.

@@ -283,25 +294,30 @@ private void initDataReader(
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
previousReader != null && previousReader instanceof RequiresPreviousReader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is previousReader != null necessary?
previousReader instanceof RequiresPreviousReader can covered previousReader != null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it is not needed

@LuciferYang
Copy link
Contributor

LuciferYang commented Jan 21, 2022

Will this pr speed up string related benchmark result when use Parquet Data Page V2?

val data = (1 to 8197).map { i =>
( i,
i.toLong, i.toShort, Array[Byte](i.toByte),
if (i % 2 == 1) s"test_${i}" else null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_$i

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

VectorizedValuesReader {

private final MemoryMode memoryMode;
private int valueCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this valueCount can also be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@LuciferYang
Copy link
Contributor

LuciferYang commented Jan 25, 2022

image

@parthchandra Can you update the benchmark running with Java 8 again? The marked data is much slower than before. I'm not sure whether this data is reasonable

@@ -93,6 +96,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
HadoopInputFile.fromPath(file, configuration), options);
this.reader = new ParquetRowGroupReaderImpl(fileReader);
this.fileSchema = fileReader.getFileMetaData().getSchema();
try {
this.writerVersion = VersionParser.parse(fileReader.getFileMetaData().getCreatedBy());
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will other types of exceptions be thrown here, except VersionParseException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yes. I encountered at least one case where the version information was empty and the version check threw a NPE.

@parthchandra
Copy link
Contributor Author

Updated the JDK 8 benchmark results as well.

@LuciferYang
Copy link
Contributor

LuciferYang commented Jan 26, 2022

Updated the JDK 8 benchmark results as well.

After comparing the new bench data, I find that the data corresponding to Parquet Data Page V2 in the two test cases String with Nulls Scan (50.0%) and String with Nulls Scan (95.0%) is relatively slower than the previous pr (although the CPU frequency of the testing machine is reduced):

before after
String with Nulls Scan (50.0%) 145.7 ns/per row 228.7 ns/per row
String with Nulls Scan (95.0%) 25.2 ns/per row 77.9 ns/per row

@parthchandra
Copy link
Contributor Author

parthchandra commented Jan 26, 2022

Updated the JDK 8 benchmark results as well.

After comparing the new bench data, I find that the data corresponding to Parquet Data Page V2 in the two test cases String with Nulls Scan (50.0%) and String with Nulls Scan (95.0%) is relatively slower than the previous pr (although the CPU frequency of the testing machine is reduced):

before after
String with Nulls Scan (50.0%) 145.7 ns/per row 228.7 ns/per row
String with Nulls Scan (95.0%) 25.2 ns/per row 77.9 ns/per row

It's hard to reasonably compare the numbers across runs (even though the difference is substantial) because of the difference in the environment.
Incidentally, with nulls, the decoder doesn't even get called so such a precipitous drop is somewhat suspicious. And it appears that the vectorized decoder is being called one record at a time (this may not be a problem because the decoding has mostly been done though not written into the output vector).
I made a change to determine runs of null/non-null values and increase the number of values being written out to the output vector in each call, but saw no significant change (running benchmark on laptop).
See:

Let me do a profile run to see if any obvious bottlenecks stand out.

@LuciferYang
Copy link
Contributor

@parthchandra I think we should add some UTs similar to String with Nulls Scan because when I add

sparkSession.conf.set(SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key, "true")

to DataSourceReadBenchmark to enable ColumnVector use offheap memory, String with Nulls Scan releated cases will failed as follows:

14:33:29.271 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 5043.0 (TID 3936)
org.apache.spark.sql.execution.QueryExecutionException: Encountered error while reading file file:///private/var/folders/0x/xj61_dbd0dldn793s6cyb7rr0000gp/T/spark-a6065795-c141-43cd-8ec6-359f3f3a0307/parquetV2/part-00000-7c6de322-95b1-4283-9399-8306753c68ab-c000.snappy.parquet. Details: 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:659) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116) ~[classes/:?]
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:546) ~[classes/:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[classes/:?]
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) ~[classes/:?]
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) ~[classes/:?]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[classes/:?]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[classes/:?]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[classes/:?]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[classes/:?]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507) ~[classes/:?]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1475) ~[classes/:?]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510) [classes/:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: org.apache.parquet.io.ParquetDecodingException: Failed to read 268435456 bytes
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedDeltaLengthByteArrayReader.readBinary(VectorizedDeltaLengthByteArrayReader.java:79) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedDeltaByteArrayReader.initFromPage(VectorizedDeltaByteArrayReader.java:76) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.initDataReader(VectorizedColumnReader.java:293) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV2(VectorizedColumnReader.java:362) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.access$100(VectorizedColumnReader.java:52) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:260) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:247) ~[classes/:?]
	at org.apache.parquet.column.page.DataPageV2.accept(DataPageV2.java:192) ~[parquet-column-1.12.2.jar:1.12.2]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:247) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:183) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:311) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:209) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116) ~[classes/:?]
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:274) ~[classes/:?]
	... 19 more

I manually verified that there was no such problem before this pr

@Override
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
if (memoryMode == MemoryMode.OFF_HEAP) {
lengthsVector = new OffHeapColumnVector(valueCount, IntegerType);
Copy link
Contributor

@LuciferYang LuciferYang Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should call lengthsVector.putInts(0, valueCount, 0); to ensure this init value of OffHeapColumnVector, or use other ways to avoid reading unexpected values in line 75 when memoryMode is MemoryMode.OFF_HEAP

@parthchandra
Copy link
Contributor Author

Thank you for finding this issue! Let me address this and add the unit test(s) as well


@Override
public void skipBinary(int total) {
if (total == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(),
prefixLengthVector, 0);
suffixReader.initFromPage(valueCount, in);
suffixReader.readBinary(prefixLengthReader.getTotalValueCount(), suffixVector, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of eagerly read the suffixes, we can have a method in VectorizedDeltaLengthByteArrayReader that just return the suffix at rowId:

  public ByteBuffer getBytes(int rowId) {
    int length = lengthsVector.getInt(rowId);
    try {
      return in.slice(length);
    } catch (EOFException e) {
      throw new ParquetDecodingException("Failed to read " + length + " bytes");
    }
  }

I tried this approach here, and it can improve the benchmark.

// but it incurs the same cost of copying the values twice _and_ c.getBinary
// is a _slow_ byte by byte copy
// The following always uses the faster system arraycopy method
byte[] out = new byte[length];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also potentially skip this copying at least for OnHeapColumnVector. I tried it and it gives some extra performance improvements.

[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (0.0%):            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV                                            5721           5727           8          1.8         545.6       1.0X
[info] SQL Json                                           6289           6295           9          1.7         599.7       0.9X
[info] SQL Parquet Vectorized: DataPageV1                  700            800          87         15.0          66.7       8.2X
[info] SQL Parquet Vectorized: DataPageV2                  994           1031          52         10.5          94.8       5.8X
[info] SQL Parquet MR: DataPageV1                         2035           2051          23          5.2         194.1       2.8X
[info] SQL Parquet MR: DataPageV2                         2289           2454         232          4.6         218.3       2.5X
[info] ParquetReader Vectorized: DataPageV1                472            482          15         22.2          45.0      12.1X
[info] ParquetReader Vectorized: DataPageV2                640            645           4         16.4          61.0       8.9X
[info] SQL ORC Vectorized                                  670            694          35         15.7          63.9       8.5X
[info] SQL ORC MR                                         1846           2047         284          5.7         176.0       3.1X

[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (50.0%):           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV                                            4825           4890          91          2.2         460.2       1.0X
[info] SQL Json                                           5298           7385        2951          2.0         505.3       0.9X
[info] SQL Parquet Vectorized: DataPageV1                  701            889         169         14.9          66.9       6.9X
[info] SQL Parquet Vectorized: DataPageV2                  684            737          58         15.3          65.2       7.1X
[info] SQL Parquet MR: DataPageV1                         1857           1869          17          5.6         177.1       2.6X
[info] SQL Parquet MR: DataPageV2                         2034           2146         159          5.2         193.9       2.4X
[info] ParquetReader Vectorized: DataPageV1                474            493          11         22.1          45.2      10.2X
[info] ParquetReader Vectorized: DataPageV2                585            586           1         17.9          55.8       8.2X
[info] SQL ORC Vectorized                                  810            845          53         12.9          77.3       6.0X
[info] SQL ORC MR                                         1854           1935         114          5.7         176.8       2.6X

[info] OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] String with Nulls Scan (95.0%):           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV                                            3212           3256          63          3.3         306.3       1.0X
[info] SQL Json                                           3693           3695           3          2.8         352.2       0.9X
[info] SQL Parquet Vectorized: DataPageV1                  147            203          46         71.2          14.0      21.8X
[info] SQL Parquet Vectorized: DataPageV2                  160            286         144         65.4          15.3      20.0X
[info] SQL Parquet MR: DataPageV1                         1229           1351         172          8.5         117.2       2.6X
[info] SQL Parquet MR: DataPageV2                         1074           1099          36          9.8         102.4       3.0X
[info] ParquetReader Vectorized: DataPageV1                107            109           2         97.9          10.2      30.0X
[info] ParquetReader Vectorized: DataPageV2                124            127           2         84.7          11.8      25.9X
[info] SQL ORC Vectorized                                  262            308          86         40.0          25.0      12.3X
[info] SQL ORC MR                                         1002           1070          96         10.5          95.5       3.2X
``

@parthchandra
Copy link
Contributor Author

parthchandra commented Mar 7, 2022

@sunchao I merged your changes into the PR. Also updated the benchmarks.

@LuciferYang
Copy link
Contributor

@sunchao should we continue this?

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

cc @cloud-fan @sadikovi if you want to take another look.

private ByteBuffer previous;
private int currentRow = 0;

// temporary variable used by getBinary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getBinary -> readBinary. Also can we add some comments on what tempBinaryValVector is for?

@@ -443,6 +444,8 @@ public UTF8String getUTF8String(int rowId) {
}
}

public abstract ByteBuffer getBytesUnsafe(int rowId, int count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add a few comments here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the method is misleading since there is a memory copy involved, it is just does not call System.arraycopy in OnHeapColumnVector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree as well. @sunchao given that this is from your patch, is it ok to change the name to say getByteBuffer ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think we can use getByteBuffer. the "unsafe" here is a bit confusing.

@@ -16,50 +16,126 @@
*/
package org.apache.spark.sql.execution.datasources.parquet;

import static org.apache.spark.sql.types.DataTypes.BinaryType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a clear import order definition for static import ? @sunchao @dongjoon-hyun

length = lengthsVector.getInt(currentRow + i);
int remaining = length;
while (remaining > 0) {
remaining -= in.skip(remaining);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss anything? Do we really need length here?

@parthchandra
Copy link
Contributor Author

Addressed the latest few comments.

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments, would appreciate it if you could take a look. Thanks.

@@ -283,6 +290,11 @@ private void initDataReader(
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this happen? Can you add a comment on why we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment. Detailed explanation is in the comment in VectorizedDeltaByteArrayReader.setPreviousValue

@@ -90,13 +90,18 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
Preconditions.checkArgument(miniSize % 8 == 0,
"miniBlockSize must be multiple of 8, but it's " + miniSize);
this.miniBlockSizeInValues = (int) miniSize;
// True value count. May be less than valueCount because of nulls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more useful to annotate the method getTotalValueCount instead of here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comment to getTotalValueCount as well.

@@ -283,6 +290,11 @@ private void initDataReader(
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
previousReader instanceof RequiresPreviousReader) {
// previous reader can only be set if reading sequentially
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: [P]revious.

private ByteBuffer previous;
private int currentRow = 0;

// temporary variable used by readBinary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Upper case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


// temporary variable used by readBinary
private final WritableColumnVector binaryValVector;
// temporary variable used by skipBinary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Upper case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

static void skipWrite(WritableColumnVector c, int rowId, ByteBuffer val, int length) { }

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

@@ -221,6 +221,13 @@ protected UTF8String getBytesAsUTF8String(int rowId, int count) {
return UTF8String.fromAddress(null, data + rowId, count);
}

@Override
public ByteBuffer getBytesUnsafe(int rowId, int count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace it with:

return ByteBuffer.wrap(getBytes(rowId, count));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but it would incur an additional function call in performance sensitive code. @sunchao ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems fine to use getBytes here also - the function call will be inlined by JIT if this is in a hot path.

@@ -443,6 +444,8 @@ public UTF8String getUTF8String(int rowId) {
}
}

public abstract ByteBuffer getBytesUnsafe(int rowId, int count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the method is misleading since there is a memory copy involved, it is just does not call System.arraycopy in OnHeapColumnVector.

i += skipCount + 1
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

// reads at least twice from the reader). This will catch any issues with state
// maintained by the reader(s)
// Add at least one string with a null
val data = (1 to 81971).map { i =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand how this number was chosen. Can you elaborate? Can we make it 2 * 4096 + 1 - would it work as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would. Changed. (Sorry that number sneaked in after I tested something else and forgot to undo it).

@parthchandra
Copy link
Contributor Author

Updated getBytesUnsafe to getByteBuffer and cleaned up the off-heap implementation.

@sunchao
Copy link
Member

sunchao commented Mar 17, 2022

Thanks @sadikovi for the review! looks like we need to fix lint. Since Spark 3.3 branch has been cut, I've asked in the dev mailing list to see if we can still include this in the release.

@parthchandra
Copy link
Contributor Author

@sunchao, @LuciferYang, @sadikovi thank you for your reviews!

@sunchao
Copy link
Member

sunchao commented Mar 31, 2022

Sorry for the delay. Going to merge this now since the PR is included in the the allowed list of Spark 3.3. The linter issue looks unrelated.

@sunchao sunchao closed this in af40145 Mar 31, 2022
sunchao added a commit that referenced this pull request Mar 31, 2022
…NGTH_BYTE_ARRAY encodings for Parquet V2 support

### What changes were proposed in this pull request?
This PR provides a vectorized implementation of the DELTA_BYTE_ARRAY encoding of Parquet V2. The PR also implements the DELTA_LENGTH_BYTE_ARRAY encoding which is needed by the former.

### Why are the changes needed?
The current support for Parquet V2 in the vectorized reader uses a non-vectorized version of the above encoding and needs to be vectorized.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Reproduces all the tests for the encodings from the Parquet implementation. Also adds more cases to the Parquet Encoding test suite.

Closes #35262 from parthchandra/SPARK-36879-PR3.

Lead-authored-by: Parth Chandra <parthc@apache.org>
Co-authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
@sunchao
Copy link
Member

sunchao commented Mar 31, 2022

Merged to master/3.3, thanks @parthchandra and all!!

@parthchandra
Copy link
Contributor Author

Thank you @sunchao , @LuciferYang , @sadikovi . I'll submit a few smaller, followup PRs for the issues that were deferred as a result of the review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants