From 21e5ec529e1df114e94f81ac8d4abfbd943f1a05 Mon Sep 17 00:00:00 2001 From: Yang Liu Date: Mon, 4 Apr 2022 11:03:20 +0800 Subject: [PATCH] [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter ### What changes were proposed in this pull request? Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. ### Why are the changes needed? It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true", SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62" sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. ### Does this PR introduce _any_ user-facing change? No, not released ### How was this patch tested? Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes #36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu Signed-off-by: Yuming Wang (cherry picked from commit c98725a2b9574ba3c9a10567af740db7467df59d) Signed-off-by: Yuming Wang --- .../catalyst/optimizer/InjectRuntimeFilter.scala | 3 ++- .../spark/sql/InjectRuntimeFilterSuite.scala | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 35d0189f64651..a69cda25ef4f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -85,7 +85,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J } val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = false, None) val alias = Alias(aggExp, "bloomFilter")() - val aggregate = ConstantFolding(Aggregate(Nil, Seq(alias), filterCreationSidePlan)) + val aggregate = + ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan))) val bloomFilterSubquery = ScalarSubquery(aggregate, Nil) val filter = BloomFilterMightContain(bloomFilterSubquery, new XxHash64(Seq(filterApplicationSideExp))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index 0da3667382c16..097a18cabd58c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -255,6 +255,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp planEnabled = sql(query).queryExecution.optimizedPlan checkAnswer(sql(query), expectedAnswer) if (shouldReplace) { + assert(!columnPruningTakesEffect(planEnabled)) assert(getNumBloomFilters(planEnabled) > getNumBloomFilters(planDisabled)) } else { assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled)) @@ -288,6 +289,20 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp numMightContains } + def columnPruningTakesEffect(plan: LogicalPlan): Boolean = { + def takesEffect(plan: LogicalPlan): Boolean = { + val result = org.apache.spark.sql.catalyst.optimizer.ColumnPruning.apply(plan) + !result.fastEquals(plan) + } + + plan.collectFirst { + case Filter(condition, _) if condition.collectFirst { + case subquery: org.apache.spark.sql.catalyst.expressions.ScalarSubquery + if takesEffect(subquery.plan) => true + }.nonEmpty => true + }.nonEmpty + } + def assertRewroteSemiJoin(query: String): Unit = { checkWithAndWithoutFeatureEnabled(query, testSemiJoin = true, shouldReplace = true) }