Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support float/double castings for ORC reading [databricks] #6319

Merged
merged 14 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Name | Description | Default Value
<a name="sql.format.json.enabled"></a>spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false
<a name="sql.format.json.read.enabled"></a>spark.rapids.sql.format.json.read.enabled|When set to true enables json input acceleration|false
<a name="sql.format.orc.enabled"></a>spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
<a name="sql.format.orc.floatTypesToString.enable"></a>spark.rapids.sql.format.orc.floatTypesToString.enable|When reading an ORC file, the source data schemas(schemas of ORC file) may differ from the target schemas (schemas of the reader), we need to handle the castings from source type to target type. Since float/double numbers in GPU have different precision with CPU, when casting float/double to string, the result of GPU is different from result of CPU spark.|true
<a name="sql.format.orc.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.orc.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type.|2147483647
<a name="sql.format.orc.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.orc.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small ORC files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None
<a name="sql.format.orc.read.enabled"></a>spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true
Expand Down
52 changes: 52 additions & 0 deletions integration_tests/src/main/python/orc_cast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def test_casting_from_integer(spark_tmp_path, to_type):
schema_str.format(*([to_type] * len(data_gen)))).orc(orc_path)
)


@pytest.mark.parametrize('overflow_long_gen', [LongGen(min_val=int(1e16)),
LongGen(max_val=int(-1e16))])
@pytest.mark.parametrize('to_type', ['timestamp'])
Expand All @@ -82,3 +83,54 @@ def test_casting_from_overflow_long(spark_tmp_path, overflow_long_gen,to_type):
conf={},
error_message="ArithmeticException"
)


# When casting float/double to double/float, we need to compare values of GPU with CPU
# in an approximate way.
@pytest.mark.approximate_float
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we off if we don't do this? It feels odd that we would get a different answer.

Copy link
Contributor Author

@sinkinben sinkinben Aug 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we off if we don't do this? It feels odd that we would get a different answer.

Yep, it's okay to remove approximate_float, we can still pass the test.

But I think we should pay attention to the method of comparing float types numbers whether if they are equal.

For example,

scala> var k = (3.14).toFloat
var k: Float = 3.14                                                                                                                 
scala> k.toDouble
val res3: Double = 3.140000104904175

I don't know whether if the conversion float -> double in GPU is same as CPU.

We should check two float types numbers if they're equal via abs(val1 - val2) < EPSLION, where EPSILON is the allowable accuracy error.

@pytest.mark.parametrize('to_type', ['float', 'double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint'])
def test_casting_from_float_and_double(spark_tmp_path, to_type):
orc_path = spark_tmp_path + '/orc_casting_from_float_and_double'
data_gen = [('float_column', float_gen), ('double_column', double_gen)]
create_orc(data_gen, orc_path)
schema_str = "float_column {}, double_column {}".format(to_type, to_type)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema(schema_str).orc(orc_path)
)


@pytest.mark.parametrize('data_gen', [DoubleGen(max_exp=32, special_cases=None),
DoubleGen(max_exp=32, special_cases=[8.88e9, 9.99e10, 1.314e11])])
def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen):
# ORC will assume the original double value in seconds, we need to convert them to
# timestamp(INT64 in micro-seconds).
#
# The 'datetime' module in python requires 0 <= year < 10000, and UTC timestamp is start from 1970/1/1.
# That is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2^32.
# So we set max_exp = 32 in DoubleGen.
#
# The maximum valid positive number is (10000 - 1970) * 365 * 24 * 3600 = 253234080000 -> 2e11 -> 2^37,
# so we add some special cases from 2^33 - 2^37 (8e9 ~ 1e11).
#
# In DoubleGen, special_case=None will generate some NaN, INF corner cases.

orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp'
with_cpu_session(
lambda spark: unary_op_df(spark, data_gen).write.orc(orc_path)
)
# the name of unique column is 'a', cast it into timestamp type
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema("a timestamp").orc(orc_path)
)


