Skip to content

Commit

Permalink
[SPARK-16334] Reusing same dictionary column for decoding consecutive…
Browse files Browse the repository at this point in the history
… row groups shouldn't throw an error

## What changes were proposed in this pull request?

This patch fixes a bug in the vectorized parquet reader that's caused by re-using the same dictionary column vector while reading consecutive row groups. Specifically, this issue manifests for a certain distribution of dictionary/plain encoded data while we read/populate the underlying bit packed dictionary data into a column-vector based data structure.

## How was this patch tested?

Manually tested on datasets provided by the community. Thanks to Chris Perluss and Keith Kraus for their invaluable help in tracking down this issue!

Author: Sameer Agarwal <sameerag@cs.berkeley.edu>

Closes apache#14941 from sameeragarwal/parquet-exception-2.
  • Loading branch information
sameeragarwal authored and davies committed Sep 2, 2016
1 parent ed9c884 commit a2c9acb
Showing 1 changed file with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,21 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
if (column.dataType() == DataTypes.IntegerType ||
DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
if (!column.isNullAt(i)) {
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
}
}
} else if (column.dataType() == DataTypes.ByteType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
if (!column.isNullAt(i)) {
column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
}
}
} else if (column.dataType() == DataTypes.ShortType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
if (!column.isNullAt(i)) {
column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
}
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
Expand All @@ -240,7 +246,9 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
if (!column.isNullAt(i)) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
Expand All @@ -249,21 +257,27 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,

case FLOAT:
for (int i = rowId; i < rowId + num; ++i) {
column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
if (!column.isNullAt(i)) {
column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
}
}
break;

case DOUBLE:
for (int i = rowId; i < rowId + num; ++i) {
column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
if (!column.isNullAt(i)) {
column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
}
}
break;
case INT96:
if (column.dataType() == DataTypes.TimestampType) {
for (int i = rowId; i < rowId + num; ++i) {
// TODO: Convert dictionary of Binaries to dictionary of Longs
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
}
} else {
throw new UnsupportedOperationException();
Expand All @@ -275,26 +289,34 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
// and reuse it across batches. This should mean adding a ByteArray would just update
// the length and offset.
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putByteArray(i, v.getBytes());
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putByteArray(i, v.getBytes());
}
}
break;
case FIXED_LEN_BYTE_ARRAY:
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
}
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
}
}
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putByteArray(i, v.getBytes());
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putByteArray(i, v.getBytes());
}
}
} else {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit a2c9acb

Please sign in to comment.