From c293a1173905670ecdcd85b8ca82a54b199f7109 Mon Sep 17 00:00:00 2001 From: sinkinben Date: Thu, 18 Aug 2022 18:50:31 +0800 Subject: [PATCH] init orc-cast-date Signed-off-by: sinkinben --- .../src/main/python/orc_cast_date_test.py | 41 +++++++++++++++++++ .../com/nvidia/spark/rapids/GpuOrcScan.scala | 34 +++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 integration_tests/src/main/python/orc_cast_date_test.py diff --git a/integration_tests/src/main/python/orc_cast_date_test.py b/integration_tests/src/main/python/orc_cast_date_test.py new file mode 100644 index 00000000000..634d220fc7d --- /dev/null +++ b/integration_tests/src/main/python/orc_cast_date_test.py @@ -0,0 +1,41 @@ +# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error +from data_gen import * +from pyspark.sql.types import * +from spark_session import with_cpu_session + + +@pytest.mark.parametrize('to_type', ['timestamp', 'string']) +def test_casting_from_date(spark_tmp_path, to_type): + orc_path = spark_tmp_path + '/orc_casting_from_date' + + # This case is failed + # data_gen = [('date_column', DateGen(end=date(1582, 1, 1)))] + # length = 10 + + # This case is fine + data_gen = [('date_column', DateGen(start=date(1582,10,15)))] + length = 2048 + + with_cpu_session( + func=lambda spark: gen_df(spark, data_gen, length=length).write.orc(orc_path), + ) + + assert_gpu_and_cpu_are_equal_collect( + func=lambda spark: spark.read.schema("date_column {}".format(to_type)).orc(orc_path), + ) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 45fa3c3b320..c991bdd5752 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -51,6 +51,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.RebaseDateTime import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{PartitionedFile, PartitioningAwareFileIndex} @@ -200,6 +201,10 @@ object GpuOrcScan extends Arm { if (fromDt == toDt) { return col } + println("-----------") + println(fromDt.toString) + println(toDt.toString) + println("-----------") (fromDt, toDt) match { // integral to integral case (DType.BOOL8 | DType.INT8 | DType.INT16 | DType.INT32 | DType.INT64, @@ -211,6 +216,28 @@ object GpuOrcScan extends Arm { } else { downCastAnyInteger(col, toDt) } + + // 'date' type in ORC is actually 'DType.TIMESTAMP_DAYS' + // date -> timestamp + // 'col' is in Proleptic Gregorian calendar, we need to cast it into Julian calendar + case (DType.TIMESTAMP_DAYS, DType.TIMESTAMP_MICROSECONDS) => + println(col.max().toString()) + val gregorianToJulian = RebaseDateTime.rebaseGregorianToJulianDays(col.max().getInt) + val julianToGregorian = RebaseDateTime.rebaseJulianToGregorianDays(col.max().getInt) + printf("gregorianToJulian: %d \n", gregorianToJulian) + printf("julianToGregorian: %d \n", julianToGregorian) + withResource(col.castTo(DType.INT64)) { longVec => + println(longVec.max().toString()) + withResource(Scalar.fromLong(24 * 60 * 60 * 1000 * 1000L)) { toMicroSeconds => + withResource(longVec.mul(toMicroSeconds)) { microSeconds => + microSeconds.castTo(DType.TIMESTAMP_MICROSECONDS) + } + } + } + // date -> string + case (DType.TIMESTAMP_DAYS, DType.STRING) => + col.asStrings() + // TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895 case (f, t) => throw new QueryExecutionException(s"Unsupported type casting: $f -> $t") @@ -239,6 +266,13 @@ object GpuOrcScan extends Arm { } case VARCHAR => to.getCategory == STRING + + case DATE => + to.getCategory match { + case STRING | TIMESTAMP => true + case _ => false + } + // TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895 case _ => false