spark YarnAllocator 源码
spark YarnAllocator 代码
文件路径:/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import java.util.LinkedHashMap
import java.util.Map.Entry
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.control.NonFatal
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{DecommissionExecutorsOnHost, RemoveExecutor, RetrieveLastAllocatedExecutorId}
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
* what to do with containers when YARN fulfills these requests.
*
* This class makes use of YARN's AMRMClient APIs. We interact with the AMRMClient in three ways:
* * Making our resource needs known, which updates local bookkeeping about containers requested.
* * Calling "allocate", which syncs our local container requests with the RM, and returns any
* containers that YARN has granted to us. This also functions as a heartbeat.
* * Processing the containers granted to us to possibly launch executors inside of them.
*
* The public methods of this class are thread-safe. All methods that mutate state are
* synchronized.
*/
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
extends Logging {
import YarnAllocator._
// Visible for testing.
@GuardedBy("this")
val allocatedHostToContainersMapPerRPId =
new HashMap[Int, HashMap[String, collection.mutable.Set[ContainerId]]]
@GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]
// Containers that we no longer care about. We've either already told the RM to release them or
// will on the next heartbeat. Containers get removed from this map after the RM tells us they've
// completed.
@GuardedBy("this")
private val releasedContainers = collection.mutable.HashSet[ContainerId]()
@GuardedBy("this")
private val runningExecutorsPerResourceProfileId = new HashMap[Int, mutable.Set[String]]()
@GuardedBy("this")
private val numExecutorsStartingPerResourceProfileId = new HashMap[Int, AtomicInteger]
@GuardedBy("this")
private val targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int]
// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
// was lost.
@GuardedBy("this")
private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
// Maintain loss reasons for already released executors, it will be added when executor loss
// reason is got from AM-RM call, and be removed after querying this loss reason.
@GuardedBy("this")
private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
@GuardedBy("this")
private[yarn] val executorIdToContainer = new HashMap[String, Container]
@GuardedBy("this")
private var numUnexpectedContainerRelease = 0L
@GuardedBy("this")
private val containerIdToExecutorIdAndResourceProfileId = new HashMap[ContainerId, (String, Int)]
// Use a ConcurrentHashMap because this is used in matchContainerToRequest, which is called
// from the rack resolver thread where synchronize(this) on this would cause a deadlock.
@GuardedBy("ConcurrentHashMap")
private[yarn] val rpIdToYarnResource = new ConcurrentHashMap[Int, Resource]()
// note currently we don't remove ResourceProfiles
@GuardedBy("this")
private[yarn] val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
// A map of ResourceProfile id to a map of preferred hostname and possible
// task numbers running on it.
@GuardedBy("this")
private var hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]] =
Map(DEFAULT_RESOURCE_PROFILE_ID -> Map.empty)
// ResourceProfile Id to number of tasks that have locality preferences in active stages
@GuardedBy("this")
private[yarn] var numLocalityAwareTasksPerResourceProfileId: Map[Int, Int] =
Map(DEFAULT_RESOURCE_PROFILE_ID -> 0)
/**
* Used to generate a unique ID per executor
*
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
* the id of new executor will start from 1, this will conflict with the executor has
* already created before. So, we should initialize the `executorIdCounter` by getting
* the max executorId from driver.
*
* And this situation of executorId conflict is just in yarn client mode, so this is an issue
* in yarn client mode. For more details, can check in jira.
*
* @see SPARK-12864
*/
@GuardedBy("this")
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
private[spark] val failureTracker = new FailureTracker(sparkConf, clock)
private val allocatorNodeHealthTracker =
new YarnAllocatorNodeHealthTracker(sparkConf, amClient, failureTracker)
private val isPythonApp = sparkConf.get(IS_PYTHON_APP)
private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
"ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
// For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
private val resourceNameMapping = ResourceRequestHelper.getResourceNameMapping(sparkConf)
// A container placement strategy based on pending tasks' locality preference
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver)
private val isYarnExecutorDecommissionEnabled: Boolean = {
(sparkConf.get(DECOMMISSION_ENABLED),
sparkConf.get(SHUFFLE_SERVICE_ENABLED)) match {
case (true, false) => true
case (true, true) =>
logWarning(s"Yarn Executor Decommissioning is supported only " +
s"when ${SHUFFLE_SERVICE_ENABLED.key} is set to false. See: SPARK-39018.")
false
case (false, _) => false
}
}
private val decommissioningNodesCache = new LinkedHashMap[String, Boolean]() {
override def removeEldestEntry(entry: Entry[String, Boolean]): Boolean = {
size() > DECOMMISSIONING_NODES_CACHE_SIZE
}
}
// The default profile is always present so we need to initialize the datastructures keyed by
// ResourceProfile id to ensure its present if things start running before a request for
// executors could add it. This approach is easier then going and special casing everywhere.
private def initDefaultProfile(): Unit = synchronized {
allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) =
new HashMap[String, mutable.Set[ContainerId]]()
runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, mutable.HashSet[String]())
numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = new AtomicInteger(0)
val initTargetExecNum = SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initTargetExecNum
val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
createYarnResourceForResourceProfile(defaultProfile)
}
initDefaultProfile()
def getNumExecutorsRunning: Int = synchronized {
runningExecutorsPerResourceProfileId.values.map(_.size).sum
}
def getNumLocalityAwareTasks: Int = synchronized {
numLocalityAwareTasksPerResourceProfileId.values.sum
}
def getNumExecutorsStarting: Int = synchronized {
numExecutorsStartingPerResourceProfileId.values.map(_.get()).sum
}
def getNumReleasedContainers: Int = synchronized {
releasedContainers.size
}
def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
def isAllNodeExcluded: Boolean = allocatorNodeHealthTracker.isAllNodeExcluded
/**
* A sequence of pending container requests that have not yet been fulfilled.
* ResourceProfile id -> pendingAllocate container request
*/
def getPendingAllocate: Map[Int, Seq[ContainerRequest]] = getPendingAtLocation(ANY_HOST)
def getNumContainersPendingAllocate: Int = synchronized {
getPendingAllocate.values.flatten.size
}
// YARN priorities are such that lower number is higher priority.
// We need to allocate a different priority for each ResourceProfile because YARN
// won't allow different container resource requirements within a Priority.
// We could allocate per Stage to make sure earlier stages get priority but Spark
// always finishes a stage before starting a later one and if we have 2 running in parallel
// the priority doesn't matter.
// We are using the ResourceProfile id as the priority.
private def getContainerPriority(rpId: Int): Priority = {
Priority.newInstance(rpId)
}
// The ResourceProfile id is the priority
private def getResourceProfileIdFromPriority(priority: Priority): Int = {
priority.getPriority()
}
private def getOrUpdateAllocatedHostToContainersMapForRPId(
rpId: Int): HashMap[String, collection.mutable.Set[ContainerId]] = synchronized {
allocatedHostToContainersMapPerRPId.getOrElseUpdate(rpId,
new HashMap[String, mutable.Set[ContainerId]]())
}
private def getOrUpdateRunningExecutorForRPId(rpId: Int): mutable.Set[String] = synchronized {
runningExecutorsPerResourceProfileId.getOrElseUpdate(rpId, mutable.HashSet[String]())
}
private def getOrUpdateNumExecutorsStartingForRPId(rpId: Int): AtomicInteger = synchronized {
numExecutorsStartingPerResourceProfileId.getOrElseUpdate(rpId, new AtomicInteger(0))
}
private def getOrUpdateTargetNumExecutorsForRPId(rpId: Int): Int = synchronized {
targetNumExecutorsPerResourceProfileId.getOrElseUpdate(rpId,
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf))
}
/**
* A sequence of pending container requests at the given location for each ResourceProfile id
* that have not yet been fulfilled.
*/
private def getPendingAtLocation(
location: String): Map[Int, Seq[ContainerRequest]] = synchronized {
val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]]
rpIdToResourceProfile.keys.foreach { id =>
val profResource = rpIdToYarnResource.get(id)
val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource)
.asScala.flatMap(_.asScala)
allContainerRequests(id) = result.toSeq
}
allContainerRequests.toMap
}
// if a ResourceProfile hasn't been seen yet, create the corresponding YARN Resource for it
private def createYarnResourceForResourceProfile(rp: ResourceProfile): Unit = synchronized {
if (!rpIdToYarnResource.contains(rp.id)) {
// track the resource profile if not already there
getOrUpdateRunningExecutorForRPId(rp.id)
logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
val resourcesWithDefaults =
ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources,
memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping)
val customSparkResources =
resourcesWithDefaults.customResources.map { case (name, execReq) =>
(name, execReq.amount.toString)
}
// There is a difference in the way custom resources are handled between
// the base default profile and custom ResourceProfiles. To allow for the user
// to request YARN containers with extra resources without Spark scheduling on
// them, the user can specify resources via the <code>spark.yarn.executor.resource.</code>
// config. Those configs are only used in the base default profile though and do
// not get propogated into any other custom ResourceProfiles. This is because
// there would be no way to remove them if you wanted a stage to not have them.
// This results in your default profile getting custom resources defined in
// <code>spark.yarn.executor.resource.</code> plus spark defined resources of
// GPU or FPGA. Spark converts GPU and FPGA resources into the YARN built in
// types <code>yarn.io/gpu</code>) and <code>yarn.io/fpga</code>, but does not
// know the mapping of any other resources. Any other Spark custom resources
// are not propogated to YARN for the default profile. So if you want Spark
// to schedule based off a custom resource and have it requested from YARN, you
// must specify it in both YARN (<code>spark.yarn.{driver/executor}.resource.</code>)
// and Spark (<code>spark.{driver/executor}.resource.</code>) configs. Leave the Spark
// config off if you only want YARN containers with the extra resources but Spark not to
// schedule using them. Now for custom ResourceProfiles, it doesn't currently have a way
// to only specify YARN resources without Spark scheduling off of them. This means for
// custom ResourceProfiles we propogate all the resources defined in the ResourceProfile
// to YARN. We still convert GPU and FPGA to the YARN build in types as well. This requires
// that the name of any custom resources you specify match what they are defined as in YARN.
val customResources = if (rp.id == DEFAULT_RESOURCE_PROFILE_ID) {
val gpuResource = sparkConf.get(YARN_GPU_DEVICE)
val fpgaResource = sparkConf.get(YARN_FPGA_DEVICE)
getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
customSparkResources.filterKeys { r =>
(r == gpuResource || r == fpgaResource)
}
} else {
customSparkResources
}
assert(resourcesWithDefaults.cores.nonEmpty)
val resource = Resource.newInstance(
resourcesWithDefaults.totalMemMiB.toInt, resourcesWithDefaults.cores.get)
ResourceRequestHelper.setResourceRequests(customResources, resource)
logDebug(s"Created resource capability: $resource")
rpIdToYarnResource.putIfAbsent(rp.id, resource)
rpIdToResourceProfile(rp.id) = rp
}
}
/**
* Request as many executors from the ResourceManager as needed to reach the desired total. If
* the requested total is smaller than the current number of running executors, no executors will
* be killed.
* @param resourceProfileToTotalExecs total number of containers requested for each
* ResourceProfile
* @param numLocalityAwareTasksPerResourceProfileId number of locality aware tasks for each
* ResourceProfile id to be used as container
* placement hint.
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts for each
* ResourceProfile id to be used as container placement hint.
* @param excludedNodes excluded nodes, which is passed in to avoid allocating new containers
* on them. It will be used to update the applications excluded node list.
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutorsWithPreferredLocalities(
resourceProfileToTotalExecs: Map[ResourceProfile, Int],
numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]],
excludedNodes: Set[String]): Boolean = synchronized {
this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
this.hostToLocalTaskCountPerResourceProfileId = hostToLocalTaskCountPerResourceProfileId
val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
createYarnResourceForResourceProfile(rp)
if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
logInfo(s"Driver requested a total number of $numExecs executor(s) " +
s"for resource profile id: ${rp.id}.")
targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes)
true
} else {
false
}
}
res.exists(_ == true)
}
/**
* Request that the ResourceManager release the container running the specified executor.
*/
def killExecutor(executorId: String): Unit = synchronized {
executorIdToContainer.get(executorId) match {
case Some(container) if !releasedContainers.contains(container.getId) =>
val (_, rpId) = containerIdToExecutorIdAndResourceProfileId(container.getId)
internalReleaseContainer(container)
getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
}
}
/**
* Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
* equal to maxExecutors.
*
* Deal with any containers YARN has granted to us by possibly launching executors in them.
*
* This must be synchronized because variables read in this method are mutated by other methods.
*/
def allocateResources(): Unit = synchronized {
updateResourceRequests()
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// requests.
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
allocatorNodeHealthTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
if (isYarnExecutorDecommissionEnabled) {
handleNodesInDecommissioningState(allocateResponse)
}
if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
getNumExecutorsRunning,
getNumExecutorsStarting,
allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala.toSeq)
}
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala.toSeq)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, getNumExecutorsRunning))
}
}
private def handleNodesInDecommissioningState(allocateResponse: AllocateResponse): Unit = {
// Some of the nodes are put in decommissioning state where RM did allocate
// resources on those nodes for earlier allocateResource calls, so notifying driver
// to put those executors in decommissioning state
allocateResponse.getUpdatedNodes.asScala.filter (node =>
// SPARK-39491: Hadoop 2.7 does not support `NodeState.DECOMMISSIONING`,
// there change to use string comparison instead for compilation.
// Should revert to `node.getNodeState == NodeState.DECOMMISSIONING` when
// Hadoop 2.7 is no longer supported.
node.getNodeState.toString.equals("DECOMMISSIONING") &&
!decommissioningNodesCache.containsKey(getHostAddress(node)))
.foreach { node =>
val host = getHostAddress(node)
driverRef.send(DecommissionExecutorsOnHost(host))
decommissioningNodesCache.put(host, true)
}
}
private def getHostAddress(nodeReport: NodeReport): String = nodeReport.getNodeId.getHost
/**
* Update the set of container requests that we will sync with the RM based on the number of
* executors we have currently running and our target number of executors for each
* ResourceProfile.
*
* Visible for testing.
*/
def updateResourceRequests(): Unit = synchronized {
val pendingAllocatePerResourceProfileId = getPendingAllocate
val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case (rpId, targetNum) =>
val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get
val pending = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty).size
val running = getOrUpdateRunningExecutorForRPId(rpId).size
logDebug(s"Updating resource requests for ResourceProfile id: $rpId, target: " +
s"$targetNum, pending: $pending, running: $running, executorsStarting: $starting")
(rpId, targetNum - pending - running - starting)
}.toMap
missingPerProfile.foreach { case (rpId, missing) =>
val hostToLocalTaskCount =
hostToLocalTaskCountPerResourceProfileId.getOrElse(rpId, Map.empty)
val pendingAllocate = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty)
val numPendingAllocate = pendingAllocate.size
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCount, pendingAllocate)
if (missing > 0) {
val resource = rpIdToYarnResource.get(rpId)
if (log.isInfoEnabled()) {
var requestContainerMessage = s"Will request $missing executor container(s) for " +
s" ResourceProfile Id: $rpId, each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory."
if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) {
requestContainerMessage ++= s" with custom resources: " + resource.toString
}
logInfo(requestContainerMessage)
}
// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
if (cancelledContainers > 0) {
logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
}
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
// to maximize locality, include requests with no locality preference that can be cancelled
val potentialContainers = availableContainers + anyHostRequests.size
val allocatedHostToContainer = getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
val numLocalityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(rpId, 0)
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCount,
allocatedHostToContainer, localRequests, rpIdToResourceProfile(rpId))
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests += createContainerRequest(resource, nodes, racks, rpId)
case _ =>
}
if (availableContainers >= newLocalityRequests.size) {
// more containers are available than needed for locality, fill in requests for any host
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
newLocalityRequests += createContainerRequest(resource, null, null, rpId)
}
} else {
val numToCancel = newLocalityRequests.size - availableContainers
// cancel some requests without locality preferences to schedule more local containers
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
if (numToCancel > 0) {
logInfo(s"Canceled $numToCancel unlocalized container requests to " +
s"resubmit with locality")
}
}
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
}
if (log.isInfoEnabled()) {
val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
if (anyHost.nonEmpty) {
logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
}
localized.foreach { request =>
logInfo(s"Submitted container request for host ${hostStr(request)}.")
}
}
} else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new " +
s"desired total ${getOrUpdateTargetNumExecutorsForRPId(rpId)} executors.")
// cancel pending allocate requests by taking locality preference into account
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
cancelRequests.foreach(amClient.removeContainerRequest)
}
}
}
def stop(): Unit = {
// Forcefully shut down the launcher pool, in case this is being called in the middle of
// container allocation. This will prevent queued executors from being started - and
// potentially interrupt active ExecutorRunnable instances too.
launcherPool.shutdownNow()
}
private def hostStr(request: ContainerRequest): String = {
Option(request.getNodes) match {
case Some(nodes) => nodes.asScala.mkString(",")
case None => "Any"
}
}
/**
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.
*/
private def createContainerRequest(
resource: Resource,
nodes: Array[String],
racks: Array[String],
rpId: Int): ContainerRequest = {
new ContainerRequest(resource, nodes, racks, getContainerPriority(rpId),
true, labelExpression.orNull)
}
/**
* Handle containers granted by the RM by launching executors on them.
*
* Due to the way the YARN allocation protocol works, certain healthy race conditions can result
* in YARN granting containers that we no longer need. In this case, we release them.
*
* Visible for testing.
*/
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// Match incoming requests by host
val remainingAfterHostMatches = new ArrayBuffer[Container]
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
// Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
// (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
// a separate thread to perform the operation.
val remainingAfterRackMatches = new ArrayBuffer[Container]
if (remainingAfterHostMatches.nonEmpty) {
var exception: Option[Throwable] = None
val thread = new Thread("spark-rack-resolver") {
override def run(): Unit = {
try {
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
} catch {
case e: Throwable =>
exception = Some(e)
}
}
}
thread.setDaemon(true)
thread.start()
try {
thread.join()
} catch {
case e: InterruptedException =>
thread.interrupt()
throw e
}
if (exception.isDefined) {
throw exception.get
}
}
// Assign remaining that are neither node-local nor rack-local
val remainingAfterOffRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterRackMatches) {
matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
remainingAfterOffRackMatches)
}
if (remainingAfterOffRackMatches.nonEmpty) {
logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
s"allocated to us")
for (container <- remainingAfterOffRackMatches) {
internalReleaseContainer(container)
}
}
runAllocatedContainers(containersToUse)
logInfo("Received %d containers from YARN, launching executors on %d of them."
.format(allocatedContainers.size, containersToUse.size))
}
/**
* Looks for requests for the given location that match the given container allocation. If it
* finds one, removes the request so that it won't be submitted again. Places the container into
* containersToUse or remaining.
*
* @param allocatedContainer container that was given to us by YARN
* @param location resource name, either a node, rack, or *
* @param containersToUse list of containers that will be used
* @param remaining list of containers that will not be used
*/
private def matchContainerToRequest(
allocatedContainer: Container,
location: String,
containersToUse: ArrayBuffer[Container],
remaining: ArrayBuffer[Container]): Unit = {
// Match on the exact resource we requested so there shouldn't be a mismatch,
// we are relying on YARN to return a container with resources no less then we requested.
// If we change this, or starting validating the container, be sure the logic covers SPARK-6050.
val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority)
val resourceForRP = rpIdToYarnResource.get(rpId)
logDebug(s"Calling amClient.getMatchingRequests with parameters: " +
s"priority: ${allocatedContainer.getPriority}, " +
s"location: $location, resource: $resourceForRP")
val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
resourceForRP)
// Match the allocation to a request
if (!matchingRequests.isEmpty) {
val containerRequest = matchingRequests.get(0).iterator.next
logDebug(s"Removing container request via AM client: $containerRequest")
amClient.removeContainerRequest(containerRequest)
containersToUse += allocatedContainer
} else {
remaining += allocatedContainer
}
}
/**
* Launches executors in the allocated containers.
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
for (container <- containersToUse) {
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId for ResourceProfile Id $rpId")
def updateInternalState(): Unit = synchronized {
getOrUpdateRunningExecutorForRPId(rpId).add(executorId)
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorIdAndResourceProfileId(container.getId) = (executorId, rpId)
val localallocatedHostToContainersMap = getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
val containerSet = localallocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
}
val rp = rpIdToResourceProfile(rpId)
val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf)
val containerMem = rp.executorResources.get(ResourceProfile.MEMORY).
map(_.amount).getOrElse(defaultResources.executorMemoryMiB).toInt
assert(defaultResources.cores.nonEmpty)
val defaultCores = defaultResources.cores.get
val containerCores = rp.getExecutorCores.getOrElse(defaultCores)
val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
if (launchContainers) {
launcherPool.execute(() => {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
containerMem,
containerCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
rp.id
).run()
updateInternalState()
} catch {
case e: Throwable =>
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
if (NonFatal(e)) {
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately
// to avoid unnecessary resource occupation.
amClient.releaseAssignedContainer(containerId)
} else {
throw e
}
}
})
} else {
// For test only
updateInternalState()
}
} else {
logInfo(("Skip launching executorRunnable as running executors count: %d " +
"reached target executors count: %d.").format(rpRunningExecs,
getOrUpdateTargetNumExecutorsForRPId(rpId)))
}
}
}
// Visible for testing.
private[yarn] def processCompletedContainers(
completedContainers: Seq[ContainerStatus]): Unit = synchronized {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
val (_, rpId) = containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId,
("", DEFAULT_RESOURCE_PROFILE_ID))
val alreadyReleased = releasedContainers.remove(containerId)
val hostOpt = allocatedContainerToHostMap.get(containerId)
val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
containerIdToExecutorIdAndResourceProfileId.get(containerId) match {
case Some((executorId, _)) =>
getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}")
}
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
completedContainer.getState,
completedContainer.getExitStatus))
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit.
val exitStatus = completedContainer.getExitStatus
val (exitCausedByApp, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS =>
(false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
"preemption) and not because of an error in the running job.")
case ContainerExitStatus.PREEMPTED =>
// Preemption is not the fault of the running tasks, since YARN preempts containers
// merely to do resource sharing, and tasks that fail due to preempted executors could
// just as easily finish on any other executor. See SPARK-8167.
(false, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
val vmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX virtual memory used".r
val diag = vmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
.map(_.concat(".")).getOrElse("")
val message = "Container killed by YARN for exceeding virtual memory limits. " +
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key} or boosting " +
s"${YarnConfiguration.NM_VMEM_PMEM_RATIO} or disabling " +
s"${YarnConfiguration.NM_VMEM_CHECK_ENABLED} because of YARN-4714."
(true, message)
case PMEM_EXCEEDED_EXIT_CODE =>
val pmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX physical memory used".r
val diag = pmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
.map(_.concat(".")).getOrElse("")
val message = "Container killed by YARN for exceeding physical memory limits. " +
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
(true, message)
case other_exit_status =>
// SPARK-26269: follow YARN's behaviour(see https://github
// .com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
// oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
// ache/hadoop/yarn/util/Apps.java#L273 for details)
if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
(false, s"Container marked as failed: $containerId$onHostStr" +
s". Exit status: ${completedContainer.getExitStatus}" +
s". Diagnostics: ${completedContainer.getDiagnostics}.")
} else {
// completed container from a bad node
allocatorNodeHealthTracker.handleResourceAllocationFailure(hostOpt)
(true, s"Container from a bad node: $containerId$onHostStr" +
s". Exit status: ${completedContainer.getExitStatus}" +
s". Diagnostics: ${completedContainer.getDiagnostics}.")
}
}
if (exitCausedByApp) {
logWarning(containerExitReason)
} else {
logInfo(containerExitReason)
}
ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
} else {
// If we have already released this container, then it must mean
// that the driver has explicitly requested it to be killed
ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
s"Container $containerId exited from explicit termination request.")
}
for {
host <- hostOpt
containerSet <- getOrUpdateAllocatedHostToContainersMapForRPId(rpId).get(host)
} {
containerSet.remove(containerId)
if (containerSet.isEmpty) {
getOrUpdateAllocatedHostToContainersMapForRPId(rpId).remove(host)
} else {
getOrUpdateAllocatedHostToContainersMapForRPId(rpId).update(host, containerSet)
}
allocatedContainerToHostMap.remove(containerId)
}
containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
executorIdToContainer.remove(eid)
pendingLossReasonRequests.remove(eid) match {
case Some(pendingRequests) =>
// Notify application of executor loss reasons so it can decide whether it should abort
pendingRequests.foreach(_.reply(exitReason))
case None =>
// We cannot find executor for pending reasons. This is because completed container
// is processed before querying pending result. We should store it for later query.
// This is usually happened when explicitly killing a container, the result will be
// returned in one AM-RM communication. So query RPC will be later than this completed
// container process.
releasedExecutorLossReasons.put(eid, exitReason)
}
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
numUnexpectedContainerRelease += 1
driverRef.send(RemoveExecutor(eid, exitReason))
}
}
}
}
/**
* Register that some RpcCallContext has asked the AM why the executor was lost. Note that
* we can only find the loss reason to send back in the next call to allocateResources().
*/
private[yarn] def enqueueGetLossReasonRequest(
eid: String,
context: RpcCallContext): Unit = synchronized {
if (executorIdToContainer.contains(eid)) {
pendingLossReasonRequests
.getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
} else if (releasedExecutorLossReasons.contains(eid)) {
// Executor is already released explicitly before getting the loss reason, so directly send
// the pre-stored lost reason
context.reply(releasedExecutorLossReasons.remove(eid).get)
} else {
logWarning(s"Tried to get the loss reason for non-existent executor $eid")
context.sendFailure(
new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
}
}
private def internalReleaseContainer(container: Container): Unit = synchronized {
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}
private[yarn] def getNumUnexpectedContainerRelease: Long = synchronized {
numUnexpectedContainerRelease
}
private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
pendingLossReasonRequests.size
}
/**
* Split the pending container requests into 3 groups based on current localities of pending
* tasks.
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
* container placement hint.
* @param pendingAllocations A sequence of pending allocation container request.
* @return A tuple of 3 sequences, first is a sequence of locality matched container
* requests, second is a sequence of locality unmatched container requests, and third is a
* sequence of locality free container requests.
*/
private def splitPendingAllocationsByLocality(
hostToLocalTaskCount: Map[String, Int],
pendingAllocations: Seq[ContainerRequest]
): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = {
val localityMatched = ArrayBuffer[ContainerRequest]()
val localityUnMatched = ArrayBuffer[ContainerRequest]()
val localityFree = ArrayBuffer[ContainerRequest]()
val preferredHosts = hostToLocalTaskCount.keySet
pendingAllocations.foreach { cr =>
val nodes = cr.getNodes
if (nodes == null) {
localityFree += cr
} else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) {
localityMatched += cr
} else {
localityUnMatched += cr
}
}
(localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
}
}
private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val VMEM_EXCEEDED_EXIT_CODE = -103
val PMEM_EXCEEDED_EXIT_CODE = -104
val DECOMMISSIONING_NODES_CACHE_SIZE = 200
val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set(
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
ContainerExitStatus.KILLED_BY_APPMASTER,
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
ContainerExitStatus.ABORTED,
ContainerExitStatus.DISKS_FAILED
)
}
相关信息
相关文章
spark ApplicationMasterArguments 源码
spark ApplicationMasterSource 源码
spark ClientDistributedCacheManager 源码
spark LocalityPreferredContainerPlacementStrategy 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