diff --git a/integration_tests/src/main/python/asserts.py b/integration_tests/src/main/python/asserts.py index c466fd3ccd2..fe15041e9ba 100644 --- a/integration_tests/src/main/python/asserts.py +++ b/integration_tests/src/main/python/asserts.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ # limitations under the License. from conftest import is_incompat, should_sort_on_spark, should_sort_locally, get_float_check, get_limit, spark_jvm -from datetime import date, datetime +from datetime import date, datetime, timedelta from decimal import Decimal import math from pyspark.sql import Row @@ -92,6 +92,9 @@ def _assert_equal(cpu, gpu, float_check, path): assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path) elif isinstance(cpu, bytearray): assert cpu == gpu, "GPU and CPU bytearray values are different at {}".format(path) + elif isinstance(cpu, timedelta): + # Used by interval type DayTimeInterval for Pyspark 3.3.0+ + assert cpu == gpu, "GPU and CPU timedelta values are different at {}".format(path) elif (cpu == None): assert cpu == gpu, "GPU and CPU are not both null at {}".format(path) else: diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index fa162dfc200..819d5b644d6 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -612,6 +612,33 @@ def make_null(): return None self._start(rand, make_null) +# DayTimeIntervalGen is for Spark 3.3.0+ +# DayTimeIntervalType(startField, endField): Represents a day-time interval which is made up of a contiguous subset of the following fields: +# SECOND, seconds within minutes and possibly fractions of a second [0..59.999999], +# MINUTE, minutes within hours [0..59], +# HOUR, hours within days [0..23], +# DAY, days in the range [0..106751991]. +# For more details: https://spark.apache.org/docs/latest/sql-ref-datatypes.html +# Note: 106751991/365 = 292471 years which is much bigger than 9999 year, seems something is wrong +class DayTimeIntervalGen(DataGen): + """Generate DayTimeIntervalType values""" + def __init__(self, max_days = None, nullable=True, special_cases =[timedelta(seconds = 0)]): + super().__init__(DayTimeIntervalType(), nullable=nullable, special_cases=special_cases) + if max_days is None: + self._max_days = 106751991 + else: + self._max_days = max_days + def start(self, rand): + self._start(rand, + lambda : timedelta( + microseconds = rand.randint(0, 999999), + seconds = rand.randint(0, 59), + minutes = rand.randint(0, 59), + hours = rand.randint(0, 23), + days = rand.randint(0, self._max_days), + ) + ) + def skip_if_not_utc(): if (not is_tz_utc()): skip_unless_precommit_tests('The java system time zone is not set to UTC') diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index a5f04b30ea2..0b40afdbe57 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ from datetime import date, datetime, timezone from marks import incompat, allow_non_gpu from pyspark.sql.types import * -from spark_session import with_spark_session, is_before_spark_311 +from spark_session import with_spark_session, is_before_spark_311, is_before_spark_330 import pyspark.sql.functions as f # We only support literal intervals for TimeSub @@ -41,6 +41,16 @@ def test_timeadd(data_gen): lambda spark: unary_op_df(spark, TimestampGen(start=datetime(5, 1, 1, tzinfo=timezone.utc), end=datetime(15, 1, 1, tzinfo=timezone.utc)), seed=1) .selectExpr("a + (interval {} days {} seconds)".format(days, seconds))) +@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') +def test_timeadd_daytime_column(): + gen_list = [ + # timestamp column max year is 1000 + ('t', TimestampGen(end = datetime(1000, 1, 1, tzinfo=timezone.utc))), + # max days is 8000 year, so added result will not be out of range + ('d', DayTimeIntervalGen(max_days = 8000 * 365))] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: gen_df(spark, gen_list).selectExpr("t + d", "t + INTERVAL '1 02:03:04' DAY TO SECOND")) + @pytest.mark.parametrize('data_gen', vals, ids=idfn) def test_dateaddinterval(data_gen): days, seconds = data_gen diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index eb52ac34568..aef67dcc46e 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -789,3 +789,21 @@ def test_parquet_read_field_id(spark_tmp_path): lambda spark: spark.read.schema(readSchema).parquet(data_path), 'FileSourceScanExec', {"spark.sql.parquet.fieldId.read.enabled": "true"}) # default is false + +@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') +def test_parquet_read_daytime_interval_cpu_file(spark_tmp_path): + data_path = spark_tmp_path + '/PARQUET_DATA' + gen_list = [('_c1', DayTimeIntervalGen())] + # write DayTimeInterval with CPU + with_cpu_session(lambda spark :gen_df(spark, gen_list).coalesce(1).write.mode("overwrite").parquet(data_path)) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.parquet(data_path)) + +@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') +def test_parquet_read_daytime_interval_gpu_file(spark_tmp_path): + data_path = spark_tmp_path + '/PARQUET_DATA' + gen_list = [('_c1', DayTimeIntervalGen())] + # write DayTimeInterval with GPU + with_gpu_session(lambda spark :gen_df(spark, gen_list).coalesce(1).write.mode("overwrite").parquet(data_path)) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.parquet(data_path)) \ No newline at end of file diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index fd556830135..0e9a990652e 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -418,3 +418,14 @@ def test_parquet_write_field_id(spark_tmp_path): data_path, 'DataWritingCommandExec', conf = {"spark.sql.parquet.fieldId.write.enabled" : "true"}) # default is true + +@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed +@pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') +def test_write_daytime_interval(spark_tmp_path): + gen_list = [('_c1', DayTimeIntervalGen())] + data_path = spark_tmp_path + '/PARQUET_DATA' + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.parquet(path), + lambda spark, path: spark.read.parquet(path), + data_path, + conf=writer_confs) diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuTypeShims.scala b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuTypeShims.scala new file mode 100644 index 00000000000..4ce006f8938 --- /dev/null +++ b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/GpuTypeShims.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.v2 + +import ai.rapids.cudf.DType +import com.nvidia.spark.rapids.GpuRowToColumnConverter.TypeConverter + +import org.apache.spark.sql.types.DataType + +object GpuTypeShims { + + /** + * If Shim supports the data type for row to column converter + * @param otherType the data type that should be checked in the Shim + * @return true if Shim support the otherType, false otherwise. + */ + def hasConverterForType(otherType: DataType) : Boolean = false + + /** + * Get the TypeConverter of the data type for this Shim + * Note should first calling hasConverterForType + * @param t the data type + * @param nullable is nullable + * @return the row to column convert for the data type + */ + def getConverterForType(t: DataType, nullable: Boolean): TypeConverter = { + throw new RuntimeException(s"No converter is found for type $t.") + } + + /** + * Get the cuDF type for the Spark data type + * @param t the Spark data type + * @return the cuDF type if the Shim supports + */ + def toRapidsOrNull(t: DataType): DType = null +} diff --git a/sql-plugin/src/main/320+/scala/org/apache/spark/sql/rapids/shims/v2/datetimeExpressions.scala b/sql-plugin/src/main/320+/scala/org/apache/spark/sql/rapids/shims/v2/datetimeExpressions.scala index e811cb7bfbf..0336337e368 100644 --- a/sql-plugin/src/main/320+/scala/org/apache/spark/sql/rapids/shims/v2/datetimeExpressions.scala +++ b/sql-plugin/src/main/320+/scala/org/apache/spark/sql/rapids/shims/v2/datetimeExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.shims.v2 import java.util.concurrent.TimeUnit -import ai.rapids.cudf.{ColumnVector, ColumnView, DType, Scalar} +import ai.rapids.cudf.{BinaryOp, BinaryOperable, ColumnVector, ColumnView, DType, Scalar} import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuScalar} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.v2.ShimBinaryExpression @@ -59,48 +59,59 @@ case class GpuTimeAdd(start: Expression, override def columnarEval(batch: ColumnarBatch): Any = { withResourceIfAllowed(left.columnarEval(batch)) { lhs => withResourceIfAllowed(right.columnarEval(batch)) { rhs => + // lhs is start, rhs is interval (lhs, rhs) match { - case (l: GpuColumnVector, intvlS: GpuScalar) => - val interval = intvlS.dataType match { + case (l: GpuColumnVector, intervalS: GpuScalar) => + // get long type interval + val interval = intervalS.dataType match { case CalendarIntervalType => // Scalar does not support 'CalendarInterval' now, so use // the Scala value instead. // Skip the null check because it wll be detected by the following calls. - val intvl = intvlS.getValue.asInstanceOf[CalendarInterval] - if (intvl.months != 0) { + val calendarI = intervalS.getValue.asInstanceOf[CalendarInterval] + if (calendarI.months != 0) { throw new UnsupportedOperationException("Months aren't supported at the moment") } - intvl.days * microSecondsInOneDay + intvl.microseconds + calendarI.days * microSecondsInOneDay + calendarI.microseconds case _: DayTimeIntervalType => - // Scalar does not support 'DayTimeIntervalType' now, so use - // the Scala value instead. - intvlS.getValue.asInstanceOf[Long] + intervalS.getValue.asInstanceOf[Long] case _ => - throw new UnsupportedOperationException("GpuTimeAdd unsupported data type: " + - intvlS.dataType) + throw new UnsupportedOperationException( + "GpuTimeAdd unsupported data type: " + intervalS.dataType) } + // add interval if (interval != 0) { - withResource(Scalar.fromLong(interval)) { us_s => - withResource(l.getBase.bitCastTo(DType.INT64)) { us => - withResource(intervalMath(us_s, us)) { longResult => - GpuColumnVector.from(longResult.castTo(DType.TIMESTAMP_MICROSECONDS), - dataType) - } - } + withResource(Scalar.durationFromLong(DType.DURATION_MICROSECONDS, interval)) { d => + GpuColumnVector.from(timestampAddDuration(l.getBase, d), dataType) } } else { l.incRefCount() } + case (l: GpuColumnVector, r: GpuColumnVector) => + (l.dataType(), r.dataType) match { + case (_: TimestampType, _: DayTimeIntervalType) => + // DayTimeIntervalType is stored as long + // bitCastTo is similar to reinterpret_cast, it's fast, the time can be ignored. + withResource(r.getBase.bitCastTo(DType.DURATION_MICROSECONDS)) { duration => + GpuColumnVector.from(timestampAddDuration(l.getBase, duration), dataType) + } + case _ => + throw new UnsupportedOperationException( + "GpuTimeAdd takes column and interval as an argument only") + } case _ => - throw new UnsupportedOperationException("GpuTimeAdd takes column and interval as an " + - "argument only") + throw new UnsupportedOperationException( + "GpuTimeAdd takes column and interval as an argument only") } } } } - private def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = { - us.add(us_s) + private def timestampAddDuration(cv: ColumnView, duration: BinaryOperable): ColumnVector = { + // Not use cv.add(duration), because of it invoke BinaryOperable.implicitConversion, + // and currently BinaryOperable.implicitConversion return Long + // Directly specify the return type is TIMESTAMP_MICROSECONDS + cv.binaryOp(BinaryOp.ADD, duration, DType.TIMESTAMP_MICROSECONDS) } } diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuTypeShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuTypeShims.scala new file mode 100644 index 00000000000..43b2be236a7 --- /dev/null +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/GpuTypeShims.scala @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims.v2 + +import ai.rapids.cudf.DType +import com.nvidia.spark.rapids.GpuRowToColumnConverter.{LongConverter, NotNullLongConverter, TypeConverter} + +import org.apache.spark.sql.types.{DataType, DayTimeIntervalType} + +/** + * Spark stores ANSI YearMonthIntervalType as int32 and ANSI DayTimeIntervalType as int64 + * internally when computing. + * See the comments of YearMonthIntervalType, below is copied from Spark + * Internally, values of year-month intervals are stored in `Int` values as amount of months + * that are calculated by the formula: + * -/+ (12 * YEAR + MONTH) + * See the comments of DayTimeIntervalType, below is copied from Spark + * Internally, values of day-time intervals are stored in `Long` values as amount of time in terms + * of microseconds that are calculated by the formula: + * -/+ (24*60*60 * DAY + 60*60 * HOUR + 60 * MINUTE + SECOND) * 1000000 + * + * Spark also stores ANSI intervals as int32 and int64 in Parquet file: + * - year-month intervals as `INT32` + * - day-time intervals as `INT64` + * To load the values as intervals back, Spark puts the info about interval types + * to the extra key `org.apache.spark.sql.parquet.row.metadata`: + * $ java -jar parquet-tools-1.12.0.jar meta ./part-...-c000.snappy.parquet + * creator: parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d) + * extra: org.apache.spark.version = 3.3.0 + * extra: org.apache.spark.sql.parquet.row.metadata = + * {"type":"struct","fields":[..., + * {"name":"i","type":"interval year to month","nullable":false,"metadata":{}}]} + * file schema: spark_schema + * -------------------------------------------------------------------------------- + * ... + * i: REQUIRED INT32 R:0 D:0 + * + * For details See https://issues.apache.org/jira/browse/SPARK-36825 + */ +object GpuTypeShims { + + /** + * If Shim supports the data type for row to column converter + * @param otherType the data type that should be checked in the Shim + * @return true if Shim support the otherType, false otherwise. + */ + def hasConverterForType(otherType: DataType) : Boolean = { + otherType match { + case DayTimeIntervalType(_, _) => true + case _ => false + } + } + + /** + * Get the TypeConverter of the data type for this Shim + * Note should first calling hasConverterForType + * @param t the data type + * @param nullable is nullable + * @return the row to column convert for the data type + */ + def getConverterForType(t: DataType, nullable: Boolean): TypeConverter = { + (t, nullable) match { + case (DayTimeIntervalType(_, _), true) => LongConverter + case (DayTimeIntervalType(_, _), false) => NotNullLongConverter + case _ => throw new RuntimeException(s"No converter is found for type $t.") + } + } + + /** + * Get the cuDF type for the Spark data type + * @param t the Spark data type + * @return the cuDF type if the Shim supports + */ + def toRapidsOrNull(t: DataType): DType = { + t match { + case _: DayTimeIntervalType => + // use int64 as Spark does + DType.INT64 + case _ => + null + } + } +} diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala index 1e3bd34597c..fb0179b5f4f 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala @@ -22,19 +22,22 @@ import org.apache.parquet.schema.MessageType import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MetadataAttribute} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Coalesce, DynamicPruningExpression, Expression, MetadataAttribute, TimeAdd} import org.apache.spark.sql.catalyst.json.rapids.shims.v2.Spark33XFileOptionsShims -import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} -import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.execution.{BaseSubqueryExec, CoalesceExec, FileSourceScanExec, InSubqueryExec, ProjectExec, ReusedSubqueryExec, SparkPlan, SubqueryBroadcastExec} +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, HadoopFsRelation, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType - +import org.apache.spark.sql.rapids.GpuFileSourceScanExec +import org.apache.spark.sql.rapids.shims.v2.GpuTimeAdd +import org.apache.spark.sql.types.{CalendarIntervalType, DayTimeIntervalType, StructType} +import org.apache.spark.unsafe.types.CalendarInterval trait Spark33XShims extends Spark33XFileOptionsShims { /** - * For spark3.3+ optionally return null if element not exists. + * For spark3.3+ optionally return null if element not exists. */ override def shouldFailOnElementNotExists(): Boolean = SQLConf.get.strictIndexOperator @@ -74,6 +77,201 @@ trait Spark33XShims extends Spark33XFileOptionsShims { } super.tagFileSourceScanExec(meta) } + + // 330+ supports DAYTIME interval types + override def getFileFormats: Map[FileFormatType, Map[FileFormatOp, FileFormatChecks]] = { + Map( + (ParquetFormatType, FileFormatChecks( + cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT + + TypeSig.ARRAY + TypeSig.MAP + TypeSig.DAYTIME).nested(), + cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT + + TypeSig.ARRAY + TypeSig.MAP + TypeSig.DAYTIME).nested(), + sparkSig = (TypeSig.cpuAtomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + + TypeSig.UDT + TypeSig.DAYTIME).nested()))) + } + + // 330+ supports DAYTIME interval types + override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { + val _gpuCommonTypes = TypeSig.commonCudfTypes + TypeSig.NULL + val map: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( + GpuOverrides.expr[Coalesce]( + "Returns the first non-null argument if exists. Otherwise, null", + ExprChecks.projectOnly( + (_gpuCommonTypes + TypeSig.DECIMAL_128 + TypeSig.ARRAY + TypeSig.STRUCT + + TypeSig.DAYTIME).nested(), + TypeSig.all, + repeatingParamCheck = Some(RepeatingParamCheck("param", + (_gpuCommonTypes + TypeSig.DECIMAL_128 + TypeSig.ARRAY + TypeSig.STRUCT + + TypeSig.DAYTIME).nested(), + TypeSig.all))), + (a, conf, p, r) => new ExprMeta[Coalesce](a, conf, p, r) { + override def convertToGpu(): + GpuExpression = GpuCoalesce(childExprs.map(_.convertToGpu())) + }), + GpuOverrides.expr[AttributeReference]( + "References an input column", + ExprChecks.projectAndAst( + TypeSig.astTypes, + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.MAP + TypeSig.ARRAY + + TypeSig.STRUCT + TypeSig.DECIMAL_128 + TypeSig.DAYTIME).nested(), + TypeSig.all), + (att, conf, p, r) => new BaseExprMeta[AttributeReference](att, conf, p, r) { + // This is the only NOOP operator. It goes away when things are bound + override def convertToGpu(): Expression = att + + // There are so many of these that we don't need to print them out, unless it + // will not work on the GPU + override def print(append: StringBuilder, depth: Int, all: Boolean): Unit = { + if (!this.canThisBeReplaced || cannotRunOnGpuBecauseOfSparkPlan) { + super.print(append, depth, all) + } + } + }), + GpuOverrides.expr[TimeAdd]( + "Adds interval to timestamp", + ExprChecks.binaryProject(TypeSig.TIMESTAMP, TypeSig.TIMESTAMP, + ("start", TypeSig.TIMESTAMP, TypeSig.TIMESTAMP), + // interval support DAYTIME column or CALENDAR literal + ("interval", TypeSig.DAYTIME + TypeSig.lit(TypeEnum.CALENDAR) + .withPsNote(TypeEnum.CALENDAR, "month intervals are not supported"), + TypeSig.DAYTIME + TypeSig.CALENDAR)), + (timeAdd, conf, p, r) => new BinaryExprMeta[TimeAdd](timeAdd, conf, p, r) { + override def tagExprForGpu(): Unit = { + GpuOverrides.extractLit(timeAdd.interval).foreach { lit => + lit.dataType match { + case CalendarIntervalType => + val intvl = lit.value.asInstanceOf[CalendarInterval] + if (intvl.months != 0) { + willNotWorkOnGpu("interval months isn't supported") + } + case _: DayTimeIntervalType => // Supported + } + } + checkTimeZoneId(timeAdd.timeZoneId) + } + + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuTimeAdd(lhs, rhs) + }) + ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap + super.getExprs ++ map + } + + // 330+ supports DAYTIME interval types + override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { + val _gpuCommonTypes = TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 + val map: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Seq( + GpuOverrides.exec[CoalesceExec]( + "The backend for the dataframe coalesce method", + ExecChecks((_gpuCommonTypes + TypeSig.DECIMAL_128 + TypeSig.STRUCT + TypeSig.ARRAY + + TypeSig.MAP + TypeSig.DAYTIME).nested(), + TypeSig.all), + (coalesce, conf, parent, r) => new SparkPlanMeta[CoalesceExec](coalesce, conf, parent, r) { + override def convertToGpu(): GpuExec = + GpuCoalesceExec(coalesce.numPartitions, childPlans.head.convertIfNeeded()) + }), + GpuOverrides.exec[DataWritingCommandExec]( + "Writing data", + ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128.withPsNote( + TypeEnum.DECIMAL, "128bit decimal only supported for Orc and Parquet") + + TypeSig.STRUCT.withPsNote(TypeEnum.STRUCT, "Only supported for Parquet") + + TypeSig.MAP.withPsNote(TypeEnum.MAP, "Only supported for Parquet") + + TypeSig.ARRAY.withPsNote(TypeEnum.ARRAY, "Only supported for Parquet") + + TypeSig.DAYTIME).nested(), + TypeSig.all), + (p, conf, parent, r) => new SparkPlanMeta[DataWritingCommandExec](p, conf, parent, r) { + override val childDataWriteCmds: scala.Seq[DataWritingCommandMeta[_]] = + Seq(GpuOverrides.wrapDataWriteCmds(p.cmd, conf, Some(this))) + + override def convertToGpu(): GpuExec = + GpuDataWritingCommandExec(childDataWriteCmds.head.convertToGpu(), + childPlans.head.convertIfNeeded()) + }), + // this is copied, only added TypeSig.DAYTIME check + GpuOverrides.exec[FileSourceScanExec]( + "Reading data from files, often from Hive tables", + ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP + + TypeSig.ARRAY + TypeSig.DECIMAL_128 + TypeSig.DAYTIME).nested(), + TypeSig.all), + (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { + + // Replaces SubqueryBroadcastExec inside dynamic pruning filters with GPU counterpart + // if possible. Instead regarding filters as childExprs of current Meta, we create + // a new meta for SubqueryBroadcastExec. The reason is that the GPU replacement of + // FileSourceScan is independent from the replacement of the partitionFilters. It is + // possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters + // are on the GPU. And vice versa. + private lazy val partitionFilters = { + val convertBroadcast = (bc: SubqueryBroadcastExec) => { + val meta = GpuOverrides.wrapAndTagPlan(bc, conf) + meta.tagForExplain() + meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec] + } + wrapped.partitionFilters.map { filter => + filter.transformDown { + case dpe@DynamicPruningExpression(inSub: InSubqueryExec) => + inSub.plan match { + case bc: SubqueryBroadcastExec => + dpe.copy(inSub.copy(plan = convertBroadcast(bc))) + case reuse@ReusedSubqueryExec(bc: SubqueryBroadcastExec) => + dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc)))) + case _ => + dpe + } + } + } + } + + // partition filters and data filters are not run on the GPU + override val childExprs: Seq[ExprMeta[_]] = Seq.empty + + override def tagPlanForGpu(): Unit = tagFileSourceScanExec(this) + + override def convertToCpu(): SparkPlan = { + wrapped.copy(partitionFilters = partitionFilters) + } + + override def convertToGpu(): GpuExec = { + val sparkSession = wrapped.relation.sparkSession + val options = wrapped.relation.options + + val location = replaceWithAlluxioPathIfNeeded( + conf, + wrapped.relation, + partitionFilters, + wrapped.dataFilters) + + val newRelation = HadoopFsRelation( + location, + wrapped.relation.partitionSchema, + wrapped.relation.dataSchema, + wrapped.relation.bucketSpec, + GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), + options)(sparkSession) + + GpuFileSourceScanExec( + newRelation, + wrapped.output, + wrapped.requiredSchema, + partitionFilters, + wrapped.optionalBucketSet, + wrapped.optionalNumCoalescedBuckets, + wrapped.dataFilters, + wrapped.tableIdentifier, + wrapped.disableBucketedScan)(conf) + } + }), + GpuOverrides.exec[ProjectExec]( + "The backend for most select, withColumn and dropColumn statements", + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP + + TypeSig.ARRAY + TypeSig.DECIMAL_128 + TypeSig.DAYTIME).nested(), + TypeSig.all), + (proj, conf, p, r) => new GpuProjectExecMeta(proj, conf, p, r)) + ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap + super.getExecs ++ map + } + } // Fallback to the default definition of `deterministic` diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 6c1fc8e945d..fd91779c0f6 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -25,8 +25,9 @@ import ai.rapids.cudf.Scalar; import ai.rapids.cudf.Schema; import ai.rapids.cudf.Table; - +import com.nvidia.spark.rapids.shims.v2.GpuTypeShims; import org.apache.arrow.memory.ReferenceManager; + import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.types.*; import org.apache.spark.sql.vectorized.ColumnVector; @@ -460,6 +461,13 @@ public void releaseReferences() { } private static DType toRapidsOrNull(DataType type) { + DType ret = toRapidsOrNullCommon(type); + // Check types that shim supporting + // e.g.: Spark 3.3.0 begin supporting AnsiIntervalType to/from parquet + return (ret != null) ? ret : GpuTypeShims.toRapidsOrNull(type); + } + + private static DType toRapidsOrNullCommon(DataType type) { if (type instanceof LongType) { return DType.INT64; } else if (type instanceof DoubleType) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index e25a1f08ffb..34b5fc688fc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -802,7 +802,7 @@ object GpuOverrides extends Logging { .map(r => r.wrap(expr, conf, parent, r).asInstanceOf[BaseExprMeta[INPUT]]) .getOrElse(new RuleNotFoundExprMeta(expr, conf, parent)) - lazy val fileFormats: Map[FileFormatType, Map[FileFormatOp, FileFormatChecks]] = Map( + lazy val basicFormats: Map[FileFormatType, Map[FileFormatOp, FileFormatChecks]] = Map( (CsvFormatType, FileFormatChecks( cudfRead = TypeSig.commonCudfTypes + TypeSig.DECIMAL_128, cudfWrite = TypeSig.none, @@ -828,6 +828,8 @@ object GpuOverrides extends Logging { sparkSig = (TypeSig.cpuAtomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested()))) + lazy val fileFormats = basicFormats ++ ShimLoader.getSparkShims.getFileFormats + val commonExpressions: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq( expr[Literal]( "Holds a static value from the query", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index 9d971b1d18d..e1a9f0e483a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{NvtxColor, NvtxRange} import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder -import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode +import com.nvidia.spark.rapids.shims.v2.{GpuTypeShims, ShimUnaryExecNode} import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -138,6 +138,9 @@ private[rapids] object GpuRowToColumnConverter { getConverterForType(v, vcn)) case (NullType, true) => NullConverter + // check special Shims types, such as DayTimeIntervalType + case (otherType, nullable) if GpuTypeShims.hasConverterForType(otherType) => + GpuTypeShims.getConverterForType(otherType, nullable) case (unknown, _) => throw new UnsupportedOperationException( s"Type $unknown not supported") } @@ -284,7 +287,7 @@ private[rapids] object GpuRowToColumnConverter { override def getNullSize: Double = 4 + VALIDITY } - private object LongConverter extends TypeConverter { + private[rapids] object LongConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { @@ -299,7 +302,7 @@ private[rapids] object GpuRowToColumnConverter { override def getNullSize: Double = 8 + VALIDITY } - private object NotNullLongConverter extends TypeConverter { + private[rapids] object NotNullLongConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 8b29932e014..84b16b4c8c2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -115,6 +115,7 @@ trait SparkShims { exportColumnRdd: Boolean): GpuColumnarToRowExecParent def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] + def getFileFormats: Map[FileFormatType, Map[FileFormatOp, FileFormatChecks]] = Map() def getScalaUDFAsExpression( function: AnyRef,