From f968791635db1d679638511fc4ff01d2b1eccff0 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 16 Mar 2022 16:46:45 -0700 Subject: [PATCH] init commit --- .../parquet/ParquetColumnVector.java | 39 ++++++-- .../parquet/VectorizedRleValuesReader.java | 93 ++++++++++++++++++- 2 files changed, 120 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java index 2fb0517e7ec95..61900f46b82b1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java @@ -60,7 +60,16 @@ final class ParquetColumnVector { int capacity, MemoryMode memoryMode, Set missingColumns) { + this(column, vector, capacity, memoryMode, missingColumns, true); + } + ParquetColumnVector( + ParquetColumn column, + WritableColumnVector vector, + int capacity, + MemoryMode memoryMode, + Set missingColumns, + boolean isTopLevel) { DataType sparkType = column.sparkType(); if (!sparkType.sameType(vector.dataType())) { throw new IllegalArgumentException("Spark type: " + sparkType + @@ -78,20 +87,27 @@ final class ParquetColumnVector { } if (isPrimitive) { - // TODO: avoid allocating these if not necessary, for instance, the node is of top-level - // and is not repeated, or the node is not top-level but its max repetition level is 0. - repetitionLevels = allocateLevelsVector(capacity, memoryMode); - definitionLevels = allocateLevelsVector(capacity, memoryMode); + if (column.repetitionLevel() > 0) { + repetitionLevels = allocateLevelsVector(capacity, memoryMode); + } + // We don't need to create and store definition levels if the column is top-level. + if (!isTopLevel) { + definitionLevels = allocateLevelsVector(capacity, memoryMode); + } } else { Preconditions.checkArgument(column.children().size() == vector.getNumChildren()); + boolean allChildrenAreMissing = true; + for (int i = 0; i < column.children().size(); i++) { ParquetColumnVector childCv = new ParquetColumnVector(column.children().apply(i), - vector.getChild(i), capacity, memoryMode, missingColumns); + vector.getChild(i), capacity, memoryMode, missingColumns, false); children.add(childCv); + // Only use levels from non-missing child, this can happen if only some but not all // fields of a struct are missing. if (!childCv.vector.isAllNull()) { + allChildrenAreMissing = false; this.repetitionLevels = childCv.repetitionLevels; this.definitionLevels = childCv.definitionLevels; } @@ -99,7 +115,7 @@ final class ParquetColumnVector { // This can happen if all the fields of a struct are missing, in which case we should mark // the struct itself as a missing column - if (repetitionLevels == null) { + if (allChildrenAreMissing) { vector.setAllNull(); } } @@ -162,8 +178,12 @@ void reset() { if (vector.isAllNull()) return; vector.reset(); - repetitionLevels.reset(); - definitionLevels.reset(); + if (repetitionLevels != null) { + repetitionLevels.reset(); + } + if (definitionLevels != null) { + definitionLevels.reset(); + } for (ParquetColumnVector child : children) { child.reset(); } @@ -287,7 +307,8 @@ private void calculateStructOffsets() { vector.reserve(definitionLevels.getElementsAppended()); int rowId = 0; - boolean hasRepetitionLevels = repetitionLevels.getElementsAppended() > 0; + boolean hasRepetitionLevels = + repetitionLevels != null && repetitionLevels.getElementsAppended() > 0; for (int i = 0; i < definitionLevels.getElementsAppended(); i++) { // If repetition level > maxRepetitionLevel, the value is a nested element (e.g., an array // element in struct>), and we should skip the definition level since it doesn't diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 3eef20577610d..e03b3c20fb3bc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -170,7 +170,11 @@ public void readBatch( WritableColumnVector defLevels, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) { - readBatchInternal(state, values, values, defLevels, valueReader, updater); + if (defLevels == null) { + readBatchInternal(state, values, values, valueReader, updater); + } else { + readBatchInternalWithDefLevels(state, values, values, defLevels, valueReader, updater); + } } /** @@ -183,11 +187,94 @@ public void readIntegers( WritableColumnVector nulls, WritableColumnVector defLevels, VectorizedValuesReader valueReader) { - readBatchInternal(state, values, nulls, defLevels, valueReader, - new ParquetVectorUpdaterFactory.IntegerUpdater()); + if (defLevels == null) { + readBatchInternal(state, values, nulls, valueReader, + new ParquetVectorUpdaterFactory.IntegerUpdater()); + } else { + readBatchInternalWithDefLevels(state, values, nulls, defLevels, valueReader, + new ParquetVectorUpdaterFactory.IntegerUpdater()); + } } private void readBatchInternal( + ParquetReadState state, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + + long rowId = state.rowId; + int leftInBatch = state.rowsToReadInBatch; + int leftInPage = state.valuesToReadInPage; + + while (leftInBatch > 0 && leftInPage > 0) { + if (currentCount == 0 && !readNextGroup()) break; + int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); + + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + if (rowId + n < rangeStart) { + skipValues(n, state, valueReader, updater); + rowId += n; + leftInPage -= n; + } else if (rowId > rangeEnd) { + state.nextRange(); + } else { + // The range [rowId, rowId + n) overlaps with the current row range in state + long start = Math.max(rangeStart, rowId); + long end = Math.min(rangeEnd, rowId + n - 1); + + // Skip the part [rowId, start) + int toSkip = (int) (start - rowId); + if (toSkip > 0) { + skipValues(toSkip, state, valueReader, updater); + rowId += toSkip; + leftInPage -= toSkip; + } + + // Read the part [start, end] + n = (int) (end - start + 1); + + switch (mode) { + case RLE: + if (currentValue == state.maxDefinitionLevel) { + updater.readValues(n, state.valueOffset, values, valueReader); + state.valueOffset += n; + } else if (!state.isRequired && currentValue == state.maxDefinitionLevel - 1) { + // Only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNulls(state.valueOffset, n); + state.valueOffset += n; + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + int value = currentBuffer[currentBufferIdx++]; + if (value == state.maxDefinitionLevel) { + updater.readValue(state.valueOffset++, values, valueReader); + } else { + // Only add null if this represents a null element, but not for the case where a + // struct itself is null + nulls.putNull(state.valueOffset++); + } + } + break; + } + state.levelOffset += n; + leftInBatch -= n; + rowId += n; + leftInPage -= n; + currentCount -= n; + } + } + + state.rowsToReadInBatch = leftInBatch; + state.valuesToReadInPage = leftInPage; + state.rowId = rowId; + } + + private void readBatchInternalWithDefLevels( ParquetReadState state, WritableColumnVector values, WritableColumnVector nulls,