-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Add read/write support for UUIDs #7399
Spark: Add read/write support for UUIDs #7399
Conversation
ef2b760
to
0c81705
Compare
ThreadLocal.withInitial( | ||
() -> { | ||
ByteBuffer buffer = ByteBuffer.allocate(16); | ||
buffer.order(ByteOrder.BIG_ENDIAN); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the default right? Just setting it to be sure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mainly aligning with other places in the code that also used a Thread local ByteBuffer when reading UUIDs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Big endian is correct. See UUIDUtil
for another implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that it's incorrect, it's just the default for all new ByteBuffers. Just wondering why we were setting it explicitly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's usually good to be explicit. Are we sure this the default, or is it the default on certain architectures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#order()
The byte order is used when reading or writing multibyte values, and when creating buffers that are views of this byte buffer. The order of a newly-created byte buffer is always [BIG_ENDIAN](https://docs.oracle.com/javase/7/docs/api/java/nio/ByteOrder.html#BIG_ENDIAN).
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
Show resolved
Hide resolved
0c81705
to
461276b
Compare
461276b
to
dbd7ba4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome!
dbd7ba4
to
b4f2593
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm! Thanks for doing this!
|
||
@Override | ||
public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { | ||
ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allocates a buffer. We may want to have a buffer here as a thread-local or a field to avoid allocation in a tight loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that observation and I initially used a Thread local to reduce byte[] allocation but couldn't get it to work because ((BytesColumnVector) output).setRef(..)
would just store a reference to the passed byte[] and on subsequent writes we would end up overwriting previous values.
Worth mentioning that GenericOrcWriters
does the same thing when writing UUIDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth mentioning in a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, I've added a comment to this as part of #7496
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
Outdated
Show resolved
Hide resolved
ByteBuffer buffer = BUFFER.get(); | ||
buffer.rewind(); | ||
buffer.putLong(uuid.getMostSignificantBits()); | ||
buffer.putLong(uuid.getLeastSignificantBits()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other places, like UUIDUtil
, we use putLong(int offset, long value)
instead of putLong(long value)
so that the position is not updated and we don't need to worry about the buffer's internal state. I think that's usually a better approach.
Also, we might want to update UUIDUtil
to share this code:
public static ByteBuffer convertToByteBuffer(UUID value) {
return convertToByteBuffer(value, null);
}
public static ByteBuffer convertToByteBuffer(UUID value, ByteBuffer reuse) {
ByteBuffer buffer;
if (reuse != null) {
buffer = reuse;
} else {
buffer = ByteBuffer.allocate(16);
}
buffer.order(ByteOrder.BIG_ENDIAN);
buffer.putLong(0, value.getMostSignificantBits());
buffer.putLong(8, value.getLeastSignificantBits());
return buffer;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense, I've updated that.
We still have a few places in the code that do buffer.putLong(uuid.getMostSignificantBits());
. I'll follow up on those and update them independently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened #7525 to address those other places in Spark
@@ -74,6 +76,11 @@ public UTF8String ofRow(VarCharVector vector, int rowId) { | |||
null, vector.getDataBuffer().memoryAddress() + start, end - start); | |||
} | |||
|
|||
@Override | |||
public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) { | |||
return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to get the underlying array and offset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vector.get(rowId)
will return the byte[] with a length of 16 for the given rowId
. I think we could get underlying array and offset from the underlying ArrowBuf
, but we would need to read it into a new byte[], which is what vector.get(rowId)
is doing underneath
@@ -329,6 +329,8 @@ public Object primitive(Type.PrimitiveType primitive) { | |||
return UTF8String.fromString((String) obj); | |||
case DECIMAL: | |||
return Decimal.apply((BigDecimal) obj); | |||
case UUID: | |||
return UTF8String.fromString(UUID.nameUUIDFromBytes((byte[]) obj).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does generatePrimitive
provide byte[]
? Shouldn't it create a String for Spark already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my guess would be because RandomUtil.generatePrimitive(..)
is used in other places where UUIDs are expected to be byte[]
b4f2593
to
7fcc85b
Compare
Thanks, @nastra! |
fixes #4581