Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SpillableHostColumnarBatch #9098

Merged
merged 2 commits into from
Aug 28, 2023

Conversation

abellina
Copy link
Collaborator

@abellina abellina commented Aug 23, 2023

Closes #8882

This is ready for another pair of eyes to take a look a the approach, especially around serialization/deserialization of host-backed ColumnarBatch.

Note that this doesn't support "unspill", as the path to get a memory buffer is not supported for a batch. We need follow on work after this like unspill and how to deal with GDS being configured. We also are using JCudfSerialization to write and read from a JVM stream to write/read from disk and there are likely faster ways of doing this we can look into, but they should be under the hood.

I am going to run this through NDS just in case, so I'll mark it as a draft for now.

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

I don't see regressions in NDS (not that they were expected).

@abellina abellina marked this pull request as ready for review August 23, 2023 21:33
@sameerz sameerz added task Work required that improves the product but is not user facing reliability Features to improve reliability or bugs that severly impact the reliability of the plugin labels Aug 23, 2023
revans2
revans2 previously approved these changes Aug 24, 2023
try {
val channel = fos.getChannel
val fileOffset = channel.position
iter.foreach { bb =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close bb when we are done?

Copy link
Collaborator Author

@abellina abellina Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These ByteBuffer instances are views on top of hostBuffer. It looks like the jdk will instantiate a DirectByteBuffer with a null cleaner in this case. We call env->NewDirectByteBuffer(addr, len).

As such I think these views are going to stay around in the heap as it is. We could move this implementation to cuDF so we can write directly from the HostMemoryBuffer to a file, rather than having to work around limitations in the jdk's ByteBuffer impl with this iterator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that closing them really ends up being a NOOP and the heap cleans them up. The only reason I mention it, is because if bb changes in some way to not be backed by a HostMemoryBuffer we would then potentially leak memory. It is just defensive. And a nit.

@abellina
Copy link
Collaborator Author

Adding @jlowe because it would be great if he can take a look at this change also

Comment on lines +289 to +290
def getHostColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = {
throw new IllegalStateException(s"$this does not support host columnar batches.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Similar comment here. It's nice that this doesn't have the sentinel value, but I'd rather see a trait that defines the ability to provide a HostColumnarBatch and have those that need to use it on their underlying RAPIDS buffer pattern match to downcast the buffer type to get access to this rather than have a method that explodes if you don't carefully know what you're doing. Not a must-fix for me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting these in a trait is easy, I can do that.

}

override def getHostColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = {
columnSpillability.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this going to interact with another thread that's trying to call onColumnSpillable at the same time? Could we sometimes end up in a state where we built the host batch but still think this is spillable? I think the answer is yes, but maybe we don't care because of the refcounts? In light of that, seems prudent to first get the refcounts in place before marking this as spillable, otherwise I think another thread could end up calling releaseResources and close hostCb on this (now spillable) buffer before we get around to locking down hostCb.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getHostColumnarBatch, like getColumnarBatch must be called after acquiring the RapidsBuffer via catalog.acquireBuffer. It will already have dealt with the RapidsBuffer reference count, so we are guaranteed that the batch produced isn't pointing to a closed batch.

And yes I see the race you mention. I'll think about that more, to see if we need to lock things further.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we don't do is mark something as not spillable if it was acquired. It is only non-spillable when the batch or buffer is obtained.

Perhaps we should add a second level of unspillable. While a buffer has RapidsBuffer.refCount > 1, we could mark the buffer as non spillable. When the RapidsBuffer.refCount goes back to 1, we could return to the cuDF-driven spillability checks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this is an issue with device buffers and batches as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While a buffer has RapidsBuffer.refCount > 1, we could mark the buffer as non spillable.

This seems desirable in general, because it's not worth spilling something if we cannot the free memory associated with it after the spill copy. Not something to tackle in this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jlowe. I filed this issue to track #9120

// the columns (the caller is holding onto a ColumnarBatch that will be closed
// after instantiation, triggering onClosed callbacks)
// This hash set contains the columns that are currently spillable.
private val columnSpillability = new ConcurrentHashMap[HostColumnVector, Boolean]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This could be done more efficiently as a BitSet (especially since most column batches will be less than 64 columns), although that would require synchronization when accessing it. Simply need to track the column index in the on close event handler (we're generating a new handler per column anyway) and have onColumnSpillable take a column index instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this one is a nit, would you mind if I do this as a separate issue? This code is really a copy from the device counterpart. I would like to fix both if we are going to change this to a bitset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate issue is fine.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to hear from others too though.

@abellina
Copy link
Collaborator Author

build

@abellina abellina merged commit 5124777 into NVIDIA:branch-23.10 Aug 28, 2023
27 checks passed
@abellina abellina deleted the spillable_host_batch branch August 28, 2023 21:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
reliability Features to improve reliability or bugs that severly impact the reliability of the plugin task Work required that improves the product but is not user facing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Add API to make HostMemoryBuffers/HostColumnVectors Spillable
5 participants