spark ExecutorPodsSnapshotsStoreImpl 源码
spark ExecutorPodsSnapshotsStoreImpl 代码
文件路径:/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import java.util.ArrayList
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model.Pod
import org.apache.spark.internal.Logging
import org.apache.spark.util.Clock
import org.apache.spark.util.SystemClock
import org.apache.spark.util.ThreadUtils
/**
* Controls the propagation of the Spark application's executor pods state to subscribers that
* react to that state.
* <br>
* Roughly follows a producer-consumer model. Producers report states of executor pods, and these
* states are then published to consumers that can perform any actions in response to these states.
* <br>
* Producers push updates in one of two ways. An incremental update sent by updatePod() represents
* a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that
* the passed pods are all of the most up to date states of all executor pods for the application.
* The combination of the states of all executor pods for the application is collectively known as
* a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that
* most recent snapshot - either by incrementally updating the snapshot with a single new pod state,
* or by replacing the snapshot entirely on a full sync.
* <br>
* Consumers, or subscribers, register that they want to be informed about all snapshots of the
* executor pods. Every time the store replaces its most up to date snapshot from either an
* incremental update or a full sync, the most recent snapshot after the update is posted to the
* subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in
* time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different
* time intervals.
* <br>
* The subscriber notification callback is guaranteed to be called from a single thread at a time.
*/
private[spark] class ExecutorPodsSnapshotsStoreImpl(
subscribersExecutor: ScheduledExecutorService,
clock: Clock = new SystemClock)
extends ExecutorPodsSnapshotsStore with Logging {
private val SNAPSHOT_LOCK = new Object()
private val subscribers = new CopyOnWriteArrayList[SnapshotsSubscriber]()
private val pollingTasks = new CopyOnWriteArrayList[Future[_]]
@GuardedBy("SNAPSHOT_LOCK")
private var currentSnapshot = ExecutorPodsSnapshot()
override def addSubscriber(
processBatchIntervalMillis: Long)
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
val newSubscriber = new SnapshotsSubscriber(onNewSnapshots)
SNAPSHOT_LOCK.synchronized {
newSubscriber.addCurrentSnapshot()
}
subscribers.add(newSubscriber)
pollingTasks.add(subscribersExecutor.scheduleWithFixedDelay(
() => newSubscriber.processSnapshots(),
0L,
processBatchIntervalMillis,
TimeUnit.MILLISECONDS))
}
override def notifySubscribers(): Unit = SNAPSHOT_LOCK.synchronized {
subscribers.asScala.foreach { s =>
subscribersExecutor.submit(new Runnable() {
override def run(): Unit = s.processSnapshots()
})
}
}
override def stop(): Unit = {
pollingTasks.asScala.foreach(_.cancel(false))
ThreadUtils.shutdown(subscribersExecutor)
}
override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized {
currentSnapshot = currentSnapshot.withUpdate(updatedPod)
addCurrentSnapshotToSubscribers()
}
override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = SNAPSHOT_LOCK.synchronized {
currentSnapshot = ExecutorPodsSnapshot(newSnapshot, clock.getTimeMillis())
addCurrentSnapshotToSubscribers()
}
private def addCurrentSnapshotToSubscribers(): Unit = {
subscribers.asScala.foreach(_.addCurrentSnapshot())
}
private class SnapshotsSubscriber(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit) {
private val snapshotsBuffer = new LinkedBlockingQueue[ExecutorPodsSnapshot]()
private val lock = new ReentrantLock()
private val notificationCount = new AtomicInteger()
def addCurrentSnapshot(): Unit = {
snapshotsBuffer.add(currentSnapshot)
}
def processSnapshots(): Unit = {
notificationCount.incrementAndGet()
processSnapshotsInternal()
}
private def processSnapshotsInternal(): Unit = {
if (lock.tryLock()) {
// Check whether there are pending notifications before calling the subscriber. This
// is needed to avoid calling the subscriber spuriously when the race described in the
// comment below happens.
if (notificationCount.get() > 0) {
try {
val snapshots = new ArrayList[ExecutorPodsSnapshot]()
snapshotsBuffer.drainTo(snapshots)
onNewSnapshots(snapshots.asScala.toSeq)
} catch {
case e: IllegalArgumentException =>
logError("Going to stop due to IllegalArgumentException", e)
System.exit(1)
case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", e)
} finally {
lock.unlock()
}
if (notificationCount.decrementAndGet() > 0) {
// There was another concurrent request for this subscriber. Schedule a task to
// immediately process snapshots again, so that the subscriber can pick up any
// changes that may have happened between the time it started looking at snapshots
// above, and the time the concurrent request arrived.
//
// This has to be done outside of the lock, otherwise we might miss a notification
// arriving after the above check, but before we've released the lock. Flip side is
// that we may schedule a useless task that will just fail to grab the lock.
subscribersExecutor.submit(new Runnable() {
override def run(): Unit = processSnapshotsInternal()
})
}
} else {
lock.unlock()
}
}
}
}
}
相关信息
相关文章
spark AbstractPodsAllocator 源码
spark ExecutorPodsAllocator 源码
spark ExecutorPodsLifecycleManager 源码
spark ExecutorPodsPollingSnapshotSource 源码
spark ExecutorPodsSnapshotsStore 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