Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions #30164

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
68fc364
[SPARK-32919][CORE] Driver side changes for coordinating push based s…
venkata91 Aug 6, 2020
2688df2
Empty commit to add Min Shen to authors list
Victsm Oct 27, 2020
0423970
Addressed some of the comments both from Mridul and Thomas
venkata91 Nov 2, 2020
eb93fe1
Address style check issues
venkata91 Nov 3, 2020
f2f61e2
Address style checks and Mridul's comment
venkata91 Nov 3, 2020
3a6219f
Address Tom's comment
venkata91 Nov 3, 2020
ca44d03
Prefer active executors for merger locations
venkata91 Nov 5, 2020
2172f65
Addressed comments from ngone51 and otterc
venkata91 Nov 5, 2020
1e76824
Address ngone51 review comments
venkata91 Nov 6, 2020
3e03109
Address Mridul's review comments
venkata91 Nov 9, 2020
cbb66eb
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 9, 2020
a1c8831
Addressed review comments
venkata91 Nov 10, 2020
047ad0c
Address review comments
venkata91 Nov 11, 2020
2d6d266
Address ngone51 review comments
venkata91 Nov 12, 2020
2b0c073
pick hosts in random order and remove host from shuffle push merger if a
venkata91 Nov 12, 2020
9ba4dfb
Addressed ngone51 comments
venkata91 Nov 13, 2020
5127d8b
Address review comments
venkata91 Nov 15, 2020
e320ac0
Address attila comments
venkata91 Nov 17, 2020
a2d85ef
Addressed ngone51 review comment
venkata91 Nov 17, 2020
affa8a0
Address attilapiros review comments
venkata91 Nov 17, 2020
46f5670
Add test in UtilsSuite to check push based shuffle is enabled or not
venkata91 Nov 18, 2020
2467a61
Merge branch 'upstream-master' into upstream-SPARK-32919
venkata91 Nov 18, 2020
050a5ae
Address Dongjoon comments
venkata91 Nov 18, 2020
1714829
Merge branch 'upstream-master' into upstream-SPARK-32919
venkata91 Nov 18, 2020
1ba7668
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 19, 2020
5ce2934
Merge remote-tracking branch 'upstream/master' into upstream-SPARK-32919
venkata91 Nov 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -95,6 +97,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)

// By default, shuffle merge is enabled for ShuffleDependency if push based shuffle is enabled
private[spark] var _shuffleMergeEnabled =
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf)

def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
_shuffleMergeEnabled = shuffleMergeEnabled
}

def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled
venkata91 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Stores the location of the list of chosen external shuffle services for handling the
* shuffle merge requests from mappers in this shuffle map stage.
*/
private[spark] var _mergerLocs: Seq[BlockManagerId] = Nil
venkata91 marked this conversation as resolved.
Show resolved Hide resolved

def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
if (mergerLocs != null && mergerLocs.length > 0) {
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
_mergerLocs = mergerLocs
}
}

def getMergerLocs: Seq[BlockManagerId] = _mergerLocs

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1927,4 +1927,34 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)

private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.push.enabled")
.doc("Set to 'true' to enable push based shuffle")
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
.booleanConf
.createWithDefault(false)

private[spark] val MAX_MERGER_LOCATIONS_CACHED =
ConfigBuilder("spark.shuffle.push.retainedMergerLocations")
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
.doc("Maximum number of shuffle push mergers locations cached for push based shuffle." +
"Currently Shuffle push merger locations are nothing but shuffle services where an" +
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
"executor is launched in the case of Push based shuffle.")
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
.intConf
.createWithDefault(500)

private[spark] val MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
.doc("Minimum percentage of shuffle push mergers locations required to enable push based" +
"shuffle for the stage with respect to number of partitions of the child stage. This is" +
" the number of unique Node Manager locations needed to enable push based shuffle.")
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
.doubleConf
.createWithDefault(0.05)

private[spark] val MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
.doc("Minimum static number of of shuffle push mergers locations should be available in" +
" order to enable push based shuffle for a stage. Note this config works in" +
" conjunction with spark.shuffle.push.mergersMinThresholdRatio")
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
.doubleConf
.createWithDefault(5)
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
}
34 changes: 34 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ private[spark] class DAGScheduler(
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf)
venkata91 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}

