-
Notifications
You must be signed in to change notification settings - Fork 28.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-46946][SQL] Supporting broadcast of multiple filtering keys in…
… DynamicPruning ### What changes were proposed in this pull request? This PR extends `DynamicPruningSubquery` to support broadcasting of multiple filtering keys (instead of one as before). The majority of the PR is to simply generalise singularity to plurality. **Note:** We actually do not use the multiple filtering keys `DynamicPruningSubquery` in this PR, we are doing this to make supporting DPP Null Safe Equality or multiple Equality predicates easier in the future. In Null Safe Equality JOIN, the JOIN condition `a <=> b` is transformed to `Coalesce(key1, Literal(key1.dataType)) = Coalesce(key2, Literal(key2.dataType)) AND IsNull(key1) = IsNull(key2)`. In order to have the highest pruning efficiency, we broadcast the 2 keys `Coalesce(key, Literal(key.dataType))` and `IsNull(key)` and use them to prune the other side at the same time. Before, the `DynamicPruningSubquery` only has one broadcasting key and we only supports DPP for one `EqualTo` JOIN predicate, now we are extending the subquery to multiple broadcasting keys. Please note that DPP has not been supported for multiple JOIN predicates. Put it in another way, at the moment, we don't insert a DPP Filter for multiple JOIN predicates at the same time, only potentially insert a DPP Filter for a given Equality JOIN predicate. ### Why are the changes needed? To make supporting DPP Null Safe Equality or DPP multiple Equality predicates easier in the future. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44988 from longvu-db/multiple-broadcast-filtering-keys. Authored-by: Thang Long VU <long.vu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
- Loading branch information
Showing
9 changed files
with
138 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
89 changes: 89 additions & 0 deletions
89
...rc/test/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruningSubquerySuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.expressions | ||
|
||
import org.apache.spark.SparkFunSuite | ||
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} | ||
import org.apache.spark.sql.types.IntegerType | ||
|
||
class DynamicPruningSubquerySuite extends SparkFunSuite { | ||
private val pruningKeyExpression = Literal(1) | ||
|
||
private val validDynamicPruningSubquery = DynamicPruningSubquery( | ||
pruningKey = pruningKeyExpression, | ||
buildQuery = Project(Seq(AttributeReference("id", IntegerType)()), | ||
LocalRelation(AttributeReference("id", IntegerType)())), | ||
buildKeys = Seq(pruningKeyExpression), | ||
broadcastKeyIndices = Seq(0), | ||
onlyInBroadcast = false | ||
) | ||
|
||
test("pruningKey data type matches single buildKey") { | ||
val dynamicPruningSubquery = validDynamicPruningSubquery | ||
.copy(buildKeys = Seq(Literal(2023))) | ||
assert(dynamicPruningSubquery.resolved == true) | ||
} | ||
|
||
test("pruningKey data type is a Struct and matches with Struct buildKey") { | ||
val dynamicPruningSubquery = validDynamicPruningSubquery | ||
.copy(pruningKey = CreateStruct(Seq(Literal(1), Literal.FalseLiteral)), | ||
buildKeys = Seq(CreateStruct(Seq(Literal(2), Literal.TrueLiteral)))) | ||
assert(dynamicPruningSubquery.resolved == true) | ||
} | ||
|
||
test("multiple buildKeys but only one broadcastKeyIndex") { | ||
val dynamicPruningSubquery = validDynamicPruningSubquery | ||
.copy(buildKeys = Seq(Literal(0), Literal(2), Literal(0), Literal(9)), | ||
broadcastKeyIndices = Seq(1)) | ||
assert(dynamicPruningSubquery.resolved == true) | ||
} | ||
|
||
test("pruningKey data type does not match the single buildKey") { | ||
val dynamicPruningSubquery = validDynamicPruningSubquery.copy( | ||
pruningKey = Literal.TrueLiteral, | ||
buildKeys = Seq(Literal(2013))) | ||
assert(dynamicPruningSubquery.resolved == false) | ||
} | ||
|
||
test("pruningKey data type is a Struct but mismatch with Struct buildKey") { | ||
val dynamicPruningSubquery = validDynamicPruningSubquery | ||
.copy(pruningKey = CreateStruct(Seq(Literal(1), Literal.FalseLiteral)), | ||
buildKeys = Seq(CreateStruct(Seq(Literal.TrueLiteral, Literal(2))))) | ||
assert(dynamicPruningSubquery.resolved == false) | ||
} | ||
|
||
test("DynamicPruningSubquery should only have a single broadcasting key") { | ||
val dynamicPruningSubquery = validDynamicPruningSubquery | ||
.copy(buildKeys = Seq(Literal(2025), Literal(2), Literal(1809)), | ||
broadcastKeyIndices = Seq(0, 2)) | ||
assert(dynamicPruningSubquery.resolved == false) | ||
} | ||
|
||
test("duplicates in broadcastKeyIndices, and also should not be allowed") { | ||
val dynamicPruningSubquery = validDynamicPruningSubquery | ||
.copy(buildKeys = Seq(Literal(2)), | ||
broadcastKeyIndices = Seq(0, 0)) | ||
assert(dynamicPruningSubquery.resolved == false) | ||
} | ||
|
||
test("broadcastKeyIndex out of bounds") { | ||
val dynamicPruningSubquery = validDynamicPruningSubquery | ||
.copy(broadcastKeyIndices = Seq(1)) | ||
assert(dynamicPruningSubquery.resolved == false) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters