Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Add read/write support for UUIDs #7399

Merged
merged 3 commits into from
May 1, 2023

Conversation

nastra
Copy link
Contributor

@nastra nastra commented Apr 21, 2023

fixes #4581

@nastra nastra marked this pull request as draft April 21, 2023 14:22
@github-actions github-actions bot added the spark label Apr 21, 2023
@nastra nastra force-pushed the spark-uuid-read-write-support-3.4 branch 2 times, most recently from ef2b760 to 0c81705 Compare April 21, 2023 15:34
ThreadLocal.withInitial(
() -> {
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.order(ByteOrder.BIG_ENDIAN);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default right? Just setting it to be sure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mainly aligning with other places in the code that also used a Thread local ByteBuffer when reading UUIDs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big endian is correct. See UUIDUtil for another implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that it's incorrect, it's just the default for all new ByteBuffers. Just wondering why we were setting it explicitly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's usually good to be explicit. Are we sure this the default, or is it the default on certain architectures?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#order()

The byte order is used when reading or writing multibyte values, and when creating buffers that are views of this byte buffer. The order of a newly-created byte buffer is always [BIG_ENDIAN](https://docs.oracle.com/javase/7/docs/api/java/nio/ByteOrder.html#BIG_ENDIAN).

@nastra nastra force-pushed the spark-uuid-read-write-support-3.4 branch from 0c81705 to 461276b Compare April 24, 2023 15:17
@nastra nastra force-pushed the spark-uuid-read-write-support-3.4 branch from 461276b to dbd7ba4 Compare April 24, 2023 15:58
Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@nastra nastra marked this pull request as ready for review April 25, 2023 07:11
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome!

@nastra nastra force-pushed the spark-uuid-read-write-support-3.4 branch from dbd7ba4 to b4f2593 Compare April 25, 2023 08:10
@nastra nastra added this to the Iceberg 1.3.0 milestone Apr 25, 2023
Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm! Thanks for doing this!


@Override
public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) {
ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allocates a buffer. We may want to have a buffer here as a thread-local or a field to avoid allocation in a tight loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that observation and I initially used a Thread local to reduce byte[] allocation but couldn't get it to work because ((BytesColumnVector) output).setRef(..) would just store a reference to the passed byte[] and on subsequent writes we would end up overwriting previous values.
Worth mentioning that GenericOrcWriters does the same thing when writing UUIDs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth mentioning in a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, I've added a comment to this as part of #7496

ByteBuffer buffer = BUFFER.get();
buffer.rewind();
buffer.putLong(uuid.getMostSignificantBits());
buffer.putLong(uuid.getLeastSignificantBits());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places, like UUIDUtil, we use putLong(int offset, long value) instead of putLong(long value) so that the position is not updated and we don't need to worry about the buffer's internal state. I think that's usually a better approach.

Also, we might want to update UUIDUtil to share this code:

  public static ByteBuffer convertToByteBuffer(UUID value) {
    return convertToByteBuffer(value, null);
  }

  public static ByteBuffer convertToByteBuffer(UUID value, ByteBuffer reuse) {
    ByteBuffer buffer;
    if (reuse != null) {
      buffer = reuse;
    } else {
      buffer = ByteBuffer.allocate(16);
    }

    buffer.order(ByteOrder.BIG_ENDIAN);
    buffer.putLong(0, value.getMostSignificantBits());
    buffer.putLong(8, value.getLeastSignificantBits());
    return buffer;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, I've updated that.
We still have a few places in the code that do buffer.putLong(uuid.getMostSignificantBits());. I'll follow up on those and update them independently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #7525 to address those other places in Spark

@@ -74,6 +76,11 @@ public UTF8String ofRow(VarCharVector vector, int rowId) {
null, vector.getDataBuffer().memoryAddress() + start, end - start);
}

@Override
public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) {
return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get the underlying array and offset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector.get(rowId) will return the byte[] with a length of 16 for the given rowId. I think we could get underlying array and offset from the underlying ArrowBuf, but we would need to read it into a new byte[], which is what vector.get(rowId) is doing underneath

@@ -329,6 +329,8 @@ public Object primitive(Type.PrimitiveType primitive) {
return UTF8String.fromString((String) obj);
case DECIMAL:
return Decimal.apply((BigDecimal) obj);
case UUID:
return UTF8String.fromString(UUID.nameUUIDFromBytes((byte[]) obj).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does generatePrimitive provide byte[]? Shouldn't it create a String for Spark already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my guess would be because RandomUtil.generatePrimitive(..) is used in other places where UUIDs are expected to be byte[]

@nastra nastra requested a review from rdblue April 26, 2023 07:13
@nastra nastra force-pushed the spark-uuid-read-write-support-3.4 branch from b4f2593 to 7fcc85b Compare April 26, 2023 07:15
@nastra nastra closed this Apr 26, 2023
@nastra nastra reopened this Apr 26, 2023
@rdblue rdblue merged commit fc3cd2e into apache:master May 1, 2023
@rdblue
Copy link
Contributor

rdblue commented May 1, 2023

Thanks, @nastra!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Spark: Cannot read or write UUID columns
4 participants