Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform explicit UnsafeRow projection in ColumnarToRow transition #5274

Merged
merged 6 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import ai.rapids.cudf.NvtxRange;
import ai.rapids.cudf.Table;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.DataType;
Expand All @@ -43,7 +44,7 @@
* be generated based off of the schema.
jlowe marked this conversation as resolved.
Show resolved Hide resolved
*/
public abstract class UnsafeRowToColumnarBatchIterator implements Iterator<ColumnarBatch> {
protected final Iterator<UnsafeRow> input;
protected final Iterator<InternalRow> input;
protected UnsafeRow pending = null;
protected final int numRowsEstimate;
protected final long dataLength;
Expand All @@ -57,7 +58,7 @@ public abstract class UnsafeRowToColumnarBatchIterator implements Iterator<Colum
protected final GpuMetric numOutputBatches;

protected UnsafeRowToColumnarBatchIterator(
Iterator<UnsafeRow> input,
Iterator<InternalRow> input,
Attribute[] schema,
CoalesceSizeGoal goal,
GpuMetric semaphoreWaitTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder, SpecializedGetters, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodegenContext, CodeGenerator}
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, NamedExpression, SortOrder, SpecializedGetters, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodegenContext, CodeGenerator, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.TrampolineUtil
Expand Down Expand Up @@ -680,7 +680,7 @@ class RowToColumnarIterator(
}

object GeneratedUnsafeRowToCudfRowIterator extends Logging {
def apply(input: Iterator[UnsafeRow],
def apply(input: Iterator[InternalRow],
schema: Array[Attribute],
goal: CoalesceSizeGoal,
semaphoreWaitTime: GpuMetric,
Expand All @@ -690,16 +690,29 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric): UnsafeRowToColumnarBatchIterator = {
val ctx = new CodegenContext

ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName)
ctx.addReferenceObj("schema", schema, classOf[Array[Attribute]].getName)
ctx.addReferenceObj("goal", goal, classOf[CoalesceSizeGoal].getName)
ctx.addReferenceObj("semaphoreWaitTime", semaphoreWaitTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("streamTime", streamTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("opTime", opTime, classOf[GpuMetric].getName)
ctx.addReferenceObj("numInputRows", numInputRows, classOf[GpuMetric].getName)
ctx.addReferenceObj("numOutputRows", numOutputRows, classOf[GpuMetric].getName)
ctx.addReferenceObj("numOutputBatches", numOutputBatches, classOf[GpuMetric].getName)
// setup code generation context to use our custom row variable
val internalRow = ctx.freshName("internalRow")
ctx.currentVars = null
ctx.INPUT_ROW = internalRow

val generateUnsafeProj = GenerateUnsafeProjection.createCode(ctx,
schema.zipWithIndex.map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) }
)

val iterRef = ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName)
val schemaRef = ctx.addReferenceObj("schema", schema,
classOf[Array[Attribute]].getCanonicalName)
abellina marked this conversation as resolved.
Show resolved Hide resolved
val goalRef = ctx.addReferenceObj("goal", goal, classOf[CoalesceSizeGoal].getName)
val semaphoreWaitTimeRef = ctx.addReferenceObj("semaphoreWaitTime", semaphoreWaitTime,
classOf[GpuMetric].getName)
val streamTimeRef = ctx.addReferenceObj("streamTime", streamTime, classOf[GpuMetric].getName)
val opTimeRef = ctx.addReferenceObj("opTime", opTime, classOf[GpuMetric].getName)
val numInputRowsRef = ctx.addReferenceObj("numInputRows", numInputRows,
classOf[GpuMetric].getName)
val numOutputRowsRef = ctx.addReferenceObj("numOutputRows", numOutputRows,
classOf[GpuMetric].getName)
val numOutputBatchesRef = ctx.addReferenceObj("numOutputBatches", numOutputBatches,
classOf[GpuMetric].getName)

val rowBaseObj = ctx.freshName("rowBaseObj")
val rowBaseOffset = ctx.freshName("rowBaseOffset")
Expand Down Expand Up @@ -757,19 +770,22 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
|}
|
|final class SpecificUnsafeRowToColumnarBatchIterator extends ${classOf[UnsafeRowToColumnarBatchIterator].getName} {
| private final org.apache.spark.sql.catalyst.expressions.UnsafeProjection unsafeProj;
|
| ${ctx.declareMutableStates()}
|
| public SpecificUnsafeRowToColumnarBatchIterator(Object[] references) {
| super((scala.collection.Iterator<UnsafeRow>)references[0],
| (org.apache.spark.sql.catalyst.expressions.Attribute[])references[1],
| (com.nvidia.spark.rapids.CoalesceSizeGoal)references[2],
| (com.nvidia.spark.rapids.GpuMetric)references[3],
| (com.nvidia.spark.rapids.GpuMetric)references[4],
| (com.nvidia.spark.rapids.GpuMetric)references[5],
| (com.nvidia.spark.rapids.GpuMetric)references[6],
| (com.nvidia.spark.rapids.GpuMetric)references[7],
| (com.nvidia.spark.rapids.GpuMetric)references[8]);
| super(
abellina marked this conversation as resolved.
Show resolved Hide resolved
| $iterRef,
| $schemaRef,
| $goalRef,
| $semaphoreWaitTimeRef,
| $streamTimeRef,
| $opTimeRef,
| $numInputRowsRef,
| $numOutputRowsRef,
| $numOutputBatchesRef);
|
| ${ctx.initMutableStates()}
| }
|
Expand All @@ -793,7 +809,9 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
| row = pending;
| pending = null;
| } else {
| row = (UnsafeRow)input.next();
| InternalRow $internalRow = (InternalRow) input.next();
| ${generateUnsafeProj.code}
| row = ${generateUnsafeProj.value};
| }
| int numBytesUsedByRow = copyInto(row, dataBaseAddress + dataOffset, endDataAddress);
| offsetsBuffer.setInt(offsetIndex, dataOffset);
Expand Down Expand Up @@ -906,8 +924,7 @@ case class GpuRowToColumnarExec(child: SparkPlan,
CudfRowTransitions.areAllSupported(output)) {
val localOutput = output
rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator(
rowIter.asInstanceOf[Iterator[UnsafeRow]],
localOutput.toArray, localGoal, semaphoreWaitTime, streamTime, opTime,
rowIter, localOutput.toArray, localGoal, semaphoreWaitTime, streamTime, opTime,
numInputRows, numOutputRows, numOutputBatches))
} else {
val converters = new GpuRowToColumnConverter(localSchema)
Expand Down