Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Struct to string casting functionality #1814

Merged
merged 24 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -17991,7 +17991,7 @@ and the accelerator produces the same result.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -18395,7 +18395,7 @@ and the accelerator produces the same result.
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td>S</td>
rwlee marked this conversation as resolved.
Show resolved Hide resolved
<td> </td>
<td> </td>
<td> </td>
Expand Down
17 changes: 17 additions & 0 deletions integration_tests/src/main/python/struct_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,20 @@ def test_orderby_struct_2(data_gen):
lambda spark : append_unique_int_col_to_df(spark, unary_op_df(spark, data_gen)),
'struct_table',
'select struct_table.a, struct_table.uniq_int from struct_table order by uniq_int')

# conf with legacy cast to string on
legacy_complex_types_to_string = {'spark.sql.legacy.castComplexTypesToString.enabled': 'true'}
# @pytest.mark.parametrize('data_gen', [StructGen([["first", boolean_gen], ["second", byte_gen], ["third", short_gen], ["fourth", int_gen], ["fifth", long_gen], ["sixth", string_gen], ["seventh", date_gen], ["eighth", float_gen], ], ["ninth", double_gen], ["tenth", timestamp_gen]])], ids=idfn)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize('data_gen', [StructGen([["first", boolean_gen], ["second", byte_gen], ["third", short_gen], ["fourth", int_gen], ["fifth", long_gen], ["sixth", string_gen], ["seventh", date_gen]])], ids=idfn)
def test_legacy_cast_struct_to_string(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(
f.col('a').cast("STRING")),
conf = legacy_complex_types_to_string)

# @pytest.mark.parametrize('data_gen', [StructGen([["first", boolean_gen], ["second", byte_gen], ["third", short_gen], ["fourth", int_gen], ["fifth", long_gen], ["sixth", string_gen], ["seventh", date_gen], ["eighth", float_gen], ], ["ninth", double_gen], ["tenth", timestamp_gen]])], ids=idfn)
rwlee marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize('data_gen', [StructGen([["first", boolean_gen], ["second", byte_gen], ["third", short_gen], ["fourth", int_gen], ["fifth", long_gen], ["sixth", string_gen], ["seventh", date_gen]])], ids=idfn)
def test_cast_struct_to_string(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(
f.col('a').cast("STRING")))
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ class Spark300Shims extends SparkShims {
InMemoryFileIndex.shouldFilterOut(path)
}

override def getLegacyComplexTypeToString(): Boolean = true

// Arrow version changed between Spark versions
override def getArrowDataBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = {
val arrowBuf = vec.getDataBuffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ class Spark311Shims extends Spark301Shims {
HadoopFSUtilsShim.shouldIgnorePath(path)
}

override def getLegacyComplexTypeToString(): Boolean = {
SQLConf.get.getConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING)
}

