Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Strings as a supported type for GpuColumnarToRow transitions #5998

Merged
merged 27 commits into from
Jul 27, 2022

Conversation

amahussein
Copy link
Collaborator

Signed-off-by: Ahmed Hussein (amahussein) a@ahussein.me

fixes #5633, fixes #5634, fixes #5635, fixes #5636

Only fixed width schema were supported by the GpuColumnarToRow transitions.
This PR adds stringType to the list of supported columns.

Changes:

Addressing points in rapidsai/cudf#10033 (comment)

  1. Added new JCudfUtil. I hope that this Util class will hide all the details of how data types should be aligned in the unsafeRow. This will make code it easier to use the same code as we: estimate size, set offsets, build the unsafeRow
  2. Added String types to the supported transitions.
  3. Updated AcceleratedColumnarToRowIterator and how the packMap is constructed.
  4. Added a new variable to CudfUnsafeRow that represents an initial rough estimate of the unsafeRow. This estimate can be later used to decide if the schema fits the CUDF optimized conversion.
  5. I changed the way plugin figures out when we call convertToRowsFixedWidthOptimized vs convertToRows, because now we might have Strings in the data. Since we already visited the schema to get a row size estimate, we have more accurate information about the width. Also, we know whether it is var Vs. fixed width schema.
  6. added variable width schema to the integration test row_conversion_test.py
  7. added unit-tests to test the correctness of offset calculations.

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

fix the type of the dataOffsetTmp

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
@amahussein amahussein added feature request New feature or request performance A performance related task/issue labels Jul 13, 2022
@amahussein
Copy link
Collaborator Author

build

@amahussein amahussein self-assigned this Jul 13, 2022
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits. It is looking good

Copy link
Collaborator

@hyperbolic2346 hyperbolic2346 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good to me. Nice to see that it isn't a massive change, but it is dense in parts. Sorry for being so pedantic, I was just writing things down as I went and looking back at the number of comments this appears that I have huge requests for this code, but I really don't feel that way. Thanks for doing this work!


/**
* Calculates the offset of the variable width section.
* @return Total bytes used by the fixed width and the validity bytes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note about alignment here would be nice. In this case, we are byte aligned at the end of the validity data and we require no alignment, but calling that out would be useful for when we forget and get worried about alignment and look it up in the format spec.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@@ -111,9 +102,10 @@ class AcceleratedColumnarToRowIterator(
// most 184 double/long values. Spark by default limits codegen to 100 fields
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these comments still correct?

val cudfColOff = jcudfRowVisitor.getColOffset()
val colLength = jcudfRowVisitor.getColLength()
val ret = colLength match {
// strings return -15
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very magical. Why -15? Should this be some sort of define?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored. Not needed any more. Util will return the generated-code.


/**
* A helper class to calculate the columns, validity, and variable width offsets.
* This helper is used to get an estimate size for the row including variable-sized rows.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This helper is used to get an estimate size for the row including variable-sized rows.
* This helper is used to get an estimate size for the row including variable-sized data.

Comment on lines 288 to 290
for (int i = 0; i < attributes.length; i++) {
calcColOffset(i);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems odd to throw away the offsets you're calculating from the return value, but now I see the goal is the side-effect in the function of setting varSizeColIndex. I wouldn't have thought much of this had this called setCOlumnOffsets and I would have missed the side-effect entirely. I wonder if something should change around here to better name, indicate the side-effect, or not require offset calculation at all if it has been done before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done! changed the name to advanceColCursorAndGet

return byteCursor;
}
private int addVarSizeData(DType rapidsType) {
int length = getEstimateSizeForVarLengthTypes(rapidsType);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this estimate is low? Is this used for buffer allocation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handled as described in #5998 (comment)

* @param ind index of the column
* @return the offset of the colum.
*/
private int calcColOffset(int ind) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very familiar. Would it make sense to wrap the above class in a caching class or is it different enough?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was avoiding inheritance to reduce the overhead. That's why I opted to repeat the code rather than using overrides/interfaces.
In the most recent code commit, the two inner classes have different functionality.

  • JCudfUtil.RowOffsetsCalculator: is used to calculate the offsets and estimate size for the buffer allocation.
  • JCudfUtil.RowBuilder: it iterates on the columns to generate the code used to copy the column field.

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
@amahussein
Copy link
Collaborator Author

