Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sequence support [databricks] #4376

Merged
merged 6 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,91 @@ def test_sort_array_lit(data_gen, is_ascending):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen, length=10).select(
f.sort_array(f.lit(array_lit), is_ascending)))

# We must restrict the length of sequence, since we may suffer the exception
# "Too long sequence: 2147483745. Should be <= 2147483632" or OOM.
sequence_integral_gens = [
ByteGen(nullable=False, min_val=-20, max_val=20, special_cases=[]),
ShortGen(nullable=False, min_val=-20, max_val=20, special_cases=[]),
IntegerGen(nullable=False, min_val=-20, max_val=20, special_cases=[]),
LongGen(nullable=False, min_val=-20, max_val=20, special_cases=[])
]

@pytest.mark.parametrize('data_gen', sequence_integral_gens, ids=idfn)
def test_sequence_without_step(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark :
three_col_df(spark, data_gen, data_gen, data_gen)
.selectExpr("sequence(a, b)",
"sequence(a, 0)",
"sequence(0, b)"))

# This function is to generate the correct sequence data according to below limitations.
# (step > num.zero && start <= stop)
# || (step < num.zero && start >= stop)
# || (step == num.zero && start == stop)
def get_sequence_data(data_gen, length=2048):
rand = random.Random(0)
data_gen.start(rand)
list = []
for index in range(length):
start = data_gen.gen()
stop = data_gen.gen()
step = data_gen.gen()
# decide the direction of step
if start < stop:
step = abs(step) + 1
elif start == stop:
step = 0
else:
step = -(abs(step) + 1)
list.append(tuple([start, stop, step]))
# add special case
list.append(tuple([2, 2, 0]))
return list

def get_sequence_df(spark, data, data_type):
return spark.createDataFrame(
SparkContext.getOrCreate().parallelize(data),
StructType([StructField('a', data_type), StructField('b', data_type), StructField('c', data_type)]))

# test below case
# (2, -1, -1)
# (2, 5, 2)
# (2, 2, 0)
@pytest.mark.parametrize('data_gen', sequence_integral_gens, ids=idfn)
def test_sequence_with_step_case1(data_gen):
data = get_sequence_data(data_gen)
assert_gpu_and_cpu_are_equal_collect(
lambda spark :
get_sequence_df(spark, data, data_gen.data_type)
.selectExpr("sequence(a, b, c)"))

sequence_three_cols_integral_gens = [
(ByteGen(nullable=False, min_val=-10, max_val=10, special_cases=[]),
ByteGen(nullable=False, min_val=30, max_val=50, special_cases=[]),
ByteGen(nullable=False, min_val=1, max_val=10, special_cases=[])),
(ShortGen(nullable=False, min_val=-10, max_val=10, special_cases=[]),
ShortGen(nullable=False, min_val=30, max_val=50, special_cases=[]),
ShortGen(nullable=False, min_val=1, max_val=10, special_cases=[])),
(IntegerGen(nullable=False, min_val=-10, max_val=10, special_cases=[]),
IntegerGen(nullable=False, min_val=30, max_val=50, special_cases=[]),
IntegerGen(nullable=False, min_val=1, max_val=10, special_cases=[])),
(LongGen(nullable=False, min_val=-10, max_val=10, special_cases=[-10, 10]),
LongGen(nullable=False, min_val=30, max_val=50, special_cases=[30, 50]),
LongGen(nullable=False, min_val=1, max_val=10, special_cases=[1, 10])),
]