// Arrow version changed between Spark versions
override def getArrowDataBuf(vec: ValueVector): (ByteBuffer, ReferenceManager) = {
val arrowBuf = vec.getDataBuffer()
Expand Down
137 changes: 130 additions & 7 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package com.nvidia.spark.rapids
import java.text.SimpleDateFormat
import java.time.DateTimeException

import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar}

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{Cast, CastBase, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.expressions.{AnsiCast, Cast, CastBase, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.types._

/** Meta-data for cast and ansi_cast. */
Expand All @@ -37,17 +39,22 @@ class CastExprMeta[INPUT <: CastBase](
private val castExpr = if (ansiEnabled) "ansi_cast" else "cast"
val fromType = cast.child.dataType
val toType = cast.dataType
var legacyCastToString = ShimLoader.getSparkShims.getLegacyComplexTypeToString()

override def tagExprForGpu(): Unit = {
recursiveTagExprForGpuCheck(fromType)
}

def recursiveTagExprForGpuCheck(fromDataType: DataType) {
if (!conf.isCastFloatToDecimalEnabled && toType.isInstanceOf[DecimalType] &&
(fromType == DataTypes.FloatType || fromType == DataTypes.DoubleType)) {
(fromDataType == DataTypes.FloatType || fromDataType == DataTypes.DoubleType)) {
willNotWorkOnGpu("the GPU will use a different strategy from Java's BigDecimal to convert " +
"floating point data types to decimals and this can produce results that slightly " +
"differ from the default behavior in Spark. To enable this operation on the GPU, set " +
s"${RapidsConf.ENABLE_CAST_FLOAT_TO_DECIMAL} to true.")
}
if (!conf.isCastFloatToStringEnabled && toType == DataTypes.StringType &&
(fromType == DataTypes.FloatType || fromType == DataTypes.DoubleType)) {
(fromDataType == DataTypes.FloatType || fromDataType == DataTypes.DoubleType)) {
willNotWorkOnGpu("the GPU will use different precision than Java's toString method when " +
"converting floating point data types to strings and this can produce results that " +
"differ from the default behavior in Spark. To enable this operation on the GPU, set" +
Expand All @@ -71,21 +78,35 @@ class CastExprMeta[INPUT <: CastBase](
"operation on the GPU, set" +
s" ${RapidsConf.ENABLE_CAST_STRING_TO_INTEGER} to true.")
}
if (!conf.isCastStringToTimestampEnabled && fromType == DataTypes.StringType
if (!conf.isCastStringToTimestampEnabled && fromDataType == DataTypes.StringType
&& toType == DataTypes.TimestampType) {
willNotWorkOnGpu("the GPU only supports a subset of formats " +
"when casting strings to timestamps. Refer to the CAST documentation " +
"for more details. To enable this operation on the GPU, set" +
s" ${RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP} to true.")
}
if (fromDataType.isInstanceOf[StructType]) {
val key = if (ansiEnabled) classOf[AnsiCast] else classOf[Cast]
val checks = GpuOverrides.expressions(key).getChecks.get.asInstanceOf[CastChecks]
rwlee marked this conversation as resolved.
Show resolved Hide resolved
fromDataType.asInstanceOf[StructType].foreach{field =>
recursiveTagExprForGpuCheck(field.dataType)
if (toType == StringType) {
if (!checks.gpuCanCast(field.dataType, toType)) {
willNotWorkOnGpu(s"Unsupported type ${field.dataType} found in Struct column. " +
s"Casting ${field.dataType} to ${toType} not currently supported. Refer to " +
"CAST documentation for more details.")
}
}
}
}
}

def buildTagMessage(entry: ConfEntry[_]): String = {
s"${entry.doc}. To enable this operation on the GPU, set ${entry.key} to true."
}

override def convertToGpu(child: Expression): GpuExpression =
GpuCast(child, toType, ansiEnabled, cast.timeZoneId)
GpuCast(child, toType, ansiEnabled, cast.timeZoneId, legacyCastToString)
}

object GpuCast {
Expand Down Expand Up @@ -134,7 +155,8 @@ case class GpuCast(
child: Expression,
dataType: DataType,
ansiMode: Boolean = false,
timeZoneId: Option[String] = None)
timeZoneId: Option[String] = None,
legacyCastToString: Boolean = false)
extends GpuUnaryExpression with TimeZoneAwareExpression with NullIntolerant {

import GpuCast._
Expand Down Expand Up @@ -240,6 +262,8 @@ case class GpuCast(
}
case (TimestampType, StringType) =>
castTimestampToString(input)
case (StructType(fields), StringType) =>
castStructToString(input, legacyCastToString, fields)

// ansi cast from larger-than-integer integral types, to integer
case (LongType, IntegerType) if ansiMode =>
Expand Down Expand Up @@ -485,6 +509,105 @@ case class GpuCast(
}
}

private def castStructToString(input: GpuColumnVector,
legacyCastToString: Boolean, inputSchema: Array[StructField]): ColumnVector = {
// The brackets that are used in casting structs and maps to strings
val (leftBracket, rightBracket) = if (legacyCastToString) ("[", "]") else ("{", "}")
var separatorColumn: ColumnVector = null
var spaceColumn: ColumnVector = null
val columns: ArrayBuffer[ColumnVector] = new ArrayBuffer[ColumnVector]()
// coreColumns tracks the casted child columns
val coreColumns: ArrayBuffer[ColumnVector] = new ArrayBuffer[ColumnVector]()

try {
withResource(GpuScalar.from(leftBracket, StringType)) { bracketScalar =>
columns += ColumnVector.fromScalar(bracketScalar, input.getRowCount().toInt)
}
withResource(input.getBase().getChildColumnView(0)) { childView =>
withResource(childView.copyToColumnVector()) { childVector =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to copy this to a column vector? I understand because of the APIs we have right now it is needed, but it would be nice to avoid the copy if possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would not copy this column vector, which is why I had been using toString in earlier iterations. How might I avoid this copy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature of doColumnar takes a GpuColumnVector It uses the GpuColumnVector to extract its Spark data type so it can do the cast. A ColumnVector is a ColumnView. As such I would propose that we insert another API in between that we could use for recursive calls.

override def doColumnar(input: GpuColumnVector): ColumnVector =
    doColumnar(input.getBase, input.dataType())

private def doColumnar(input: ColumnView, sparkType: DataType): COlumnVector = {
   (sparkType, dataType) match {
      ...
   }
}

Then you can call the new doColumnar without making copy of the data.

withResource(input.getBase().getChildColumnView(0)) { childView =>
  columns += doColumnar(childView, inputSchema(0).dataType)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There a few things around decimal casting that didn't translate nicely. In some cases they want to do a zero-copy, so I've filtered them out in the GpuColumnVector wrapper API. In those cases, if those cases are found when doing a ColumnView cast, it's copied to a ColumnVector. Reworking the decimal support to be ColumnView friendly version can be done in a follow up PR if necessary, but it seemed a bit out of scope and was non-trivial.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a follow on issue to clean this up? The issue appears to be because of incRefCount, and there should be ways for us to work around that without too much difficulty.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columns += doColumnar(GpuColumnVector.from(childVector, inputSchema(0).dataType))
}
}

if(legacyCastToString) {
rwlee marked this conversation as resolved.
Show resolved Hide resolved
coreColumns += columns.last
withResource(GpuScalar.from(",", StringType)) { separatorScalar =>
separatorColumn = ColumnVector.fromScalar(separatorScalar, input.getRowCount().toInt)
}
withResource(GpuScalar.from(" ", StringType)) { separatorScalar =>
spaceColumn = ColumnVector.fromScalar(separatorScalar, input.getRowCount().toInt)
}
for(childIndex <- 1 until input.getBase().getNumChildren()) {
withResource(input.getBase().getChildColumnView(childIndex)) { childView =>
columns += separatorColumn
// Copies the whitespace column's validity with the current column's validity.
// Mimics the Spark null behavior of consecutive commas with no space between them
columns += spaceColumn.mergeAndSetValidity(BinaryOp.BITWISE_AND, childView)
withResource(childView.copyToColumnVector()) { childVector =>
columns += doColumnar(GpuColumnVector.from(childVector,
inputSchema(childIndex).dataType))
coreColumns += columns.last
}
}
}
withResource(GpuScalar.from(rightBracket, StringType)) { bracketScalar =>
columns += ColumnVector.fromScalar(bracketScalar, input.getRowCount().toInt)
}

// Merge casted child columns
withResource(GpuScalar.from("", StringType)) { emptyStrScalar =>
withResource(ColumnVector.stringConcatenate(emptyStrScalar, emptyStrScalar,
columns.toArray[ColumnView])) { fullResult =>
// Merge the validity of all child columns, fully null rows are null in the result
withResource(fullResult.mergeAndSetValidity(BinaryOp.BITWISE_OR,
coreColumns: _*)) { nulledResult =>
// Reflect the struct column's validity vector in the result
nulledResult.mergeAndSetValidity(BinaryOp.BITWISE_AND, input.getBase(), nulledResult)
}
}
}
} else {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
withResource(GpuScalar.from(", ", StringType)) { separatorScalar =>
separatorColumn = ColumnVector.fromScalar(separatorScalar, input.getRowCount().toInt)
}
for(childIndex <- 1 until input.getBase().getNumChildren()) {
withResource(input.getBase().getChildColumnView(childIndex)) { childView =>
columns += separatorColumn
withResource(childView.copyToColumnVector()) { childVector =>
columns += doColumnar(GpuColumnVector.from(childVector,
inputSchema(childIndex).dataType))
}
}
}
withResource(GpuScalar.from(rightBracket, StringType)) { bracketScalar =>
columns += ColumnVector.fromScalar(bracketScalar, input.getRowCount().toInt)
}

// Merge casted child columns
withResource(GpuScalar.from("", StringType)) { emptyStrScalar =>
withResource(GpuScalar.from("null", StringType)) { nullStringScalar =>
withResource(ColumnVector.stringConcatenate(emptyStrScalar, nullStringScalar,
columns.toArray[ColumnView])) { fullResult =>
// Reflect the struct column's validity vector in the result
fullResult.mergeAndSetValidity(BinaryOp.BITWISE_AND, input.getBase())
}
}
}
}
} finally {
if (separatorColumn != null) {
columns.foreach(col =>
if(col.getNativeView() != separatorColumn.getNativeView()) {
col.close()
})
separatorColumn.close()
}
if (spaceColumn != null) {
spaceColumn.close()
}
}
}

private def castFloatingTypeToString(input: GpuColumnVector): ColumnVector = {
withResource(input.getBase.castTo(DType.STRING)) { cudfCast =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ trait SparkShims {

def shouldIgnorePath(path: String): Boolean

def getLegacyComplexTypeToString(): Boolean

def getArrowDataBuf(vec: ValueVector): (ByteBuffer, ReferenceManager)
def getArrowValidityBuf(vec: ValueVector): (ByteBuffer, ReferenceManager)
def getArrowOffsetsBuf(vec: ValueVector): (ByteBuffer, ReferenceManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ class CastChecks extends ExprChecks {
val mapChecks: TypeSig = none
val sparkMapSig: TypeSig = STRING + MAP.nested(all)

val structChecks: TypeSig = none
val structChecks: TypeSig = STRING
rwlee marked this conversation as resolved.
Show resolved Hide resolved
val sparkStructSig: TypeSig = STRING + STRUCT.nested(all)

val udtChecks: TypeSig = none
Expand Down Expand Up @@ -840,8 +840,8 @@ class CastChecks extends ExprChecks {
}

def gpuCanCast(from: DataType, to: DataType, allowDecimal: Boolean = true): Boolean = {
val (_, sparkSig) = getChecksAndSigs(from)
sparkSig.isSupportedByPlugin(to, allowDecimal)
val (checks, _) = getChecksAndSigs(from)
checks.isSupportedByPlugin(to, allowDecimal)
}
}

Expand Down