build

@amahussein
Copy link
Collaborator Author

amahussein commented Jul 15, 2022

I see the following failures in the Pre-merge:

 [2022-07-15T02:36:43.516Z] - test with small input batches *** FAILED ***
[2022-07-15T02:36:43.516Z]   java.lang.AssertionError: End address is too high for copy range src 0x7f2168058510 < 0x7f2168058500
[2022-07-15T02:36:43.516Z]   at ai.rapids.cudf.MemoryBuffer.addressOutOfBoundsCheck(MemoryBuffer.java:138)
[2022-07-15T02:36:43.516Z]   at ai.rapids.cudf.BaseDeviceMemoryBuffer.copyFromHostBuffer(BaseDeviceMemoryBuffer.java:42)
[2022-07-15T02:36:43.516Z]   at ai.rapids.cudf.BaseDeviceMemoryBuffer.copyFromHostBuffer(BaseDeviceMemoryBuffer.java:105)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.ColumnView$NestedColumnVector.createNestedColumnVector(ColumnView.java:4470)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.ColumnView$NestedColumnVector.createNewNestedColumnVector(ColumnView.java:4396)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.ColumnView$NestedColumnVector.createColumnVector(ColumnView.java:4334)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.HostColumnVector.copyToDevice(HostColumnVector.java:220)
[2022-07-15T02:36:43.517Z]   at com.nvidia.spark.rapids.InternalRowToColumnarBatchIterator.next(InternalRowToColumnarBatchIterator.java:168)
[2022-07-15T02:36:43.517Z]   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificInternalRowToColumnarBatchIterator.next(Unknown Source)
[2022-07-15T02:36:43.517Z]   at com.nvidia.spark.rapids.CollectTimeIterator.$anonfun$next$1(GpuExec.scala:199)
[2022-07-15T02:36:43.517Z]   ...
[2022-07-15T02:36:43.517Z] - test multiple output batches *** FAILED ***
[2022-07-15T02:36:43.517Z]   java.lang.AssertionError: End address is too high for copy range src 0x7f2264f04c30 < 0x7f2264f04c20
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.MemoryBuffer.addressOutOfBoundsCheck(MemoryBuffer.java:138)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.BaseDeviceMemoryBuffer.copyFromHostBuffer(BaseDeviceMemoryBuffer.java:42)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.BaseDeviceMemoryBuffer.copyFromHostBuffer(BaseDeviceMemoryBuffer.java:105)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.ColumnView$NestedColumnVector.createNestedColumnVector(ColumnView.java:4470)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.ColumnView$NestedColumnVector.createNewNestedColumnVector(ColumnView.java:4396)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.ColumnView$NestedColumnVector.createColumnVector(ColumnView.java:4334)
[2022-07-15T02:36:43.517Z]   at ai.rapids.cudf.HostColumnVector.copyToDevice(HostColumnVector.java:220)
[2022-07-15T02:36:43.517Z]   at com.nvidia.spark.rapids.InternalRowToColumnarBatchIterator.next(InternalRowToColumnarBatchIterator.java:168)
[2022-07-15T02:36:43.517Z]   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificInternalRowToColumnarBatchIterator.next(Unknown Source)
[2022-07-15T02:36:43.517Z]   at com.nvidia.spark.rapids.CollectTimeIterator.$anonfun$next$1(GpuExec.scala:199)
[2022-07-15T02:36:43.517Z]   ...
[2022-07-15T02:36:43.775Z] - require single batch
[2022-07-15T02:36:43.775Z] munmap_chunk(): invalid pointer

@amahussein
Copy link
Collaborator Author

build

, " long cudfDstAddress = startAddress + dataDstOffset;"
, " long newDataOffset = cudfDstAddress + strSize;"
, " if (newDataOffset > bufferEndAddress) {"
, " throw new java.lang.RuntimeException("
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is now any time we try to copy a String that does not fit, this will throw an exception. I am not okay with that because there can be many cases where we want to copy more than one batch to the GPU. I am fine with throwing an exception if the first batch cannot fit, but we do need a way to detect that later rows don't fit and just go on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Bobby!
I modified the behavior to handle the corner case.

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
@amahussein
Copy link
Collaborator Author

Reasons of old failure in GpuCoalesceBatchesSuite:

  • InternalRowToColumnarBatchIterator is initialized with small goalTargetSize = 1 byte
  • the dataLength is set to be the estimate of the row size (20 bytes for string).
  • The crash happens when one row exceeds that size, and we cannot fit it into the dataBuffer

The fix:

The following changes fixes the unit tests.

  • modify the generated method copyUTF8StringInto to return
    negative value if the strData excceds the endAddress
  • modify the method copyInto(UnsafeRow input, long startAddress, long endAddress) to return
    negative value if copyUTF8StringInto return negative value.
  • inside InternalRowToColumnarBatchIterator.next(), check that no rows were copied, and that there
    is still pending row to copy. In that case throw an exception.
  • handle the exception by increasing the dataLEngth.
  • retry.
batchDone = false;
int retryCount = 0;
while (!batchDone) {
   try (HostMemoryBuffer dataBuffer = HostMemoryBuffer.allocate(dataLength);
        HostMemoryBuffer offsetsBuffer =
           HostMemoryBuffer.allocate(((long)numRowsEstimate + 1) * BYTES_PER_OFFSET)) {
     int[] used = fillBatch(dataBuffer, offsetsBuffer);
     int dataOffset = used[0];
     int currentRow = used[1];
     // if we fail to copy at least one row then we need to throw an exception
     if (currentRow == 0 && pending != null) {
       throw new BufferOverflowException();
     }
     batchDone = true
     if (retryCount > 0) {
       // restore the original dataLength and numRowsEstimate
       // so that we do not continue using outliers.
     }
     // rest code to create and copy CV here
     ....
     ...
   } catch (BufferOverflowException ex) { // batch does not fit a single row
     // increase dataLength by 25%
     int newDataLength = Math.Min(Integer.MAX_VALUE, (dataLength * 125) / 100));
     if (newDataLength <= dataLength) { // we already reached the limit
       throw RuntimeException(ex);
     }
     dataLength = newDataLength;
     numRowsEstimate = //calculate new value;
     retryCount++;
   }
}

@revans2 and @hyperbolic2346 let me know if you are fine with the new fix.

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
@amahussein
Copy link
Collaborator Author

build

@hyperbolic2346
Copy link
Collaborator

hyperbolic2346 commented Jul 21, 2022

  • InternalRowToColumnarBatchIterator is initialized with small goalTargetSize = 1 byte

Why wouldn't we start with a target size of a single page of memory? Do we really need to get it down to the byte if we increase it by 25% each time?

  • the dataLength is set to be the estimate of the row size (20 bytes for string).

Is this a valid assumption for strings? cudf limits a single column to 2 gigs, but that could be a single row. Further, there could be any number of string columns in a table. This limits a single row to ~50% of the gpu memory available before we are unable to convert it due to the double allocation, etc. If we assume 20 bytes per string and the string is even remotely large, adding 25% each iteration will take a long time to get up to large enough.

  • modify the generated method copyUTF8StringInto to return
    negative value if the strData excceds the endAddress

Can the failure case return the number of bytes required so we can better tune the allocation?

  • modify the method copyInto(UnsafeRow input, long startAddress, long endAddress) to return
    negative value if copyUTF8StringInto return negative value.

Would this process have to repeat for each string then? Say we have a row with 10 strings that are each 1 meg. Would we have to call into it 10 times in the best case where we fail once and are able to resize to a large enough buffer to hold the failed allocation?

revans2
revans2 previously approved these changes Jul 22, 2022
Copy link
Collaborator

@hyperbolic2346 hyperbolic2346 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting there for sure. These are just nits and questions.

Comment on lines 191 to 192
long newRowSizeEst = dataLength << 1 ;
newRowSizeEst = Math.min(newRowSizeEst, JCudfUtil.JCUDF_MAX_DATA_BUFFER_LENGTH);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
long newRowSizeEst = dataLength << 1 ;
newRowSizeEst = Math.min(newRowSizeEst, JCudfUtil.JCUDF_MAX_DATA_BUFFER_LENGTH);
long newRowSizeEst = Math.min(dataLength << 1, JCudfUtil.JCUDF_MAX_DATA_BUFFER_LENGTH);

