Skip to content

Commit

Permalink
[SPARK-38899][SQL][FOLLOWUP] Fix bug extract datetime in DS V2 pushdown
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

[SPARK-38899](#36663) supports extract function in JDBC data source.
But the implement is incorrect.
This PR just add a test case and it will be failed !
The test case show below.
```
test("scan with filter push-down with date time functions")  {
    val df9 = sql("SELECT name FROM h2.test.datetime WHERE " +
      "dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
    checkFiltersRemoved(df9)
    val expectedPlanFragment9 =
      "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100], " +
      "PushedTopN: ORDER BY [EXTRACT(DAY_OF_YEAR FROM DATE1) ASC NULLS FIRST] LIMIT 1,"
    checkPushedInfo(df9, expectedPlanFragment9)
    checkAnswer(df9, Seq(Row("alex")))
  }
```

The test case output failure show below.
```
"== Parsed Logical Plan ==
'GlobalLimit 1
+- 'LocalLimit 1
   +- 'Sort ['dayofyear('date1) ASC NULLS FIRST], true
      +- 'Project ['name]
         +- 'Filter ('dayofyear('date1) > 100)
            +- 'UnresolvedRelation [h2, test, datetime], [], false

== Analyzed Logical Plan ==
name: string
GlobalLimit 1
+- LocalLimit 1
   +- Project [name#x]
      +- Sort [dayofyear(date1#x) ASC NULLS FIRST], true
         +- Project [name#x, date1#x]
            +- Filter (dayofyear(date1#x) > 100)
               +- SubqueryAlias h2.test.datetime
                  +- RelationV2[NAME#x, DATE1#x, TIME1#x] h2.test.datetime test.datetime

== Optimized Logical Plan ==
Project [name#x]
+- RelationV2[NAME#x] test.datetime

== Physical Plan ==
*(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$145f6181a [NAME#x] PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100], PushedTopN: ORDER BY [org.apache.spark.sql.connector.expressions.Extract3b95fce9 ASC NULLS FIRST] LIMIT 1, ReadSchema: struct<NAME:string>

" did not contain "PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100], PushedTopN: ORDER BY [EXTRACT(DAY_OF_YEAR FROM DATE1) ASC NULLS FIRST] LIMIT 1,"
```

### Why are the changes needed?

Fix an implement bug.
The reason of the bug is the Extract the function does not implement the toString method when pushing down the JDBC data source.

### Does this PR introduce _any_ user-facing change?

'No'.
New feature.

### How was this patch tested?

New test case.

Closes #37469 from chenzhx/spark-master.

Authored-by: chenzhx <chen@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
chenzhx authored and cloud-fan committed Aug 11, 2022
1 parent e17d8ec commit 5dadf52
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.internal.connector.ToStringSQLBuilder;

import java.io.Serializable;

Expand Down Expand Up @@ -59,4 +60,10 @@ public Extract(String field, Expression source) {

@Override
public Expression[] children() { return new Expression[]{ source() }; }

@Override
public String toString() {
ToStringSQLBuilder builder = new ToStringSQLBuilder();
return builder.build(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,15 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
"PushedFilters: [DATE1 IS NOT NULL, ((EXTRACT(DAY_OF_WEEK FROM DATE1) % 7) + 1) = 4]"
checkPushedInfo(df8, expectedPlanFragment8)
checkAnswer(df8, Seq(Row("alex")))

val df9 = sql("SELECT name FROM h2.test.datetime WHERE " +
"dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
checkFiltersRemoved(df9)
val expectedPlanFragment9 =
"PushedFilters: [DATE1 IS NOT NULL, EXTRACT(DAY_OF_YEAR FROM DATE1) > 100], " +
"PushedTopN: ORDER BY [EXTRACT(DAY_OF_YEAR FROM DATE1) ASC NULLS FIRST] LIMIT 1,"
checkPushedInfo(df9, expectedPlanFragment9)
checkAnswer(df9, Seq(Row("alex")))
}

test("scan with filter push-down with misc functions") {
Expand Down

0 comments on commit 5dadf52

Please sign in to comment.