Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow: Add support for TimeType / UUIDType #2739

Merged
merged 1 commit into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public static Field convert(final NestedField field) {
case TIME:
arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
break;
case UUID:
arrowType = new ArrowType.FixedSizeBinary(16);
break;
case TIMESTAMP:
arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND,
((Types.TimestampType) field.type()).shouldAdjustToUTC() ? "UTC" : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
* <li>Columns with constant values are physically encoded as a dictionary. The Arrow vector
* type is int32 instead of the type as per the schema.
* See https://github.com/apache/iceberg/issues/2484.</li>
* <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link Types.MapType},
* {@link Types.StructType}, {@link Types.UUIDType}, {@link Types.FixedType} and
* <li>Data types: {@link Types.ListType}, {@link Types.MapType},
* {@link Types.StructType}, {@link Types.FixedType} and
* {@link Types.DecimalType}
* See https://github.com/apache/iceberg/issues/2485 and https://github.com/apache/iceberg/issues/2486.</li>
* <li>Iceberg v2 spec is not supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.ValueVector;
Expand Down Expand Up @@ -112,6 +114,7 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getDict
case BSON:
return new DictionaryStringAccessor<>((IntVector) vector, dictionary, stringFactorySupplier.get());
case INT_64:
case TIME_MICROS:
case TIMESTAMP_MILLIS:
case TIMESTAMP_MICROS:
return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
Expand Down Expand Up @@ -189,6 +192,10 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getDict
} else if (vector instanceof StructVector) {
StructVector structVector = (StructVector) vector;
return new StructAccessor<>(structVector, structChildFactorySupplier.get());
} else if (vector instanceof TimeMicroVector) {
return new TimeMicroAccessor<>((TimeMicroVector) vector);
} else if (vector instanceof FixedSizeBinaryVector) {
return new FixedSizeBinaryAccessor<>((FixedSizeBinaryVector) vector);
}
throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
}
Expand Down Expand Up @@ -469,6 +476,38 @@ public final long getLong(int rowId) {
}
}

private static class TimeMicroAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

private final TimeMicroVector vector;

TimeMicroAccessor(TimeMicroVector vector) {
super(vector);
this.vector = vector;
}

@Override
public final long getLong(int rowId) {
return vector.get(rowId);
}
}

private static class FixedSizeBinaryAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

private final FixedSizeBinaryVector vector;

FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) {
super(vector);
this.vector = vector;
}

@Override
public byte[] getBinary(int rowId) {
return vector.get(rowId);
}
}

private static class ArrayAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.types.FloatingPointPrecision;
Expand Down Expand Up @@ -108,6 +109,8 @@ private enum ReadType {
FLOAT,
DOUBLE,
TIMESTAMP_MILLIS,
TIME_MICROS,
UUID,
DICTIONARY
}

Expand Down Expand Up @@ -169,6 +172,9 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
case TIMESTAMP_MILLIS:
vectorizedColumnIterator.nextBatchTimestampMillis(vec, typeWidth, nullabilityHolder);
break;
case UUID:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come only UUID was added to this switch?

Copy link
Contributor Author

@nastra nastra Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIME_MICROS and TIMESTAMP_MICROS are evaluated as LONG: https://github.com/nastra/iceberg/blob/21af7bcd73f450e997d1af085634567a734a16b9/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java#L240-L255

so both are basically handled in the LONG part of that switch statement. Only UUID needs to be handled differently

vectorizedColumnIterator.nextBatchFixedSizeBinary(vec, typeWidth, nullabilityHolder);
break;
}
}
}
Expand All @@ -178,6 +184,7 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
nullabilityHolder, icebergField.type());
}

