Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move HostConcatResultUtil out of unshimmed classes #5614

Merged
merged 3 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ mvn_verify() {

# Triggering here until we change the jenkins file
rapids_shuffle_smoke_test

# non-caller classloader smoke test in pseudo-distributed
# standalone cluster
echo "Running test_cartesian_join_special_case_count with spark.rapids.force.caller.classloader=false"
PYSP_TEST_spark_rapids_force_caller_classloader=false \
NUM_LOCAL_EXECS=1 \
TEST_PARALLEL=0 \
./integration_tests/run_pyspark_from_build.sh -k 'test_cartesian_join_special_case_count[100]'
}

rapids_shuffle_smoke_test() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import java.util

import ai.rapids.cudf.{HostConcatResultUtil, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange}
import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange}
import ai.rapids.cudf.JCudfSerialization.{HostConcatResult, SerializedTableHeader}
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode

Expand Down Expand Up @@ -103,7 +103,7 @@ class HostShuffleCoalesceIterator(
val firstHeader = serializedTables.peekFirst().header
if (firstHeader.getNumColumns == 0) {
(0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst())
HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch)
cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch)
} else {
val headers = new Array[SerializedTableHeader](numTablesInBatch)
withResource(new Array[HostMemoryBuffer](numTablesInBatch)) { buffers =>
Expand Down Expand Up @@ -211,7 +211,7 @@ class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult],
// generate GPU data from batches that are empty.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime)
withResource(new MetricRange(opTimeMetric)) { _ =>
val batch = HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
val batch = cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
outputBatchesMetric += 1
outputRowsMetric += batch.numRows()
batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{HostConcatResultUtil, NvtxColor, NvtxRange}
import ai.rapids.cudf.{NvtxColor, NvtxRange}
import ai.rapids.cudf.JCudfSerialization.HostConcatResult
import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode}

Expand Down Expand Up @@ -345,7 +345,7 @@ object GpuShuffledHashJoinExec extends Arm {
// we can bring the build batch to the GPU now
withResource(hostConcatResult) { _ =>
buildTime.ns {
HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package ai.rapids.cudf
package com.nvidia.spark.rapids.cudf_utils

import ai.rapids.cudf.JCudfSerialization.HostConcatResult
jlowe marked this conversation as resolved.
Show resolved Hide resolved
import ai.rapids.cudf
import com.nvidia.spark.rapids.{Arm, GpuColumnVectorFromBuffer}

import org.apache.spark.sql.types.DataType
Expand All @@ -26,11 +26,10 @@ object HostConcatResultUtil extends Arm {
/**
* Create a rows-only `HostConcatResult`.
*/
def rowsOnlyHostConcatResult(numRows: Int): HostConcatResult = {
new HostConcatResult(
new JCudfSerialization.SerializedTableHeader(
Array.empty, numRows, 0L),
HostMemoryBuffer.allocate(0, false))
def rowsOnlyHostConcatResult(numRows: Int): cudf.JCudfSerialization.HostConcatResult = {
new cudf.JCudfSerialization.HostConcatResult(
new cudf.JCudfSerialization.SerializedTableHeader(numRows),
cudf.HostMemoryBuffer.allocate(0, false))
}

/**
Expand All @@ -41,7 +40,7 @@ object HostConcatResultUtil extends Arm {
* callers are responsible for closing the resulting `ColumnarBatch`
*/
def getColumnarBatch(
hostConcatResult: HostConcatResult,
hostConcatResult: cudf.JCudfSerialization.HostConcatResult,
sparkSchema: Array[DataType]): ColumnarBatch = {
if (hostConcatResult.getTableHeader.getNumColumns == 0) {
// We expect the caller to have acquired the GPU unconditionally before calling
Expand Down