From 1df405fb122fa492e2f499b9bb1cf3ba5ecfd060 Mon Sep 17 00:00:00 2001 From: chenzhx Date: Fri, 8 Jul 2022 11:34:23 +0800 Subject: [PATCH] [SPARK-38899][SQL] DS V2 supports push down datetime functions ### What changes were proposed in this pull request? Currently, Spark have some datetime functions. Please refer https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L577 These functions show below: `DATE_ADD,` `DATEDIFF`, `TRUNC`, `EXTRACT`, `SECOND`, `MINUTE`, `HOUR`, `MONTH`, `QUARTER`, `YEAR`, `DAYOFWEEK`, `DAYOFMONTH`, `DAYOFYEAR` The mainstream databases support these functions show below. Function|PostgreSQL|ClickHouse|H2|MySQL|Oracle|Presto|Teradata|Snowflake|DB2|Vertica|Exasol|Impala|Mariadb|Druid|Singlestore|ElasticSearch -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- `DateAdd`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|No|No|Yes|Yes|No|Yes|Yes `DateDiff`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|Yes|No|Yes|Yes|No|Yes|Yes `DateTrunc`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes| Yes|Yes|Yes|Yes|No|Yes|Yes|Yes `Hour`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Minute`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Month`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Quarter`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Second`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `Year`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes `DayOfMonth`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes `DayOfWeek`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes `DayOfYear`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes `WEEK_OF_YEAR`|Yes|No|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes `YEAR_OF_WEEK`|No|No|Yes|Yes|Yes|Yes|No|Yes|No|No|No|No|Yes|No|No|No DS V2 should supports push down these datetime functions. ### Why are the changes needed? DS V2 supports push down datetime functions. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes #36663 from chenzhx/datetime. Authored-by: chenzhx Signed-off-by: Wenchen Fan --- .../sql/connector/expressions/Extract.java | 62 ++++++++ .../expressions/GeneralScalarExpression.java | 18 +++ .../util/V2ExpressionSQLBuilder.java | 11 ++ .../catalyst/util/V2ExpressionBuilder.scala | 57 ++++++- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 26 ++++ .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 146 +++++++++++++++--- 6 files changed, 296 insertions(+), 24 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java new file mode 100644 index 0000000000000..a925f1ee31a98 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Extract.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions; + +import org.apache.spark.annotation.Evolving; + +import java.io.Serializable; + +/** + * Represent an extract function, which extracts and returns the value of a + * specified datetime field from a datetime or interval value expression. + *

+ * The currently supported fields names following the ISO standard: + *

    + *
  1. SECOND Since 3.4.0
  2. + *
  3. MINUTE Since 3.4.0
  4. + *
  5. HOUR Since 3.4.0
  6. + *
  7. MONTH Since 3.4.0
  8. + *
  9. QUARTER Since 3.4.0
  10. + *
  11. YEAR Since 3.4.0
  12. + *
  13. DAY_OF_WEEK Since 3.4.0
  14. + *
  15. DAY Since 3.4.0
  16. + *
  17. DAY_OF_YEAR Since 3.4.0
  18. + *
  19. WEEK Since 3.4.0
  20. + *
  21. YEAR_OF_WEEK Since 3.4.0
  22. + *