# Test the scalar case for the data start < stop and step > 0
@pytest.mark.parametrize('start_gen,stop_gen,step_gen', sequence_three_cols_integral_gens, ids=idfn)
def test_sequence_with_step_case2(start_gen, stop_gen, step_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark :
three_col_df(spark, start_gen, stop_gen, step_gen)
.selectExpr("sequence(a, b, c)",
"sequence(a, b, 2)",
"sequence(a, 20, c)",
"sequence(a, 20, 2)",
"sequence(0, b, c)",
"sequence(0, 4, c)",
"sequence(0, b, 3)"),)
Original file line number Diff line number Diff line change
Expand Up @@ -3293,6 +3293,18 @@ object GpuOverrides extends Logging {
(a, conf, p, r) => new ExprMeta[CreateMap](a, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuCreateMap(childExprs.map(_.convertToGpu()))
}
),
expr[Sequence](
desc = "Sequence",
ExprChecks.projectOnly(
TypeSig.ARRAY.nested(TypeSig.integral), TypeSig.ARRAY.nested(TypeSig.integral +
TypeSig.TIMESTAMP + TypeSig.DATE),
Seq(ParamCheck("start", TypeSig.integral, TypeSig.integral + TypeSig.TIMESTAMP +
TypeSig.DATE),
ParamCheck("stop", TypeSig.integral, TypeSig.integral + TypeSig.TIMESTAMP +
TypeSig.DATE)),
Some(RepeatingParamCheck("step", TypeSig.integral, TypeSig.integral + TypeSig.CALENDAR))),
(a, conf, p, r) => new GpuSequenceMeta(a, conf, p, r)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
)
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package org.apache.spark.sql.rapids
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import ai.rapids.cudf.{ColumnView, GroupByAggregation, GroupByOptions, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuLiteral, GpuMapUtils, GpuScalar, GpuUnaryExpression}
import ai.rapids.cudf.{BinaryOperable, ColumnVector, ColumnView, GroupByAggregation, GroupByOptions, Scalar}
import com.nvidia.spark.rapids.{DataFromReplacementRule, ExprMeta, GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuExpression, GpuLiteral, GpuMapUtils, GpuScalar, GpuTernaryExpression, GpuUnaryExpression, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.GpuExpressionsUtils.columnarEvalToColumn
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes, RowOrdering}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes, RowOrdering, Sequence, TimeZoneAwareExpression}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -392,4 +392,267 @@ case class GpuArrayMax(child: Expression) extends GpuBaseArrayAgg with ImplicitC
override def prettyName: String = "array_max"

override protected def agg: GroupByAggregation = GroupByAggregation.max()
}
}

class GpuSequenceMeta(
expr: Sequence,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ExprMeta[Sequence](expr, conf, parent, rule) {

override def tagExprForGpu(): Unit = {
// We have to fall back to the CPU if the timeZoneId is not UTC when
// we are processing date/timestamp.
// Date/Timestamp are not enabled right now so this is probably fine.
}

override def convertToGpu(): GpuExpression = {
if (expr.stepOpt.isDefined) {
val Seq(start, stop, step) = childExprs.map(_.convertToGpu())
GpuSequenceWithStep(start, stop, step, expr.timeZoneId)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
} else {
val Seq(start, stop) = childExprs.map(_.convertToGpu())
GpuSequence(start, stop, expr.timeZoneId)
}
}
}

trait GpuSequenceTrait {
revans2 marked this conversation as resolved.
Show resolved Hide resolved

def numberScalar(dt: DataType, value: Int): Scalar = dt match {
case ByteType => Scalar.fromByte(value.toByte)
case ShortType => Scalar.fromShort(value.toShort)
case IntegerType => Scalar.fromInt(value)
case LongType => Scalar.fromLong(value.toLong)
case _ =>
throw new IllegalArgumentException("wrong data type: " + dt)
}

}

/** GpuSequence without step */
case class GpuSequence(start: Expression, stop: Expression, timeZoneId: Option[String] = None)
extends GpuBinaryExpression with TimeZoneAwareExpression with GpuSequenceTrait {

override def left: Expression = start

override def right: Expression = stop

override def dataType: DataType = ArrayType(start.dataType, containsNull = false)

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Some(timeZoneId))

/** Calculate the size between start and stop, both inclusive
* |stop - start| + 1 */

/**
* Calculate the size between start and stop both inclusive and the step
* @param start
* @param stop
* @return (size, step)
*/
private def calculateSizeAndStep(start: BinaryOperable, stop: BinaryOperable, dt: DataType):
Seq[ColumnVector] = {
withResource(stop.sub(start)) { difference =>
withResource(numberScalar(dt, 1)) { one =>
val step = withResource(numberScalar(dt, -1)) { negativeOne =>
withResource(numberScalar(dt, 0)) { scalarZero =>
withResource(difference.greaterOrEqualTo(scalarZero)) { pred =>
pred.ifElse(one, negativeOne)
}
}
}
val size = closeOnExcept(step) { _ =>
withResource(difference.abs()) { absDifference =>
absDifference.add(one)
}
}
Seq(size, step)
}
}
}

override def doColumnar(start: GpuColumnVector, stop: GpuColumnVector): ColumnVector = {
withResource(calculateSizeAndStep(start.getBase, stop.getBase, start.dataType())) { ret =>
ColumnVector.sequence(start.getBase, ret(0), ret(1))
}
}

override def doColumnar(start: GpuScalar, stop: GpuColumnVector): ColumnVector = {
withResource(calculateSizeAndStep(start.getBase, stop.getBase, stop.dataType())) { ret =>
withResource(ColumnVector.fromScalar(start.getBase, stop.getRowCount.toInt)) { startV =>
ColumnVector.sequence(startV, ret(0), ret(1))
}
}
}

override def doColumnar(start: GpuColumnVector, stop: GpuScalar): ColumnVector = {
withResource(calculateSizeAndStep(start.getBase, stop.getBase, start.dataType())) { ret =>
ColumnVector.sequence(start.getBase, ret(0), ret(1))
}
}

override def doColumnar(numRows: Int, lhs: GpuScalar, rhs: GpuScalar): ColumnVector =
throw new IllegalStateException("This is not supported yet")

}

/** GpuSequence with step */
case class GpuSequenceWithStep(start: Expression, stop: Expression, step: Expression,
timeZoneId: Option[String] = None) extends GpuTernaryExpression with TimeZoneAwareExpression
with GpuSequenceTrait {

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Some(timeZoneId))

override def first: Expression = start

override def second: Expression = stop

override def third: Expression = step

override def dataType: DataType = ArrayType(start.dataType, containsNull = false)

/* TODO do we need to check below conditions?
revans2 marked this conversation as resolved.
Show resolved Hide resolved
(step > num.zero && start <= stop)
|| (step < num.zero && start >= stop)
|| (step == num.zero && start == stop),
*/
private def calculateSize(
start: BinaryOperable,
stop: BinaryOperable,
step: BinaryOperable,
rows: Int,
dt: DataType): ColumnVector = {
// First, calculate sizeWithNegative=floor((stop-start)/step)+1.
// if step = 0, the div operation in cudf will get MIN_VALUE, which is ok for this case,
// since when size < 0, cudf will not generate sequence
// Second, calculate size = if(sizeWithNegative < 0) 0 else sizeWithNegative
// Third, if (start == stop && step == 0), let size = 1.
withResource(stop.sub(start)) { difference =>
withResource(difference.floorDiv(step)) { quotient =>
withResource(numberScalar(dt, 1)) { one =>
withResource(quotient.add(one)) { sizeWithNegative =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When withResource nests this deeply, that's usually an indication that we're holding onto one or more GPU results longer than necessary, adding undesired and avoidable memory pressure. For example, we compute quotient here and only need it to compute sizeWithNegative, yet we hold onto the GPU memory for the quotient result until after the entire calculation completes. The memory can be freed earlier with something like this:

  withResource(numberScalar(dt, 1)) { one =>
    val sizeWithNegative = withResource(difference.floorDiv(step)) { quotient =>
      quotient.add(one)
    }
    withResource(sizeWithNegative) { sizeWithNegative =>
      ....

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @jlowe. Changed this accordingly.

withResource(numberScalar(dt, 0)) { zero =>
withResource(sizeWithNegative.greaterOrEqualTo(zero)) { pred =>
withResource(pred.ifElse(sizeWithNegative, zero)) { tmpSize =>
// when start==stop && step==0, size will be 0.
// but we should change size to 1
withResource(difference.equalTo(zero)) { diffHasZero =>
step match {
case stepScalar: Scalar =>
withResource(ColumnVector.fromScalar(stepScalar, rows)) { stepV =>
withResource(stepV.equalTo(zero)) { stepHasZero =>
withResource(diffHasZero.and(stepHasZero)) { predWithZero =>
predWithZero.ifElse(one, tmpSize)
}
}
}
case _ =>
withResource(step.equalTo(zero)) { stepHasZero =>
withResource(diffHasZero.and(stepHasZero)) { predWithZero =>
predWithZero.ifElse(one, tmpSize)
}
}
}
}
}
}
}
}
}
}
}
}

override def doColumnar(
start: GpuColumnVector,
stop: GpuColumnVector,
step: GpuColumnVector): ColumnVector = {
withResource(calculateSize(start.getBase, stop.getBase, step.getBase, start.getRowCount.toInt,
start.dataType())) { size =>
ColumnVector.sequence(start.getBase, size, step.getBase)
}
}

override def doColumnar(
start: GpuScalar,
stop: GpuColumnVector,
step: GpuColumnVector): ColumnVector = {
withResource(calculateSize(start.getBase, stop.getBase, step.getBase, stop.getRowCount.toInt,
start.dataType)) { size =>
withResource(ColumnVector.fromScalar(start.getBase, stop.getRowCount.toInt)) { startV =>
ColumnVector.sequence(startV, size, step.getBase)
}
}
}

override def doColumnar(
start: GpuScalar,
stop: GpuScalar,
step: GpuColumnVector): ColumnVector = {
withResource(ColumnVector.fromScalar(start.getBase, step.getRowCount.toInt)) { startV =>
withResource(calculateSize(startV, stop.getBase, step.getBase, step.getRowCount.toInt,
start.dataType)) { size =>
ColumnVector.sequence(startV, size, step.getBase)
}
}
}

override def doColumnar(
start: GpuScalar,
stop: GpuColumnVector,
step: GpuScalar): ColumnVector = {
withResource(calculateSize(start.getBase, stop.getBase, step.getBase, stop.getRowCount.toInt,
start.dataType)) { size =>
withResource(ColumnVector.fromScalar(start.getBase, stop.getRowCount.toInt)) { startV =>
withResource(ColumnVector.fromScalar(step.getBase, stop.getRowCount.toInt)) { stepV =>
ColumnVector.sequence(startV, size, stepV)
}
}
}
}

override def doColumnar(
start: GpuColumnVector,
stop: GpuScalar,
step: GpuColumnVector): ColumnVector = {
withResource(calculateSize(start.getBase, stop.getBase, step.getBase, start.getRowCount.toInt,
start.dataType())) { size =>
ColumnVector.sequence(start.getBase, size, step.getBase)
}
}

override def doColumnar(
start: GpuColumnVector,
stop: GpuScalar,
step: GpuScalar): ColumnVector = {
withResource(calculateSize(start.getBase, stop.getBase, step.getBase, start.getRowCount.toInt,
start.dataType())) { size =>
withResource(ColumnVector.fromScalar(step.getBase, start.getRowCount.toInt)) { stepV =>
ColumnVector.sequence(start.getBase, size, stepV)
}
}
}

override def doColumnar(
start: GpuColumnVector,
stop: GpuColumnVector,
step: GpuScalar): ColumnVector =
withResource(calculateSize(start.getBase, stop.getBase, step.getBase, start.getRowCount.toInt,
start.dataType())) { size =>
withResource(ColumnVector.fromScalar(step.getBase, start.getRowCount.toInt)) { stepV =>
ColumnVector.sequence(start.getBase, size, stepV)
}
}

override def doColumnar(
numRows: Int,
val0: GpuScalar,
val1: GpuScalar,
val2: GpuScalar): ColumnVector =
throw new IllegalStateException("This is not supported yet")
revans2 marked this conversation as resolved.
Show resolved Hide resolved

}