Skip to content

Commit

Permalink
init commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Mar 16, 2022
1 parent 6aea907 commit f968791
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,16 @@ final class ParquetColumnVector {
int capacity,
MemoryMode memoryMode,
Set<ParquetColumn> missingColumns) {
this(column, vector, capacity, memoryMode, missingColumns, true);
}

ParquetColumnVector(
ParquetColumn column,
WritableColumnVector vector,
int capacity,
MemoryMode memoryMode,
Set<ParquetColumn> missingColumns,
boolean isTopLevel) {
DataType sparkType = column.sparkType();
if (!sparkType.sameType(vector.dataType())) {
throw new IllegalArgumentException("Spark type: " + sparkType +
Expand All @@ -78,28 +87,35 @@ final class ParquetColumnVector {
}

if (isPrimitive) {
// TODO: avoid allocating these if not necessary, for instance, the node is of top-level
// and is not repeated, or the node is not top-level but its max repetition level is 0.
repetitionLevels = allocateLevelsVector(capacity, memoryMode);
definitionLevels = allocateLevelsVector(capacity, memoryMode);
if (column.repetitionLevel() > 0) {
repetitionLevels = allocateLevelsVector(capacity, memoryMode);
}
// We don't need to create and store definition levels if the column is top-level.
if (!isTopLevel) {
definitionLevels = allocateLevelsVector(capacity, memoryMode);
}
} else {
Preconditions.checkArgument(column.children().size() == vector.getNumChildren());
boolean allChildrenAreMissing = true;

for (int i = 0; i < column.children().size(); i++) {
ParquetColumnVector childCv = new ParquetColumnVector(column.children().apply(i),
vector.getChild(i), capacity, memoryMode, missingColumns);
vector.getChild(i), capacity, memoryMode, missingColumns, false);
children.add(childCv);


// Only use levels from non-missing child, this can happen if only some but not all
// fields of a struct are missing.
if (!childCv.vector.isAllNull()) {
allChildrenAreMissing = false;
this.repetitionLevels = childCv.repetitionLevels;
this.definitionLevels = childCv.definitionLevels;
}
}

// This can happen if all the fields of a struct are missing, in which case we should mark
// the struct itself as a missing column
if (repetitionLevels == null) {
if (allChildrenAreMissing) {
vector.setAllNull();
}
}
Expand Down Expand Up @@ -162,8 +178,12 @@ void reset() {
if (vector.isAllNull()) return;

vector.reset();
repetitionLevels.reset();
definitionLevels.reset();
if (repetitionLevels != null) {
repetitionLevels.reset();
}
if (definitionLevels != null) {
definitionLevels.reset();
}
for (ParquetColumnVector child : children) {
child.reset();
}
Expand Down Expand Up @@ -287,7 +307,8 @@ private void calculateStructOffsets() {
vector.reserve(definitionLevels.getElementsAppended());

int rowId = 0;
boolean hasRepetitionLevels = repetitionLevels.getElementsAppended() > 0;
boolean hasRepetitionLevels =
repetitionLevels != null && repetitionLevels.getElementsAppended() > 0;
for (int i = 0; i < definitionLevels.getElementsAppended(); i++) {
// If repetition level > maxRepetitionLevel, the value is a nested element (e.g., an array
// element in struct<array<int>>), and we should skip the definition level since it doesn't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ public void readBatch(
WritableColumnVector defLevels,
VectorizedValuesReader valueReader,
ParquetVectorUpdater updater) {
readBatchInternal(state, values, values, defLevels, valueReader, updater);
if (defLevels == null) {
readBatchInternal(state, values, values, valueReader, updater);
} else {
readBatchInternalWithDefLevels(state, values, values, defLevels, valueReader, updater);
}
}

/**
Expand All @@ -183,11 +187,94 @@ public void readIntegers(
WritableColumnVector nulls,
WritableColumnVector defLevels,
VectorizedValuesReader valueReader) {
readBatchInternal(state, values, nulls, defLevels, valueReader,
new ParquetVectorUpdaterFactory.IntegerUpdater());
if (defLevels == null) {
readBatchInternal(state, values, nulls, valueReader,
new ParquetVectorUpdaterFactory.IntegerUpdater());
} else {
readBatchInternalWithDefLevels(state, values, nulls, defLevels, valueReader,
new ParquetVectorUpdaterFactory.IntegerUpdater());
}
}

private void readBatchInternal(
ParquetReadState state,
WritableColumnVector values,
WritableColumnVector nulls,
VectorizedValuesReader valueReader,
ParquetVectorUpdater updater) {

long rowId = state.rowId;
int leftInBatch = state.rowsToReadInBatch;
int leftInPage = state.valuesToReadInPage;

while (leftInBatch > 0 && leftInPage > 0) {
if (currentCount == 0 && !readNextGroup()) break;
int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));

long rangeStart = state.currentRangeStart();
long rangeEnd = state.currentRangeEnd();

if (rowId + n < rangeStart) {
skipValues(n, state, valueReader, updater);
rowId += n;
leftInPage -= n;
} else if (rowId > rangeEnd) {
state.nextRange();
} else {
// The range [rowId, rowId + n) overlaps with the current row range in state
long start = Math.max(rangeStart, rowId);
long end = Math.min(rangeEnd, rowId + n - 1);

// Skip the part [rowId, start)
int toSkip = (int) (start - rowId);
if (toSkip > 0) {
skipValues(toSkip, state, valueReader, updater);
rowId += toSkip;
leftInPage -= toSkip;
}

// Read the part [start, end]
n = (int) (end - start + 1);

switch (mode) {
case RLE:
if (currentValue == state.maxDefinitionLevel) {
updater.readValues(n, state.valueOffset, values, valueReader);
state.valueOffset += n;
} else if (!state.isRequired && currentValue == state.maxDefinitionLevel - 1) {
// Only add null if this represents a null element, but not for the case where a
// struct itself is null
nulls.putNulls(state.valueOffset, n);
state.valueOffset += n;
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
int value = currentBuffer[currentBufferIdx++];
if (value == state.maxDefinitionLevel) {
updater.readValue(state.valueOffset++, values, valueReader);
} else {
// Only add null if this represents a null element, but not for the case where a
// struct itself is null
nulls.putNull(state.valueOffset++);
}
}
break;
}
state.levelOffset += n;
leftInBatch -= n;
rowId += n;
leftInPage -= n;
currentCount -= n;
}
}

state.rowsToReadInBatch = leftInBatch;
state.valuesToReadInPage = leftInPage;
state.rowId = rowId;
}

private void readBatchInternalWithDefLevels(
ParquetReadState state,
WritableColumnVector values,
WritableColumnVector nulls,
Expand Down

0 comments on commit f968791

Please sign in to comment.