spark KubernetesExecutorBackend 源码
spark KubernetesExecutorBackend 代码
文件路径:/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.executor.CoarseGrainedExecutorBackend
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.Utils
private[spark] object KubernetesExecutorBackend extends Logging {
// Message used internally to start the executor when the driver successfully accepted the
// registration request.
case object RegisteredExecutor
case class Arguments(
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
resourcesFileOpt: Option[String],
resourceProfileId: Int,
podName: String)
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId,
arguments.bindAddress, arguments.hostname, arguments.cores,
env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
}
def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) =>
CoarseGrainedExecutorBackend): Unit = {
Utils.initDaemon(log)
SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
assert(arguments.hostname != null &&
(arguments.hostname.indexOf(':') == -1 || arguments.hostname.split(":").length > 2))
// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val fetcher = RpcEnv.create(
"driverPropsFetcher",
arguments.bindAddress,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
numUsableCores = 0,
clientMode = true)
var driver: RpcEndpointRef = null
val nTries = 3
for (i <- 0 until nTries if driver == null) {
try {
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
} catch {
case e: Throwable => if (i == nTries - 1) {
throw e
}
}
}
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
val execId: String = arguments.executorId match {
case null | "EXECID" | "" =>
// We need to resolve the exec id dynamically
driver.askSync[String](GenerateExecID(arguments.podName))
case id =>
id
}
fetcher.shutdown()
// Create SparkEnv using properties we fetched from the driver.
val driverConf = new SparkConf()
for ((key, value) <- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
} else {
driverConf.set(key, value)
}
}
cfg.hadoopDelegationCreds.foreach { tokens =>
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}
driverConf.set(EXECUTOR_ID, execId)
val env = SparkEnv.createExecutorEnv(driverConf, execId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile, execId)
env.rpcEnv.setupEndpoint("Executor", backend)
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher",
new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = backend.stopping))
}
env.rpcEnv.awaitTermination()
}
}
def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
var driverUrl: String = null
var executorId: String = null
var bindAddress: String = null
var hostname: String = null
var cores: Int = 0
var resourcesFileOpt: Option[String] = None
var appId: String = null
var workerUrl: Option[String] = None
var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
var podName: String = null
var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--bind-address") :: value :: tail =>
bindAddress = value
argv = tail
case ("--hostname") :: value :: tail =>
// entrypoint.sh sets SPARK_EXECUTOR_POD_IP without '[]'
hostname = Utils.addBracketsIfNeeded(value)
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--resourcesFile") :: value :: tail =>
resourcesFileOpt = Some(value)
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--resourceProfileId") :: value :: tail =>
resourceProfileId = value.toInt
argv = tail
case ("--podName") :: value :: tail =>
podName = value
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit(classNameForEntry)
}
}
if (hostname == null) {
hostname = Utils.localHostName()
log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself")
}
if (driverUrl == null || executorId == null || cores <= 0 || appId == null) {
printUsageAndExit(classNameForEntry)
}
if (bindAddress == null) {
bindAddress = hostname
}
Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
resourcesFileOpt, resourceProfileId, podName)
}
private def printUsageAndExit(classNameForEntry: String): Unit = {
// scalastyle:off println
System.err.println(
s"""
|Usage: $classNameForEntry [options]
|
| Options are:
| --driver-url <driverUrl>
| --executor-id <executorId>
| --bind-address <bindAddress>
| --hostname <hostname>
| --cores <cores>
| --resourcesFile <fileWithJSONResourceInformation>
| --app-id <appid>
| --worker-url <workerUrl>
| --resourceProfileId <id>
| --podName <podName>
|""".stripMargin)
// scalastyle:on println
System.exit(1)
}
}
相关信息
相关文章
spark AbstractPodsAllocator 源码
spark ExecutorPodsAllocator 源码
spark ExecutorPodsLifecycleManager 源码
spark ExecutorPodsPollingSnapshotSource 源码
spark ExecutorPodsSnapshotsStore 源码
spark ExecutorPodsSnapshotsStoreImpl 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