Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for GPUCoalesceBatch to deal with Map #1052

Merged
merged 2 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ else
mkdir -p "$RUN_DIR"
cd "$RUN_DIR"
"$SPARK_HOME"/bin/spark-submit --jars "${ALL_JARS// /,}" \
--conf "spark.driver.extraJavaOptions=-Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS" \
--conf 'spark.executor.extraJavaOptions=-Duser.timezone=GMT' \
--conf "spark.driver.extraJavaOptions=-ea -Duser.timezone=GMT $COVERAGE_SUBMIT_FLAGS" \
--conf 'spark.executor.extraJavaOptions=-ea -Duser.timezone=GMT' \
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not needed just added them in to verify that I didn't get something wrong.

--conf 'spark.sql.session.timeZone=UTC' \
--conf 'spark.sql.shuffle.partitions=12' \
$SPARK_SUBMIT_FLAGS \
Expand Down
126 changes: 122 additions & 4 deletions sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -260,18 +261,130 @@ public static final Table from(ColumnarBatch batch) {
return new Table(extractBases(batch));
}

/**
* Get the data types for a batch.
*/
public static final DataType[] extractTypes(ColumnarBatch batch) {
DataType[] ret = new DataType[batch.numCols()];
for (int i = 0; i < batch.numCols(); i++) {
ret[i] = batch.column(i).dataType();
}
return ret;
}

/**
* Get the data types for a struct.
*/
public static final DataType[] extractTypes(StructType st) {
DataType[] ret = new DataType[st.size()];
for (int i = 0; i < st.size(); i++) {
ret[i] = st.apply(i).dataType();
}
return ret;
}

/**
* Convert a Table to a ColumnarBatch. The columns in the table will have their reference counts
* incremented so you will need to close both the table passed in and the batch returned to
* not have any leaks.
* @deprecated spark data types must be provided with it.
*/
@Deprecated
public static final ColumnarBatch from(Table table) {
return from(table, 0, table.getNumberOfColumns());
}