It looks odd to me to write to newRowSizeEst and then immediately assign to it again. This got me looking more at this code and now I have questions too.

So we have a buffer of size dataLength and it is too small, so we double the buffer size in newRowSizeEst and then set it to the min of that doubled buffer size and a max. Then we compare the new size we want to make the buffer to the old buffer size and if it is smaller or the same, we bail. The only time this could happen is if dataLength is currently JCudfUtil.JCUDF_MAX_DATA_BUFFER_LENGTH, right? If that is the case, why won't we just check that? This way seems a little convoluted.

        if (dataLength == JCudfUtil.JCUDF_MAX_DATA_BUFFER_LENGTH) { // this won't work...
        // double buffer size and try again
        long newRowSizeEst = Math.min(dataLength << 1, JCudfUtil.JCUDF_MAX_DATA_BUFFER_LENGTH);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sure. Easy fix.

@@ -62,7 +62,8 @@ public final class JCudfUtil {
/**
* The maximum buffer size allocated to copy JCudf row.
*/
public static final long JCUDF_MAX_DATA_BUFFER_LENGTH = Integer.MAX_VALUE;
public static final long JCUDF_MAX_DATA_BUFFER_LENGTH =
Integer.MAX_VALUE - (JCUDF_ROW_ALIGNMENT - 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using Integet.MAX_VALUE as a limit? It is a limit on a cudf column due to cudf::size_type being a signed int, but is there a reason to limit the row here to an int? I'm not against this, just ensuring it isn't being coupled unnecessarily to cudf::size_type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During transitions, the rows are 8-bytes aligned. So, Integer.MAX_VALUE - 7 is 8 bytes aligned.
Probably I should rename this to a JCUDF_MAX_ROW_SIZE_LENGTH because the bufferSize which is multiple rows can be larger.

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
@amahussein
Copy link
Collaborator Author

build

@amahussein
Copy link
Collaborator Author

Failure in CI is unrelated and being reported in #6054

@jlowe
Copy link
Member

jlowe commented Jul 25, 2022

build

1 similar comment
@sameerz
Copy link
Collaborator

sameerz commented Jul 26, 2022

build

Copy link
Collaborator

@hyperbolic2346 hyperbolic2346 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. Most of my comments are about comments, so I think we can declare this very close.

integration_tests/src/main/python/row_conversion_test.py Outdated Show resolved Hide resolved
return attributes.length > 0 && varSizeColIndex != attributes.length;
}

public int getValidityBytesOffset() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm noticing some new functions without comments. I'm not sure the comment on function policy of the java code though, so I'm just going to comment that I see it. :)

amahussein and others added 9 commits July 26, 2022 21:57
….java

Co-authored-by: Mike Wilson <hyperbolic2346@users.noreply.github.com>
Co-authored-by: Mike Wilson <hyperbolic2346@users.noreply.github.com>
Co-authored-by: Mike Wilson <hyperbolic2346@users.noreply.github.com>
….java

Co-authored-by: Mike Wilson <hyperbolic2346@users.noreply.github.com>
Co-authored-by: Mike Wilson <hyperbolic2346@users.noreply.github.com>
Co-authored-by: Mike Wilson <hyperbolic2346@users.noreply.github.com>
Co-authored-by: Mike Wilson <hyperbolic2346@users.noreply.github.com>
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
@amahussein
Copy link
Collaborator Author

buil

@amahussein
Copy link
Collaborator Author

build

Copy link
Collaborator

@hyperbolic2346 hyperbolic2346 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiniest of nits left, I'm happy with how this has turned out. Thank you for humoring me through so many tiny changes.

@amahussein amahussein merged commit 452e7ba into NVIDIA:branch-22.08 Jul 27, 2022
jlowe added a commit to jlowe/spark-rapids that referenced this pull request Aug 18, 2022
…itions (NVIDIA#5998)"

This reverts commit 452e7ba.

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
jlowe added a commit that referenced this pull request Aug 19, 2022
…itions (#5998)" (#6367)

This reverts commit 452e7ba.

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request performance A performance related task/issue
Projects
None yet
6 participants