diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 8247695e088..b33aa910a31 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -29,7 +29,10 @@ def read_parquet_sql(data_path): parquet_gens_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, date_gen, - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))], + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)), ArrayGen(byte_gen), + ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen), + ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))), + ArrayGen(ArrayGen(byte_gen))], pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/132'))] # test with original parquet file reader, the multi-file parallel reader for cloud, and coalesce file reader for @@ -51,7 +54,9 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs, lambda spark : gen_df(spark, gen_list).write.parquet(data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) all_confs = reader_confs.copy() - all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, 'spark.sql.legacy.parquet.datetimeRebaseModeInRead': 'CORRECTED'}) + # once https://github.com/NVIDIA/spark-rapids/issues/1126 is in we can remove spark.sql.legacy.parquet.datetimeRebaseModeInRead config which is a workaround + # for nested timestamp/date support assert_gpu_and_cpu_are_equal_collect(read_func(data_path), conf=all_confs) @@ -119,14 +124,36 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS'] + +# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed delete this test and merge it +# into test_ts_read_round_trip nested timestamps and dates are not supported right now. +@pytest.mark.parametrize('gen', [ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))), + ArrayGen(ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))))], ids=idfn) +@pytest.mark.parametrize('ts_write', parquet_ts_write_options) +@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY']) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1126') +def test_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark : unary_op_df(spark, gen).write.parquet(data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase, + 'spark.sql.parquet.outputTimestampType': ts_write}) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.parquet(data_path), + conf=all_confs) + +# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with +# timestamp_gen +@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))], ids=idfn) @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY']) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): - # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with - # timestamp_gen - gen = TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)) +def test_ts_read_round_trip(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : unary_op_df(spark, gen).write.parquet(data_path), @@ -143,14 +170,15 @@ def readParquetCatchException(spark, data_path): df = spark.read.parquet(data_path).collect() assert e_info.match(r".*SparkUpgradeException.*") +# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed nested timestamps and dates should be added in +# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with +# timestamp_gen +@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))], ids=idfn) @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['LEGACY']) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): - # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with - # timestamp_gen - gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc)) +def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : unary_op_df(spark, gen).write.parquet(data_path), diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 079f9931764..0fd4f9966c2 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids.shims.spark300 import java.time.ZoneId import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.GpuOverrides.isSupportedType import com.nvidia.spark.rapids.spark300.RapidsShuffleManager import org.apache.spark.SparkEnv @@ -136,7 +135,10 @@ class Spark300Shims extends SparkShims { "Reading data from files, often from Hive tables", (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) + GpuOverrides.isSupportedType(t, + allowArray = true, + allowStringMaps = true, + allowNesting = true) // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index fbe20aa9a9a..1b3e3ff282f 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -18,9 +18,9 @@ package com.nvidia.spark.rapids.shims.spark300db import java.time.ZoneId -import com.nvidia.spark.rapids.GpuOverrides.isSupportedType import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.spark300.Spark300Shims + import org.apache.spark.sql.rapids.shims.spark300db._ import org.apache.hadoop.fs.Path @@ -91,7 +91,10 @@ class Spark300dbShims extends Spark300Shims { (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) + GpuOverrides.isSupportedType(t, + allowArray = true, + allowStringMaps = true, + allowNesting = true) // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala index 9406513340b..852f48b320f 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala @@ -16,11 +16,10 @@ package com.nvidia.spark.rapids.shims.spark301db -import com.nvidia.spark.rapids.GpuOverrides.isSupportedType import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.spark301.Spark301Shims -import org.apache.spark.sql.rapids.shims.spark301db._ +import org.apache.spark.sql.rapids.shims.spark301db._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -84,7 +83,10 @@ class Spark301dbShims extends Spark301Shims { "Reading data from files, often from Hive tables", (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) + GpuOverrides.isSupportedType(t, + allowArray = true, + allowStringMaps = true, + allowNesting = true) // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala index d1197d53333..2939ff18534 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala @@ -17,7 +17,6 @@ package com.nvidia.spark.rapids.shims.spark310 import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.GpuOverrides.isSupportedType import com.nvidia.spark.rapids.shims.spark301.Spark301Shims import com.nvidia.spark.rapids.spark310.RapidsShuffleManager @@ -137,12 +136,15 @@ class Spark310Shims extends Spark301Shims { GpuOverrides.exec[FileSourceScanExec]( "Reading data from files, often from Hive tables", (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowArray = true, + allowStringMaps = true, + allowNesting = true) + // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty - override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) - override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) override def convertToGpu(): GpuExec = { @@ -214,9 +216,6 @@ class Spark310Shims extends Spark301Shims { a.dataFilters, conf) } - - override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) }), GpuOverrides.scan[OrcScan]( "ORC parsing", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala b/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala index 3205cbc0585..27c9c85e249 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala @@ -27,6 +27,8 @@ object RebaseHelper extends Arm { private[this] def isDateTimeRebaseNeeded(column: ColumnVector, startDay: Int, startTs: Long): Boolean = { + // TODO update this for nested column checks + // https://github.com/NVIDIA/spark-rapids/issues/1126 val dtype = column.getType if (dtype == DType.TIMESTAMP_DAYS) { withResource(Scalar.timestampDaysFromInt(startDay)) { minGood => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 3ec10c7d8cd..defe9a34ece 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1845,6 +1845,12 @@ object GpuOverrides { override val childScans: scala.Seq[ScanMeta[_]] = Seq(GpuOverrides.wrapScan(p.scan, conf, Some(this))) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, + allowStringMaps = true, + allowArray = true, + allowNesting = true) + override def convertToGpu(): GpuExec = GpuBatchScanExec(p.output, childScans(0).convertToGpu()) }), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 34813107963..38bb457eea5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -62,7 +62,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.InputFileUtils import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{MapType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DataType, DateType, MapType, StringType, StructType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -134,8 +134,11 @@ object GpuParquetScanBase { } for (field <- readSchema) { - if (!GpuColumnVector.isSupportedType(field.dataType) - && !field.dataType.isInstanceOf[MapType]) { + if (!GpuOverrides.isSupportedType( + field.dataType, + allowStringMaps = true, + allowArray = true, + allowNesting = true)) { meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}") } } @@ -153,6 +156,17 @@ object GpuParquetScanBase { val schemaHasTimestamps = readSchema.exists { field => TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType]) } + def isTsOrDate(dt: DataType) : Boolean = dt match { + case TimestampType | DateType => true + case _ => false + } + val schemaMightNeedNestedRebase = readSchema.exists { field => + field.dataType match { + case MapType(_, _, _) | ArrayType(_, _) | StructType(_) => + TrampolineUtil.dataTypeExistsRecursively(field.dataType, isTsOrDate) + case _ => false + } + } // Currently timestamp conversion is not supported. // If support needs to be added then we need to follow the logic in Spark's @@ -168,9 +182,16 @@ object GpuParquetScanBase { } sqlConf.get(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key) match { - case "EXCEPTION" => // Good + case "EXCEPTION" => if (schemaMightNeedNestedRebase) { + meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " + + s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} is EXCEPTION") + } case "CORRECTED" => // Good - case "LEGACY" => // Good, but it really is EXCEPTION for us... + case "LEGACY" => // really is EXCEPTION for us... + if (schemaMightNeedNestedRebase) { + meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " + + s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} is LEGACY") + } case other => meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode") }