@SuppressWarnings("MethodLength")
private void allocateFieldVector(boolean dictionaryEncodedVector) {
if (dictionaryEncodedVector) {
Field field = new Field(
Expand Down Expand Up @@ -240,6 +247,12 @@ private void allocateFieldVector(boolean dictionaryEncodedVector) {
this.readType = ReadType.LONG;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case TIME_MICROS:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline do we want to add support for MILLIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like Parquet's TimeWriter only writes micros in https://github.com/nastra/iceberg/blob/50f4ecca7711e69f63589fea828d26230fac8d59/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java#L256, so I think the answer would be that we don't need to handle TIME_MILLIS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into supporting TIME_MILLIS as a follow-up tomorrow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after some investigation, supporting TIME_MILLIS might be a bit more involved. I opened #2755

this.vec = arrowField.createVector(rootAlloc);
((TimeMicroVector) vec).allocateNew(batchSize);
this.readType = ReadType.LONG;
this.typeWidth = 8;
break;
case DECIMAL:
this.vec = arrowField.createVector(rootAlloc);
((DecimalVector) vec).allocateNew(batchSize);
Expand Down Expand Up @@ -269,11 +282,17 @@ private void allocateFieldVector(boolean dictionaryEncodedVector) {
} else {
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
int len = ((Types.FixedType) icebergField.type()).length();
int len;
if (icebergField.type() instanceof Types.UUIDType) {
len = 16;
this.readType = ReadType.UUID;
} else {
len = ((Types.FixedType) icebergField.type()).length();
this.readType = ReadType.FIXED_WIDTH_BINARY;
}
this.vec = arrowField.createVector(rootAlloc);
vec.setInitialCapacity(batchSize * len);
vec.allocateNew();
this.readType = ReadType.FIXED_WIDTH_BINARY;
this.typeWidth = len;
break;
case BINARY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,22 @@ public void nextBatchFixedLengthDecimal(
}
}

public void nextBatchFixedSizeBinary(
FieldVector fieldVector,
int typeWidth,
NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
vectorizedPageIterator.nextBatchFixedSizeBinary(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}

public void nextBatchVarWidthType(FieldVector fieldVector, NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -410,4 +411,42 @@ void readBatchOfDictionaryEncodedLongBackedDecimals(FieldVector vector, int star
currentCount -= num;
}
}

void readBatchOfDictionaryEncodedFixedSizeBinary(
FieldVector vector, int typeWidth, int startOffset,
int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
byte[] bytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(bytes, 0, vectorBytes, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,36 @@ public int nextBatchFixedLengthDecimal(
return actualBatchSize;
}

public int nextBatchFixedSizeBinary(
final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
final int typeWidth, NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedSizeBinary(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfFixedSizeBinary(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}

/**
* Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
Expand Down Expand Up @@ -656,6 +657,50 @@ public void readBatchOfFixedLengthDecimals(
}
}

public void readBatchOfFixedSizeBinary(
final FieldVector vector, final int startOffset,
final int typeWidth, final int numValsToRead, NullabilityHolder nullabilityHolder,
ValuesAsBytesReader valuesReader) {
int bufferIdx = startOffset;
int left = numValsToRead;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
byte[] byteArray = new byte[typeWidth];
switch (mode) {
case RLE:
if (currentValue == maxDefLevel) {
for (int i = 0; i < num; i++) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(bufferIdx, byteArray);
nullabilityHolder.setNotNull(bufferIdx);
bufferIdx++;
}
} else {
setNulls(nullabilityHolder, bufferIdx, num, vector.getValidityBuffer());
bufferIdx += num;
}
break;
case PACKED:
for (int i = 0; i < num; ++i) {
if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(bufferIdx, byteArray);
nullabilityHolder.setNotNull(bufferIdx);
} else {
setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
}
bufferIdx++;
}
break;
}
left -= num;
currentCount -= num;
}
}

public void readBatchOfDictionaryEncodedFixedLengthDecimals(
final FieldVector vector,
final int startOffset,
Expand Down Expand Up @@ -701,6 +746,51 @@ public void readBatchOfDictionaryEncodedFixedLengthDecimals(
}
}

public void readBatchOfDictionaryEncodedFixedSizeBinary(
final FieldVector vector,
final int startOffset,
final int typeWidth,
final int numValsToRead,
NullabilityHolder nullabilityHolder,
VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader,
Dictionary dict) {
int idx = startOffset;
int left = numValsToRead;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == maxDefLevel) {
dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedFixedSizeBinary(vector, typeWidth, idx,
num, dict, nullabilityHolder);
} else {
setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
}
idx += num;
break;
case PACKED:
for (int i = 0; i < num; i++) {
if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
byte[] bytes = dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).getBytes();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(bytes, 0, vectorBytes, 0, typeWidth);
((FixedSizeBinaryVector) vector).set(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
} else {
setNull(nullabilityHolder, idx, vector.getValidityBuffer());
}
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}

public void readBatchVarWidth(
final FieldVector vector,
final int startOffset,
Expand Down
Loading