/**
* If push based shuffle is enabled, set the shuffle services to be used for the given
* shuffle map stage for block push/merge.
*
* Even with DRA kicking in and significantly reducing the number of available active
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
* executors, we would still be able to get sufficient shuffle service locations for
* block push/merge by getting the historical locations of past executors.
*/
private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) {
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: disable shuffle merge for the retry/reuse cases
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
logDebug(s"List of shuffle push merger locations " +
s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
venkata91 marked this conversation as resolved.
Show resolved Hide resolved

if (mergerLocs.nonEmpty) {
stage.shuffleDep.setMergerLocs(mergerLocs)
logInfo("Shuffle merge enabled for %s (%s) with %d merger locations"
.format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
} else {
stage.shuffleDep.setShuffleMergeEnabled(false)
logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name))
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
}
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand Down Expand Up @@ -1281,6 +1309,12 @@ private[spark] class DAGScheduler(
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
// Only generate merger location for a given shuffle dependency once. This way, even if
// this stage gets retried, it would still be merging blocks using the same set of
// shuffle services.
if (s.shuffleDep.shuffleMergeEnabled) {
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
prepareShuffleServicesForShuffleMapStage(s)
}
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.BlockManagerId

/**
* A backend interface for scheduling systems that allows plugging in different ones under
Expand Down Expand Up @@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend {
*/
def maxNumConcurrentTasks(rp: ResourceProfile): Int

/**
* Get the list of both active and dead executors host locations for push based shuffle
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
*
* Currently push based shuffle is disabled for both stage retry and stage reuse cases
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
* (for eg: in the case where few partitions are lost due to failure). Hence this method
* should be invoked only once for a ShuffleDependency.
* @return List of external shuffle services locations
*/
def getShufflePushMergerLocations(
numPartitions: Int,
resourceProfileId: Int): Seq[BlockManagerId] = Nil

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.Future
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.storage.BlockManagerMessages.{GetShufflePushMergerLocations, _}
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.util.{RpcUtils, ThreadUtils}

private[spark]
Expand Down Expand Up @@ -125,6 +125,17 @@ class BlockManagerMaster(
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

/**
* Get list of unique shuffle service locations where an executor is successfully
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
* registered in the past for block push/merge with push based shuffle.
*/
def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
}

def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ class BlockManagerMasterEndpoint(
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

// Mapping from host name to shuffle (mergers) services where the current app
// registered an executor in the past. Older hosts are removed when the
// maxRetainedMergerLocations size is reached in favor of newer locations.
private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]()
venkata91 marked this conversation as resolved.
Show resolved Hide resolved

// Maximum number of merger locations to cache
private val maxRetainedMergerLocations = conf.get(config.MAX_MERGER_LOCATIONS_CACHED)

private val askThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
Expand Down Expand Up @@ -139,6 +147,9 @@ class BlockManagerMasterEndpoint(
case GetBlockStatus(blockId, askStorageEndpoints) =>
context.reply(blockStatus(blockId, askStorageEndpoints))

case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>
context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))

case IsExecutorAlive(executorId) =>
context.reply(blockManagerIdByExecutor.contains(executorId))

Expand Down Expand Up @@ -360,6 +371,17 @@ class BlockManagerMasterEndpoint(

}

private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
if (!shuffleMergerLocations.contains(blockManagerId.host) && !blockManagerId.isDriver) {
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
val shuffleServerId = BlockManagerId(blockManagerId.executorId, blockManagerId.host,
StorageUtils.externalShuffleServicePort(conf))
if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
shuffleMergerLocations -= shuffleMergerLocations.head._1
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
}
shuffleMergerLocations(shuffleServerId.host) = shuffleServerId
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
}
}

private def removeExecutor(execId: String): Unit = {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
Expand Down Expand Up @@ -526,6 +548,8 @@ class BlockManagerMasterEndpoint(

blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)

addMergerLocation(id)
Copy link
Member

@Ngone51 Ngone51 Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about only adding the merger location when we know there're no active executors on it. Since we'd prefer active executor locations first now, I think it's almost a redundant copy of the active executor locations normally.

The idea is we could add it when we find there're no active executors after removeExecutor and remove it when there're new executors register on the same host. This's definitely helpful for static resource allocation, although I'm a little bit hesitant about the possible overhead for dynamic resource allocation.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a copy, but a materialized view of candidate hosts where external shuffle has been configured for the current application. It becomes a copy only when there is only 1 executor per host.
The cardinality of this map is, btw, low in comparison to total executors - given multi tenancy and given maxRetainedMergerLocations threshold.

venkata91 marked this conversation as resolved.
Show resolved Hide resolved
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
Expand Down Expand Up @@ -657,6 +681,13 @@ class BlockManagerMasterEndpoint(
}
}

private def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
val mergers = shuffleMergerLocations.values.filterNot(x => hostsToFilter.contains(x.host)).toSeq
mergers.take(numMergersNeeded)
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns an [[RpcEndpointRef]] of the [[BlockManagerReplicaEndpoint]] for sending RPC messages.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,8 @@ private[spark] object BlockManagerMessages {
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster

case class GetShufflePushMergerLocations(numMergersNeeded: Int, hostsToFilter: Set[String])
extends ToBlockManagerMaster

}
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.nio.channels.{Channels, FileChannel, WritableByteChannel}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.SecureRandom
import java.util.{Arrays, Locale, Properties, Random, UUID}
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.zip.GZIPInputStream
Expand Down Expand Up @@ -2541,6 +2541,15 @@ private[spark] object Utils extends Logging {
master == "local" || master.startsWith("local[")
}

