Skip to content

Commit

Permalink
[SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if …
Browse files Browse the repository at this point in the history
…applicable

### What changes were proposed in this pull request?
Based on a follow up comment in #28123, where we can coalesce buckets for shuffled hash join as well. The note here is we only coalesce the buckets from shuffled hash join stream side (i.e. the side not building hash map), so we don't need to worry about OOM when coalescing multiple buckets in one task for building hash map.

> If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.

Refactor existing physical plan rule `CoalesceBucketsInSortMergeJoin` to `CoalesceBucketsInJoin`, for covering shuffled hash join as well.
Refactor existing unit test `CoalesceBucketsInSortMergeJoinSuite` to `CoalesceBucketsInJoinSuite`, for covering shuffled hash join as well.

### Why are the changes needed?
Avoid shuffle for joining different bucketed tables, is also useful for shuffled hash join. In production, we are seeing users to use shuffled hash join to join bucketed tables (set `spark.sql.join.preferSortMergeJoin`=false, to avoid sort), and this can help avoid shuffle if number of buckets are not same.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added unit tests in `CoalesceBucketsInJoinSuite` for verifying shuffled hash join physical plan.

### Performance number per request from maropu

I was looking at TPCDS per suggestion from maropu. But I found most of queries from TPCDS are doing aggregate, and only several ones are doing join. None of input tables are bucketed. So I took the approach to test a modified version of `TPCDS q93` as

```
SELECT ss_ticket_number, sr_ticket_number
FROM store_sales
JOIN store_returns
ON ss_ticket_number = sr_ticket_number
```

And make `store_sales` and `store_returns` to be bucketed tables.

Physical query plan without coalesce:

```
ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft
:- Exchange hashpartitioning(ss_ticket_number#109L, 4), true, [id=#67]
:  +- *(1) Project [ss_ticket_number#109L]
:     +- *(1) Filter isnotnull(ss_ticket_number#109L)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2
+- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130]
   +- *(2) Filter isnotnull(sr_ticket_number#120L)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4
```

Physical query plan with coalesce:

```
ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft
:- *(1) Project [ss_ticket_number#109L]
:  +- *(1) Filter isnotnull(ss_ticket_number#109L)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2
+- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130]
   +- *(2) Filter isnotnull(sr_ticket_number#120L)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4 (Coalesced to 2)
```

Run time improvement as 50% of wall clock time:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join coalesce bucket off              1541           1664         106          1.9         535.1       1.0X
shuffle hash join coalesce bucket on               1060           1169          81          2.7         368.1       1.5X
```

Closes #29079 from c21/split-bucket.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
c21 authored and maropu committed Jul 21, 2020
1 parent 0432379 commit 39181ff
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2638,21 +2638,24 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
val COALESCE_BUCKETS_IN_JOIN_ENABLED =
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
.doc("When true, if two bucketed tables with the different number of buckets are joined, " +
"the side with a bigger number of buckets will be coalesced to have the same number " +
"of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
"and only when the bigger number of buckets is divisible by the smaller number of buckets.")
"of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
"number of buckets. Bucket coalescing is applied to sort-merge joins and " +
"shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " +
"in join, but it also reduces parallelism and could possibly cause OOM for " +
"shuffled hash join.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
"equal to this value for bucket coalescing to be applied. This configuration only " +
s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
.version("3.1.0")
.intConf
.checkValue(_ > 0, "The difference must be positive.")
Expand Down Expand Up @@ -3269,6 +3272,11 @@ class SQLConf extends Serializable with Logging {

def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)

def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)

def coalesceBucketsInJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInSortMergeJoin
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
Expand Down Expand Up @@ -332,7 +332,7 @@ object QueryExecution {
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Seq(
CoalesceBucketsInSortMergeJoin(sparkSession.sessionState.conf),
CoalesceBucketsInJoin(sparkSession.sessionState.conf),
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.bucketing

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

/**
* This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
* if the following conditions are met:
* - Two bucketed tables are joined.
* - Join keys match with output partition expressions on their respective sides.
* - The larger bucket number is divisible by the smaller bucket number.
* - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
* - The ratio of the number of buckets is less than the value set in
* COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
*/
case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
private def updateNumCoalescedBucketsInScan(
plan: SparkPlan,
numCoalescedBuckets: Int): SparkPlan = {
plan transformUp {
case f: FileSourceScanExec =>
f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
}
}

private def updateNumCoalescedBuckets(
join: BaseJoinExec,
numLeftBuckets: Int,
numRightBucket: Int,
numCoalescedBuckets: Int): BaseJoinExec = {
if (numCoalescedBuckets != numLeftBuckets) {
val leftCoalescedChild =
updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
join match {
case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
}
} else {
val rightCoalescedChild =
updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
join match {
case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
}
}
}

private def isCoalesceSHJStreamSide(
join: ShuffledHashJoinExec,
numLeftBuckets: Int,
numRightBucket: Int,
numCoalescedBuckets: Int): Boolean = {
if (numCoalescedBuckets == numLeftBuckets) {
join.buildSide != BuildRight
} else {
join.buildSide != BuildLeft
}
}

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.coalesceBucketsInJoinEnabled) {
return plan
}

plan transform {
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
conf.coalesceBucketsInJoinMaxBucketRatio =>
val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
join match {
case j: SortMergeJoinExec =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
case j: ShuffledHashJoinExec
// Only coalesce the buckets for shuffled hash join stream side,
// to avoid OOM for build side.
if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
case other => other
}
case other => other
}
}
}

/**
* An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
* where both sides of the join have the bucketed tables,
* are consisted of only the scan operation,
* and numbers of buckets are not equal but divisible.
*/
object ExtractJoinWithBuckets {
@tailrec
private def hasScanOperation(plan: SparkPlan): Boolean = plan match {
case f: FilterExec => hasScanOperation(f.child)
case p: ProjectExec => hasScanOperation(p.child)
case _: FileSourceScanExec => true
case _ => false
}

private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
plan.collectFirst {
case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
f.optionalNumCoalescedBuckets.isEmpty =>
f.relation.bucketSpec.get
}
}

/**
* The join keys should match with expressions for output partitioning. Note that
* the ordering does not matter because it will be handled in `EnsureRequirements`.
*/
private def satisfiesOutputPartitioning(
keys: Seq[Expression],
partitioning: Partitioning): Boolean = {
partitioning match {
case HashPartitioning(exprs, _) if exprs.length == keys.length =>
exprs.forall(e => keys.exists(_.semanticEquals(e)))
case _ => false
}
}

private def isApplicable(j: BaseJoinExec): Boolean = {
(j.isInstanceOf[SortMergeJoinExec] ||
j.isInstanceOf[ShuffledHashJoinExec]) &&
hasScanOperation(j.left) &&
hasScanOperation(j.right) &&
satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
}

private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = {
val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2))
// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller
// number of buckets because bucket id is calculated by modding the total number of buckets.
numBuckets1 != numBuckets2 && large % small == 0
}

def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = {
plan match {
case j: BaseJoinExec if isApplicable(j) =>
val leftBucket = getBucketSpec(j.left)
val rightBucket = getBucketSpec(j.right)
if (leftBucket.isDefined && rightBucket.isDefined &&
isDivisible(leftBucket.get.numBuckets, rightBucket.get.numBuckets)) {
Some(j, leftBucket.get.numBuckets, rightBucket.get.numBuckets)
} else {
None
}
case _ => None
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
test("Coalesced bucket info should be a part of explain string") {
withTable("t1", "t2") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1")
Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2")
val df1 = spark.table("t1")
Expand Down
Loading

0 comments on commit 39181ff

Please sign in to comment.