Skip to content

Commit

Permalink
[SPARK-38899][SQL] DS V2 supports push down datetime functions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Currently, Spark have some datetime functions. Please refer
https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L577

These functions show below:
`DATE_ADD,`
`DATEDIFF`,
`TRUNC`,
`EXTRACT`,
`SECOND`,
`MINUTE`,
`HOUR`,
`MONTH`,
`QUARTER`,
`YEAR`,
`DAYOFWEEK`,
`DAYOFMONTH`,
`DAYOFYEAR`

The mainstream databases support these functions show below.
Function|PostgreSQL|ClickHouse|H2|MySQL|Oracle|Presto|Teradata|Snowflake|DB2|Vertica|Exasol|Impala|Mariadb|Druid|Singlestore|ElasticSearch
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
`DateAdd`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|No|No|Yes|Yes|No|Yes|Yes
`DateDiff`|No|Yes|Yes|Yes|Yes|Yes|No|Yes|No|Yes|No|Yes|Yes|No|Yes|Yes
`DateTrunc`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes| Yes|Yes|Yes|Yes|No|Yes|Yes|Yes
`Hour`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Minute`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Month`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Quarter`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Second`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`Year`|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes|Yes
`DayOfMonth`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes
`DayOfWeek`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
`DayOfYear`|Yes|Yes|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
`WEEK_OF_YEAR`|Yes|No|Yes|Yes|Yes|Yes|No|Yes|Yes|Yes|No|Yes|Yes|Yes|Yes|Yes
`YEAR_OF_WEEK`|No|No|Yes|Yes|Yes|Yes|No|Yes|No|No|No|No|Yes|No|No|No

DS V2 should supports push down these datetime functions.

### Why are the changes needed?

DS V2 supports push down datetime functions.

### Does this PR introduce _any_ user-facing change?

'No'.
New feature.

### How was this patch tested?

New tests.

Closes #36663 from chenzhx/datetime.

Authored-by: chenzhx <chen@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
chenzhx authored and cloud-fan committed Jul 8, 2022
1 parent 231d376 commit 1df405f
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Evolving;

import java.io.Serializable;

/**
* Represent an extract function, which extracts and returns the value of a
* specified datetime field from a datetime or interval value expression.
* <p>
* The currently supported fields names following the ISO standard:
* <ol>
* <li> <code>SECOND</code> Since 3.4.0 </li>
* <li> <code>MINUTE</code> Since 3.4.0 </li>
* <li> <code>HOUR</code> Since 3.4.0 </li>
* <li> <code>MONTH</code> Since 3.4.0 </li>
* <li> <code>QUARTER</code> Since 3.4.0 </li>
* <li> <code>YEAR</code> Since 3.4.0 </li>
* <li> <code>DAY_OF_WEEK</code> Since 3.4.0 </li>
* <li> <code>DAY</code> Since 3.4.0 </li>
* <li> <code>DAY_OF_YEAR</code> Since 3.4.0 </li>
* <li> <code>WEEK</code> Since 3.4.0 </li>
* <li> <code>YEAR_OF_WEEK</code> Since 3.4.0 </li>
* </ol>
*
* @since 3.4.0
*/

