-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support unix_timestamp and to_unix_timestamp with non-UTC timezones (non-DST) #9816
Changes from 17 commits
3faacd8
9ed56ea
c7722f9
a37d1ce
e26e299
3696466
ce90809
3566f1c
749cfbf
f239624
465f0f3
877c9eb
83fee5b
fa439da
8ed6ff0
3b72923
003cf3a
0752ffc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -244,11 +244,16 @@ def test_dayofyear(data_gen): | |
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark : unary_op_df(spark, data_gen).select(f.dayofyear(f.col('a')))) | ||
|
||
|
||
non_utc_unix_time_allow = ['ProjectExec'] if not is_supported_time_zone() else [] | ||
|
||
|
||
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) | ||
@allow_non_gpu(*non_utc_allow) | ||
@allow_non_gpu(*non_utc_unix_time_allow) | ||
def test_unix_timestamp(data_gen): | ||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a')))) | ||
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), | ||
{"spark.rapids.sql.nonUTC.enabled": "true"}) | ||
|
||
|
||
@allow_non_gpu('ProjectExec') | ||
|
@@ -381,30 +386,30 @@ def fun(spark): | |
|
||
@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF']) | ||
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) | ||
@allow_non_gpu(*non_utc_allow) | ||
@allow_non_gpu(*non_utc_unix_time_allow) | ||
def test_unix_timestamp_improved(data_gen, ansi_enabled): | ||
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", | ||
"spark.sql.legacy.timeParserPolicy": "CORRECTED"} | ||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), | ||
copy_and_update({'spark.sql.ansi.enabled': ansi_enabled}, conf)) | ||
copy_and_update({'spark.sql.ansi.enabled': ansi_enabled, "spark.rapids.sql.nonUTC.enabled": "true"}, conf)) | ||
|
||
@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF']) | ||
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) | ||
@allow_non_gpu(*non_utc_allow) | ||
@allow_non_gpu(*non_utc_unix_time_allow) | ||
def test_unix_timestamp(data_gen, ansi_enabled): | ||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col("a"))), | ||
{'spark.sql.ansi.enabled': ansi_enabled}) | ||
{'spark.sql.ansi.enabled': ansi_enabled, "spark.rapids.sql.nonUTC.enabled": "true"}) | ||
|
||
@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF']) | ||
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) | ||
@allow_non_gpu(*non_utc_allow) | ||
@allow_non_gpu(*non_utc_unix_time_allow) | ||
def test_to_unix_timestamp_improved(data_gen, ansi_enabled): | ||
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} | ||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"), | ||
copy_and_update({'spark.sql.ansi.enabled': ansi_enabled}, conf)) | ||
copy_and_update({'spark.sql.ansi.enabled': ansi_enabled, "spark.rapids.sql.nonUTC.enabled": "true"}, conf)) | ||
|
||
str_date_and_format_gen = [pytest.param(StringGen('[0-9]{4}/[01][0-9]'),'yyyy/MM', marks=pytest.mark.xfail(reason="cudf does no checks")), | ||
(StringGen('[0-9]{4}/[01][12]/[0-2][1-8]'),'yyyy/MM/dd'), | ||
|
@@ -417,12 +422,13 @@ def invalid_date_string_df(spark): | |
return spark.createDataFrame([['invalid_date_string']], "a string") | ||
|
||
@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF']) | ||
@pytest.mark.parametrize('non_utc_timezone_enabled', [True, False], ids=['NOT_UTC_ON', 'NOT_UTC_OFF']) | ||
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) | ||
@allow_non_gpu(*non_utc_allow) | ||
def test_string_to_unix_timestamp(data_gen, date_form, ansi_enabled): | ||
@allow_non_gpu('ProjectExec') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't want to have a hard coded ProjectExec that does not know if we should fall back to the CPU or not!. |
||
def test_string_to_unix_timestamp(data_gen, date_form, ansi_enabled, non_utc_timezone_enabled): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This parameter is not used and nonUTC.enabled is going away. |
||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)), | ||
{'spark.sql.ansi.enabled': ansi_enabled}) | ||
{'spark.sql.ansi.enabled': ansi_enabled, "spark.rapids.sql.nonUTC.enabled": "true"}) | ||
|
||
def test_string_to_unix_timestamp_ansi_exception(): | ||
assert_gpu_and_cpu_error( | ||
|
@@ -431,12 +437,13 @@ def test_string_to_unix_timestamp_ansi_exception(): | |
conf=ansi_enabled_conf) | ||
|
||
@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF']) | ||
@pytest.mark.parametrize('non_utc_timezone_enabled', [True, False], ids=['NOT_UTC_ON', 'NOT_UTC_OFF']) | ||
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) | ||
@allow_non_gpu(*non_utc_allow) | ||
def test_string_unix_timestamp(data_gen, date_form, ansi_enabled): | ||
@allow_non_gpu('ProjectExec') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here too. |
||
def test_string_unix_timestamp(data_gen, date_form, ansi_enabled, non_utc_timezone_enabled): | ||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form)), | ||
{'spark.sql.ansi.enabled': ansi_enabled}) | ||
{'spark.sql.ansi.enabled': ansi_enabled, "spark.rapids.sql.nonUTC.enabled": non_utc_timezone_enabled}) | ||
|
||
def test_string_unix_timestamp_ansi_exception(): | ||
assert_gpu_and_cpu_error( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -824,25 +824,41 @@ abstract class GpuToTimestamp | |
val failOnError: Boolean = SQLConf.get.ansiEnabled | ||
|
||
override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { | ||
val tmp = if (lhs.dataType == StringType) { | ||
// rhs is ignored we already parsed the format | ||
if (getTimeParserPolicy == LegacyTimeParserPolicy) { | ||
parseStringAsTimestampWithLegacyParserPolicy( | ||
lhs, | ||
sparkFormat, | ||
strfFormat, | ||
DType.TIMESTAMP_MICROSECONDS, | ||
(col, strfFormat) => col.asTimestampMicroseconds(strfFormat)) | ||
} else { | ||
parseStringAsTimestamp( | ||
lhs, | ||
sparkFormat, | ||
strfFormat, | ||
DType.TIMESTAMP_MICROSECONDS, | ||
failOnError) | ||
} | ||
} else { // Timestamp or DateType | ||
lhs.getBase.asTimestampMicroseconds() | ||
val tmp = lhs.dataType match { | ||
case _: StringType => | ||
// rhs is ignored we already parsed the format | ||
if (getTimeParserPolicy == LegacyTimeParserPolicy) { | ||
parseStringAsTimestampWithLegacyParserPolicy( | ||
lhs, | ||
sparkFormat, | ||
strfFormat, | ||
DType.TIMESTAMP_MICROSECONDS, | ||
(col, strfFormat) => col.asTimestampMicroseconds(strfFormat)) | ||
} else { | ||
parseStringAsTimestamp( | ||
lhs, | ||
sparkFormat, | ||
strfFormat, | ||
DType.TIMESTAMP_MICROSECONDS, | ||
failOnError) | ||
} | ||
case _: DateType => | ||
timeZoneId match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still |
||
case Some(_) => | ||
if (GpuOverrides.isUTCTimezone(zoneId)) { | ||
lhs.getBase.asTimestampMicroseconds() | ||
} else { | ||
assert(GpuTimeZoneDB.isSupportedTimeZone(zoneId)) | ||
withResource(lhs.getBase.asTimestampMicroseconds) { tsInMs => | ||
GpuTimeZoneDB.fromTimestampToUtcTimestamp(tsInMs, zoneId) | ||
} | ||
} | ||
case None => lhs.getBase.asTimestampMicroseconds() | ||
} | ||
case _ => | ||
// Consistent with Spark's behavior which ignores timeZone for other types like timestamp | ||
// and timestampNtp. | ||
lhs.getBase.asTimestampMicroseconds() | ||
} | ||
// Return Timestamp value if dataType it is expecting is of TimestampType | ||
if (dataType.equals(TimestampType)) { | ||
|
@@ -892,8 +908,19 @@ abstract class GpuToTimestampImproved extends GpuToTimestamp { | |
failOnError) | ||
} | ||
} else if (lhs.dataType() == DateType){ | ||
lhs.getBase.asTimestampSeconds() | ||
} else { // Timestamp | ||
timeZoneId match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
case Some(_) => | ||
if (GpuOverrides.isUTCTimezone(zoneId)) { | ||
lhs.getBase.asTimestampSeconds() | ||
} else { | ||
assert(GpuTimeZoneDB.isSupportedTimeZone(zoneId)) | ||
withResource(lhs.getBase.asTimestampSeconds) { tsInMs => | ||
GpuTimeZoneDB.fromTimestampToUtcTimestamp(tsInMs, zoneId) | ||
} | ||
} | ||
case None => lhs.getBase.asTimestampSeconds() | ||
} | ||
} else { // Timestamp. Note: no need to consider timezone which is consistent with Spark | ||
// https://github.com/rapidsai/cudf/issues/5166 | ||
// The time is off by 1 second if the result is < 0 | ||
val longSecs = withResource(lhs.getBase.asTimestampSeconds()) { secs => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add cases for not supported Time Zone.
e.g.:
And test TZs locally:
export TZ=Iran
export TZ= America/Los_Angeles
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated