diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java index 936fa2ce78cd8..b8d375ee25592 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java @@ -39,16 +39,18 @@ public class VectorizedDeltaByteArrayReader extends VectorizedReaderBase private final VectorizedDeltaBinaryPackedReader prefixLengthReader; private final VectorizedDeltaLengthByteArrayReader suffixReader; private WritableColumnVector prefixLengthVector; - private ByteBuffer previous = null; + private ByteBuffer previous; private int currentRow = 0; // temporary variable used by getBinary private final WritableColumnVector binaryValVector; + private final WritableColumnVector tempBinaryValVector; VectorizedDeltaByteArrayReader() { this.prefixLengthReader = new VectorizedDeltaBinaryPackedReader(); this.suffixReader = new VectorizedDeltaLengthByteArrayReader(); binaryValVector = new OnHeapColumnVector(1, BinaryType); + tempBinaryValVector = new OnHeapColumnVector(1, BinaryType); } @Override @@ -62,12 +64,11 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce @Override public Binary readBinary(int len) { - readValues(1, binaryValVector, 0, ByteBufferOutputWriter::writeArrayByteBuffer); + readValues(1, binaryValVector, 0); return Binary.fromConstantByteArray(binaryValVector.getBinary(0)); } - private void readValues(int total, WritableColumnVector c, int rowId, - ByteBufferOutputWriter outputWriter) { + private void readValues(int total, WritableColumnVector c, int rowId) { for (int i = 0; i < total; i++) { // NOTE: due to PARQUET-246, it is important that we // respect prefixLength which was read from prefixLengthReader, @@ -81,29 +82,21 @@ private void readValues(int total, WritableColumnVector c, int rowId, int length = prefixLength + suffixLength; // We have to do this to materialize the output + WritableColumnVector arrayData = c.arrayData(); + int offset = arrayData.getElementsAppended(); if (prefixLength != 0) { - // We could do - // c.putByteArray(rowId + i, previous, 0, prefixLength); - // c.putByteArray(rowId+i, suffix, prefixLength, suffix.length); - // previous = c.getBinary(rowId+1); - // but it incurs the same cost of copying the values twice _and_ c.getBinary - // is a _slow_ byte by byte copy - // The following always uses the faster system arraycopy method - byte[] out = new byte[length]; - System.arraycopy(previous.array(), previous.position(), out, 0, prefixLength); - System.arraycopy(suffixArray, suffix.position(), out, prefixLength, suffixLength); - previous = ByteBuffer.wrap(out); - } else { - previous = suffix; + arrayData.appendBytes(prefixLength, previous.array(), previous.position()); } - outputWriter.write(c, rowId + i, previous, previous.limit() - previous.position()); + arrayData.appendBytes(suffixLength, suffixArray, suffix.position()); + c.putArray(rowId + i, offset, length); + previous = arrayData.getBytesUnsafe(offset, length); currentRow++; } } @Override public void readBinary(int total, WritableColumnVector c, int rowId) { - readValues(total, c, rowId, ByteBufferOutputWriter::writeArrayByteBuffer); + readValues(total, c, rowId); } /** @@ -121,9 +114,29 @@ public void setPreviousReader(ValuesReader reader) { @Override public void skipBinary(int total) { - // we have to read all the values so that we always have the correct 'previous' - // we just don't write it to the output vector - readValues(total, null, currentRow, ByteBufferOutputWriter::skipWrite); + WritableColumnVector c1 = tempBinaryValVector; + WritableColumnVector c2 = binaryValVector; + + for (int i = 0; i < total; i++) { + int prefixLength = prefixLengthVector.getInt(currentRow); + ByteBuffer suffix = suffixReader.getBytes(currentRow); + byte[] suffixArray = suffix.array(); + int suffixLength = suffix.limit() - suffix.position(); + int length = prefixLength + suffixLength; + + WritableColumnVector arrayData = c1.arrayData(); + c1.reset(); + if (prefixLength != 0) { + arrayData.appendBytes(prefixLength, previous.array(), previous.position()); + } + arrayData.appendBytes(suffixLength, suffixArray, suffix.position()); + previous = arrayData.getBytesUnsafe(0, length); + currentRow++; + + WritableColumnVector tmp = c1; + c1 = c2; + c2 = tmp; + } } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index bbe96819a618b..a7abc5a53bddd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -221,6 +221,13 @@ protected UTF8String getBytesAsUTF8String(int rowId, int count) { return UTF8String.fromAddress(null, data + rowId, count); } + @Override + public ByteBuffer getBytesUnsafe(int rowId, int count) { + byte[] array = new byte[count]; + Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count); + return ByteBuffer.wrap(array); + } + // // APIs dealing with shorts // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 833a93f2a2bdb..10ff78e38de49 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -219,6 +219,12 @@ protected UTF8String getBytesAsUTF8String(int rowId, int count) { return UTF8String.fromBytes(byteData, rowId, count); } + @Override + public ByteBuffer getBytesUnsafe(int rowId, int count) { + return ByteBuffer.wrap(byteData, rowId, count); + } + + // // APIs dealing with Shorts // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 5e01c372793f1..4c0d58dc5be45 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -18,6 +18,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteBuffer; import com.google.common.annotations.VisibleForTesting; @@ -443,6 +444,8 @@ public byte[] getBinary(int rowId) { } } + public abstract ByteBuffer getBytesUnsafe(int rowId, int count); + /** * Append APIs. These APIs all behave similarly and will append data to the current vector. It * is not valid to mix the put and append APIs. The append APIs are slower and should only be