spark ExecutorPodsAllocator 源码
spark ExecutorPodsAllocator 代码
文件路径:/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod, PodBuilder}
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.util.{Clock, Utils}
class ExecutorPodsAllocator(
conf: SparkConf,
secMgr: SecurityManager,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
clock: Clock) extends AbstractPodsAllocator() with Logging {
private val EXECUTOR_ID_COUNTER = new AtomicInteger(0)
// ResourceProfile id -> total expected executors per profile, currently we don't remove
// any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749
private val totalExpectedExecutorsPerResourceProfileId = new ConcurrentHashMap[Int, Int]()
private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)
private val podCreationTimeout = math.max(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000
private val namespace = conf.get(KUBERNETES_NAMESPACE)
private val kubernetesDriverPodName = conf
.get(KUBERNETES_DRIVER_POD_NAME)
private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
s"No pod was found named $name in the cluster in the " +
s"namespace $namespace (this was supposed to be the driver pod.).")))
// Executor IDs that have been requested from Kubernetes but have not been detected in any
// snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created.
private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, (Int, Long)]
// Executor IDs that have been requested from Kubernetes but have not been detected in any POD
// snapshot yet but already known by the scheduler backend. Mapped to the ResourceProfile id.
private val schedulerKnownNewlyCreatedExecs = mutable.LinkedHashMap.empty[Long, Int]
private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)
// visible for tests
private[k8s] val numOutstandingPods = new AtomicInteger()
private var lastSnapshot = ExecutorPodsSnapshot()
private var appId: String = _
// Executors that have been deleted by this allocator but not yet detected as deleted in
// a snapshot from the API server. This is used to deny registration from these executors
// if they happen to come up before the deletion takes effect.
@volatile private var deletedExecutorIds = Set.empty[Long]
def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
appId = applicationId
driverPod.foreach { pod =>
// Wait until the driver pod is ready before starting executors, as the headless service won't
// be resolvable by DNS until the driver pod is ready.
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
}
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, schedulerBackend, _)
}
}
def setTotalExpectedExecutors(resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
totalExpectedExecutorsPerResourceProfileId.put(rp.id, numExecs)
}
logDebug(s"Set total expected execs to $totalExpectedExecutorsPerResourceProfileId")
if (numOutstandingPods.get() == 0) {
snapshotsStore.notifySubscribers()
}
}
def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)
private def onNewSnapshots(
applicationId: String,
schedulerBackend: KubernetesClusterSchedulerBackend,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys)
newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds
// Although we are going to delete some executors due to timeout in this function,
// it takes undefined time before the actual deletion. Hence, we should collect all PVCs
// in use at the beginning. False positive is okay in this context in order to be safe.
val k8sKnownPVCNames = snapshots.flatMap(_.executorPods.values.map(_.pod)).flatMap { pod =>
pod.getSpec.getVolumes.asScala
.flatMap { v => Option(v.getPersistentVolumeClaim).map(_.getClaimName) }
}
// transfer the scheduler backend known executor requests from the newlyCreatedExecutors
// to the schedulerKnownNewlyCreatedExecs
val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet
schedulerKnownNewlyCreatedExecs ++=
newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet
// For all executors we've created against the API but have not seen in a snapshot
// yet - check the current time. If the current time has exceeded some threshold,
// assume that the pod was either never created (the API server never properly
// handled the creation request), or the API server created the pod but we missed
// both the creation and deletion events. In either case, delete the missing pod
// if possible, and mark such a pod to be rescheduled below.
val currentTime = clock.getTimeMillis()
val timedOut = newlyCreatedExecutors.flatMap { case (execId, (_, timeCreated)) =>
if (currentTime - timeCreated > podCreationTimeout) {
Some(execId)
} else {
logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
s" was created ${currentTime - timeCreated} milliseconds ago.")
None
}
}
if (timedOut.nonEmpty) {
logWarning(s"Executors with ids ${timedOut.mkString(",")} were not detected in the" +
s" Kubernetes cluster after $podCreationTimeout ms despite the fact that a previous" +
" allocation attempt tried to create them. The executors may have been deleted but the" +
" application missed the deletion event.")
newlyCreatedExecutors --= timedOut
if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
.delete()
}
}
}
if (snapshots.nonEmpty) {
lastSnapshot = snapshots.last
}
// Make a local, non-volatile copy of the reference since it's used multiple times. This
// is the only method that modifies the list, so this is safe.
var _deletedExecutorIds = deletedExecutorIds
if (snapshots.nonEmpty) {
val existingExecs = lastSnapshot.executorPods.keySet
_deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs)
}
val notDeletedPods = lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_))
// Map the pods into per ResourceProfile id so we can check per ResourceProfile,
// add a fast path if not using other ResourceProfiles.
val rpIdToExecsAndPodState =
mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]()
if (totalExpectedExecutorsPerResourceProfileId.size <= 1) {
rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) =
mutable.HashMap.empty ++= notDeletedPods
} else {
notDeletedPods.foreach { case (execId, execPodState) =>
val rpId = execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt
val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId,
mutable.HashMap[Long, ExecutorPodState]())
execPods(execId) = execPodState
}
}
// sum of all the pending pods unknown by the scheduler (total for all the resources)
var totalPendingCount = 0
// total not running pods (including scheduler known & unknown, pending & newly requested ones)
var totalNotRunningPodCount = 0
val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId
.asScala
.toSeq
.sortBy(_._1)
.flatMap { case (rpId, targetNum) =>
val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty)
val currentRunningCount = podsForRpId.values.count {
case PodRunning(_) => true
case _ => false
}
val (schedulerKnownPendingExecsForRpId, currentPendingExecutorsForRpId) = podsForRpId.filter {
case (_, PodPending(_)) => true
case _ => false
}.partition { case (k, _) =>
schedulerKnownExecs.contains(k)
}
// This variable is used later to print some debug logs. It's updated when cleaning up
// excess pod requests, since currentPendingExecutorsForRpId is immutable.
var pendingCountForRpId = currentPendingExecutorsForRpId.size
val newlyCreatedExecutorsForRpId =
newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
rpId == waitingRpId
}
val schedulerKnownNewlyCreatedExecsForRpId =
schedulerKnownNewlyCreatedExecs.filter { case (_, waitingRpId) =>
rpId == waitingRpId
}
if (podsForRpId.nonEmpty) {
logDebug(s"ResourceProfile Id: $rpId (" +
s"pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutorsForRpId.size} unknown pending, " +
s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " +
s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " +
s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created)")
}
// It's possible that we have outstanding pods that are outdated when dynamic allocation
// decides to downscale the application. So check if we can release any pending pods early
// instead of waiting for them to time out. Drop them first from the unacknowledged list,
// then from the pending. However, in order to prevent too frequent fluctuation, newly
// requested pods are protected during executorIdleTimeout period.
//
// TODO: with dynamic allocation off, handle edge cases if we end up with more running
// executors than expected.
var notRunningPodCountForRpId =
currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size +
newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size
val podCountForRpId = currentRunningCount + notRunningPodCountForRpId
if (podCountForRpId > targetNum) {
val excess = podCountForRpId - targetNum
val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
.filter { case (_, (_, createTime)) =>
currentTime - createTime > executorIdleTimeout
}.keys.take(excess).toList
val pendingToDelete = currentPendingExecutorsForRpId
.filter(x => isExecutorIdleTimedOut(x._2, currentTime))
.take(excess - newlyCreatedToDelete.size)
.map { case (id, _) => id }
val toDelete = newlyCreatedToDelete ++ pendingToDelete
if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
_deletedExecutorIds = _deletedExecutorIds ++ toDelete
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
newlyCreatedExecutors --= newlyCreatedToDelete
pendingCountForRpId -= pendingToDelete.size
notRunningPodCountForRpId -= toDelete.size
}
}
}
totalPendingCount += pendingCountForRpId
totalNotRunningPodCount += notRunningPodCountForRpId
// The code below just prints debug messages, which are only useful when there's a change
// in the snapshot state. Since the messages are a little spammy, avoid them when we know
// there are no useful updates.
if (log.isDebugEnabled && snapshots.nonEmpty) {
val outstanding = pendingCountForRpId + newlyCreatedExecutorsForRpId.size
if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " +
"equal to the number of requested executors. Not scaling up further.")
} else {
if (newlyCreatedExecutorsForRpId.nonEmpty) {
logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} executors for " +
s"ResourceProfile Id $rpId before requesting more.")
}
}
}
if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) {
Some(rpId, podCountForRpId, targetNum)
} else {
// for this resource profile we do not request more PODs
None
}
}
val remainingSlotFromPendingPods = maxPendingPods - totalNotRunningPodCount
if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0) {
ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, remainingSlotFromPendingPods)
.foreach { case ((rpId, podCountForRpId, targetNum), sharedSlotFromPendingPods) =>
val numMissingPodsForRpId = targetNum - podCountForRpId
val numExecutorsToAllocate =
math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
s"ResourceProfile Id: $rpId, target: $targetNum, known: $podCountForRpId, " +
s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.")
requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames)
}
}
deletedExecutorIds = _deletedExecutorIds
// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
// update method when not needed. PODs known by the scheduler backend are not counted here as
// they considered running PODs and they should not block upscaling.
numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size)
}
private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = {
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && conf.get(KUBERNETES_DRIVER_REUSE_PVC) &&
driverPod.nonEmpty) {
try {
val createdPVCs = kubernetesClient
.persistentVolumeClaims
.inNamespace(namespace)
.withLabel("spark-app-selector", applicationId)
.list()
.getItems
.asScala
val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs")
reusablePVCs
} catch {
case _: KubernetesClientException =>
logInfo("Cannot list PVC resources. Please check account permissions.")
mutable.Buffer.empty[PersistentVolumeClaim]
}
} else {
mutable.Buffer.empty[PersistentVolumeClaim]
}
}
private def requestNewExecutors(
numExecutorsToAllocate: Int,
applicationId: String,
resourceProfileId: Int,
pvcsInUse: Seq[String]): Unit = {
// Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
conf,
newExecutorId.toString,
applicationId,
driverPod,
resourceProfileId)
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient, rpIdToResourceProfile(resourceProfileId))
val executorPod = resolvedExecutorSpec.pod
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
addOwnerReference(driverPod.get, Seq(resource))
}
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
throw e
}
}
}
private def replacePVCsIfNeeded(
pod: Pod,
resources: Seq[HasMetadata],
reusablePVCs: mutable.Buffer[PersistentVolumeClaim]): Seq[HasMetadata] = {
val replacedResources = mutable.Set[HasMetadata]()
resources.foreach {
case pvc: PersistentVolumeClaim =>
// Find one with the same storage class and size.
val index = reusablePVCs.indexWhere { p =>
p.getSpec.getStorageClassName == pvc.getSpec.getStorageClassName &&
p.getSpec.getResources.getRequests.get("storage") ==
pvc.getSpec.getResources.getRequests.get("storage")
}
if (index >= 0) {
val volume = pod.getSpec.getVolumes.asScala.find { v =>
v.getPersistentVolumeClaim != null &&
v.getPersistentVolumeClaim.getClaimName == pvc.getMetadata.getName
}
if (volume.nonEmpty) {
val matchedPVC = reusablePVCs.remove(index)
replacedResources.add(pvc)
logInfo(s"Reuse PersistentVolumeClaim ${matchedPVC.getMetadata.getName}")
volume.get.getPersistentVolumeClaim.setClaimName(matchedPVC.getMetadata.getName)
}
}
case _ => // no-op
}
resources.filterNot(replacedResources.contains)
}
private def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = {
try {
val creationTime = Instant.parse(state.pod.getMetadata.getCreationTimestamp).toEpochMilli()
currentTime - creationTime > executorIdleTimeout
} catch {
case e: Exception =>
logError(s"Cannot get the creationTimestamp of the pod: ${state.pod}", e)
true
}
}
override def stop(applicationId: String): Unit = {
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
}
}
}
private[spark] object ExecutorPodsAllocator {
// A utility function to split the available slots among the specified consumers
def splitSlots[T](consumers: Seq[T], slots: Int): Seq[(T, Int)] = {
val d = slots / consumers.size
val r = slots % consumers.size
consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - r).map((_, d))
}
}
相关信息
相关文章
spark AbstractPodsAllocator 源码
spark ExecutorPodsLifecycleManager 源码
spark ExecutorPodsPollingSnapshotSource 源码
spark ExecutorPodsSnapshotsStore 源码
spark ExecutorPodsSnapshotsStoreImpl 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