public static final ColumnarBatch from(Table table, List<DataType> colTypes) {
public static final ColumnarBatch from(Table table, DataType[] colTypes) {
return from(table, colTypes, 0, table.getNumberOfColumns());
}

/**
* This should only ever be called from an assertion.
*/
private static boolean typeConversionAllowed(ColumnViewAccess cv, DataType colType) {
DType dt = cv.getDataType();
if (!dt.isNestedType()) {
return getSparkType(dt).equals(colType);
}
if (colType instanceof MapType) {
MapType mType = (MapType) colType;
// list of struct of key/value
if (!(dt.equals(DType.LIST))) {
return false;
}
try (ColumnViewAccess structCv = cv.getChildColumnViewAccess(0)) {
if (!(structCv.getDataType().equals(DType.STRUCT))) {
return false;
}
if (structCv.getNumChildren() != 2) {
return false;
}
try (ColumnViewAccess keyCv = structCv.getChildColumnViewAccess(0)) {
if (!typeConversionAllowed(keyCv, mType.keyType())) {
return false;
}
}
try (ColumnViewAccess valCv = structCv.getChildColumnViewAccess(1)) {
return typeConversionAllowed(valCv, mType.valueType());
}
}
} else if (colType instanceof ArrayType) {
if (!(dt.equals(DType.LIST))) {
return false;
}
try (ColumnViewAccess tmp = cv.getChildColumnViewAccess(0)) {
return typeConversionAllowed(tmp, ((ArrayType) colType).elementType());
}
} else if (colType instanceof StructType) {
if (!(dt.equals(DType.STRUCT))) {
return false;
}
StructType st = (StructType) colType;
final int numChildren = cv.getNumChildren();
if (numChildren != st.size()) {
return false;
}
for (int childIndex = 0; childIndex < numChildren; childIndex++) {
try (ColumnViewAccess tmp = cv.getChildColumnViewAccess(childIndex)) {
StructField entry = ((StructType) colType).apply(childIndex);
if (!typeConversionAllowed(tmp, entry.dataType())) {
return false;
}
}
}
return true;
} else if (colType instanceof BinaryType) {
if (!(dt.equals(DType.LIST))) {
return false;
}
try (ColumnViewAccess tmp = cv.getChildColumnViewAccess(0)) {
DType tmpType = tmp.getDataType();
return tmpType.equals(DType.INT8) || tmpType.equals(DType.UINT8);
}
} else {
// Unexpected type
return false;
}
}

/**
* This should only ever be called from an assertion. This is to avoid the performance overhead
* of doing the complicated check in production. Sadly this means that we don't get to give a
* clear message about what part of the check failed, so the assertions that use this should
* include in the message both types so a user can see what is different about them.
*/
private static boolean typeConversionAllowed(Table table, DataType[] colTypes) {
final int numColumns = table.getNumberOfColumns();
if (numColumns != colTypes.length) {
return false;
}
boolean ret = true;
for (int colIndex = 0; colIndex < numColumns; colIndex++) {
ret = ret && typeConversionAllowed(table.getColumn(colIndex), colTypes[colIndex]);
}
return ret;
}

/**
* Get a ColumnarBatch from a set of columns in the Table. This gets the columns
* starting at startColIndex and going until but not including untilColIndex. This will
Expand All @@ -282,8 +395,10 @@ public static final ColumnarBatch from(Table table, List<DataType> colTypes) {
* @param startColIndex index of the first vector you want in the final ColumnarBatch
* @param untilColIndex until index of the columns. (ie doesn't include that column num)
* @return a ColumnarBatch of the vectors from the table
* @deprecated must use the version that takes spark data types.
*/
public static final ColumnarBatch from(Table table, int startColIndex, int untilColIndex) {
@Deprecated
private static final ColumnarBatch from(Table table, int startColIndex, int untilColIndex) {
assert table != null : "Table cannot be null";
int numColumns = untilColIndex - startColIndex;
ColumnVector[] columns = new ColumnVector[numColumns];
Expand Down Expand Up @@ -324,15 +439,17 @@ public static final ColumnarBatch from(Table table, int startColIndex, int until
* @param untilColIndex until index of the columns. (ie doesn't include that column num)
* @return a ColumnarBatch of the vectors from the table
*/
public static final ColumnarBatch from(Table table, List<DataType> colTypes, int startColIndex, int untilColIndex) {
public static final ColumnarBatch from(Table table, DataType[] colTypes, int startColIndex, int untilColIndex) {
assert table != null : "Table cannot be null";
assert typeConversionAllowed(table, colTypes) : "Type conversion is not allowed from " + table +
" to " + colTypes;
int numColumns = untilColIndex - startColIndex;
ColumnVector[] columns = new ColumnVector[numColumns];
int finalLoc = 0;
boolean success = false;
try {
for (int i = startColIndex; i < untilColIndex; i++) {
columns[finalLoc] = from(table.getColumn(i).incRefCount(), colTypes.get(i));
columns[finalLoc] = from(table.getColumn(i).incRefCount(), colTypes[i]);
finalLoc++;
}
long rows = table.getRowCount();
Expand Down Expand Up @@ -399,6 +516,7 @@ public static final GpuColumnVector[] extractColumns(ColumnarBatch batch) {
return vectors;
}

@Deprecated
public static final GpuColumnVector[] extractColumns(Table table) {
try (ColumnarBatch batch = from(table)) {
return extractColumns(batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ object ConcatAndConsumeAll {
* blow up.
* @param arrayOfBatches the batches to concat. This will be consumed and you do not need to
* close any of the batches after this is called.
* @param schema the schema of the output types.
* @return a single batch with all of them concated together.
*/
def buildNonEmptyBatch(arrayOfBatches: Array[ColumnarBatch]): ColumnarBatch = {
def buildNonEmptyBatch(arrayOfBatches: Array[ColumnarBatch],
schema: StructType): ColumnarBatch = {
if (arrayOfBatches.length == 1) {
arrayOfBatches(0)
} else {
val tables = arrayOfBatches.map(GpuColumnVector.from)
try {
val combined = Table.concatenate(tables: _*)
try {
GpuColumnVector.from(combined)
GpuColumnVector.from(combined, GpuColumnVector.extractTypes(schema))
} finally {
combined.close()
}
Expand Down Expand Up @@ -465,7 +467,7 @@ class GpuCoalesceIteratorForMaps(iter: Iterator[ColumnarBatch],
val tmp = batches.toArray
// Clear the buffer so we don't close it again (buildNonEmptyBatch closed it for us).
batches = ArrayBuffer.empty
val ret = ConcatAndConsumeAll.buildNonEmptyBatch(tmp)
val ret = ConcatAndConsumeAll.buildNonEmptyBatch(tmp, schema)
// sum of current batches and concatenating batches. Approximately sizeof(ret * 2).
maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2
ret
Expand Down Expand Up @@ -608,7 +610,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
val ret = ConcatAndConsumeAll.buildNonEmptyBatch(popAllDecompressed())
val ret = ConcatAndConsumeAll.buildNonEmptyBatch(popAllDecompressed(), schema)
// sum of current batches and concatenating batches. Approximately sizeof(ret * 2).
maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,8 +979,8 @@ class MultiFileParquetPartitionReader(
} else {
val table = readToTable(seqPathsAndBlocks, clippedSchema, isCorrectRebaseMode)
try {
val colTypes = readDataSchema.fields.map(f => f.dataType).toList
val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes.asJava))
val colTypes = readDataSchema.fields.map(f => f.dataType)
val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes))
maybeBatch.foreach { batch =>
logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes")
}
Expand Down Expand Up @@ -1416,8 +1416,8 @@ class MultiFileCloudParquetPartitionReader(
Some(evolveSchemaIfNeededAndClose(table, fileName, clippedSchema))
}
try {
val colTypes = readDataSchema.fields.map(f => f.dataType).toList
val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes.asJava))
val colTypes = readDataSchema.fields.map(f => f.dataType)
val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes))
maybeBatch.foreach { batch =>
logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes")
}
Expand Down Expand Up @@ -1498,8 +1498,8 @@ class ParquetPartitionReader(
} else {
val table = readToTable(currentChunkedBlocks)
try {
val colTypes = readDataSchema.fields.map(f => f.dataType).toList
val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes.asJava))
val colTypes = readDataSchema.fields.map(f => f.dataType)
val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes))
maybeBatch.foreach { batch =>
logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, OrderedDistribution}
import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -119,7 +119,10 @@ case class GpuRangePartitioning(
//get the table for upper bound calculation
slicedSortedTbl = new Table(sortColumns: _*)
//get the final column batch, remove the sort order sortColumns
finalSortedCb = GpuColumnVector.from(sortedTbl, numSortCols, sortedTbl.getNumberOfColumns)
val outputTypes = gpuOrdering.map(_.child.dataType) ++
GpuColumnVector.extractTypes(batch)
finalSortedCb = GpuColumnVector.from(sortedTbl, outputTypes.toArray,
numSortCols, sortedTbl.getNumberOfColumns)
val numRows = finalSortedCb.numRows
partitionColumns = GpuColumnVector.extractColumns(finalSortedCb)
// get the ranges table and get upper bounds if possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, NullsFirst, NullsLa
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{SortExec, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuSortMeta(
Expand Down Expand Up @@ -160,18 +161,22 @@ class GpuColumnarBatchSorter(
private def sortBatch(inputBatch: ColumnarBatch): ColumnarBatch = {
val nvtxRange = initNvtxRange
try {
var outputTypes: Seq[DataType] = Nil
var inputTbl: Table = null
var inputCvs: Seq[GpuColumnVector] = Nil
try {
if (sortOrder.nonEmpty) {
inputCvs = SortUtils.getGpuColVectorsAndBindReferences(inputBatch, sortOrder)
inputTbl = new cudf.Table(inputCvs.map(_.getBase): _*)
outputTypes = sortOrder.map(_.child.dataType) ++
GpuColumnVector.extractTypes(inputBatch)
} else if (inputBatch.numCols() > 0) {
inputTbl = GpuColumnVector.from(inputBatch)
outputTypes = GpuColumnVector.extractTypes(inputBatch)
}
val orderByArgs = getOrderArgs(inputTbl)
val startTimestamp = System.nanoTime()
val batch = doGpuSort(inputTbl, orderByArgs)
val batch = doGpuSort(inputTbl, orderByArgs, outputTypes)
updateMetricValues(inputTbl, startTimestamp, batch)
batch
} finally {
Expand Down Expand Up @@ -244,11 +249,12 @@ class GpuColumnarBatchSorter(

private def doGpuSort(
tbl: Table,
orderByArgs: Seq[Table.OrderByArg]): ColumnarBatch = {
orderByArgs: Seq[Table.OrderByArg],
types: Seq[DataType]): ColumnarBatch = {
var resultTbl: cudf.Table = null
try {
resultTbl = tbl.orderBy(orderByArgs: _*)
GpuColumnVector.from(resultTbl, numSortCols, resultTbl.getNumberOfColumns)
GpuColumnVector.from(resultTbl, types.toArray, numSortCols, resultTbl.getNumberOfColumns)
} finally {
if (resultTbl != null) {
resultTbl.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,12 @@ object GpuFilter extends Arm {
var tbl: cudf.Table = null
var filtered: cudf.Table = null
try {
import collection.JavaConverters._
filterConditionCv = boundCondition.columnarEval(batch).asInstanceOf[GpuColumnVector]
tbl = GpuColumnVector.from(batch)
val colTypes =
(0 until batch.numCols()).map(i => batch.column(i).dataType())
filtered = tbl.filter(filterConditionCv.getBase)
GpuColumnVector.from(filtered, colTypes.asJava)
GpuColumnVector.from(filtered, colTypes.toArray)
} finally {
Seq(filtered, tbl, filterConditionCv, batch).safeClose()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import org.apache.spark.util.Utils
*/
class RebatchingRoundoffIterator(
wrapped: Iterator[ColumnarBatch],
schema: StructType,
targetRoundoff: Int,
inputRows: SQLMetric,
inputBatches: SQLMetric)
Expand Down Expand Up @@ -95,7 +96,7 @@ class RebatchingRoundoffIterator(
batches.append(SpillableColumnarBatch(got, SpillPriorities.ACTIVE_BATCHING_PRIORITY))
}
val toConcat = batches.safeMap(_.getColumnarBatch()).toArray
ConcatAndConsumeAll.buildNonEmptyBatch(toConcat)
ConcatAndConsumeAll.buildNonEmptyBatch(toConcat, schema)
}

override def next(): ColumnarBatch = {
Expand Down Expand Up @@ -539,6 +540,7 @@ case class GpuArrowEvalPythonExec(

lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)
val inputRDD = child.executeColumnar()
val schema = output.toStructType
inputRDD.mapPartitions { iter =>
val queue: BatchQueue = new BatchQueue()
val context = TaskContext.get()
Expand Down Expand Up @@ -573,7 +575,7 @@ case class GpuArrowEvalPythonExec(
})

val boundReferences = GpuBindReferences.bindReferences(allInputs, child.output)
val batchedIterator = new RebatchingRoundoffIterator(iter, batchSize,
val batchedIterator = new RebatchingRoundoffIterator(iter, schema, batchSize,
numInputRows, numInputBatches)
val projectedIterator = batchedIterator.map { batch =>
// We have to do the project before we add the batch because the batch might be closed
Expand Down