Skip to content

Commit

Permalink
[SPARK-34064][SQL] Cancel the running broadcast sub-jobs when SQL sta…
Browse files Browse the repository at this point in the history
…tement is cancelled

### What changes were proposed in this pull request?
#24595 introduced `private val runId: UUID = UUID.randomUUID` in `BroadcastExchangeExec` to cancel the broadcast execution in the Future when timeout happens. Since the runId is a random UUID instead of inheriting the job group id, when a SQL statement is cancelled, these broadcast sub-jobs are still executing. This PR uses the job group id of the outside thread as its `runId` to abort these broadcast sub-jobs when the SQL statement is cancelled.

### Why are the changes needed?
When broadcasting a table takes too long and the SQL statement is cancelled. However, the background Spark job is still running and it wastes resources.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test.
Since broadcasting a table is too fast to cancel in UT, but it is very easy to verify manually:
1. Start a Spark thrift-server with less resource in YARN.
2. When the driver is running but no executors are launched, submit a SQL which will broadcast tables from beeline.
3. Cancel the SQL in beeline

Without the patch, broadcast sub-jobs won't be cancelled.
![Screen Shot 2021-01-11 at 12 03 13 PM](https://user-images.githubusercontent.com/1853780/104150975-ab024b00-5416-11eb-8bf9-b5167bdad80a.png)

With this patch, broadcast sub-jobs will be cancelled.
![Screen Shot 2021-01-11 at 11 43 40 AM](https://user-images.githubusercontent.com/1853780/104150994-be151b00-5416-11eb-80ff-313d423c8a2e.png)

Closes #31119 from LantaoJin/SPARK-34064.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f1b21ba)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
LantaoJin authored and cloud-fan committed Jan 13, 2021
1 parent 6ee2618 commit 6414ba3
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.{ExecutionContext, Promise}
import scala.concurrent.duration.NANOSECONDS
import scala.util.control.NonFatal

import org.apache.spark.{broadcast, SparkException}
import org.apache.spark.{broadcast, SparkContext, SparkException}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -74,7 +74,10 @@ case class BroadcastExchangeExec(
child: SparkPlan) extends BroadcastExchangeLike {
import BroadcastExchangeExec._

override val runId: UUID = UUID.randomUUID
// Cancelling a SQL statement from Spark ThriftServer needs to cancel
// its related broadcast sub-jobs. So set the run id to job group id if exists.
override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
.map(UUID.fromString).getOrElse(UUID.randomUUID)

override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
Expand Down

0 comments on commit 6414ba3

Please sign in to comment.