/**
* Push based shuffle can only be enabled when external shuffle service is enabled.
* In the initial version, we cannot support pushed based shuffle and adaptive execution
* at the same time. Will improve this in a later version.
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
*/
def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
conf.get(PUSH_BASED_SHUFFLE_ENABLED) && conf.get(SHUFFLE_SERVICE_ENABLED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I look forward to the day when the second condition will be disabled :-)
It will be relevant for both k8s and spark streaming !

+CC @dongjoon-hyun you might be interested in this in future.

}

/**
* Return whether dynamic allocation is enabled in the given conf.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1974,7 +1974,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}

class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
test("mergerLocations should be bounded with in" +
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
" spark.shuffle.push.retainedMergerLocations") {
assert(master.getShufflePushMergerLocations(10, Set.empty).isEmpty)
makeBlockManager(100, "execA",
transferService = Some(new MockBlockTransferService(10, "hostA")))
makeBlockManager(100, "execB",
transferService = Some(new MockBlockTransferService(10, "hostB")))
makeBlockManager(100, "execC",
transferService = Some(new MockBlockTransferService(10, "hostC")))
makeBlockManager(100, "execD",
transferService = Some(new MockBlockTransferService(10, "hostD")))
makeBlockManager(100, "execE",
transferService = Some(new MockBlockTransferService(10, "hostA")))
assert(master.getShufflePushMergerLocations(10, Set.empty).size == 4)
assert(master.getShufflePushMergerLocations(10, Set.empty)
.exists(x => Seq("hostC", "hostD", "hostA", "hostB").contains(x.host)))
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
assert(master.getShufflePushMergerLocations(10, Set("hostB")).size == 3)
}

class MockBlockTransferService(
val maxFailures: Int,
hostname: String = "MockBlockTransferServiceHost") extends BlockTransferService {
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
var numCalls = 0
var tempFileManager: DownloadFileManager = null

Expand All @@ -1992,7 +2013,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE

override def close(): Unit = {}

override def hostName: String = { "MockBlockTransferServiceHost" }
override def hostName: String = { hostname }

override def port: Int = { 63332 }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler.cluster

import java.util.EnumSet
import java.util.concurrent.atomic.{AtomicBoolean}
import java.util.concurrent.atomic.AtomicBoolean
import javax.servlet.DispatcherType

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -29,14 +29,15 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}

import org.apache.spark.SparkContext
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.internal.config.UI._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{RpcUtils, ThreadUtils}
import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}

/**
* Abstract Yarn scheduler backend that contains common logic
Expand Down Expand Up @@ -80,6 +81,16 @@ private[spark] abstract class YarnSchedulerBackend(
/** Attempt ID. This is unset for client-mode schedulers */
private var attemptId: Option[ApplicationAttemptId] = None

private val blockManagerMaster: BlockManagerMaster = sc.env.blockManager.master

private val minMergersThresholdRatio = conf.get(config.MERGER_LOCATIONS_MIN_THRESHOLD_RATIO)

private val minMergersStaticThreshold = conf.get(config.MERGER_LOCATIONS_MIN_STATIC_THRESHOLD)

private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)

private val numExecutors = conf.get(EXECUTOR_INSTANCES).getOrElse(0)

/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
Expand Down Expand Up @@ -161,6 +172,32 @@ private[spark] abstract class YarnSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}

override def getShufflePushMergerLocations(
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
numPartitions: Int,
resourceProfileId: Int): Seq[BlockManagerId] = {
// Currently this is naive way of calculating numMergersNeeded for a stage. In future,
// we can use better heuristics to calculate numMergersNeeded for a stage.
val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) {
maxNumExecutors
} else {
numExecutors
}
val tasksPerExecutor = sc.resourceProfileManager
.resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf)
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
val numMergersNeeded = math.min(
math.max(1, math.ceil(numPartitions / tasksPerExecutor).toInt), maxExecutors)
val minMergersThreshold = math.max(minMergersStaticThreshold,
math.floor(numMergersNeeded * minMergersThresholdRatio).toInt)
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
val mergerLocations = blockManagerMaster
.getShufflePushMergerLocations(numMergersNeeded, scheduler.nodeBlacklist())
logDebug(s"Num merger locations available ${mergerLocations.length}")
venkata91 marked this conversation as resolved.
Show resolved Hide resolved
if (mergerLocations.size < numMergersNeeded && mergerLocations.size < minMergersThreshold) {
Seq.empty[BlockManagerId]
} else {
mergerLocations
}
}

/**
* Add filters to the SparkUI.
*/
Expand Down