diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index 83283e57897..b5fd142ef7b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -193,6 +193,7 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range val step: Long = range.step val numSlices: Int = range.numSlices.getOrElse(sparkContext.defaultParallelism) val numElements: BigInt = range.numElements + val isEmptyRange: Boolean = start == end || (start < end ^ 0 < step) override val output: Seq[Attribute] = range.output @@ -224,76 +225,82 @@ case class GpuRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range val totalTime = longMetric(TOTAL_TIME) val maxRowCountPerBatch = Math.min(targetSizeBytes/8, Int.MaxValue) - sqlContext - .sparkContext - .parallelize(0 until numSlices, numSlices) - .mapPartitionsWithIndex { (i, _) => - val partitionStart = (i * numElements) / numSlices * step + start - val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start - def getSafeMargin(bi: BigInt): Long = - if (bi.isValidLong) { - bi.toLong - } else if (bi > 0) { - Long.MaxValue - } else { - Long.MinValue - } - val safePartitionStart = getSafeMargin(partitionStart) // inclusive - val safePartitionEnd = getSafeMargin(partitionEnd) // exclusive, unless start == this - val taskContext = TaskContext.get() - - val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] { - private[this] var number: Long = safePartitionStart - private[this] var done: Boolean = false - private[this] val inputMetrics = taskContext.taskMetrics().inputMetrics - - override def hasNext: Boolean = - if (!done) { - if (step > 0) { - number < safePartitionEnd - } else { - number > safePartitionEnd - } - } else false - - override def next(): ColumnarBatch = - withResource(new NvtxWithMetrics("GpuRange", NvtxColor.DARK_GREEN, totalTime)){ - _ => - GpuSemaphore.acquireIfNecessary(taskContext) - val start = number - val remainingSteps = (safePartitionEnd - start) / step - // Start is inclusive so we need to produce at least one row - val rowsThisBatch = Math.max(1, Math.min(remainingSteps, maxRowCountPerBatch)) - val endInclusive = start + ((rowsThisBatch - 1) * step) - number = endInclusive + step - if (number < endInclusive ^ step < 0) { - // we have Long.MaxValue + Long.MaxValue < Long.MaxValue - // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a - // step back, we are pretty sure that we have an overflow. - done = true + if (isEmptyRange) { + sparkContext.emptyRDD[ColumnarBatch] + } else { + sqlContext + .sparkContext + .parallelize(0 until numSlices, numSlices) + .mapPartitionsWithIndex { (i, _) => + val partitionStart = (i * numElements) / numSlices * step + start + val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start + + def getSafeMargin(bi: BigInt): Long = + if (bi.isValidLong) { + bi.toLong + } else if (bi > 0) { + Long.MaxValue + } else { + Long.MinValue + } + + val safePartitionStart = getSafeMargin(partitionStart) // inclusive + val safePartitionEnd = getSafeMargin(partitionEnd) // exclusive, unless start == this + val taskContext = TaskContext.get() + + val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] { + private[this] var number: Long = safePartitionStart + private[this] var done: Boolean = false + private[this] val inputMetrics = taskContext.taskMetrics().inputMetrics + + override def hasNext: Boolean = + if (!done) { + if (step > 0) { + number < safePartitionEnd + } else { + number > safePartitionEnd } + } else false + + override def next(): ColumnarBatch = + withResource(new NvtxWithMetrics("GpuRange", NvtxColor.DARK_GREEN, totalTime)) { + _ => + GpuSemaphore.acquireIfNecessary(taskContext) + val start = number + val remainingSteps = (safePartitionEnd - start) / step + // Start is inclusive so we need to produce at least one row + val rowsThisBatch = Math.max(1, Math.min(remainingSteps, maxRowCountPerBatch)) + val endInclusive = start + ((rowsThisBatch - 1) * step) + number = endInclusive + step + if (number < endInclusive ^ step < 0) { + // we have Long.MaxValue + Long.MaxValue < Long.MaxValue + // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a + // step back, we are pretty sure that we have an overflow. + done = true + } - val ret = withResource(Scalar.fromLong(start)) { startScalar => - withResource(Scalar.fromLong(step)) { stepScalar => - withResource( - ai.rapids.cudf.ColumnVector.sequence( - startScalar, stepScalar, rowsThisBatch.toInt)) { vec => - withResource(new Table(vec)) { tab => - GpuColumnVector.from(tab, Array[DataType](LongType)) + val ret = withResource(Scalar.fromLong(start)) { startScalar => + withResource(Scalar.fromLong(step)) { stepScalar => + withResource( + ai.rapids.cudf.ColumnVector.sequence( + startScalar, stepScalar, rowsThisBatch.toInt)) { vec => + withResource(new Table(vec)) { tab => + GpuColumnVector.from(tab, Array[DataType](LongType)) + } } } } - } - assert (rowsThisBatch == ret.numRows()) - numOutputRows += rowsThisBatch - TrampolineUtil.incInputRecordsRows(inputMetrics, rowsThisBatch) - numOutputBatches += 1 - ret - } + assert(rowsThisBatch == ret.numRows()) + numOutputRows += rowsThisBatch + TrampolineUtil.incInputRecordsRows(inputMetrics, rowsThisBatch) + numOutputBatches += 1 + ret + } + } + new InterruptibleIterator(taskContext, iter) } - new InterruptibleIterator(taskContext, iter) - } + } } override def simpleString(maxFields: Int): String = {