Skip to content

Commit

Permalink
DRILL-7948: Unable to query file with required fixed_len_byte_array d…
Browse files Browse the repository at this point in the history
…ecimal columns (#2254)
  • Loading branch information
vvysotskyi committed Jun 16, 2021
1 parent 111e345 commit f056ea7
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,24 @@ public Void call() throws IOException {
bytesRead = compressedSize;

synchronized (parent) {
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
readStatus.setIsDictionaryPage(true);
valuesRead += pageHeader.getDictionary_page_header().getNum_values();
} else {
valuesRead += pageHeader.getData_page_header().getNum_values();
parent.totalPageValuesRead += valuesRead;
PageType type = pageHeader.getType() == null ? PageType.DATA_PAGE : pageHeader.getType();
switch (type) {
case DICTIONARY_PAGE:
readStatus.setIsDictionaryPage(true);
valuesRead += pageHeader.getDictionary_page_header().getNum_values();
break;
case DATA_PAGE_V2:
valuesRead += pageHeader.getData_page_header_v2().getNum_values();
parent.totalPageValuesRead += valuesRead;
break;
case DATA_PAGE:
valuesRead += pageHeader.getData_page_header().getNum_values();
parent.totalPageValuesRead += valuesRead;
break;
default:
throw UserException.unsupportedError()
.message("Page type is not supported yet: " + type)
.build(logger);
}
long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
readStatus.setPageHeader(pageHeader);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.vector.DateVector;
import org.apache.drill.exec.vector.IntervalVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarDecimalVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
Expand Down Expand Up @@ -195,30 +192,6 @@ void addNext(int start, int index) {

}

public static class VarDecimalReader extends ConvertedReader<VarDecimalVector> {

VarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, VarDecimalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

@Override
void addNext(int start, int index) {
switch (columnChunkMetaData.getType()) {
case INT32:
valueVec.getMutator().setSafe(index, Ints.toByteArray(bytebuf.getInt(start)), 0, dataTypeLengthInBytes);
break;
case INT64:
valueVec.getMutator().setSafe(index, Longs.toByteArray(bytebuf.getLong(start)), 0, dataTypeLengthInBytes);
break;
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
valueVec.getMutator().setSafe(index, start, start + dataTypeLengthInBytes, bytebuf);
break;
}
}
}

public static class IntervalReader extends ConvertedReader<IntervalVector> {
IntervalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, IntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
Expand All @@ -229,12 +202,13 @@ public static class IntervalReader extends ConvertedReader<IntervalVector> {
void addNext(int start, int index) {
if (usingDictionary) {
byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes();
valueVec.getMutator().setSafe(index * 12,
valueVec.getMutator().setSafe(index,
ParquetReaderUtility.getIntFromLEBytes(input, 0),
ParquetReaderUtility.getIntFromLEBytes(input, 4),
ParquetReaderUtility.getIntFromLEBytes(input, 8));
} else {
valueVec.getMutator().setSafe(index, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
valueVec.getMutator().setSafe(index, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,13 @@ public static class NullableIntervalReader extends NullableConvertedReader<Nulla
void addNext(int start, int index) {
if (usingDictionary) {
byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes();
valueVec.getMutator().setSafe(index * 12, 1,
valueVec.getMutator().setSafe(index, 1,
ParquetReaderUtility.getIntFromLEBytes(input, 0),
ParquetReaderUtility.getIntFromLEBytes(input, 4),
ParquetReaderUtility.getIntFromLEBytes(input, 8));
} else {
valueVec.getMutator().set(index, 1, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
valueVec.getMutator().set(index, 1, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBufUtil;
Expand All @@ -39,6 +40,8 @@
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.Util;
Expand Down Expand Up @@ -313,11 +316,12 @@ public boolean next() throws IOException {
}

timer.start();
currentPageCount = pageHeader.data_page_header.num_values;
PageHeaderInfoProvider pageHeaderInfoProvider = pageHeaderInfoProviderBuilder(pageHeader);
currentPageCount = pageHeaderInfoProvider.getNumValues();

final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
final Encoding valueEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.encoding);
final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getRepetitionLevelEncoding());
final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getDefinitionLevelEncoding());
final Encoding valueEncoding = METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getEncoding());

byteLength = pageHeader.uncompressed_page_size;

Expand Down Expand Up @@ -453,8 +457,9 @@ void resetDefinitionLevelReader(int skipCount) throws IOException {
Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel() == 1);
Preconditions.checkState(currentPageCount > 0);

final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
PageHeaderInfoProvider pageHeaderInfoProvider = pageHeaderInfoProviderBuilder(pageHeader);
final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getRepetitionLevelEncoding());
final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getDefinitionLevelEncoding());

final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));

Expand All @@ -475,4 +480,84 @@ void resetDefinitionLevelReader(int skipCount) throws IOException {
definitionLevels.skip();
}
}

/**
* Common interface for wrappers of {@link DataPageHeader} and {@link DataPageHeaderV2} classes.
*/
private interface PageHeaderInfoProvider {
int getNumValues();

org.apache.parquet.format.Encoding getEncoding();

org.apache.parquet.format.Encoding getDefinitionLevelEncoding();

org.apache.parquet.format.Encoding getRepetitionLevelEncoding();
}

private static class DataPageHeaderV1InfoProvider implements PageHeaderInfoProvider {
private final DataPageHeader dataPageHeader;

private DataPageHeaderV1InfoProvider(DataPageHeader dataPageHeader) {
this.dataPageHeader = dataPageHeader;
}

@Override
public int getNumValues() {
return dataPageHeader.getNum_values();
}

@Override
public org.apache.parquet.format.Encoding getEncoding() {
return dataPageHeader.getEncoding();
}

@Override
public org.apache.parquet.format.Encoding getDefinitionLevelEncoding() {
return dataPageHeader.getDefinition_level_encoding();
}

@Override
public org.apache.parquet.format.Encoding getRepetitionLevelEncoding() {
return dataPageHeader.getRepetition_level_encoding();
}
}

private static class DataPageHeaderV2InfoProvider implements PageHeaderInfoProvider {
private final DataPageHeaderV2 dataPageHeader;

private DataPageHeaderV2InfoProvider(DataPageHeaderV2 dataPageHeader) {
this.dataPageHeader = dataPageHeader;
}

@Override
public int getNumValues() {
return dataPageHeader.getNum_values();
}

@Override
public org.apache.parquet.format.Encoding getEncoding() {
return dataPageHeader.getEncoding();
}

@Override
public org.apache.parquet.format.Encoding getDefinitionLevelEncoding() {
return org.apache.parquet.format.Encoding.PLAIN;
}

@Override
public org.apache.parquet.format.Encoding getRepetitionLevelEncoding() {
return org.apache.parquet.format.Encoding.PLAIN;
}
}

private static PageHeaderInfoProvider pageHeaderInfoProviderBuilder(PageHeader pageHeader) {
switch (pageHeader.getType()) {
case DATA_PAGE:
return new DataPageHeaderV1InfoProvider(pageHeader.getData_page_header());
case DATA_PAGE_V2:
return new DataPageHeaderV2InfoProvider(pageHeader.getData_page_header_v2());
default:
throw new DrillRuntimeException("Unsupported page header type:" + pageHeader.getType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,12 @@ static class DictionaryVarDecimalReader extends FixedByteAlignedReader<VarDecima
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
recordsReadInThisIteration =
Math.min(pageReader.currentPageCount - pageReader.valuesRead,
recordsToReadInThisPass - valuesReadInCurrentPass);

switch (columnDescriptor.getType()) {
switch (columnDescriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
if (usingDictionary) {
for (int i = 0; i < recordsReadInThisIteration; i++) {
Expand All @@ -253,7 +254,10 @@ protected void readField(long recordsToReadInThisPass) {
}
setWriteIndex();
} else {
super.readField(recordsToReadInThisPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
byte[] bytes = Ints.toByteArray(pageReader.pageData.getInt((int) readStartInBytes + i * dataTypeLengthInBytes));
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, bytes, 0, dataTypeLengthInBytes);
}
}
break;
case INT64:
Expand All @@ -264,9 +268,33 @@ protected void readField(long recordsToReadInThisPass) {
}
setWriteIndex();
} else {
super.readField(recordsToReadInThisPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
byte[] bytes = Longs.toByteArray(pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes));
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, bytes, 0, dataTypeLengthInBytes);
}
}
break;
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
if (usingDictionary) {
VarDecimalVector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++) {
Binary currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
mutator.setSafe(valuesReadInCurrentPass + i,
currDictValToWrite.toByteBuffer().slice(), 0, currDictValToWrite.length());
}
// Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
// and we will go into the else condition below. The readField method of the parent class requires the
// writer index to be set correctly.
int writerIndex = valueVec.getBuffer().writerIndex();
valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
} else {
for (int i = 0; i < recordsReadInThisIteration; i++) {
int start = (int) readStartInBytes + i * dataTypeLengthInBytes;
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i,
start, start + dataTypeLengthInBytes, pageReader.pageData);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.Binary;
Expand All @@ -46,16 +47,15 @@
* that are supported by Drill. Embedded types specified in the Parquet specification are not covered by the
* examples but can be added.
* To create a new parquet file, define a schema, create a GroupWriter based on the schema, then add values
* for individual records to the GroupWriter.<br>
* TODO: DRILL-7904. To run this tool please use 28.2-jre <guava.version> instead of 19.0 in main POM file
* for individual records to the GroupWriter.
* @see org.apache.drill.exec.store.parquet.TestFileGenerator TestFileGenerator
* @see org.apache.parquet.hadoop.example.GroupWriteSupport GroupWriteSupport
* @see org.apache.parquet.example.Paper Dremel Example
*/
public class ParquetSimpleTestFileGenerator {

public enum EnumType {
RANDOM_VALUE, MAX_VALUE, MIN_VALUE;
RANDOM_VALUE, MAX_VALUE, MIN_VALUE
}

public static Path root = new Path("file:/tmp/parquet/");
Expand Down Expand Up @@ -221,20 +221,16 @@ public static ParquetWriter<Group> initWriter(MessageType schema, String fileNam

GroupWriteSupport.setSchema(schema, conf);

ParquetWriter<Group> writer =
new ParquetWriter<Group>(initFile(fileName),
ParquetFileWriter.Mode.OVERWRITE,
new GroupWriteSupport(),
CompressionCodecName.SNAPPY,
1024,
1024,
512,
dictEncoding, // enable dictionary encoding,
false,
ParquetProperties.WriterVersion.PARQUET_1_0, conf
);

return writer;
return ExampleParquetWriter.builder(initFile(fileName))
.withDictionaryEncoding(dictEncoding)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(1024)
.withDictionaryPageSize(512)
.withValidation(false)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withConf(conf)
.build();
}

public static void writeComplexValues(GroupFactory gf, ParquetWriter<Group> complexWriter, boolean writeNulls) throws IOException {
Expand Down
Loading

0 comments on commit f056ea7

Please sign in to comment.