spark MesosClusterPersistenceEngine 源码
spark MesosClusterPersistenceEngine 代码
文件路径:/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import scala.collection.JavaConverters._
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.util.Utils
/**
* Persistence engine factory that is responsible for creating new persistence engines
* to store Mesos cluster mode state.
*/
private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) {
def createEngine(path: String): MesosClusterPersistenceEngine
}
/**
* Mesos cluster persistence engine is responsible for persisting Mesos cluster mode
* specific state, so that on failover all the state can be recovered and the scheduler
* can resume managing the drivers.
*/
private[spark] trait MesosClusterPersistenceEngine {
def persist(name: String, obj: Object): Unit
def expunge(name: String): Unit
def fetch[T](name: String): Option[T]
def fetchAll[T](): Iterable[T]
}
/**
* Zookeeper backed persistence engine factory.
* All Zk engines created from this factory shares the same Zookeeper client, so
* all of them reuses the same connection pool.
*/
private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf)
extends MesosClusterPersistenceEngineFactory(conf) with Logging {
lazy val zk = SparkCuratorUtil.newClient(conf)
def createEngine(path: String): MesosClusterPersistenceEngine = {
new ZookeeperMesosClusterPersistenceEngine(path, zk, conf)
}
}
/**
* Black hole persistence engine factory that creates black hole
* persistence engines, which stores nothing.
*/
private[spark] class BlackHoleMesosClusterPersistenceEngineFactory
extends MesosClusterPersistenceEngineFactory(null) {
def createEngine(path: String): MesosClusterPersistenceEngine = {
new BlackHoleMesosClusterPersistenceEngine
}
}
/**
* Black hole persistence engine that stores nothing.
*/
private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine {
override def persist(name: String, obj: Object): Unit = {}
override def fetch[T](name: String): Option[T] = None
override def expunge(name: String): Unit = {}
override def fetchAll[T](): Iterable[T] = Iterable.empty[T]
}
/**
* Zookeeper based Mesos cluster persistence engine, that stores cluster mode state
* into Zookeeper. Each engine object is operating under one folder in Zookeeper, but
* reuses a shared Zookeeper client.
*/
private[spark] class ZookeeperMesosClusterPersistenceEngine(
baseDir: String,
zk: CuratorFramework,
conf: SparkConf)
extends MesosClusterPersistenceEngine with Logging {
private val workingDir =
conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark_mesos_dispatcher") + "/" + baseDir
SparkCuratorUtil.mkdir(zk, workingDir)
def path(name: String): String = {
workingDir + "/" + name
}
override def expunge(name: String): Unit = {
zk.delete().forPath(path(name))
}
override def persist(name: String, obj: Object): Unit = {
val serialized = Utils.serialize(obj)
val zkPath = path(name)
zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized)
}
override def fetch[T](name: String): Option[T] = {
val zkPath = path(name)
try {
val fileData = zk.getData().forPath(zkPath)
Some(Utils.deserialize[T](fileData))
} catch {
case e: NoNodeException => None
case e: Exception =>
logWarning("Exception while reading persisted file, deleting", e)
zk.delete().forPath(zkPath)
None
}
}
override def fetchAll[T](): Iterable[T] = {
zk.getChildren.forPath(workingDir).asScala.flatMap(fetch[T])
}
}
相关信息
相关文章
spark MesosClusterScheduler 源码
spark MesosClusterSchedulerSource 源码
spark MesosCoarseGrainedSchedulerBackend 源码
spark MesosFineGrainedSchedulerBackend 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