Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter #36047

Closed
wants to merge 1 commit into from

Conversation

Flyangz
Copy link

@Flyangz Flyangz commented Apr 2, 2022

What changes were proposed in this pull request?

Add ColumnPruning in InjectRuntimeFilter.injectBloomFilter to optimize the BoomFilter creation query.

Why are the changes needed?

It seems BloomFilter subqueries injected by InjectRuntimeFilter will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in InjectRuntimeFilterSuite:

withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
  SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
  val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62"
  sql(query).explain()
}

The reason is subqueries have not been optimized by ColumnPruning, and this pr will fix it.

Does this PR introduce any user-facing change?

No, not released

How was this patch tested?

Improve the test by adding columnPruningTakesEffect to check the optimizedPlan of bloom filter join.

@github-actions github-actions bot added the SQL label Apr 2, 2022
@wangyum
Copy link
Member

wangyum commented Apr 2, 2022

cc @sigmod

@@ -45,7 +45,8 @@ class SparkOptimizer(
PartitionPruning) :+
Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter,
RewritePredicateSubquery) :+
RewritePredicateSubquery,
OptimizeSubqueries) :+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great catch, @Flyangz!

Does it work if we do the following here:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala#L88

ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan)))

OptimizeSubqueries seems a bit of an overkill as

  • it runs all rules again for other non-BloomFilter subqueries as well
  • the BoomFilter creation query's input to the BloomFilterAggregate has already been optimized

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea, done.

@Flyangz Flyangz changed the title [SPARK-32268][SQL][FOLLOWUP] Add OptimizeSubqueries below the InjectRuntimeFilter [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter Apr 3, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@sigmod
Copy link
Contributor

sigmod commented Apr 3, 2022

LGTM. Can we merge it to branch-3.3 as well?

@wangyum wangyum closed this in c98725a Apr 4, 2022
wangyum pushed a commit that referenced this pull request Apr 4, 2022
### What changes were proposed in this pull request?
Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query.

### Why are the changes needed?
It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`:
```scala
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
  SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
  val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62"
  sql(query).explain()
}
```
The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it.

### Does this PR introduce _any_ user-facing change?
No, not released

### How was this patch tested?
Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join.

Closes #36047 from Flyangz/SPARK-32268-FOllOWUP.

Authored-by: Yang Liu <yintai@xiaohongshu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit c98725a)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
@wangyum
Copy link
Member

wangyum commented Apr 4, 2022

Merged to master and branch-3.3.

songzhxlh-max pushed a commit to songzhxlh-max/spark that referenced this pull request Oct 12, 2022
* [SPARK-32268][SQL] Row-level Runtime Filtering

This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs.
[Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details.

With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions.
With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions.

No

Added tests

Closes apache#35789 from somani/rf.

Lead-authored-by: Abhishek Somani <abhishek.somani@databricks.com>
Co-authored-by: Abhishek Somani <abh.somani@gmail.com>
Co-authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 1f4e4c8)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode

`Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260),  so the content of  `exception.getMessage` is different from that of non-ans mode.

This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent.

Bug fix.

No

- Pass GA
- Local Test

**Before**

```
export SPARK_ANSI_SQL_MODE=false
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 23 seconds, 537 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

```
export SPARK_ANSI_SQL_MODE=true
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
- Test that might_contain errors out non-constant Bloom filter *** FAILED ***
  "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch:
   cannot cast bigint to binary with ANSI mode on.
   If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false.
  ; line 2 pos 21;
  'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)]
  +- SubqueryAlias t
     +- LocalRelation [a#2424L]
  " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171)
```

**After**
```
export SPARK_ANSI_SQL_MODE=false
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 26 seconds, 544 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

```

```
export SPARK_ANSI_SQL_MODE=true
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 25 seconds, 289 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 7165123)
Signed-off-by: Yuming Wang <yumwang@ebay.com>

* [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter

Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`.

It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said.

This pr fixes the issue.

No, not released

Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path.

Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c0c52dd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter

Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query.

It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`:
```scala
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
  SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
  val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62"
  sql(query).explain()
}
```
The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it.

No, not released

Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join.

Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP.

Authored-by: Yang Liu <yintai@xiaohongshu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit c98725a)
Signed-off-by: Yuming Wang <yumwang@ebay.com>