def test_casting_from_overflow_double_to_timestamp(spark_tmp_path):
orc_path = spark_tmp_path + '/orc_casting_from_overflow_double_to_timestamp'
with_cpu_session(
lambda spark: unary_op_df(spark, DoubleGen(min_exp=38)).write.orc(orc_path)
)
assert_gpu_and_cpu_error(
df_fun=lambda spark: spark.read.schema("a timestamp").orc(orc_path).collect(),
conf={},
error_message="ArithmeticException"
)
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ object GpuCast extends Arm {
}
}

private def castFloatingTypeToString(input: ColumnView): ColumnVector = {
private[rapids] def castFloatingTypeToString(input: ColumnView): ColumnVector = {
withResource(input.castTo(DType.STRING)) { cudfCast =>

// replace "e+" with "E"
Expand Down
187 changes: 173 additions & 14 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ case class GpuOrcScan(
}

object GpuOrcScan extends Arm {

def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = {
val scan = scanMeta.wrapped
val schema = StructType(scan.readDataSchema ++ scan.readPartitionSchema)
Expand Down Expand Up @@ -186,6 +187,78 @@ object GpuOrcScan extends Arm {
}
}

/**
* Get the overflow flags in booleans.
* true means no overflow, while false means getting overflow.
*
* @param doubleMillis the input double column
* @param millis the long column casted from the doubleMillis
*/
private def getOverflowFlags(doubleMillis: ColumnView, millis: ColumnView): ColumnView = {
// No overflow when
// doubleMillis <= Long.MAX_VALUE &&
// doubleMillis >= Long.MIN_VALUE &&
// ((millis >= 0) == (doubleMillis >= 0))
val rangeCheck = withResource(Scalar.fromLong(Long.MaxValue)) { max =>
withResource(doubleMillis.lessOrEqualTo(max)) { upperCheck =>
withResource(Scalar.fromLong(Long.MinValue)) { min =>
withResource(doubleMillis.greaterOrEqualTo(min)) { lowerCheck =>
upperCheck.and(lowerCheck)
}
}
}
}
withResource(rangeCheck) { _ =>
val signCheck = withResource(Scalar.fromInt(0)) { zero =>
withResource(millis.greaterOrEqualTo(zero)) { longSign =>
withResource(doubleMillis.greaterOrEqualTo(zero)) { doubleSign =>
longSign.equalTo(doubleSign)
}
}
}
withResource(signCheck) { _ =>
rangeCheck.and(signCheck)
}
}
}

/**
* Borrowed from ORC "ConvertTreeReaderFactory"
* Scala does not support such numeric literal, so parse from string.
*/
private val MIN_LONG_AS_DOUBLE = java.lang.Double.valueOf("-0x1p63")

/**
* We cannot store Long.MAX_VALUE as a double without losing precision. Instead, we store
* Long.MAX_VALUE + 1 == -Long.MIN_VALUE, and then offset all comparisons by 1.
*/
private val MAX_LONG_AS_DOUBLE_PLUS_ONE = java.lang.Double.valueOf("0x1p63")

/**
* Return a boolean column indicates whether the rows in col can fix in a long.
* It assumes the input type is float or double.
*/
private def doubleCanFitInLong(col: ColumnView): ColumnVector = {
// It is true when
// (MIN_LONG_AS_DOUBLE - doubleValue < 1.0) &&
// (doubleValue < MAX_LONG_AS_DOUBLE_PLUS_ONE)
val lowRet = withResource(Scalar.fromDouble(MIN_LONG_AS_DOUBLE)) { sMin =>
withResource(Scalar.fromDouble(1.0)) { sOne =>
withResource(sMin.sub(col)) { diff =>
diff.lessThan(sOne)
}
}
}
withResource(lowRet) { _ =>
withResource(Scalar.fromDouble(MAX_LONG_AS_DOUBLE_PLUS_ONE)) { sMax =>
withResource(col.lessThan(sMax)) { highRet =>
lowRet.and(highRet)
}
}
}
}


/**
* Cast the column to the target type for ORC schema evolution.
* It is designed to support all the cases that `canCast` returns true.
Expand Down Expand Up @@ -233,6 +306,73 @@ object GpuOrcScan extends Arm {
DType.TIMESTAMP_MICROSECONDS) =>
OrcCastingShims.castIntegerToTimestamp(col, fromDt)

// float to bool/integral
case (DType.FLOAT32 | DType.FLOAT64, DType.BOOL8 | DType.INT8 | DType.INT16 | DType.INT32
| DType.INT64) =>
// Follow the CPU ORC conversion:
// First replace rows that cannot fit in long with nulls,
// next convert to long,
// then down cast long to the target integral type.
val longDoubles = withResource(doubleCanFitInLong(col)) { fitLongs =>
col.copyWithBooleanColumnAsValidity(fitLongs)
}
withResource(longDoubles) { _ =>
withResource(longDoubles.castTo(DType.INT64)) { longs =>
toDt match {
case DType.BOOL8 => longs.castTo(toDt)
case DType.INT64 => longs.incRefCount()
case _ => downCastAnyInteger(longs, toDt)
}
}
}

// float/double to double/float
case (DType.FLOAT32 | DType.FLOAT64, DType.FLOAT32 | DType.FLOAT64) =>
col.castTo(toDt)

// float/double to string
// cuDF keep 9 decimal numbers after the decimal point, and CPU keeps more than 10.
// So when casting float/double to string, the result of GPU is different from CPU.
// We let a conf 'spark.rapids.sql.format.orc.floatTypesToString.enable' to control it's
// enable or not.
case (DType.FLOAT32 | DType.FLOAT64, DType.STRING) =>
GpuCast.castFloatingTypeToString(col)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please file a follow on issue for us to go back an see what we can do to fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please file a follow on issue for us to go back an see what we can do to fix this?

Ok, after merging this, I will file an issue to describe this problem.


// float/double -> timestamp
case (DType.FLOAT32 | DType.FLOAT64, DType.TIMESTAMP_MICROSECONDS) =>
// Follow the CPU ORC conversion.
// val doubleMillis = doubleValue * 1000,
// val millis = Math.round(doubleMillis)
// if (noOverflow) millis else null
val milliSeconds = withResource(Scalar.fromDouble(1000.0)) { thousand =>
// ORC assumes value is in seconds
withResource(col.mul(thousand, DType.FLOAT64)) { doubleMillis =>
withResource(doubleMillis.round()) { millis =>
withResource(getOverflowFlags(doubleMillis, millis)) { overflowFlags =>
millis.copyWithBooleanColumnAsValidity(overflowFlags)
}
}
}
}
// Cast milli-seconds to micro-seconds
firestarman marked this conversation as resolved.
Show resolved Hide resolved
// We need to pay attention that when convert (milliSeconds * 1000) to INT64, there may be
// INT64-overflow.
// In this step, ORC casting of CPU throw an exception rather than replace such values with
// null. We followed the CPU code here.
withResource(milliSeconds) { _ =>
// Test whether if there is long-overflow
// If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will
// throw an exception (as CPU code does).
if (milliSeconds.max() != null) {
testLongMultiplicationOverflow(milliSeconds.max().getDouble.toLong, 1000L)
}
withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds =>
withResource(microSeconds.castTo(DType.INT64)) { longVec =>
longVec.castTo(DType.TIMESTAMP_MICROSECONDS)
}
}
}

// TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895
case (f, t) =>
throw new QueryExecutionException(s"Unsupported type casting: $f -> $t")
Expand All @@ -246,7 +386,8 @@ object GpuOrcScan extends Arm {
* but the ones between GPU supported types.
* Each supported casting is implemented in "castColumnTo".
*/
def canCast(from: TypeDescription, to: TypeDescription): Boolean = {
def canCast(from: TypeDescription, to: TypeDescription,
isOrcFloatTypesToStringEnable: Boolean): Boolean = {
import org.apache.orc.TypeDescription.Category._
if (!to.getCategory.isPrimitive || !from.getCategory.isPrimitive) {
// Don't convert from any to complex, or from complex to any.
Expand All @@ -268,7 +409,16 @@ object GpuOrcScan extends Arm {
}
case VARCHAR =>
toType == STRING
case _ => false

case FLOAT | DOUBLE =>
toType match {
case BOOLEAN | BYTE | SHORT | INT | LONG | FLOAT | DOUBLE | TIMESTAMP => true
case STRING => isOrcFloatTypesToStringEnable
case _ => false
}
// TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895
case _ =>
false
}
}

Expand Down Expand Up @@ -313,7 +463,8 @@ case class GpuOrcMultiFilePartitionReaderFactory(
private val debugDumpPrefix = Option(rapidsConf.orcDebugDumpPrefix)
private val numThreads = rapidsConf.multiThreadReadNumThreads
private val maxNumFileProcessed = rapidsConf.maxNumOrcFilesParallel
private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, filters)
private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, filters,
rapidsConf.isOrcFloatTypesToStringEnable)
private val ignoreMissingFiles = sqlConf.ignoreMissingFiles
private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles

Expand Down Expand Up @@ -400,7 +551,8 @@ case class GpuOrcPartitionReaderFactory(
private val debugDumpPrefix = Option(rapidsConf.orcDebugDumpPrefix)
private val maxReadBatchSizeRows: Integer = rapidsConf.maxReadBatchSizeRows
private val maxReadBatchSizeBytes: Long = rapidsConf.maxReadBatchSizeBytes
private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, pushedFilters)
private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, pushedFilters,
rapidsConf.isOrcFloatTypesToStringEnable)

override def supportColumnarReads(partition: InputPartition): Boolean = true

Expand Down Expand Up @@ -931,7 +1083,8 @@ private object OrcTools extends Arm {
private case class GpuOrcFileFilterHandler(
@transient sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
pushedFilters: Array[Filter]) extends Arm {
pushedFilters: Array[Filter],
isOrcFloatTypesToStringEnable: Boolean) extends Arm {

private[rapids] val isCaseSensitive = sqlConf.caseSensitiveAnalysis

Expand Down Expand Up @@ -1026,7 +1179,7 @@ private case class GpuOrcFileFilterHandler(
val isCaseSensitive = readerOpts.getIsSchemaEvolutionCaseAware

val (updatedReadSchema, fileIncluded) = checkSchemaCompatibility(orcReader.getSchema,
readerOpts.getSchema, isCaseSensitive)
readerOpts.getSchema, isCaseSensitive, isOrcFloatTypesToStringEnable)
// GPU has its own read schema, so unset the reader include to read all the columns
// specified by its read schema.
readerOpts.include(null)
Expand Down Expand Up @@ -1206,11 +1359,13 @@ private case class GpuOrcFileFilterHandler(
private def checkSchemaCompatibility(
fileSchema: TypeDescription,
readSchema: TypeDescription,
isCaseAware: Boolean): (TypeDescription, Array[Boolean]) = {
isCaseAware: Boolean,
isOrcFloatTypesToStringEnable: Boolean): (TypeDescription, Array[Boolean]) = {
// all default to false
val fileIncluded = new Array[Boolean](fileSchema.getMaximumId + 1)
val isForcePos = OrcShims.forcePositionalEvolution(conf)
(checkTypeCompatibility(fileSchema, readSchema, isCaseAware, fileIncluded, isForcePos),
(checkTypeCompatibility(fileSchema, readSchema, isCaseAware, fileIncluded, isForcePos,
isOrcFloatTypesToStringEnable),
fileIncluded)
}

Expand All @@ -1224,7 +1379,8 @@ private case class GpuOrcFileFilterHandler(
readType: TypeDescription,
isCaseAware: Boolean,
fileIncluded: Array[Boolean],
isForcePos: Boolean): TypeDescription = {
isForcePos: Boolean,
isOrcFloatTypesToStringEnable: Boolean): TypeDescription = {
(fileType.getCategory, readType.getCategory) match {
case (TypeDescription.Category.STRUCT, TypeDescription.Category.STRUCT) =>
// Check for the top or nested struct types.
Expand Down Expand Up @@ -1252,7 +1408,7 @@ private case class GpuOrcFileFilterHandler(
.zipWithIndex.foreach { case ((fileFieldName, fType), idx) =>
getReadFieldType(fileFieldName, idx).foreach { case (rField, rType) =>
val newChild = checkTypeCompatibility(fType, rType,
isCaseAware, fileIncluded, isForcePos)
isCaseAware, fileIncluded, isForcePos, isOrcFloatTypesToStringEnable)
prunedReadSchema.addField(rField, newChild)
}
}
Expand All @@ -1262,19 +1418,22 @@ private case class GpuOrcFileFilterHandler(
// for struct children.
case (TypeDescription.Category.LIST, TypeDescription.Category.LIST) =>
val newChild = checkTypeCompatibility(fileType.getChildren.get(0),
readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos)
readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos,
isOrcFloatTypesToStringEnable)
fileIncluded(fileType.getId) = true
TypeDescription.createList(newChild)
case (TypeDescription.Category.MAP, TypeDescription.Category.MAP) =>
val newKey = checkTypeCompatibility(fileType.getChildren.get(0),
readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos)
readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos,
isOrcFloatTypesToStringEnable)
val newValue = checkTypeCompatibility(fileType.getChildren.get(1),
readType.getChildren.get(1), isCaseAware, fileIncluded, isForcePos)
readType.getChildren.get(1), isCaseAware, fileIncluded, isForcePos,
isOrcFloatTypesToStringEnable)
fileIncluded(fileType.getId) = true
TypeDescription.createMap(newKey, newValue)
case (ft, rt) if ft.isPrimitive && rt.isPrimitive =>
if (OrcShims.typeDescriptionEqual(fileType, readType) ||
GpuOrcScan.canCast(fileType, readType)) {
GpuOrcScan.canCast(fileType, readType, isOrcFloatTypesToStringEnable)) {
// Since type casting is supported, here should return the file type.
fileIncluded(fileType.getId) = true
fileType.clone()
Expand Down
12 changes: 12 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,16 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val ENABLE_ORC_FLOAT_TYPES_TO_STRING =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some docs to this that indicate what will happen if we run into this situation. For most configs when we are in this kind of a situation we fall back to the CPU, but here we will throw an exception and the job will fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated this in config.md.

conf("spark.rapids.sql.format.orc.floatTypesToString.enable")
.doc("When reading an ORC file, the source data schemas(schemas of ORC file) may differ " +
"from the target schemas (schemas of the reader), we need to handle the castings from " +
"source type to target type. Since float/double numbers in GPU have different precision " +
"with CPU, when casting float/double to string, the result of GPU is different from " +
"result of CPU spark.")
.booleanConf
.createWithDefault(true)

val ORC_READER_TYPE = conf("spark.rapids.sql.format.orc.reader.type")
.doc("Sets the ORC reader type. We support different types that are optimized for " +
"different environments. The original Spark style reader can be selected by setting this " +
Expand Down Expand Up @@ -1856,6 +1866,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE)

lazy val isOrcFloatTypesToStringEnable: Boolean = get(ENABLE_ORC_FLOAT_TYPES_TO_STRING)

lazy val isOrcPerFileReadEnabled: Boolean =
RapidsReaderType.withName(get(ORC_READER_TYPE)) == RapidsReaderType.PERFILE

Expand Down