+ * + * @since 3.4.0 + */ + +@Evolving +public class Extract implements Expression, Serializable { + + private String field; + private Expression source; + + public Extract(String field, Expression source) { + this.field = field; + this.source = source; + } + + public String field() { return field; } + public Expression source() { return source; } + + @Override + public Expression[] children() { return new Expression[]{ source() }; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java index ab9e33e86be77..53c511a87f691 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java @@ -346,6 +346,24 @@ *
  • Since version: 3.4.0
  • * * + *
  • Name: DATE_ADD + *
      + *
    • SQL semantic: DATE_ADD(start_date, num_days)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • + *
  • Name: DATE_DIFF + *
      + *
    • SQL semantic: DATE_DIFF(end_date, start_date)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • + *
  • Name: TRUNC + *
      + *
    • SQL semantic: TRUNC(date, format)
    • + *
    • Since version: 3.4.0
    • + *
    + *
  • * * Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off, * including: add, subtract, multiply, divide, remainder, pmod. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java index 9b62fedcc8055..2a01102614908 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java @@ -23,6 +23,7 @@ import org.apache.spark.sql.connector.expressions.Cast; import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.Extract; import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.expressions.GeneralScalarExpression; import org.apache.spark.sql.connector.expressions.Literal; @@ -46,6 +47,9 @@ public String build(Expression expr) { } else if (expr instanceof Cast) { Cast cast = (Cast) expr; return visitCast(build(cast.expression()), cast.dataType()); + } else if (expr instanceof Extract) { + Extract extract = (Extract) expr; + return visitExtract(extract.field(), build(extract.source())); } else if (expr instanceof GeneralScalarExpression) { GeneralScalarExpression e = (GeneralScalarExpression) expr; String name = e.name(); @@ -136,6 +140,9 @@ public String build(Expression expr) { case "UPPER": case "LOWER": case "TRANSLATE": + case "DATE_ADD": + case "DATE_DIFF": + case "TRUNC": return visitSQLFunction(name, Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new)); case "CASE_WHEN": { @@ -327,4 +334,8 @@ protected String visitTrim(String direction, String[] inputs) { return "TRIM(" + direction + " " + inputs[1] + " FROM " + inputs[0] + ")"; } } + + protected String visitExtract(String field, String source) { + return "EXTRACT(" + field + " FROM " + source + ")"; + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 163e071f08ead..8bb65a8804471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc} +import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc} import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate} -import org.apache.spark.sql.types.BooleanType +import org.apache.spark.sql.types.{BooleanType, IntegerType} /** * The builder to generate V2 expressions from catalyst expressions. @@ -344,6 +344,59 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { } else { None } + case date: DateAdd => + val childrenExpressions = date.children.flatMap(generateExpression(_)) + if (childrenExpressions.length == date.children.length) { + Some(new GeneralScalarExpression("DATE_ADD", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case date: DateDiff => + val childrenExpressions = date.children.flatMap(generateExpression(_)) + if (childrenExpressions.length == date.children.length) { + Some(new GeneralScalarExpression("DATE_DIFF", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case date: TruncDate => + val childrenExpressions = date.children.flatMap(generateExpression(_)) + if (childrenExpressions.length == date.children.length) { + Some(new GeneralScalarExpression("TRUNC", childrenExpressions.toArray[V2Expression])) + } else { + None + } + case Second(child, _) => + generateExpression(child).map(v => new V2Extract("SECOND", v)) + case Minute(child, _) => + generateExpression(child).map(v => new V2Extract("MINUTE", v)) + case Hour(child, _) => + generateExpression(child).map(v => new V2Extract("HOUR", v)) + case Month(child) => + generateExpression(child).map(v => new V2Extract("MONTH", v)) + case Quarter(child) => + generateExpression(child).map(v => new V2Extract("QUARTER", v)) + case Year(child) => + generateExpression(child).map(v => new V2Extract("YEAR", v)) + // DayOfWeek uses Sunday = 1, Monday = 2, ... and ISO standard is Monday = 1, ..., + // so we use the formula ((ISO_standard % 7) + 1) to do translation. + case DayOfWeek(child) => + generateExpression(child).map(v => new GeneralScalarExpression("+", + Array[V2Expression](new GeneralScalarExpression("%", + Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))), + LiteralValue(1, IntegerType)))) + // WeekDay uses Monday = 0, Tuesday = 1, ... and ISO standard is Monday = 1, ..., + // so we use the formula (ISO_standard - 1) to do translation. + case WeekDay(child) => + generateExpression(child).map(v => new GeneralScalarExpression("-", + Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(1, IntegerType)))) + case DayOfMonth(child) => + generateExpression(child).map(v => new V2Extract("DAY", v)) + case DayOfYear(child) => + generateExpression(child).map(v => new V2Extract("DAY_OF_YEAR", v)) + case WeekOfYear(child) => + generateExpression(child).map(v => new V2Extract("WEEK", v)) + case YearOfWeek(child) => + generateExpression(child).map(v => new V2Extract("YEAR_OF_WEEK", v)) // TODO supports other expressions case ApplyFunctionExpression(function, children) => val childrenExpressions = children.flatMap(generateExpression(_)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 1202f51ef94b2..f96dd5559f6e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -22,10 +22,12 @@ import java.util.Locale import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.functions.UnboundFunction +import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType} @@ -132,4 +134,28 @@ private[sql] object H2Dialect extends JdbcDialect { } super.classifyException(message, e) } + + override def compileExpression(expr: Expression): Option[String] = { + val jdbcSQLBuilder = new H2JDBCSQLBuilder() + try { + Some(jdbcSQLBuilder.build(expr)) + } catch { + case NonFatal(e) => + logWarning("Error occurs while compiling V2 expression", e) + None + } + } + + class H2JDBCSQLBuilder extends JDBCSQLBuilder { + + override def visitExtract(field: String, source: String): String = { + val newField = field match { + case "DAY_OF_WEEK" => "ISO_DAY_OF_WEEK" + case "WEEK" => "ISO_WEEK" + case "YEAR_OF_WEEK" => "ISO_WEEK_YEAR" + case _ => field + } + s"EXTRACT($newField FROM $source)" + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 865d4718d6883..4156ae5b27928 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -181,6 +181,14 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel "(1, 'bottle', 11111111111111111111.123)").executeUpdate() conn.prepareStatement("INSERT INTO \"test\".\"item\" VALUES " + "(1, 'bottle', 99999999999999999999.123)").executeUpdate() + + conn.prepareStatement( + "CREATE TABLE \"test\".\"datetime\" (name TEXT(32), date1 DATE, time1 TIMESTAMP)") + .executeUpdate() + conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " + + "('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate() + conn.prepareStatement("INSERT INTO \"test\".\"datetime\" VALUES " + + "('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate() } H2Dialect.registerFunction("my_avg", IntegralAverage) H2Dialect.registerFunction("my_strlen", StrLen(CharLength)) @@ -199,9 +207,11 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } private def checkPushedInfo(df: DataFrame, expectedPlanFragment: String*): Unit = { - df.queryExecution.optimizedPlan.collect { - case _: DataSourceV2ScanRelation => - checkKeywordsExistsInExplain(df, expectedPlanFragment: _*) + withSQLConf(SQLConf.MAX_METADATA_STRING_LENGTH.key -> "1000") { + df.queryExecution.optimizedPlan.collect { + case _: DataSourceV2ScanRelation => + checkKeywordsExistsInExplain(df, expectedPlanFragment: _*) + } } } @@ -744,8 +754,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkSortRemoved(df9) checkLimitRemoved(df9) checkPushedInfo(df9, "PushedFilters: [], " + - "PushedTopN: ORDER BY [CASE WHEN (SALARY > 8000.00) AND " + - "(SALARY < 10000.00) THEN SALARY ELSE 0.00 END ASC NULL..., ") + "PushedTopN: " + + "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") checkAnswer(df9, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) @@ -762,8 +773,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkSortRemoved(df10, false) checkLimitRemoved(df10, false) checkPushedInfo(df10, "PushedFilters: [], " + - "PushedTopN: ORDER BY [CASE WHEN (SALARY > 8000.00) AND " + - "(SALARY < 10000.00) THEN SALARY ELSE 0.00 END ASC NULL..., ") + "PushedTopN: " + + "ORDER BY [CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END " + + "ASC NULLS FIRST, DEPT ASC NULLS FIRST, SALARY ASC NULLS FIRST] LIMIT 3,") checkAnswer(df10, Seq(Row(1, "amy", 10000, 0), Row(2, "david", 10000, 0), Row(2, "alex", 12000, 0))) } @@ -880,8 +892,9 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .filter(floor($"bonus") === 1200) .filter(ceil($"bonus") === 1200) checkFiltersRemoved(df13) - checkPushedInfo(df13, "PushedFilters: [BONUS IS NOT NULL, LN(BONUS) > 7.0, EXP(BONUS) > 0.0, " + - "(POWER(BONUS, 2.0)) = 1440000.0, SQRT(BONU...,") + checkPushedInfo(df13, "PushedFilters: " + + "[BONUS IS NOT NULL, LN(BONUS) > 7.0, EXP(BONUS) > 0.0, (POWER(BONUS, 2.0)) = 1440000.0, " + + "SQRT(BONUS) > 34.0, FLOOR(BONUS) = 1200, CEIL(BONUS) = 1200],") checkAnswer(df13, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true))) @@ -903,8 +916,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .filter(radians($"bonus") > 20) .filter(signum($"bonus") === 1) checkFiltersRemoved(df15) - checkPushedInfo(df15, "PushedFilters: [BONUS IS NOT NULL, (LOG(2.0, BONUS)) > 10.0, " + - "LOG10(BONUS) > 3.0, (ROUND(BONUS, 0)) = 1200.0, DEG...,") + checkPushedInfo(df15, "PushedFilters: " + + "[BONUS IS NOT NULL, (LOG(2.0, BONUS)) > 10.0, LOG10(BONUS) > 3.0, " + + "(ROUND(BONUS, 0)) = 1200.0, DEGREES(BONUS) > 68754.0, RADIANS(BONUS) > 20.0, " + + "SIGN(BONUS) = 1.0],") checkAnswer(df15, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true))) @@ -921,8 +936,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .filter(atan($"bonus") > 1.4) .filter(atan2($"bonus", $"bonus") > 0.7) checkFiltersRemoved(df16) - checkPushedInfo(df16, "PushedFilters: [BONUS IS NOT NULL, SIN(BONUS) < -0.08, " + - "SINH(BONUS) > 200.0, COS(BONUS) > 0.9, COSH(BONUS) > 200....,") + checkPushedInfo(df16, "PushedFilters: [" + + "BONUS IS NOT NULL, SIN(BONUS) < -0.08, SINH(BONUS) > 200.0, COS(BONUS) > 0.9, " + + "COSH(BONUS) > 200.0, TAN(BONUS) < -0.08, TANH(BONUS) = 1.0, COT(BONUS) < -11.0, " + + "ASIN(BONUS) > 0.1, ACOS(BONUS) > 1.4, ATAN(BONUS) > 1.4, (ATAN2(BONUS, BONUS)) > 0.7],") checkAnswer(df16, Seq(Row(1, "cathy", 9000, 1200, false), Row(2, "alex", 12000, 1200, false), Row(6, "jen", 12000, 1200, true))) @@ -1025,18 +1042,92 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel |AND cast(dept as short) > 1 |AND cast(bonus as decimal(20, 2)) > 1200""".stripMargin) checkFiltersRemoved(df6, ansiMode) - val expectedPlanFragment8 = if (ansiMode) { + val expectedPlanFragment6 = if (ansiMode) { "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL, " + - "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, ...," + "CAST(BONUS AS string) LIKE '%30%', CAST(DEPT AS byte) > 1, " + + "CAST(DEPT AS short) > 1, CAST(BONUS AS decimal(20,2)) > 1200.00]" } else { "PushedFilters: [BONUS IS NOT NULL, DEPT IS NOT NULL]," } - checkPushedInfo(df6, expectedPlanFragment8) + checkPushedInfo(df6, expectedPlanFragment6) checkAnswer(df6, Seq(Row(2, "david", 10000, 1300, true))) } } } + test("scan with filter push-down with date time functions") { + val df1 = sql("SELECT name FROM h2.test.datetime WHERE " + + "dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ") + checkFiltersRemoved(df1) + val expectedPlanFragment1 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100, " + + "EXTRACT(DAY FROM DATE1) > 10]" + checkPushedInfo(df1, expectedPlanFragment1) + checkAnswer(df1, Seq(Row("amy"), Row("alex"))) + + val df2 = sql("SELECT name FROM h2.test.datetime WHERE " + + "year(date1) = 2022 AND quarter(date1) = 2") + checkFiltersRemoved(df2) + val expectedPlanFragment2 = + "[DATE1 IS NOT NULL, EXTRACT(YEAR FROM DATE1) = 2022, " + + "EXTRACT(QUARTER FROM DATE1) = 2]" + checkPushedInfo(df2, expectedPlanFragment2) + checkAnswer(df2, Seq(Row("amy"), Row("alex"))) + + val df3 = sql("SELECT name FROM h2.test.datetime WHERE " + + "second(time1) = 0 AND month(date1) = 5") + checkFiltersRemoved(df3) + val expectedPlanFragment3 = + "PushedFilters: [TIME1 IS NOT NULL, DATE1 IS NOT NULL, " + + "EXTRACT(SECOND FROM TIME1) = 0, EXTRACT(MONTH FROM DATE1) = 5]" + checkPushedInfo(df3, expectedPlanFragment3) + checkAnswer(df3, Seq(Row("amy"), Row("alex"))) + + val df4 = sql("SELECT name FROM h2.test.datetime WHERE " + + "hour(time1) = 0 AND minute(time1) = 0") + checkFiltersRemoved(df4) + val expectedPlanFragment4 = + "PushedFilters: [TIME1 IS NOT NULL, EXTRACT(HOUR FROM TIME1) = 0, " + + "EXTRACT(MINUTE FROM TIME1) = 0]" + checkPushedInfo(df4, expectedPlanFragment4) + checkAnswer(df4, Seq(Row("amy"), Row("alex"))) + + val df5 = sql("SELECT name FROM h2.test.datetime WHERE " + + "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") + checkFiltersRemoved(df5) + val expectedPlanFragment5 = + "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(WEEK FROM DATE1) > 10, " + + "EXTRACT(YEAR_OF_WEEK FROM DATE1) = 2022]" + checkPushedInfo(df5, expectedPlanFragment5) + checkAnswer(df5, Seq(Row("alex"), Row("amy"))) + + // H2 does not support + val df6 = sql("SELECT name FROM h2.test.datetime WHERE " + + "trunc(date1, 'week') = date'2022-05-16' AND date_add(date1, 1) = date'2022-05-20' " + + "AND datediff(date1, '2022-05-10') > 0") + checkFiltersRemoved(df6, false) + val expectedPlanFragment6 = + "PushedFilters: [DATE1 IS NOT NULL]" + checkPushedInfo(df6, expectedPlanFragment6) + checkAnswer(df6, Seq(Row("amy"))) + + val df7 = sql("SELECT name FROM h2.test.datetime WHERE " + + "weekday(date1) = 2") + checkFiltersRemoved(df7) + val expectedPlanFragment7 = + "PushedFilters: [DATE1 IS NOT NULL, (EXTRACT(DAY_OF_WEEK FROM DATE1) - 1) = 2]" + checkPushedInfo(df7, expectedPlanFragment7) + checkAnswer(df7, Seq(Row("alex"))) + + val df8 = sql("SELECT name FROM h2.test.datetime WHERE " + + "dayofweek(date1) = 4") + checkFiltersRemoved(df8) + val expectedPlanFragment8 = + "PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]" + checkPushedInfo(df8, expectedPlanFragment8) + checkAnswer(df8, Seq(Row("alex"))) + } + test("scan with filter push-down with UDF") { JdbcDialects.unregisterDialect(H2Dialect) try { @@ -1115,7 +1206,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people", false), Row("test", "empty_table", false), Row("test", "employee", false), Row("test", "item", false), Row("test", "dept", false), - Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false))) + Row("test", "person", false), Row("test", "view1", false), Row("test", "view2", false), + Row("test", "datetime", false))) } test("SQL API: create table as select") { @@ -1213,7 +1305,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkFiltersRemoved(df2) val expectedPlanFragment2 = "PushedFilters: [NAME IS NOT NULL, TRIM(BOTH FROM NAME) = 'jen', " + - "(TRIM(BOTH 'j' FROM NAME)) = 'en', (TRANSLATE(NA..." + "(TRIM(BOTH 'j' FROM NAME)) = 'en', (TRANSLATE(NAME, 'e', '1')) = 'j1n']" checkPushedInfo(df2, expectedPlanFragment2) checkAnswer(df2, Seq(Row(6, "jen", 12000, 1200, true))) @@ -1803,10 +1895,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel """.stripMargin) checkAggregateRemoved(df) checkPushedInfo(df, - "PushedAggregates: [COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00)" + - " THEN SALARY ELSE 0.00 END), COUNT(CAS..., " + - "PushedFilters: [], " + - "PushedGroupByExpressions: [DEPT], ") + "PushedAggregates: " + + "[COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END), " + + "COUNT(CASE WHEN (SALARY > 8000.00) AND (SALARY <= 13000.00) THEN SALARY ELSE 0.00 END), " + + "COUNT(CASE WHEN (SALARY > 11000.00) OR (SALARY < 10000.00) THEN SALARY ELSE 0.00 END), " + + "COUNT(CASE WHEN (SALARY >= 12000.00) OR (SALARY < 9000.00) THEN SALARY ELSE 0.00 END), " + + "MAX(CASE WHEN (SALARY <= 10000.00) AND (SALARY >= 8000.00) THEN SALARY ELSE 0.00 END), " + + "MAX(CASE WHEN (SALARY <= 9000.00) OR (SALARY > 10000.00) THEN SALARY ELSE 0.00 END), " + + "MAX(CASE WHEN (SALARY = 0.00) OR (SALARY >= 8000.00) THEN SALARY ELSE 0.00 END), " + + "MAX(CASE WHEN (SALARY <= 8000.00) OR (SALARY >= 10000.00) THEN 0.00 ELSE SALARY END), " + + "MIN(CASE WHEN (SALARY <= 8000.00) AND (SALARY IS NOT NULL) THEN SALARY ELSE 0.00 END), " + + "SUM(CASE WHEN SALARY > 10000.00 THEN 2 WHEN SALARY > 8000.00 THEN 1 END), " + + "AVG(CASE WHEN (SALARY <= 8000.00) AND (SALARY IS NULL) THEN SALARY ELSE 0.00 END)], " + + "PushedFilters: [], " + + "PushedGroupByExpressions: [DEPT],") checkAnswer(df, Seq(Row(1, 1, 1, 1, 1, 0d, 12000d, 0d, 12000d, 0d, 0d, 2, 0d), Row(2, 2, 2, 2, 2, 10000d, 12000d, 10000d, 12000d, 0d, 0d, 3, 0d), Row(2, 2, 2, 2, 2, 10000d, 9000d, 10000d, 10000d, 9000d, 0d, 2, 0d)))