* [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession

This PR proposes:
1. Use the function registry in the Spark Session being used
2. Move function registration into `beforeAll`

Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1.

No, test-only.

Unittests fixed.

Closes apache#36576 from HyukjinKwon/SPARK-32268-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c5351f8)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
songzhxlh-max added a commit to Kyligence/spark that referenced this pull request Oct 13, 2022
* [SPARK-39857][SQL] V2ExpressionBuilder uses the wrong LiteralValue data type for In predicate (#535)

### What changes were proposed in this pull request?
When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead.

### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
new test

Closes apache#37271 from huaxingao/inset.

Authored-by: huaxingao <huaxin_gao@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Co-authored-by: huaxingao <huaxin_gao@apple.com>

* [SPARK-32268][SQL] Row-level Runtime Filtering

* [SPARK-32268][SQL] Row-level Runtime Filtering

This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs.
[Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details.

With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions.
With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions.

No

Added tests

Closes apache#35789 from somani/rf.

Lead-authored-by: Abhishek Somani <abhishek.somani@databricks.com>
Co-authored-by: Abhishek Somani <abh.somani@gmail.com>
Co-authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 1f4e4c8)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode

`Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260),  so the content of  `exception.getMessage` is different from that of non-ans mode.

This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent.

Bug fix.

No

- Pass GA
- Local Test

**Before**

```
export SPARK_ANSI_SQL_MODE=false
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 23 seconds, 537 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

```
export SPARK_ANSI_SQL_MODE=true
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
- Test that might_contain errors out non-constant Bloom filter *** FAILED ***
  "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch:
   cannot cast bigint to binary with ANSI mode on.
   If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false.
  ; line 2 pos 21;
  'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)]
  +- SubqueryAlias t
     +- LocalRelation [a#2424L]
  " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171)
```

**After**
```
export SPARK_ANSI_SQL_MODE=false
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 26 seconds, 544 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

```

```
export SPARK_ANSI_SQL_MODE=true
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 25 seconds, 289 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 7165123)
Signed-off-by: Yuming Wang <yumwang@ebay.com>

* [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter

Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`.

It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said.

This pr fixes the issue.

No, not released

Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path.

Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c0c52dd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter

Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query.

It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`:
```scala
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
  SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
  val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62"
  sql(query).explain()
}
```
The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it.

No, not released

Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join.

Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP.

Authored-by: Yang Liu <yintai@xiaohongshu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit c98725a)
Signed-off-by: Yuming Wang <yumwang@ebay.com>

* [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession

This PR proposes:
1. Use the function registry in the Spark Session being used
2. Move function registration into `beforeAll`

Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1.

No, test-only.

Unittests fixed.

Closes apache#36576 from HyukjinKwon/SPARK-32268-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c5351f8)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

* KE-29673 add segment prune function for bloom runtime filter

fix min/max for UTF8String collection

valid the runtime filter if need when broadcast join is valid

* AL-6084 in Cast for method of canCast, when DecimalType cast to DoubleType add transformable logic (#542)

* AL-6084 in Cast for method of canCast, when DecimalType cast DecimalType to DoubleType add suit logical

Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Zhixiong Chen <chen@apache.org>
Co-authored-by: huaxingao <huaxin_gao@apple.com>
Co-authored-by: Bowen Song <bowen.song@kyligence.io>
leejaywei pushed a commit to Kyligence/spark that referenced this pull request Oct 18, 2022
* [SPARK-32268][SQL] Row-level Runtime Filtering

This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs.
[Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details.

With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions.
With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions.

No

Added tests

Closes apache#35789 from somani/rf.

Lead-authored-by: Abhishek Somani <abhishek.somani@databricks.com>
Co-authored-by: Abhishek Somani <abh.somani@gmail.com>
Co-authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 1f4e4c8)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode

`Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260),  so the content of  `exception.getMessage` is different from that of non-ans mode.

This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent.

Bug fix.

No

- Pass GA
- Local Test

**Before**

```
export SPARK_ANSI_SQL_MODE=false
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 23 seconds, 537 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

```
export SPARK_ANSI_SQL_MODE=true
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
- Test that might_contain errors out non-constant Bloom filter *** FAILED ***
  "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch:
   cannot cast bigint to binary with ANSI mode on.
   If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false.
  ; line 2 pos 21;
  'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)]
  +- SubqueryAlias t
     +- LocalRelation [a#2424L]
  " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171)
```

**After**
```
export SPARK_ANSI_SQL_MODE=false
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 26 seconds, 544 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

```

```
export SPARK_ANSI_SQL_MODE=true
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 25 seconds, 289 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 7165123)
Signed-off-by: Yuming Wang <yumwang@ebay.com>

* [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter

Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`.

It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said.

This pr fixes the issue.

No, not released

Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path.

Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit c0c52dd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter

Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query.

It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`:
```scala
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
  SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
  val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62"
  sql(query).explain()
}
```
The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it.

No, not released

Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join.

Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP.

Authored-by: Yang Liu <yintai@xiaohongshu.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit c98725a)
Signed-off-by: Yuming Wang <yumwang@ebay.com>

* [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession

This PR proposes:
1. Use the function registry in the Spark Session being used
2. Move function registration into `beforeAll`

Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1.

No, test-only.

Unittests fixed.

Closes apache#36576 from HyukjinKwon/SPARK-32268-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c5351f8)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
cloud-fan pushed a commit that referenced this pull request Nov 15, 2022
…ith in-subquery filter

### What changes were proposed in this pull request?

Apply ColumnPruning for in subquery filter.

Note that, the bloom filter side has already fixed by #36047

### Why are the changes needed?

The inferred in-subquery filter should apply ColumnPruning before get plan statistics and check if can be broadcasted. Otherwise, the final physical plan will be different from expected.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes #38619 from ulysses-you/SPARK-41112.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…ith in-subquery filter

### What changes were proposed in this pull request?

Apply ColumnPruning for in subquery filter.

Note that, the bloom filter side has already fixed by apache#36047

### Why are the changes needed?

The inferred in-subquery filter should apply ColumnPruning before get plan statistics and check if can be broadcasted. Otherwise, the final physical plan will be different from expected.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes apache#38619 from ulysses-you/SPARK-41112.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants