Skip to content

Commit

Permalink
DRILL-7919: Fix reading parquet with decimal dictionary encoding (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
vvysotskyi committed May 28, 2021
1 parent f73496d commit 2c0e718
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,38 +177,8 @@ static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader,
}
}
} else { // if the column is nullable
if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
return new NullableBitReader(recordReader, descriptor, columnChunkMetaData,
fixedLength, (NullableBitVector) v, schemaElement);
} else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE) {
switch(recordReader.getDateCorruptionStatus()) {
case META_SHOWS_CORRUPTION:
return new NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector)v, schemaElement);
case META_SHOWS_NO_CORRUPTION:
return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
case META_UNCLEAR_TEST_VALUES:
return new NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
default:
throw new ExecutionSetupException(
String.format("Issue setting up parquet reader for date type, " +
"unrecognized date corruption status %s. See DRILL-4203 for more info.",
recordReader.getDateCorruptionStatus()));
}
} else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
if (convertedType == ConvertedType.DECIMAL) {
return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) v, schemaElement);
} else if (convertedType == ConvertedType.INTERVAL) {
return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (NullableIntervalVector) v, schemaElement);
} else {
return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
}
} else {
return getNullableColumnReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, v, schemaElement);
}
return getNullableColumnReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, v, schemaElement);
}
}

Expand Down Expand Up @@ -252,91 +222,110 @@ static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, Colu
}
}

public static NullableColumnReader<?> getNullableColumnReader(ParquetRecordReader parentReader,
public static ColumnReader<?> getNullableColumnReader(ParquetRecordReader parentReader,
ColumnDescriptor columnDescriptor,
ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength,
ValueVector valueVec,
SchemaElement schemaElement) throws ExecutionSetupException {
ConvertedType convertedType = schemaElement.getConverted_type();

if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96) {
// TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
switch (columnDescriptor.getPrimitiveType().getPrimitiveTypeName()) {
case BOOLEAN:
return new NullableBitReader(parentReader, columnDescriptor, columnChunkMetaData,
fixedLength, (NullableBitVector) valueVec, schemaElement);
case INT32:
if (convertedType == null) {
return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
}
switch (convertedType) {
case INT_8:
case INT_16:
case INT_32:
return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
case UINT_8:
case UINT_16:
case UINT_32:
return new NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableUInt4Vector) valueVec, schemaElement);
case DECIMAL:
return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
case TIME_MILLIS:
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector) valueVec, schemaElement);
case DATE:
switch (parentReader.getDateCorruptionStatus()) {
case META_SHOWS_CORRUPTION:
return new NullableFixedByteAlignedReaders.NullableCorruptDateReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableDateVector) valueVec, schemaElement);
case META_SHOWS_NO_CORRUPTION:
return new NullableFixedByteAlignedReaders.NullableDateReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableDateVector) valueVec, schemaElement);
case META_UNCLEAR_TEST_VALUES:
return new NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableDateVector) valueVec, schemaElement);
default:
throw new ExecutionSetupException(
String.format("Issue setting up parquet reader for date type, " +
"unrecognized date corruption status %s. See DRILL-4203 for more info.",
parentReader.getDateCorruptionStatus()));
}
default:
throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT32");
}
case INT64:
if (convertedType == null) {
return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec, schemaElement);
}
switch (convertedType) {
case UINT_64:
return new NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader, columnDescriptor,
columnChunkMetaData, fixedLength, (NullableUInt8Vector) valueVec, schemaElement);
case DECIMAL:
return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
case TIMESTAMP_MILLIS:
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector)valueVec, schemaElement);
// DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no logical type
case TIMESTAMP_MICROS:
case INT_64:
return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector) valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT64");
}
case INT96:
// TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
} else {
return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
}
} else if (convertedType == ConvertedType.DECIMAL) {
// NullableVarDecimalVector allows storing of values with different width,
// so every time when the value is added, offset vector should be updated.
// Therefore NullableVarDecimalReader is used here instead of NullableFixedByteAlignedReader.
return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
} else {
return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
}
} else {
switch (columnDescriptor.getType()) {
case INT32:
if (convertedType == null) {
return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
}
case FLOAT:
return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
case DOUBLE:
return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
case FIXED_LEN_BYTE_ARRAY:
if (convertedType != null) {
switch (convertedType) {
case INT_8:
case INT_16:
case INT_32:
return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
case UINT_8:
case UINT_16:
case UINT_32:
return new NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableUInt4Vector) valueVec, schemaElement);
case DECIMAL:
return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
case TIME_MILLIS:
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector)valueVec, schemaElement);
case INTERVAL:
return new NullableFixedByteAlignedReaders.NullableIntervalReader(parentReader, columnDescriptor,
columnChunkMetaData, fixedLength, (NullableIntervalVector) valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT32");
}
case INT64:
if (convertedType == null) {
return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec, schemaElement);
}
switch (convertedType) {
case UINT_64:
return new NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader, columnDescriptor,
columnChunkMetaData, fixedLength, (NullableUInt8Vector) valueVec, schemaElement);
case DECIMAL:
return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
case TIMESTAMP_MILLIS:
return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector)valueVec, schemaElement);
// DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no logical type
case TIMESTAMP_MICROS:
case INT_64:
return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector) valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT64");
}
case INT96:
// TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
} else {
return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
if (!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,columnDescriptor,
columnChunkMetaData, fixedLength, valueVec, schemaElement);
}
}
case FLOAT:
return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
case DOUBLE:
return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
}
}
return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, columnDescriptor,
columnChunkMetaData, fixedLength, (NullableVarBinaryVector) valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getPrimitiveType().getPrimitiveTypeName().name());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,27 @@ protected void readField(long recordsToReadInThisPass) {
}
}
break;
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
if (usingDictionary) {
NullableVarDecimalVector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++) {
Binary currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
mutator.setSafe(valuesReadInCurrentPass + i, currDictValToWrite.toByteBuffer().slice(), 0,
currDictValToWrite.length());
}
// Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
// and we will go into the else condition below. The readField method of the parent class requires the
// writer index to be set correctly.
int writerIndex = valueVec.getBuffer().writerIndex();
valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
} else {
for (int i = 0; i < recordsToReadInThisPass; i++) {
Binary valueToWrite = pageReader.valueReader.readBytes();
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valueToWrite.toByteBuffer().slice(), 0,
valueToWrite.length());
}
}
}
}

Expand Down Expand Up @@ -471,31 +492,6 @@ void addNext(int start, int index) {
}
}

public static class NullableVarDecimalReader extends NullableConvertedReader<NullableVarDecimalVector> {
NullableVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableVarDecimalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

// TODO: allow reading page instead of reading every record separately
@Override
void addNext(int start, int index) {
switch (columnChunkMetaData.getType()) {
case INT32:
valueVec.getMutator().setSafe(index, Ints.toByteArray(bytebuf.getInt(start)), 0, dataTypeLengthInBytes);
break;
case INT64:
valueVec.getMutator().setSafe(index, Longs.toByteArray(bytebuf.getLong(start)), 0, dataTypeLengthInBytes);
break;
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
valueVec.getMutator().setSafe(index, 1, start, start + dataTypeLengthInBytes, bytebuf);
break;
}

}
}

public static class NullableIntervalReader extends NullableConvertedReader<NullableIntervalVector> {
NullableIntervalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableIntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.drill.test.TestBuilder.mapOf;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -697,4 +698,14 @@ public void testNullableVarBinaryUUID() throws Exception {
.baselineValues(new Object[]{null})
.go();
}

@Test
public void testDecimalDictionaryEncoding() throws Exception {
testBuilder()
.sqlQuery("select RegHrs from cp.`parquet/dict_dec.parquet`")
.ordered()
.baselineColumns("RegHrs")
.baselineValues(new BigDecimal("8.000000"))
.go();
}
}
Binary file not shown.

0 comments on commit 2c0e718

Please sign in to comment.