Skip to content

Commit

Permalink
Move Stack classes to wrapper classes to fix non-deterministic build …
Browse files Browse the repository at this point in the history
…issue (#9576)

* Move Stack classes to wrapper classes around the original scala 2.12/2.13 Stack classes to handle build issues

Signed-off-by: Navin Kumar <navink@nvidia.com>

* Make these classes subclass scala.Proxy

Signed-off-by: Navin Kumar <navink@nvidia.com>

* Clean this up

Signed-off-by: Navin Kumar <navink@nvidia.com>

* Fix bug in premerge build

Signed-off-by: Navin Kumar <navink@nvidia.com>

---------

Signed-off-by: Navin Kumar <navink@nvidia.com>
  • Loading branch information
NVnavkumar authored Oct 31, 2023
1 parent 7bd0998 commit d157306
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,36 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayStack

class ScalaStack[T] extends ArrayStack[T]
class RapidsStack[T] extends Proxy {
private val stack = new ArrayStack[T]()

override def self = stack

def push(elem1: T): Unit = {
self.push(elem1)
}

def pop(): T = {
self.pop()
}

def isEmpty: Boolean = {
self.isEmpty
}

def nonEmpty: Boolean = {
self.nonEmpty
}

def size: Int = {
self.size
}

def toSeq: Seq[T] = {
self.toSeq
}

def clear(): Unit = {
self.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,36 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.Stack

class ScalaStack[T] extends Stack[T]
class RapidsStack[T] extends Proxy {
private val stack = new Stack[T]()
override def self = stack

def push(elem1: T): RapidsStack[T] = {
self.push(elem1)
this
}

def pop(): T = {
self.pop()
}

def isEmpty: Boolean = {
self.isEmpty
}

def nonEmpty: Boolean = {
self.nonEmpty
}

def size(): Int = {
self.size
}

def toSeq(): Seq[T] = {
self.toSeq
}

def clear(): Unit = {
self.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ case class GpuOutOfCoreSortIterator(
while (!pending.isEmpty && sortedSize < targetSize) {
// Keep going until we have enough data to return
var bytesLeftToFetch = targetSize
val pendingSort = new ScalaStack[SpillableColumnarBatch]()
closeOnExcept(pendingSort) { _ =>
val pendingSort = new RapidsStack[SpillableColumnarBatch]()
closeOnExcept(pendingSort.toSeq) { _ =>
while (!pending.isEmpty &&
(bytesLeftToFetch - pending.peek().buffer.sizeInBytes >= 0 || pendingSort.isEmpty)) {
val buffer = pending.poll().buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ class GpuSorter(
* @return the sorted data.
*/
final def mergeSortAndCloseWithRetry(
spillableBatches: ScalaStack[SpillableColumnarBatch],
spillableBatches: RapidsStack[SpillableColumnarBatch],
sortTime: GpuMetric): SpillableColumnarBatch = {
closeOnExcept(spillableBatches) { _ =>
closeOnExcept(spillableBatches.toSeq) { _ =>
assert(spillableBatches.nonEmpty)
}
withResource(new NvtxWithMetrics("merge sort", NvtxColor.DARK_GREEN, sortTime)) { _ =>
Expand Down Expand Up @@ -277,9 +277,9 @@ class GpuSorter(
}
}
} else {
closeOnExcept(spillableBatches) { _ =>
val batchesToMerge = new ScalaStack[SpillableColumnarBatch]()
closeOnExcept(batchesToMerge) { _ =>
closeOnExcept(spillableBatches.toSeq) { _ =>
val batchesToMerge = new RapidsStack[SpillableColumnarBatch]()
closeOnExcept(batchesToMerge.toSeq) { _ =>
while (spillableBatches.nonEmpty || batchesToMerge.size > 1) {
// pop a spillable batch if there is one, and add it to `batchesToMerge`.
if (spillableBatches.nonEmpty) {
Expand All @@ -299,7 +299,7 @@ class GpuSorter(

// we no longer care about the old batches, we closed them
closeOnExcept(merged) { _ =>
batchesToMerge.safeClose()
batchesToMerge.toSeq.safeClose()
batchesToMerge.clear()
}

Expand Down

0 comments on commit d157306

Please sign in to comment.