diff --git a/docs/configs.md b/docs/configs.md index edcf1bcc621..1e5be3dfc7a 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -92,6 +92,7 @@ Name | Description | Default Value spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false spark.rapids.sql.format.json.read.enabled|When set to true enables json input acceleration|false spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true +spark.rapids.sql.format.orc.floatTypesToString.enable|When reading an ORC file, the source data schemas(schemas of ORC file) may differ from the target schemas (schemas of the reader), we need to handle the castings from source type to target type. Since float/double numbers in GPU have different precision with CPU, when casting float/double to string, the result of GPU is different from result of CPU spark. Its default value is `true` (this means the strings result will differ from result of CPU). If it's set `false` explicitly and there exists casting from float/double to string in the job, then such behavior will cause an exception, and the job will fail.|true spark.rapids.sql.format.orc.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type.|2147483647 spark.rapids.sql.format.orc.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small ORC files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true diff --git a/integration_tests/src/main/python/orc_cast_test.py b/integration_tests/src/main/python/orc_cast_test.py index 6a84407a632..6d5966294a8 100644 --- a/integration_tests/src/main/python/orc_cast_test.py +++ b/integration_tests/src/main/python/orc_cast_test.py @@ -66,6 +66,7 @@ def test_casting_from_integer(spark_tmp_path, to_type): schema_str.format(*([to_type] * len(data_gen)))).orc(orc_path) ) + @pytest.mark.parametrize('overflow_long_gen', [LongGen(min_val=int(1e16)), LongGen(max_val=int(-1e16))]) @pytest.mark.parametrize('to_type', ['timestamp']) @@ -81,3 +82,54 @@ def test_casting_from_overflow_long(spark_tmp_path, overflow_long_gen,to_type): conf={}, error_message="ArithmeticException" ) + + +# When casting float/double to double/float, we need to compare values of GPU with CPU +# in an approximate way. +@pytest.mark.approximate_float +@pytest.mark.parametrize('to_type', ['float', 'double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint']) +def test_casting_from_float_and_double(spark_tmp_path, to_type): + orc_path = spark_tmp_path + '/orc_casting_from_float_and_double' + data_gen = [('float_column', float_gen), ('double_column', double_gen)] + create_orc(data_gen, orc_path) + schema_str = "float_column {}, double_column {}".format(to_type, to_type) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.schema(schema_str).orc(orc_path) + ) + + +@pytest.mark.parametrize('data_gen', [DoubleGen(max_exp=32, special_cases=None), + DoubleGen(max_exp=32, special_cases=[8.88e9, 9.99e10, 1.314e11])]) +def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen): + # ORC will assume the original double value in seconds, we need to convert them to + # timestamp(INT64 in micro-seconds). + # + # The 'datetime' module in python requires 0 <= year < 10000, and UTC timestamp is start from 1970/1/1. + # That is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2^32. + # So we set max_exp = 32 in DoubleGen. + # + # The maximum valid positive number is (10000 - 1970) * 365 * 24 * 3600 = 253234080000 -> 2e11 -> 2^37, + # so we add some special cases from 2^33 - 2^37 (8e9 ~ 1e11). + # + # In DoubleGen, special_case=None will generate some NaN, INF corner cases. + + orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp' + with_cpu_session( + lambda spark: unary_op_df(spark, data_gen).write.orc(orc_path) + ) + # the name of unique column is 'a', cast it into timestamp type + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.schema("a timestamp").orc(orc_path) + ) + + +def test_casting_from_overflow_double_to_timestamp(spark_tmp_path): + orc_path = spark_tmp_path + '/orc_casting_from_overflow_double_to_timestamp' + with_cpu_session( + lambda spark: unary_op_df(spark, DoubleGen(min_exp=38)).write.orc(orc_path) + ) + assert_gpu_and_cpu_error( + df_fun=lambda spark: spark.read.schema("a timestamp").orc(orc_path).collect(), + conf={}, + error_message="ArithmeticException" + ) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 600669ccf55..f5e2f4cbac2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -989,7 +989,7 @@ object GpuCast extends Arm { } } - private def castFloatingTypeToString(input: ColumnView): ColumnVector = { + private[rapids] def castFloatingTypeToString(input: ColumnView): ColumnVector = { withResource(input.castTo(DType.STRING)) { cudfCast => // replace "e+" with "E" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 781009f1121..38ca1d484dd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -122,6 +122,7 @@ case class GpuOrcScan( } object GpuOrcScan extends Arm { + def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = { val scan = scanMeta.wrapped val schema = StructType(scan.readDataSchema ++ scan.readPartitionSchema) @@ -186,6 +187,78 @@ object GpuOrcScan extends Arm { } } + /** + * Get the overflow flags in booleans. + * true means no overflow, while false means getting overflow. + * + * @param doubleMillis the input double column + * @param millis the long column casted from the doubleMillis + */ + private def getOverflowFlags(doubleMillis: ColumnView, millis: ColumnView): ColumnView = { + // No overflow when + // doubleMillis <= Long.MAX_VALUE && + // doubleMillis >= Long.MIN_VALUE && + // ((millis >= 0) == (doubleMillis >= 0)) + val rangeCheck = withResource(Scalar.fromLong(Long.MaxValue)) { max => + withResource(doubleMillis.lessOrEqualTo(max)) { upperCheck => + withResource(Scalar.fromLong(Long.MinValue)) { min => + withResource(doubleMillis.greaterOrEqualTo(min)) { lowerCheck => + upperCheck.and(lowerCheck) + } + } + } + } + withResource(rangeCheck) { _ => + val signCheck = withResource(Scalar.fromInt(0)) { zero => + withResource(millis.greaterOrEqualTo(zero)) { longSign => + withResource(doubleMillis.greaterOrEqualTo(zero)) { doubleSign => + longSign.equalTo(doubleSign) + } + } + } + withResource(signCheck) { _ => + rangeCheck.and(signCheck) + } + } + } + + /** + * Borrowed from ORC "ConvertTreeReaderFactory" + * Scala does not support such numeric literal, so parse from string. + */ + private val MIN_LONG_AS_DOUBLE = java.lang.Double.valueOf("-0x1p63") + + /** + * We cannot store Long.MAX_VALUE as a double without losing precision. Instead, we store + * Long.MAX_VALUE + 1 == -Long.MIN_VALUE, and then offset all comparisons by 1. + */ + private val MAX_LONG_AS_DOUBLE_PLUS_ONE = java.lang.Double.valueOf("0x1p63") + + /** + * Return a boolean column indicates whether the rows in col can fix in a long. + * It assumes the input type is float or double. + */ + private def doubleCanFitInLong(col: ColumnView): ColumnVector = { + // It is true when + // (MIN_LONG_AS_DOUBLE - doubleValue < 1.0) && + // (doubleValue < MAX_LONG_AS_DOUBLE_PLUS_ONE) + val lowRet = withResource(Scalar.fromDouble(MIN_LONG_AS_DOUBLE)) { sMin => + withResource(Scalar.fromDouble(1.0)) { sOne => + withResource(sMin.sub(col)) { diff => + diff.lessThan(sOne) + } + } + } + withResource(lowRet) { _ => + withResource(Scalar.fromDouble(MAX_LONG_AS_DOUBLE_PLUS_ONE)) { sMax => + withResource(col.lessThan(sMax)) { highRet => + lowRet.and(highRet) + } + } + } + } + + /** * Cast the column to the target type for ORC schema evolution. * It is designed to support all the cases that `canCast` returns true. @@ -233,6 +306,79 @@ object GpuOrcScan extends Arm { DType.TIMESTAMP_MICROSECONDS) => OrcCastingShims.castIntegerToTimestamp(col, fromDt) + // float to bool/integral + case (DType.FLOAT32 | DType.FLOAT64, DType.BOOL8 | DType.INT8 | DType.INT16 | DType.INT32 + | DType.INT64) => + // Follow the CPU ORC conversion: + // First replace rows that cannot fit in long with nulls, + // next convert to long, + // then down cast long to the target integral type. + val longDoubles = withResource(doubleCanFitInLong(col)) { fitLongs => + col.copyWithBooleanColumnAsValidity(fitLongs) + } + withResource(longDoubles) { _ => + withResource(longDoubles.castTo(DType.INT64)) { longs => + toDt match { + case DType.BOOL8 => longs.castTo(toDt) + case DType.INT64 => longs.incRefCount() + case _ => downCastAnyInteger(longs, toDt) + } + } + } + + // float/double to double/float + case (DType.FLOAT32 | DType.FLOAT64, DType.FLOAT32 | DType.FLOAT64) => + col.castTo(toDt) + + // float/double to string + // cuDF keep 9 decimal numbers after the decimal point, and CPU keeps more than 10. + // So when casting float/double to string, the result of GPU is different from CPU. + // We let a conf 'spark.rapids.sql.format.orc.floatTypesToString.enable' to control it's + // enable or not. + case (DType.FLOAT32 | DType.FLOAT64, DType.STRING) => + GpuCast.castFloatingTypeToString(col) + + // float/double -> timestamp + case (DType.FLOAT32 | DType.FLOAT64, DType.TIMESTAMP_MICROSECONDS) => + // Follow the CPU ORC conversion. + // val doubleMillis = doubleValue * 1000, + // val milliseconds = Math.round(doubleMillis) + // if (noOverflow) { milliseconds } else { null } + val milliseconds = withResource(Scalar.fromDouble(DateTimeConstants.MILLIS_PER_SECOND)) { + thousand => + // ORC assumes value is in seconds + withResource(col.mul(thousand, DType.FLOAT64)) { doubleMillis => + withResource(doubleMillis.round()) { millis => + withResource(getOverflowFlags(doubleMillis, millis)) { overflowFlags => + millis.copyWithBooleanColumnAsValidity(overflowFlags) + } + } + } + } + // Cast milli-seconds to micro-seconds + // We need to pay attention that when convert (milliSeconds * 1000) to INT64, there may be + // INT64-overflow. + // In this step, ORC casting of CPU throw an exception rather than replace such values with + // null. We followed the CPU code here. + withResource(milliseconds) { _ => + // Test whether if there is long-overflow + // If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will + // throw an exception (as CPU code does). + withResource(milliseconds.max()) { maxValue => + if (maxValue.isValid) { + testLongMultiplicationOverflow(maxValue.getDouble.toLong, + DateTimeConstants.MICROS_PER_MILLIS) + } + } + withResource(Scalar.fromDouble(DateTimeConstants.MICROS_PER_MILLIS)) { thousand => + withResource(milliseconds.mul(thousand)) { microseconds => + withResource(microseconds.castTo(DType.INT64)) { longVec => + longVec.castTo(DType.TIMESTAMP_MICROSECONDS) + } + } + } + } + // TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895 case (f, t) => throw new QueryExecutionException(s"Unsupported type casting: $f -> $t") @@ -246,7 +392,8 @@ object GpuOrcScan extends Arm { * but the ones between GPU supported types. * Each supported casting is implemented in "castColumnTo". */ - def canCast(from: TypeDescription, to: TypeDescription): Boolean = { + def canCast(from: TypeDescription, to: TypeDescription, + isOrcFloatTypesToStringEnable: Boolean): Boolean = { import org.apache.orc.TypeDescription.Category._ if (!to.getCategory.isPrimitive || !from.getCategory.isPrimitive) { // Don't convert from any to complex, or from complex to any. @@ -268,7 +415,16 @@ object GpuOrcScan extends Arm { } case VARCHAR => toType == STRING - case _ => false + + case FLOAT | DOUBLE => + toType match { + case BOOLEAN | BYTE | SHORT | INT | LONG | FLOAT | DOUBLE | TIMESTAMP => true + case STRING => isOrcFloatTypesToStringEnable + case _ => false + } + // TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895 + case _ => + false } } @@ -366,7 +522,8 @@ case class GpuOrcMultiFilePartitionReaderFactory( private val debugDumpPrefix = Option(rapidsConf.orcDebugDumpPrefix) private val numThreads = rapidsConf.multiThreadReadNumThreads private val maxNumFileProcessed = rapidsConf.maxNumOrcFilesParallel - private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, filters) + private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, filters, + rapidsConf.isOrcFloatTypesToStringEnable) private val ignoreMissingFiles = sqlConf.ignoreMissingFiles private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles @@ -454,7 +611,8 @@ case class GpuOrcPartitionReaderFactory( private val debugDumpPrefix = Option(rapidsConf.orcDebugDumpPrefix) private val maxReadBatchSizeRows: Integer = rapidsConf.maxReadBatchSizeRows private val maxReadBatchSizeBytes: Long = rapidsConf.maxReadBatchSizeBytes - private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, pushedFilters) + private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, pushedFilters, + rapidsConf.isOrcFloatTypesToStringEnable) override def supportColumnarReads(partition: InputPartition): Boolean = true @@ -985,7 +1143,8 @@ private object OrcTools extends Arm { private case class GpuOrcFileFilterHandler( @transient sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], - pushedFilters: Array[Filter]) extends Arm { + pushedFilters: Array[Filter], + isOrcFloatTypesToStringEnable: Boolean) extends Arm { private[rapids] val isCaseSensitive = sqlConf.caseSensitiveAnalysis @@ -1080,7 +1239,7 @@ private case class GpuOrcFileFilterHandler( val isCaseSensitive = readerOpts.getIsSchemaEvolutionCaseAware val (updatedReadSchema, fileIncluded) = checkSchemaCompatibility(orcReader.getSchema, - readerOpts.getSchema, isCaseSensitive) + readerOpts.getSchema, isCaseSensitive, isOrcFloatTypesToStringEnable) // GPU has its own read schema, so unset the reader include to read all the columns // specified by its read schema. readerOpts.include(null) @@ -1260,11 +1419,13 @@ private case class GpuOrcFileFilterHandler( private def checkSchemaCompatibility( fileSchema: TypeDescription, readSchema: TypeDescription, - isCaseAware: Boolean): (TypeDescription, Array[Boolean]) = { + isCaseAware: Boolean, + isOrcFloatTypesToStringEnable: Boolean): (TypeDescription, Array[Boolean]) = { // all default to false val fileIncluded = new Array[Boolean](fileSchema.getMaximumId + 1) val isForcePos = OrcShims.forcePositionalEvolution(conf) - (checkTypeCompatibility(fileSchema, readSchema, isCaseAware, fileIncluded, isForcePos), + (checkTypeCompatibility(fileSchema, readSchema, isCaseAware, fileIncluded, isForcePos, + isOrcFloatTypesToStringEnable), fileIncluded) } @@ -1278,7 +1439,8 @@ private case class GpuOrcFileFilterHandler( readType: TypeDescription, isCaseAware: Boolean, fileIncluded: Array[Boolean], - isForcePos: Boolean): TypeDescription = { + isForcePos: Boolean, + isOrcFloatTypesToStringEnable: Boolean): TypeDescription = { (fileType.getCategory, readType.getCategory) match { case (TypeDescription.Category.STRUCT, TypeDescription.Category.STRUCT) => // Check for the top or nested struct types. @@ -1306,7 +1468,7 @@ private case class GpuOrcFileFilterHandler( .zipWithIndex.foreach { case ((fileFieldName, fType), idx) => getReadFieldType(fileFieldName, idx).foreach { case (rField, rType) => val newChild = checkTypeCompatibility(fType, rType, - isCaseAware, fileIncluded, isForcePos) + isCaseAware, fileIncluded, isForcePos, isOrcFloatTypesToStringEnable) prunedReadSchema.addField(rField, newChild) } } @@ -1316,19 +1478,22 @@ private case class GpuOrcFileFilterHandler( // for struct children. case (TypeDescription.Category.LIST, TypeDescription.Category.LIST) => val newChild = checkTypeCompatibility(fileType.getChildren.get(0), - readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos) + readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos, + isOrcFloatTypesToStringEnable) fileIncluded(fileType.getId) = true TypeDescription.createList(newChild) case (TypeDescription.Category.MAP, TypeDescription.Category.MAP) => val newKey = checkTypeCompatibility(fileType.getChildren.get(0), - readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos) + readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos, + isOrcFloatTypesToStringEnable) val newValue = checkTypeCompatibility(fileType.getChildren.get(1), - readType.getChildren.get(1), isCaseAware, fileIncluded, isForcePos) + readType.getChildren.get(1), isCaseAware, fileIncluded, isForcePos, + isOrcFloatTypesToStringEnable) fileIncluded(fileType.getId) = true TypeDescription.createMap(newKey, newValue) case (ft, rt) if ft.isPrimitive && rt.isPrimitive => if (OrcShims.typeDescriptionEqual(fileType, readType) || - GpuOrcScan.canCast(fileType, readType)) { + GpuOrcScan.canCast(fileType, readType, isOrcFloatTypesToStringEnable)) { // Since type casting is supported, here should return the file type. fileIncluded(fileType.getId) = true fileType.clone() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 9c56b96664a..c228c4ce2b8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -864,6 +864,19 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val ENABLE_ORC_FLOAT_TYPES_TO_STRING = + conf("spark.rapids.sql.format.orc.floatTypesToString.enable") + .doc("When reading an ORC file, the source data schemas(schemas of ORC file) may differ " + + "from the target schemas (schemas of the reader), we need to handle the castings from " + + "source type to target type. Since float/double numbers in GPU have different precision " + + "with CPU, when casting float/double to string, the result of GPU is different from " + + "result of CPU spark. Its default value is `true` (this means the strings result will " + + "differ from result of CPU). If it's set `false` explicitly and there exists casting " + + "from float/double to string in the job, then such behavior will cause an exception, " + + "and the job will fail.") + .booleanConf + .createWithDefault(true) + val ORC_READER_TYPE = conf("spark.rapids.sql.format.orc.reader.type") .doc("Sets the ORC reader type. We support different types that are optimized for " + "different environments. The original Spark style reader can be selected by setting this " + @@ -1856,6 +1869,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE) + lazy val isOrcFloatTypesToStringEnable: Boolean = get(ENABLE_ORC_FLOAT_TYPES_TO_STRING) + lazy val isOrcPerFileReadEnabled: Boolean = RapidsReaderType.withName(get(ORC_READER_TYPE)) == RapidsReaderType.PERFILE