@Evolving
public class Extract implements Expression, Serializable {

private String field;
private Expression source;

public Extract(String field, Expression source) {
this.field = field;
this.source = source;
}

public String field() { return field; }
public Expression source() { return source; }

@Override
public Expression[] children() { return new Expression[]{ source() }; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,24 @@
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>DATE_ADD</code>
* <ul>
* <li>SQL semantic: <code>DATE_ADD(start_date, num_days)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>DATE_DIFF</code>
* <ul>
* <li>SQL semantic: <code>DATE_DIFF(end_date, start_date)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>TRUNC</code>
* <ul>
* <li>SQL semantic: <code>TRUNC(date, format)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* </ol>
* Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off,
* including: add, subtract, multiply, divide, remainder, pmod.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.spark.sql.connector.expressions.Cast;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.Extract;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.GeneralScalarExpression;
import org.apache.spark.sql.connector.expressions.Literal;
Expand All @@ -46,6 +47,9 @@ public String build(Expression expr) {
} else if (expr instanceof Cast) {
Cast cast = (Cast) expr;
return visitCast(build(cast.expression()), cast.dataType());
} else if (expr instanceof Extract) {
Extract extract = (Extract) expr;
return visitExtract(extract.field(), build(extract.source()));
} else if (expr instanceof GeneralScalarExpression) {
GeneralScalarExpression e = (GeneralScalarExpression) expr;
String name = e.name();
Expand Down Expand Up @@ -136,6 +140,9 @@ public String build(Expression expr) {
case "UPPER":
case "LOWER":
case "TRANSLATE":
case "DATE_ADD":
case "DATE_DIFF":
case "TRUNC":
return visitSQLFunction(name,
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
case "CASE_WHEN": {
Expand Down Expand Up @@ -327,4 +334,8 @@ protected String visitTrim(String direction, String[] inputs) {
return "TRIM(" + direction + " " + inputs[1] + " FROM " + inputs[0] + ")";
}
}

protected String visitExtract(String field, String source) {
return "EXTRACT(" + field + " FROM " + source + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.sql.catalyst.util

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc}
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, UserDefinedScalarFunc}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate}
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.{BooleanType, IntegerType}

/**
* The builder to generate V2 expressions from catalyst expressions.
Expand Down Expand Up @@ -344,6 +344,59 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
} else {
None
}
case date: DateAdd =>
val childrenExpressions = date.children.flatMap(generateExpression(_))
if (childrenExpressions.length == date.children.length) {
Some(new GeneralScalarExpression("DATE_ADD", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case date: DateDiff =>
val childrenExpressions = date.children.flatMap(generateExpression(_))
if (childrenExpressions.length == date.children.length) {
Some(new GeneralScalarExpression("DATE_DIFF", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case date: TruncDate =>
val childrenExpressions = date.children.flatMap(generateExpression(_))
if (childrenExpressions.length == date.children.length) {
Some(new GeneralScalarExpression("TRUNC", childrenExpressions.toArray[V2Expression]))
} else {
None
}
case Second(child, _) =>
generateExpression(child).map(v => new V2Extract("SECOND", v))
case Minute(child, _) =>
generateExpression(child).map(v => new V2Extract("MINUTE", v))
case Hour(child, _) =>
generateExpression(child).map(v => new V2Extract("HOUR", v))
case Month(child) =>
generateExpression(child).map(v => new V2Extract("MONTH", v))
case Quarter(child) =>
generateExpression(child).map(v => new V2Extract("QUARTER", v))
case Year(child) =>
generateExpression(child).map(v => new V2Extract("YEAR", v))
// DayOfWeek uses Sunday = 1, Monday = 2, ... and ISO standard is Monday = 1, ...,
// so we use the formula ((ISO_standard % 7) + 1) to do translation.
case DayOfWeek(child) =>
generateExpression(child).map(v => new GeneralScalarExpression("+",
Array[V2Expression](new GeneralScalarExpression("%",
Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(7, IntegerType))),
LiteralValue(1, IntegerType))))
// WeekDay uses Monday = 0, Tuesday = 1, ... and ISO standard is Monday = 1, ...,
// so we use the formula (ISO_standard - 1) to do translation.
case WeekDay(child) =>
generateExpression(child).map(v => new GeneralScalarExpression("-",
Array[V2Expression](new V2Extract("DAY_OF_WEEK", v), LiteralValue(1, IntegerType))))
case DayOfMonth(child) =>
generateExpression(child).map(v => new V2Extract("DAY", v))
case DayOfYear(child) =>
generateExpression(child).map(v => new V2Extract("DAY_OF_YEAR", v))
case WeekOfYear(child) =>
generateExpression(child).map(v => new V2Extract("WEEK", v))
case YearOfWeek(child) =>
generateExpression(child).map(v => new V2Extract("YEAR_OF_WEEK", v))
// TODO supports other expressions
case ApplyFunctionExpression(function, children) =>
val childrenExpressions = children.flatMap(generateExpression(_))
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import java.util.Locale
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType}
Expand Down Expand Up @@ -132,4 +134,28 @@ private[sql] object H2Dialect extends JdbcDialect {
}
super.classifyException(message, e)
}

override def compileExpression(expr: Expression): Option[String] = {
val jdbcSQLBuilder = new H2JDBCSQLBuilder()
try {
Some(jdbcSQLBuilder.build(expr))
} catch {
case NonFatal(e) =>
logWarning("Error occurs while compiling V2 expression", e)
None
}
}

class H2JDBCSQLBuilder extends JDBCSQLBuilder {

override def visitExtract(field: String, source: String): String = {
val newField = field match {
case "DAY_OF_WEEK" => "ISO_DAY_OF_WEEK"
case "WEEK" => "ISO_WEEK"
case "YEAR_OF_WEEK" => "ISO_WEEK_YEAR"
case _ => field
}
s"EXTRACT($newField FROM $source)"
}
}
}
Loading

0 comments on commit 1df405f

Please sign in to comment.