spark Client 源码

  • 2022-10-20
  • 浏览 (231)

spark Client 代码

文件路径:/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.deploy.yarn

import java.io.{FileSystem => _, _}
import java.net.{InetAddress, UnknownHostException, URI, URL}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.immutable.{Map => IMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.util.control.NonFatal

import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.{DataOutputBuffer, Text}
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.util.VersionInfo
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{CallerContext, Utils, VersionUtils, YarnContainerInfoHelper}

private[spark] class Client(
    val args: ClientArguments,
    val sparkConf: SparkConf,
    val rpcEnv: RpcEnv)
  extends Logging {

  import Client._

  private val yarnClient = YarnClient.createYarnClient
  private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

  private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"

  // ContainerLaunchContext.setTokensConf is only available in Hadoop 2.9+ and 3.x, so here we use
  // reflection to avoid compilation for Hadoop 2.7 profile.
  private val SET_TOKENS_CONF_METHOD = "setTokensConf"

  private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
  private var appMaster: ApplicationMaster = _
  private var stagingDirPath: Path = _

  private val amMemoryOverheadFactor = if (isClusterMode) {
    sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
  } else {
    AM_MEMORY_OVERHEAD_FACTOR
  }

  // AM related configurations
  private val amMemory = if (isClusterMode) {
    sparkConf.get(DRIVER_MEMORY).toInt
  } else {
    sparkConf.get(AM_MEMORY).toInt
  }
  private val amMemoryOverhead = {
    val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
    sparkConf.get(amMemoryOverheadEntry).getOrElse(
      math.max((amMemoryOverheadFactor * amMemory).toLong,
        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
  }
  private val amCores = if (isClusterMode) {
    sparkConf.get(DRIVER_CORES)
  } else {
    sparkConf.get(AM_CORES)
  }

  // Executor related configurations
  private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
  // Executor offHeap memory in MiB.
  protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf)

  private val executorMemoryOvereadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
  private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
    math.max((executorMemoryOvereadFactor * executorMemory).toLong,
      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt

  private val isPython = sparkConf.get(IS_PYTHON_APP)
  private val pysparkWorkerMemory: Int = if (isPython) {
    sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
  } else {
    0
  }

  private val distCacheMgr = new ClientDistributedCacheManager()
  private val cachedResourcesConf = new SparkConf(false)

  private val keytab = sparkConf.get(KEYTAB).orNull
  private val amKeytabFileName: Option[String] = if (keytab != null && isClusterMode) {
    val principal = sparkConf.get(PRINCIPAL).orNull
    require((principal == null) == (keytab == null),
      "Both principal and keytab must be defined, or neither.")
    logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab")
    // Generate a file name that can be used for the keytab file, that does not conflict
    // with any user file.
    Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString)
  } else {
    None
  }

  require(keytab == null || !Utils.isLocalUri(keytab), "Keytab should reference a local file.")

  private val launcherBackend = new LauncherBackend() {
    override protected def conf: SparkConf = sparkConf

    override def onStopRequest(): Unit = {
      if (isClusterMode && appId != null) {
        yarnClient.killApplication(appId)
      } else {
        setState(SparkAppHandle.State.KILLED)
        stop()
      }
    }
  }
  private val fireAndForget = isClusterMode && !sparkConf.get(WAIT_FOR_APP_COMPLETION)

  private var appId: ApplicationId = null

  def getApplicationId(): ApplicationId = {
    appId
  }

  def reportLauncherState(state: SparkAppHandle.State): Unit = {
    launcherBackend.setState(state)
  }

  def stop(): Unit = {
    if (appMaster != null) {
      appMaster.stopUnmanaged(stagingDirPath)
    }
    launcherBackend.close()
    yarnClient.stop()
  }

  /**
   * Submit an application running our ApplicationMaster to the ResourceManager.
   *
   * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
   * creating applications and setting up the application submission context. This was not
   * available in the alpha API.
   */
  def submitApplication(): Unit = {
    ResourceRequestHelper.validateResources(sparkConf)

    try {
      launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start()

      if (log.isDebugEnabled) {
        logDebug("Requesting a new application from cluster with %d NodeManagers"
          .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
      }

      // Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      this.appId = newAppResponse.getApplicationId()

      // The app staging dir based on the STAGING_DIR configuration if configured
      // otherwise based on the users home directory.
      // scalastyle:off FileSystemGet
      val appStagingBaseDir = sparkConf.get(STAGING_DIR)
        .map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }
        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
      stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
      // scalastyle:on FileSystemGet

      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appId.toString)).setCurrentContext()

      // Verify whether the cluster has enough resources for our AM
      verifyClusterResources(newAppResponse)

      // Set up the appropriate contexts to launch our AM
      val containerContext = createContainerLaunchContext()
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)
    } catch {
      case e: Throwable =>
        if (stagingDirPath != null) {
          cleanupStagingDir()
        }
        throw e
    }
  }

  /**
   * Cleanup application staging directory.
   */
  private def cleanupStagingDir(): Unit = {
    if (sparkConf.get(PRESERVE_STAGING_FILES)) {
      return
    }

    def cleanupStagingDirInternal(): Unit = {
      try {
        val fs = stagingDirPath.getFileSystem(hadoopConf)
        if (fs.delete(stagingDirPath, true)) {
          logInfo(s"Deleted staging directory $stagingDirPath")
        }
      } catch {
        case ioe: IOException =>
          logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
      }
    }

    cleanupStagingDirInternal()
  }

  /**
   * Set up the context for submitting our ApplicationMaster.
   * This uses the YarnClientApplication not available in the Yarn alpha API.
   */
  def createApplicationSubmissionContext(
      newApp: YarnClientApplication,
      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {

    val componentName = if (isClusterMode) {
      config.YARN_DRIVER_RESOURCE_TYPES_PREFIX
    } else {
      config.YARN_AM_RESOURCE_TYPES_PREFIX
    }
    val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)
    val amResources = yarnAMResources ++
      getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
    logDebug(s"AM resources: $amResources")
    val appContext = newApp.getApplicationSubmissionContext
    appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
    appContext.setQueue(sparkConf.get(QUEUE_NAME))
    appContext.setAMContainerSpec(containerContext)
    appContext.setApplicationType(sparkConf.get(APPLICATION_TYPE))

    sparkConf.get(APPLICATION_TAGS).foreach { tags =>
      appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
    }
    sparkConf.get(MAX_APP_ATTEMPTS) match {
      case Some(v) => appContext.setMaxAppAttempts(v)
      case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
          "Cluster's default value will be used.")
    }

    sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
      appContext.setAttemptFailuresValidityInterval(interval)
    }

    val capability = Records.newRecord(classOf[Resource])
    capability.setMemory(amMemory + amMemoryOverhead)
    capability.setVirtualCores(amCores)
    if (amResources.nonEmpty) {
      ResourceRequestHelper.setResourceRequests(amResources, capability)
    }
    logDebug(s"Created resource capability for AM request: $capability")

    sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
      case Some(expr) =>
        val amRequest = Records.newRecord(classOf[ResourceRequest])
        amRequest.setResourceName(ResourceRequest.ANY)
        amRequest.setPriority(Priority.newInstance(0))
        amRequest.setCapability(capability)
        amRequest.setNumContainers(1)
        amRequest.setNodeLabelExpression(expr)
        appContext.setAMContainerResourceRequest(amRequest)
      case None =>
        appContext.setResource(capability)
    }

    sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
      try {
        val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
        logAggregationContext.setRolledLogsIncludePattern(includePattern)
        sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
          logAggregationContext.setRolledLogsExcludePattern(excludePattern)
        }
        appContext.setLogAggregationContext(logAggregationContext)
      } catch {
        case NonFatal(e) =>
          logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
            "does not support it", e)
      }
    }
    appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)

    sparkConf.get(APPLICATION_PRIORITY).foreach { appPriority =>
      appContext.setPriority(Priority.newInstance(appPriority))
    }
    appContext
  }

  /**
   * Set up security tokens for launching our ApplicationMaster container.
   *
   * In client mode, a set of credentials has been obtained by the scheduler, so they are copied
   * and sent to the AM. In cluster mode, new credentials are obtained and then sent to the AM,
   * along with whatever credentials the current user already has.
   */
  private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
    val currentUser = UserGroupInformation.getCurrentUser()
    val credentials = currentUser.getCredentials()

    if (isClusterMode) {
      val credentialManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null)
      credentialManager.obtainDelegationTokens(credentials)
    }

    val serializedCreds = SparkHadoopUtil.get.serialize(credentials)
    amContainer.setTokens(ByteBuffer.wrap(serializedCreds))
  }

  /**
   * Set configurations sent from AM to RM for renewing delegation tokens.
   */
  private def setTokenConf(amContainer: ContainerLaunchContext): Unit = {
    // SPARK-37205: this regex is used to grep a list of configurations and send them to YARN RM
    // for fetching delegation tokens. See YARN-5910 for more details.
    val regex = sparkConf.get(config.AM_TOKEN_CONF_REGEX)
    // The feature is only supported in Hadoop 2.9+ and 3.x, hence the check below.
    val isSupported = VersionUtils.majorMinorVersion(VersionInfo.getVersion) match {
      case (2, n) if n >= 9 => true
      case (3, _) => true
      case _ => false
    }
    if (regex.nonEmpty && isSupported) {
      logInfo(s"Processing token conf (spark.yarn.am.tokenConfRegex) with regex $regex")
      val dob = new DataOutputBuffer();
      val copy = new Configuration(false);
      copy.clear();
      hadoopConf.asScala.foreach { entry =>
        if (entry.getKey.matches(regex.get)) {
          copy.set(entry.getKey, entry.getValue)
          logInfo(s"Captured key: ${entry.getKey} -> value: ${entry.getValue}")
        }
      }
      copy.write(dob);

      // since this method was added in Hadoop 2.9 and 3.0, we use reflection here to avoid
      // compilation error for Hadoop 2.7 profile.
      val setTokensConfMethod = try {
        amContainer.getClass.getMethod(SET_TOKENS_CONF_METHOD, classOf[ByteBuffer])
      } catch {
        case _: NoSuchMethodException =>
          throw new SparkException(s"Cannot find setTokensConf method in ${amContainer.getClass}." +
              s" Please check YARN version and make sure it is 2.9+ or 3.x")
      }
      setTokensConfMethod.invoke(amContainer, ByteBuffer.wrap(dob.getData))
    }
  }

  /** Get the application report from the ResourceManager for an application we have submitted. */
  def getApplicationReport(): ApplicationReport =
    yarnClient.getApplicationReport(appId)

  /**
   * Return the security token used by this client to communicate with the ApplicationMaster.
   * If no security is enabled, the token returned by the report is null.
   */
  private def getClientToken(report: ApplicationReport): String =
    Option(report.getClientToAMToken).map(_.toString).getOrElse("")

  /**
   * Fail fast if we have requested more resources per container than is available in the cluster.
   */
  private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
    logInfo("Verifying our application has not requested more than the maximum " +
      s"memory capability of the cluster ($maxMem MB per container)")
    val executorMem =
      executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory
    if (executorMem > maxMem) {
      throw new IllegalArgumentException(s"Required executor memory ($executorMemory MB), " +
        s"offHeap memory ($executorOffHeapMemory) MB, overhead ($executorMemoryOverhead MB), " +
        s"and PySpark memory ($pysparkWorkerMemory MB) is above the max threshold ($maxMem MB) " +
        "of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' " +
        "and/or 'yarn.nodemanager.resource.memory-mb'.")
    }
    val amMem = amMemory + amMemoryOverhead
    if (amMem > maxMem) {
      throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
        s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
        "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
        "'yarn.nodemanager.resource.memory-mb'.")
    }
    logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
      amMem,
      amMemoryOverhead))

    // We could add checks to make sure the entire cluster has enough resources but that involves
    // getting all the node reports and computing ourselves.
  }

  /**
   * Copy the given file to a remote file system (e.g. HDFS) if needed.
   * The file is only copied if the source and destination file systems are different or the source
   * scheme is "file". This is used for preparing resources for launching the ApplicationMaster
   * container. Exposed for testing.
   */
  private[yarn] def copyFileToRemote(
      destDir: Path,
      srcPath: Path,
      replication: Option[Short],
      symlinkCache: Map[URI, Path],
      force: Boolean = false,
      destName: Option[String] = None): Path = {
    val destFs = destDir.getFileSystem(hadoopConf)
    val srcFs = srcPath.getFileSystem(hadoopConf)
    var destPath = srcPath
    if (force || !compareFs(srcFs, destFs) || "file".equals(srcFs.getScheme)) {
      destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
      logInfo(s"Uploading resource $srcPath -> $destPath")
      try {
        FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
      } catch {
        // HADOOP-16878 changes the behavior to throw exceptions when src equals to dest
        case e: PathOperationException
            if srcFs.makeQualified(srcPath).equals(destFs.makeQualified(destPath)) =>
      }
      replication.foreach(repl => destFs.setReplication(destPath, repl))
      destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
    } else {
      logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
    }
    // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
    // version shows the specific version in the distributed cache configuration
    val qualifiedDestPath = destFs.makeQualified(destPath)
    val qualifiedDestDir = qualifiedDestPath.getParent
    val resolvedDestDir = symlinkCache.getOrElseUpdate(qualifiedDestDir.toUri(), {
      val fc = FileContext.getFileContext(qualifiedDestDir.toUri(), hadoopConf)
      fc.resolvePath(qualifiedDestDir)
    })
    new Path(resolvedDestDir, qualifiedDestPath.getName())
  }

  /**
   * Upload any resources to the distributed cache if needed. If a resource is intended to be
   * consumed locally, set up the appropriate config for downstream code to handle it properly.
   * This is used for setting up a container launch context for our ApplicationMaster.
   * Exposed for testing.
   */
  def prepareLocalResources(
      destDir: Path,
      pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
    logInfo("Preparing resources for our AM container")
    // Upload Spark and the application JAR to the remote file system if necessary,
    // and add them as local resources to the application master.
    val fs = destDir.getFileSystem(hadoopConf)

    // Used to keep track of URIs added to the distributed cache. If the same URI is added
    // multiple times, YARN will fail to launch containers for the app with an internal
    // error.
    val distributedUris = new HashSet[String]
    // Used to keep track of URIs(files) added to the distribute cache have the same name. If
    // same name but different path files are added multiple time, YARN will fail to launch
    // containers for the app with an internal error.
    val distributedNames = new HashSet[String]

    val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
    val localResources = HashMap[String, LocalResource]()
    FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))

    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
    val symlinkCache: Map[URI, Path] = HashMap[URI, Path]()

    def addDistributedUri(uri: URI): Boolean = {
      val uriStr = uri.toString()
      val fileName = new File(uri.getPath).getName
      if (distributedUris.contains(uriStr)) {
        logWarning(s"Same path resource $uri added multiple times to distributed cache.")
        false
      } else if (distributedNames.contains(fileName)) {
        logWarning(s"Same name resource $uri added multiple times to distributed cache")
        false
      } else {
        distributedUris += uriStr
        distributedNames += fileName
        true
      }
    }

    /*
     * Distribute a file to the cluster.
     *
     * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
     * to HDFS (if not already there) and added to the application's distributed cache.
     *
     * @param path URI of the file to distribute.
     * @param resType Type of resource being distributed.
     * @param destName Name of the file in the distributed cache.
     * @param targetDir Subdirectory where to place the file.
     * @param appMasterOnly Whether to distribute only to the AM.
     * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
     *         localized path for non-local paths, or the input `path` for local paths.
     *         The localized path will be null if the URI has already been added to the cache.
     */
    def distribute(
        path: String,
        resType: LocalResourceType = LocalResourceType.FILE,
        destName: Option[String] = None,
        targetDir: Option[String] = None,
        appMasterOnly: Boolean = false): (Boolean, String) = {
      val trimmedPath = path.trim()
      val localURI = Utils.resolveURI(trimmedPath)
      if (localURI.getScheme != Utils.LOCAL_SCHEME) {
        if (addDistributedUri(localURI)) {
          val localPath = getQualifiedLocalPath(localURI, hadoopConf)
          val linkname = targetDir.map(_ + "/").getOrElse("") +
            destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
          val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache)
          val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
          distCacheMgr.addResource(
            destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
            appMasterOnly = appMasterOnly)
          (false, linkname)
        } else {
          (false, null)
        }
      } else {
        (true, trimmedPath)
      }
    }

    // If we passed in a keytab, make sure we copy the keytab to the staging directory on
    // HDFS, and setup the relevant environment vars, so the AM can login again.
    amKeytabFileName.foreach { kt =>
      logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
        " via the YARN Secure Distributed Cache.")
      val (_, localizedPath) = distribute(keytab,
        destName = Some(kt),
        appMasterOnly = true)
      require(localizedPath != null, "Keytab file already distributed.")
    }

    // If we passed in a ivySettings file, make sure we copy the file to the distributed cache
    // in cluster mode so that the driver can access it
    val ivySettings = sparkConf.getOption("spark.jars.ivySettings")
    val ivySettingsLocalizedPath: Option[String] = ivySettings match {
      case Some(ivySettingsPath) if isClusterMode =>
        val uri = new URI(ivySettingsPath)
        Option(uri.getScheme).getOrElse("file") match {
          case "file" =>
            val ivySettingsFile = new File(uri.getPath)
            require(ivySettingsFile.exists(), s"Ivy settings file $ivySettingsFile not found")
            require(ivySettingsFile.isFile(), s"Ivy settings file $ivySettingsFile is not a" +
              "normal file")
            // Generate a file name that can be used for the ivySettings file, that does not
            // conflict with any user file.
            val localizedFileName = Some(ivySettingsFile.getName() + "-" +
              UUID.randomUUID().toString)
            val (_, localizedPath) = distribute(ivySettingsPath, destName = localizedFileName)
            require(localizedPath != null, "IvySettings file already distributed.")
            Some(localizedPath)
          case scheme =>
            throw new IllegalArgumentException(s"Scheme $scheme not supported in " +
              "spark.jars.ivySettings")
        }
      case _ => None
    }

    /**
     * Add Spark to the cache. There are two settings that control what files to add to the cache:
     * - if a Spark archive is defined, use the archive. The archive is expected to contain
     *   jar files at its root directory.
     * - if a list of jars is provided, filter the non-local ones, resolve globs, and
     *   add the found files to the cache.
     *
     * Note that the archive cannot be a "local" URI. If none of the above settings are found,
     * then upload all files found in $SPARK_HOME/jars.
     */
    val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
    if (sparkArchive.isDefined) {
      val archive = sparkArchive.get
      require(!Utils.isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
      distribute(Utils.resolveURI(archive).toString,
        resType = LocalResourceType.ARCHIVE,
        destName = Some(LOCALIZED_LIB_DIR))
    } else {
      sparkConf.get(SPARK_JARS) match {
        case Some(jars) =>
          // Break the list of jars to upload, and resolve globs.
          val localJars = new ArrayBuffer[String]()
          jars.foreach { jar =>
            if (!Utils.isLocalUri(jar)) {
              val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
              val pathFs = FileSystem.get(path.toUri(), hadoopConf)
              val fss = pathFs.globStatus(path)
              if (fss == null) {
                throw new FileNotFoundException(s"Path ${path.toString} does not exist")
              }
              fss.filter(_.isFile()).foreach { entry =>
                val uri = entry.getPath().toUri()
                statCache.update(uri, entry)
                distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR))
              }
            } else {
              localJars += jar
            }
          }

          // Propagate the local URIs to the containers using the configuration.
          sparkConf.set(SPARK_JARS, localJars.toSeq)

        case None =>
          // No configuration, so fall back to uploading local jar files.
          logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " +
            "to uploading libraries under SPARK_HOME.")
          val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
            sparkConf.getenv("SPARK_HOME")))
          val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
            new File(Utils.getLocalDir(sparkConf)))
          val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))

          try {
            jarsStream.setLevel(0)
            jarsDir.listFiles().foreach { f =>
              if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
                jarsStream.putNextEntry(new ZipEntry(f.getName))
                Files.copy(f.toPath, jarsStream)
                jarsStream.closeEntry()
              }
            }
          } finally {
            jarsStream.close()
          }

          distribute(jarsArchive.toURI.getPath,
            resType = LocalResourceType.ARCHIVE,
            destName = Some(LOCALIZED_LIB_DIR))
          jarsArchive.delete()
      }
    }

    /**
     * Copy user jar to the distributed cache if their scheme is not "local".
     * Otherwise, set the corresponding key in our SparkConf to handle it downstream.
     */
    Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
      val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME))
      if (isLocal) {
        require(localizedPath != null, s"Path $jar already distributed")
        // If the resource is intended for local use only, handle this downstream
        // by setting the appropriate property
        sparkConf.set(APP_JAR, localizedPath)
      }
    }

    /**
     * Do the same for any additional resources passed in through ClientArguments.
     * Each resource category is represented by a 3-tuple of:
     *   (1) comma separated list of resources in this category,
     *   (2) resource type, and
     *   (3) whether to add these resources to the classpath
     */
    val cachedSecondaryJarLinks = ListBuffer.empty[String]
    List(
      (sparkConf.get(JARS_TO_DISTRIBUTE), LocalResourceType.FILE, true),
      (sparkConf.get(FILES_TO_DISTRIBUTE), LocalResourceType.FILE, false),
      (sparkConf.get(ARCHIVES_TO_DISTRIBUTE), LocalResourceType.ARCHIVE, false)
    ).foreach { case (flist, resType, addToClasspath) =>
      flist.foreach { file =>
        val (_, localizedPath) = distribute(file, resType = resType)
        // If addToClassPath, we ignore adding jar multiple times to distributed cache.
        if (addToClasspath) {
          if (localizedPath != null) {
            cachedSecondaryJarLinks += localizedPath
          }
        } else {
          if (localizedPath == null) {
            throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" +
              " to the distributed cache.")
          }
        }
      }
    }
    if (cachedSecondaryJarLinks.nonEmpty) {
      sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks.toSeq)
    }

    if (isClusterMode && args.primaryPyFile != null) {
      distribute(args.primaryPyFile, appMasterOnly = true)
    }

    pySparkArchives.foreach { f =>
      val uri = Utils.resolveURI(f)
      if (uri.getScheme != Utils.LOCAL_SCHEME) {
        distribute(f)
      }
    }

    // The python files list needs to be treated especially. All files that are not an
    // archive need to be placed in a subdirectory that will be added to PYTHONPATH.
    sparkConf.get(PY_FILES).foreach { f =>
      val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
      distribute(f, targetDir = targetDir)
    }

    // Update the configuration with all the distributed files, minus the conf archive. The
    // conf archive will be handled by the AM differently so that we avoid having to send
    // this configuration by other means. See SPARK-14602 for one reason of why this is needed.
    distCacheMgr.updateConfiguration(cachedResourcesConf)

    // Upload the conf archive to HDFS manually, and record its location in the configuration.
    // This will allow the AM to know where the conf archive is in HDFS, so that it can be
    // distributed to the containers.
    //
    // This code forces the archive to be copied, so that unit tests pass (since in that case both
    // file systems are the same and the archive wouldn't normally be copied). In most (all?)
    // deployments, the archive would be copied anyway, since it's a temp file in the local file
    // system.
    val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
    val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
    cachedResourcesConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())

    val confsToOverride = Map.empty[String, String]
    // If propagating the keytab to the AM, override the keytab name with the name of the
    // distributed file.
    amKeytabFileName.foreach { kt => confsToOverride.put(KEYTAB.key, kt) }

    // If propagating the ivySettings file to the distributed cache, override the ivySettings
    // file name with the name of the distributed file.
    ivySettingsLocalizedPath.foreach { path =>
      confsToOverride.put("spark.jars.ivySettings", path)
    }

    val localConfArchive = new Path(createConfArchive(confsToOverride).toURI())
    copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true,
      destName = Some(LOCALIZED_CONF_ARCHIVE))

    // Manually add the config archive to the cache manager so that the AM is launched with
    // the proper files set up.
    distCacheMgr.addResource(
      remoteFs, hadoopConf, remoteConfArchivePath, localResources, LocalResourceType.ARCHIVE,
      LOCALIZED_CONF_DIR, statCache, appMasterOnly = false)

    localResources
  }

  /**
   * Create an archive with the config files for distribution.
   *
   * These will be used by AM and executors. The files are zipped and added to the job as an
   * archive, so that YARN will explode it when distributing to AM and executors. This directory
   * is then added to the classpath of AM and executor process, just to make sure that everybody
   * is using the same default config.
   *
   * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
   * shows up in the classpath before YARN_CONF_DIR.
   *
   * Currently this makes a shallow copy of the conf directory. If there are cases where a
   * Hadoop config directory contains subdirectories, this code will have to be fixed.
   *
   * The archive also contains some Spark configuration. Namely, it saves the contents of
   * SparkConf in a file to be loaded by the AM process.
   *
   * @param confsToOverride configs that should overriden when creating the final spark conf file
   */
  private def createConfArchive(confsToOverride: Map[String, String]): File = {
    val hadoopConfFiles = new HashMap[String, File]()

    // SPARK_CONF_DIR shows up in the classpath before HADOOP_CONF_DIR/YARN_CONF_DIR
    sys.env.get("SPARK_CONF_DIR").foreach { localConfDir =>
      val dir = new File(localConfDir)
      if (dir.isDirectory) {
        val files = dir.listFiles(new FileFilter {
          override def accept(pathname: File): Boolean = {
            pathname.isFile && pathname.getName.endsWith(".xml")
          }
        })
        files.foreach { f => hadoopConfFiles(f.getName) = f }
      }
    }

    // SPARK-23630: during testing, Spark scripts filter out hadoop conf dirs so that user's
    // environments do not interfere with tests. This allows a special env variable during
    // tests so that custom conf dirs can be used by unit tests.
    val confDirs = Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR") ++
      (if (Utils.isTesting) Seq("SPARK_TEST_HADOOP_CONF_DIR") else Nil)

    confDirs.foreach { envKey =>
      sys.env.get(envKey).foreach { path =>
        val dir = new File(path)
        if (dir.isDirectory()) {
          val files = dir.listFiles()
          if (files == null) {
            logWarning("Failed to list files under directory " + dir)
          } else {
            files.foreach { file =>
              if (file.isFile && !hadoopConfFiles.contains(file.getName())) {
                hadoopConfFiles(file.getName()) = file
              }
            }
          }
        }
      }
    }

    val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip",
      new File(Utils.getLocalDir(sparkConf)))
    val confStream = new ZipOutputStream(new FileOutputStream(confArchive))

    logDebug(s"Creating an archive with the config files for distribution at $confArchive.")
    try {
      confStream.setLevel(0)

      // Upload $SPARK_CONF_DIR/log4j2 configuration file to the distributed cache to make sure that
      // the executors will use the latest configurations instead of the default values. This is
      // required when user changes log4j2 configuration directly to set the log configurations. If
      // configuration file is provided through --files then executors will be taking configurations
      // from --files instead of $SPARK_CONF_DIR/log4j2 configuration file.

      // Also upload metrics.properties to distributed cache if exists in classpath.
      // If user specify this file using --files then executors will use the one
      // from --files instead.
      val log4j2ConfigFiles = Seq("log4j2.yaml", "log4j2.yml", "log4j2.json", "log4j2.jsn",
        "log4j2.xml", "log4j2.properties")
      for { prop <- log4j2ConfigFiles ++ Seq("metrics.properties")
            url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
            if url.getProtocol == "file" } {
        val file = new File(url.getPath())
        confStream.putNextEntry(new ZipEntry(file.getName()))
        Files.copy(file.toPath, confStream)
        confStream.closeEntry()
      }

      // Save the Hadoop config files under a separate directory in the archive. This directory
      // is appended to the classpath so that the cluster-provided configuration takes precedence.
      confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/"))
      confStream.closeEntry()
      hadoopConfFiles.foreach { case (name, file) =>
        if (file.canRead()) {
          confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name"))
          Files.copy(file.toPath, confStream)
          confStream.closeEntry()
        }
      }

      // Save the YARN configuration into a separate file that will be overlayed on top of the
      // cluster's Hadoop conf.
      confStream.putNextEntry(new ZipEntry(SparkHadoopUtil.SPARK_HADOOP_CONF_FILE))
      hadoopConf.writeXml(confStream)
      confStream.closeEntry()

      // Save Spark configuration to a file in the archive.
      val props = confToProperties(sparkConf)
      confsToOverride.foreach { case (k, v) => props.setProperty(k, v)}
      writePropertiesToArchive(props, SPARK_CONF_FILE, confStream)

      // Write the distributed cache config to the archive.
      writePropertiesToArchive(confToProperties(cachedResourcesConf), DIST_CACHE_CONF_FILE,
        confStream)
    } finally {
      confStream.close()
    }
    confArchive
  }

  /**
   * Set up the environment for launching our ApplicationMaster container.
   */
  private def setupLaunchEnv(
      stagingDirPath: Path,
      pySparkArchives: Seq[String]): HashMap[String, String] = {
    logInfo("Setting up the launch environment for our AM container")
    val env = new HashMap[String, String]()
    populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
    env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
    env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
    env("SPARK_PREFER_IPV6") = Utils.preferIPv6.toString

    // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
    val amEnvPrefix = "spark.yarn.appMasterEnv."
    sparkConf.getAll
      .filter { case (k, v) => k.startsWith(amEnvPrefix) }
      .map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
      .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }

    // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH
    // of the container processes too. Add all non-.py files directly to PYTHONPATH.
    //
    // NOTE: the code currently does not handle .py files defined with a "local:" scheme.
    val pythonPath = new ListBuffer[String]()
    val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py"))
    if (pyFiles.nonEmpty) {
      pythonPath += buildPath(Environment.PWD.$$(), LOCALIZED_PYTHON_DIR)
    }
    (pySparkArchives ++ pyArchives).foreach { path =>
      val uri = Utils.resolveURI(path)
      if (uri.getScheme != Utils.LOCAL_SCHEME) {
        pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName())
      } else {
        pythonPath += uri.getPath()
      }
    }

    // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors.
    if (pythonPath.nonEmpty) {
      val pythonPathList = (sys.env.get("PYTHONPATH") ++ pythonPath)
      env("PYTHONPATH") = (env.get("PYTHONPATH") ++ pythonPathList)
        .mkString(ApplicationConstants.CLASS_PATH_SEPARATOR)
      val pythonPathExecutorEnv = (sparkConf.getExecutorEnv.toMap.get("PYTHONPATH") ++
        pythonPathList).mkString(ApplicationConstants.CLASS_PATH_SEPARATOR)
      sparkConf.setExecutorEnv("PYTHONPATH", pythonPathExecutorEnv)
    }

    if (isClusterMode) {
      // propagate PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON to driver in cluster mode
      Seq("PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON").foreach { envname =>
        if (!env.contains(envname)) {
          sys.env.get(envname).foreach(env(envname) = _)
        }
      }
      sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _))
    }

    Seq(ENV_DIST_CLASSPATH, SPARK_TESTING).foreach { envVar =>
      sys.env.get(envVar).foreach(value => env(envVar) = value)
    }

    env
  }

  /**
   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
   * This sets up the launch environment, java options, and the command for launching the AM.
   */
  private def createContainerLaunchContext(): ContainerLaunchContext = {
    logInfo("Setting up container launch context for our AM")
    val pySparkArchives =
      if (sparkConf.get(IS_PYTHON_APP)) {
        findPySparkArchives()
      } else {
        Nil
      }

    val launchEnv = setupLaunchEnv(stagingDirPath, pySparkArchives)
    val localResources = prepareLocalResources(stagingDirPath, pySparkArchives)

    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
    amContainer.setLocalResources(localResources.asJava)
    amContainer.setEnvironment(launchEnv.asJava)

    val javaOpts = ListBuffer[String]()

    javaOpts += s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}"

    // SPARK-37106: To start AM with Java 17, `JavaModuleOptions.defaultModuleOptions`
    // is added by default. It will not affect Java 8 and Java 11 due to existence of
    // `-XX:+IgnoreUnrecognizedVMOptions`.
    javaOpts += JavaModuleOptions.defaultModuleOptions()

    // Set the environment variable through a command prefix
    // to append to the existing value of the variable
    var prefixEnv: Option[String] = None

    // Add Xmx for AM memory
    javaOpts += "-Xmx" + amMemory + "m"

    val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
    javaOpts += "-Djava.io.tmpdir=" + tmpDir

    // TODO: Remove once cpuset version is pushed out.
    // The context is, default gc for server class machines ends up using all cores to do gc -
    // hence if there are multiple containers in same node, Spark GC affects all other containers'
    // performance (which can be that of other Spark containers)
    // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
    // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
    // of cores on a node.
    val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
    if (useConcurrentAndIncrementalGC) {
      // In our expts, using (default) throughput collector has severe perf ramifications in
      // multi-tenant machines
      javaOpts += "-XX:+UseConcMarkSweepGC"
      javaOpts += "-XX:MaxTenuringThreshold=31"
      javaOpts += "-XX:SurvivorRatio=8"
      javaOpts += "-XX:+CMSIncrementalMode"
      javaOpts += "-XX:+CMSIncrementalPacing"
      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
    }

    // Include driver-specific java options if we are launching a driver
    if (isClusterMode) {
      sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts =>
        javaOpts ++= Utils.splitCommandString(opts)
          .map(Utils.substituteAppId(_, this.appId.toString))
          .map(YarnSparkHadoopUtil.escapeForShell)
      }
      val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
        sys.props.get("spark.driver.libraryPath")).flatten
      if (libraryPaths.nonEmpty) {
        prefixEnv = Some(createLibraryPathPrefix(libraryPaths.mkString(File.pathSeparator),
          sparkConf))
      }
      if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
        logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode")
      }
    } else {
      // Validate and include yarn am specific java options in yarn-client mode.
      sparkConf.get(AM_JAVA_OPTIONS).foreach { opts =>
        if (opts.contains("-Dspark")) {
          val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to set Spark options (was '$opts')."
          throw new SparkException(msg)
        }
        if (opts.contains("-Xmx")) {
          val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to specify max heap memory settings " +
            s"(was '$opts'). Use spark.yarn.am.memory instead."
          throw new SparkException(msg)
        }
        javaOpts ++= Utils.splitCommandString(opts)
          .map(Utils.substituteAppId(_, this.appId.toString))
          .map(YarnSparkHadoopUtil.escapeForShell)
      }
      sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
        prefixEnv = Some(createLibraryPathPrefix(paths, sparkConf))
      }
    }

    // For log4j2 configuration to reference
    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)

    val userClass =
      if (isClusterMode) {
        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
      } else {
        Nil
      }
    val userJar =
      if (args.userJar != null) {
        Seq("--jar", args.userJar)
      } else {
        Nil
      }
    val primaryPyFile =
      if (isClusterMode && args.primaryPyFile != null) {
        Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
      } else {
        Nil
      }
    val primaryRFile =
      if (args.primaryRFile != null) {
        Seq("--primary-r-file", args.primaryRFile)
      } else {
        Nil
      }
    val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }
    if (args.primaryRFile != null &&
        (args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) {
      args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
    }
    val userArgs = args.userArgs.flatMap { arg =>
      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
    }
    val amArgs =
      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
      Seq("--properties-file",
        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
      Seq("--dist-cache-conf",
        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))

    // Command for the ApplicationMaster
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    amContainer.setCommands(printableCommands.asJava)

    logDebug("===============================================================================")
    logDebug("YARN AM launch context:")
    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
    logDebug("    env:")
    if (log.isDebugEnabled) {
      Utils.redact(sparkConf, launchEnv.toSeq).foreach { case (k, v) =>
        logDebug(s"        $k -> $v")
      }
    }
    logDebug("    resources:")
    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
    logDebug("    command:")
    logDebug(s"        ${printableCommands.mkString(" ")}")
    logDebug("===============================================================================")

    // send the acl settings into YARN to control who has access via YARN interfaces
    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
    setupSecurityToken(amContainer)
    setTokenConf(amContainer)
    amContainer
  }

  /**
   * Report the state of an application until it has exited, either successfully or
   * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
   * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
   * or KILLED).
   *
   * @param appId ID of the application to monitor.
   * @param returnOnRunning Whether to also return the application state when it is RUNNING.
   * @param logApplicationReport Whether to log details of the application report every iteration.
   * @param interval How often to poll the YARN RM for application status (in ms).
   * @return A pair of the yarn application state and the final application state.
   */
  def monitorApplication(
      returnOnRunning: Boolean = false,
      logApplicationReport: Boolean = true,
      interval: Long = sparkConf.get(REPORT_INTERVAL)): YarnAppReport = {
    var lastState: YarnApplicationState = null
    while (true) {
      Thread.sleep(interval)
      val report: ApplicationReport =
        try {
          getApplicationReport
        } catch {
          case e: ApplicationNotFoundException =>
            logError(s"Application $appId not found.")
            cleanupStagingDir()
            return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None)
          case NonFatal(e) if !e.isInstanceOf[InterruptedIOException] =>
            val msg = s"Failed to contact YARN for application $appId."
            logError(msg, e)
            // Don't necessarily clean up staging dir because status is unknown
            return YarnAppReport(YarnApplicationState.FAILED, FinalApplicationStatus.FAILED,
              Some(msg))
        }
      val state = report.getYarnApplicationState

      if (logApplicationReport) {
        logInfo(s"Application report for $appId (state: $state)")

        // If DEBUG is enabled, log report details every iteration
        // Otherwise, log them every time the application changes state
        if (log.isDebugEnabled) {
          logDebug(formatReportDetails(report, getDriverLogsLink(report)))
        } else if (lastState != state) {
          logInfo(formatReportDetails(report, getDriverLogsLink(report)))
        }
      }

      if (lastState != state) {
        state match {
          case YarnApplicationState.RUNNING =>
            reportLauncherState(SparkAppHandle.State.RUNNING)
          case YarnApplicationState.FINISHED =>
            report.getFinalApplicationStatus match {
              case FinalApplicationStatus.FAILED =>
                reportLauncherState(SparkAppHandle.State.FAILED)
              case FinalApplicationStatus.KILLED =>
                reportLauncherState(SparkAppHandle.State.KILLED)
              case _ =>
                reportLauncherState(SparkAppHandle.State.FINISHED)
            }
          case YarnApplicationState.FAILED =>
            reportLauncherState(SparkAppHandle.State.FAILED)
          case YarnApplicationState.KILLED =>
            reportLauncherState(SparkAppHandle.State.KILLED)
          case _ =>
        }
      }

      if (state == YarnApplicationState.FINISHED ||
          state == YarnApplicationState.FAILED ||
          state == YarnApplicationState.KILLED) {
        cleanupStagingDir()
        return createAppReport(report)
      }

      if (returnOnRunning && state == YarnApplicationState.RUNNING) {
        return createAppReport(report)
      }
      if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled &&
          appMaster == null && report.getAMRMToken != null) {
        appMaster = startApplicationMasterService(report)
      }
      lastState = state
    }

    // Never reached, but keeps compiler happy
    throw new SparkException("While loop is depleted! This should never happen...")
  }

  private def startApplicationMasterService(report: ApplicationReport): ApplicationMaster = {
    // Add AMRMToken to establish connection between RM and AM
    val token = report.getAMRMToken
    val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
      new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](
        token.getIdentifier().array(), token.getPassword().array,
        new Text(token.getKind()), new Text(token.getService()))
    val currentUGI = UserGroupInformation.getCurrentUser
    currentUGI.addToken(amRMToken)

    // Start Application Service in a separate thread and continue with application monitoring
    val appMaster = new ApplicationMaster(
      new ApplicationMasterArguments(Array.empty), sparkConf, hadoopConf)
    val amService = new Thread("Unmanaged Application Master Service") {
      override def run(): Unit = {
        appMaster.runUnmanaged(rpcEnv, report.getCurrentApplicationAttemptId,
          stagingDirPath, cachedResourcesConf)
      }
    }
    amService.setDaemon(true)
    amService.start()
    appMaster
  }

  /**
   * Format an application report and optionally, links to driver logs, in a human-friendly manner.
   *
   * @param report The application report from YARN.
   * @param driverLogsLinks A map of driver log files and their links. Keys are the file names
   *                        (e.g. `stdout`), and values are the links. If empty, nothing will be
   *                        printed.
   * @return Human-readable version of the input data.
   */
  private def formatReportDetails(report: ApplicationReport,
    driverLogsLinks: IMap[String, String]): String = {
    val details = Seq[(String, String)](
      ("client token", getClientToken(report)),
      ("diagnostics", report.getDiagnostics),
      ("ApplicationMaster host", report.getHost),
      ("ApplicationMaster RPC port", report.getRpcPort.toString),
      ("queue", report.getQueue),
      ("start time", report.getStartTime.toString),
      ("final status", report.getFinalApplicationStatus.toString),
      ("tracking URL", report.getTrackingUrl),
      ("user", report.getUser)
    ) ++ driverLogsLinks.map { case (fname, link) => (s"Driver Logs ($fname)", link) }

    // Use more loggable format if value is null or empty
    details.map { case (k, v) =>
      val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
      s"\n\t $k: $newValue"
    }.mkString("")
  }

  /**
   * Fetch links to the logs of the driver for the given application report. This requires
   * query the ResourceManager via RPC. Returns an empty map if the links could not be fetched.
   * If this feature is disabled via [[CLIENT_INCLUDE_DRIVER_LOGS_LINK]], or if the application
   * report indicates that the driver container isn't currently running, an empty map is
   * returned immediately.
   */
  private def getDriverLogsLink(appReport: ApplicationReport): IMap[String, String] = {
    if (!sparkConf.get(CLIENT_INCLUDE_DRIVER_LOGS_LINK)
      || appReport.getYarnApplicationState != YarnApplicationState.RUNNING) {
      return IMap.empty
    }
    try {
      Option(appReport.getCurrentApplicationAttemptId)
        .flatMap(attemptId => Option(yarnClient.getApplicationAttemptReport(attemptId)))
        .flatMap(attemptReport => Option(attemptReport.getAMContainerId))
        .flatMap(amContainerId => Option(yarnClient.getContainerReport(amContainerId)))
        .flatMap(containerReport => Option(containerReport.getLogUrl))
        .map(YarnContainerInfoHelper.getLogUrlsFromBaseUrl)
        .getOrElse(IMap.empty)
    } catch {
      case e: Exception =>
        logWarning(s"Unable to get driver log links for $appId: $e")
        // Include the full stack trace only at DEBUG level to reduce verbosity
        logDebug(s"Unable to get driver log links for $appId", e)
        IMap.empty
    }
  }

  /**
   * Submit an application to the ResourceManager.
   * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
   * reporting the application's status until the application has exited for any reason.
   * Otherwise, the client process will exit after submission.
   * If the application finishes with a failed, killed, or undefined status,
   * throw an appropriate SparkException.
   */
  def run(): Unit = {
    submitApplication()
    if (!launcherBackend.isConnected() && fireAndForget) {
      val report = getApplicationReport
      val state = report.getYarnApplicationState
      logInfo(s"Application report for $appId (state: $state)")
      logInfo(formatReportDetails(report, getDriverLogsLink(report)))
      if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
        throw new SparkException(s"Application $appId finished with status: $state")
      }
    } else {
      val YarnAppReport(appState, finalState, diags) = monitorApplication()
      if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
        diags.foreach { err =>
          logError(s"Application diagnostics message: $err")
        }
        throw new SparkException(s"Application $appId finished with failed status")
      }
      if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
        throw new SparkException(s"Application $appId is killed")
      }
      if (finalState == FinalApplicationStatus.UNDEFINED) {
        throw new SparkException(s"The final status of application $appId is undefined")
      }
    }
  }

  private def findPySparkArchives(): Seq[String] = {
    sys.env.get("PYSPARK_ARCHIVES_PATH")
      .map(_.split(",").toSeq)
      .getOrElse {
        val pyLibPath = Seq(sys.env("SPARK_HOME"), "python", "lib").mkString(File.separator)
        val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
        require(pyArchivesFile.exists(),
          s"$pyArchivesFile not found; cannot run pyspark application in YARN mode.")
        val py4jFile = new File(pyLibPath, PythonUtils.PY4J_ZIP_NAME)
        require(py4jFile.exists(),
          s"$py4jFile not found; cannot run pyspark application in YARN mode.")
        Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
      }
  }

}

private[spark] object Client extends Logging {

  // Alias for the user jar
  val APP_JAR_NAME: String = "__app__.jar"

  // Staging directory for any temporary jars or files
  val SPARK_STAGING: String = ".sparkStaging"


  // Staging directory is private! -> rwx--------
  val STAGING_DIR_PERMISSION: FsPermission =
    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)

  // App files are world-wide readable and owner writable -> rw-r--r--
  val APP_FILE_PERMISSION: FsPermission =
    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)

  // Distribution-defined classpath to add to processes
  val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"

  // Subdirectory where the user's Spark and Hadoop config files will be placed.
  val LOCALIZED_CONF_DIR = "__spark_conf__"

  // Subdirectory in the conf directory containing Hadoop config files.
  val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"

  // File containing the conf archive in the AM. See prepareLocalResources().
  val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"

  // Name of the file in the conf archive containing Spark configuration.
  val SPARK_CONF_FILE = "__spark_conf__.properties"

  // Name of the file in the conf archive containing the distributed cache info.
  val DIST_CACHE_CONF_FILE = "__spark_dist_cache__.properties"

  // Subdirectory where the user's python files (not archives) will be placed.
  val LOCALIZED_PYTHON_DIR = "__pyfiles__"

  // Subdirectory where Spark libraries will be placed.
  val LOCALIZED_LIB_DIR = "__spark_libs__"

  val SPARK_TESTING = "SPARK_TESTING"

  /**
   * Return the path to the given application's staging directory.
   */
  private def getAppStagingDir(appId: ApplicationId): String = {
    buildPath(SPARK_STAGING, appId.toString())
  }

  /**
   * Populate the classpath entry in the given environment map with any application
   * classpath specified through the Hadoop and Yarn configurations.
   */
  private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String])
    : Unit = {
    val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
    classPathElementsToAdd.foreach { c =>
      YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
    }
  }

  private def getYarnAppClasspath(conf: Configuration): Seq[String] =
    Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
      case Some(s) => s.toSeq
      case None => getDefaultYarnApplicationClasspath
    }

  private def getMRAppClasspath(conf: Configuration): Seq[String] =
    Option(conf.getStrings("mapreduce.application.classpath")) match {
      case Some(s) => s.toSeq
      case None => getDefaultMRApplicationClasspath
    }

  private[yarn] def getDefaultYarnApplicationClasspath: Seq[String] =
    YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq

  private[yarn] def getDefaultMRApplicationClasspath: Seq[String] =
    StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH).toSeq

  /**
   * Populate the classpath entry in the given environment map.
   *
   * User jars are generally not added to the JVM's system classpath; those are handled by the AM
   * and executor backend. When the deprecated `spark.yarn.user.classpath.first` is used, user jars
   * are included in the system classpath, though. The extra class path and other uploaded files are
   * always made available through the system class path.
   *
   * @param args Client arguments (when starting the AM) or null (when starting executors).
   */
  private[yarn] def populateClasspath(
      args: ClientArguments,
      conf: Configuration,
      sparkConf: SparkConf,
      env: HashMap[String, String],
      extraClassPath: Option[String] = None): Unit = {
    extraClassPath.foreach { cp =>
      addClasspathEntry(getClusterPath(sparkConf, cp), env)
    }

    val cpSet = extraClassPath match {
      case Some(classPath) if Utils.isTesting => classPath.split(File.pathSeparator).toSet
      case _ => Set.empty[String]
    }

    addClasspathEntry(Environment.PWD.$$(), env)

    addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env)

    if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
      // in order to properly add the app jar when user classpath is first
      // we have to do the mainJar separate in order to send the right thing
      // into addFileToClasspath
      val mainJar =
        if (args != null) {
          getMainJarUri(Option(args.userJar))
        } else {
          getMainJarUri(sparkConf.get(APP_JAR))
        }
      mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR_NAME, env))

      val secondaryJars =
        if (args != null) {
          getSecondaryJarUris(Option(sparkConf.get(JARS_TO_DISTRIBUTE)))
        } else {
          getSecondaryJarUris(sparkConf.get(SECONDARY_JARS))
        }
      secondaryJars.foreach { x =>
        addFileToClasspath(sparkConf, conf, x, null, env)
      }
    }

    // Add the Spark jars to the classpath, depending on how they were distributed.
    addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env)
    if (sparkConf.get(SPARK_ARCHIVE).isEmpty) {
      sparkConf.get(SPARK_JARS).foreach { jars =>
        jars.filter(Utils.isLocalUri).foreach { jar =>
          val uri = new URI(jar)
          addClasspathEntry(getClusterPath(sparkConf, uri.getPath()), env)
        }
      }
    }

    if (sparkConf.get(POPULATE_HADOOP_CLASSPATH)) {
      populateHadoopClasspath(conf, env)
    }

    sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
      // SPARK-40635: during the test, add a jar de-duplication process to avoid
      // that the startup command can't be executed due to the too long classpath.
      val newCp = if (Utils.isTesting) {
        cp.split(File.pathSeparator)
          .filterNot(cpSet.contains).mkString(File.pathSeparator)
      } else cp
      addClasspathEntry(getClusterPath(sparkConf, newCp), env)
    }

    // Add the localized Hadoop config at the end of the classpath, in case it contains other
    // files (such as configuration files for different services) that are not part of the
    // YARN cluster's config.
    addClasspathEntry(
      buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, LOCALIZED_HADOOP_CONF_DIR), env)
  }

  /**
   * Returns a list of URIs representing the user classpath.
   *
   * @param conf Spark configuration.
   */
  def getUserClasspath(conf: SparkConf): Array[URI] = {
    val mainUri = getMainJarUri(conf.get(APP_JAR))
    val secondaryUris = getSecondaryJarUris(conf.get(SECONDARY_JARS))
    (mainUri ++ secondaryUris).toArray
  }

  /**
   * Returns a list of local, absolute file URLs representing the user classpath. Note that this
   * must be executed on the same host which will access the URLs, as it will resolve relative
   * paths based on the current working directory, as well as environment variables.
   * See SPARK-35672 for discussion of why it is necessary to do environment variable substitution.
   *
   * @param conf Spark configuration.
   * @param useClusterPath Whether to use the 'cluster' path when resolving paths with the
   *                       `local` scheme. This should be used when running on the cluster, but
   *                       not when running on the gateway (i.e. for the driver in `client` mode).
   * @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]].
   */
  def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = {
    Client.getUserClasspath(conf).map { uri =>
      val inputPath = uri.getPath
      val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) {
        Client.getClusterPath(conf, inputPath)
      } else {
        // Any other URI schemes should have been resolved by this point
        assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString),
          "getUserClasspath should only return 'file' or 'local' URIs but found: " + uri)
        inputPath
      }
      val envVarResolvedFilePath = YarnSparkHadoopUtil.replaceEnvVars(replacedFilePath, sys.env)
      Paths.get(envVarResolvedFilePath).toAbsolutePath.toUri.toURL
    }
  }

  private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
    mainJar.flatMap { path =>
      val uri = Utils.resolveURI(path)
      if (uri.getScheme == Utils.LOCAL_SCHEME) Some(uri) else None
    }.orElse(Some(new URI(APP_JAR_NAME)))
  }

  private def getSecondaryJarUris(secondaryJars: Option[Seq[String]]): Seq[URI] = {
    secondaryJars.getOrElse(Nil).map(new URI(_))
  }

  /**
   * Adds the given path to the classpath, handling "local:" URIs correctly.
   *
   * If an alternate name for the file is given, and it's not a "local:" file, the alternate
   * name will be added to the classpath (relative to the job's work directory).
   *
   * If not a "local:" file and no alternate name, the linkName will be added to the classpath.
   *
   * @param conf        Spark configuration.
   * @param hadoopConf  Hadoop configuration.
   * @param uri         URI to add to classpath (optional).
   * @param fileName    Alternate name for the file (optional).
   * @param env         Map holding the environment variables.
   */
  private def addFileToClasspath(
      conf: SparkConf,
      hadoopConf: Configuration,
      uri: URI,
      fileName: String,
      env: HashMap[String, String]): Unit = {
    if (uri != null && uri.getScheme == Utils.LOCAL_SCHEME) {
      addClasspathEntry(getClusterPath(conf, uri.getPath), env)
    } else if (fileName != null) {
      addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env)
    } else if (uri != null) {
      val localPath = getQualifiedLocalPath(uri, hadoopConf)
      val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
      addClasspathEntry(buildPath(Environment.PWD.$$(), linkName), env)
    }
  }

  /**
   * Add the given path to the classpath entry of the given environment map.
   * If the classpath is already set, this appends the new path to the existing classpath.
   */
  private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit =
    YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)

  /**
   * Returns the path to be sent to the NM for a path that is valid on the gateway.
   *
   * This method uses two configuration values:
   *
   *  - spark.yarn.config.gatewayPath: a string that identifies a portion of the input path that may
   *    only be valid in the gateway node.
   *  - spark.yarn.config.replacementPath: a string with which to replace the gateway path. This may
   *    contain, for example, env variable references, which will be expanded by the NMs when
   *    starting containers.
   *
   * If either config is not available, the input path is returned.
   */
  def getClusterPath(conf: SparkConf, path: String): String = {
    val localPath = conf.get(GATEWAY_ROOT_PATH)
    val clusterPath = conf.get(REPLACEMENT_ROOT_PATH)
    if (localPath != null && clusterPath != null) {
      path.replace(localPath, clusterPath)
    } else {
      path
    }
  }

  /**
   * Return whether two URI represent file system are the same
   */
  private[spark] def compareUri(srcUri: URI, dstUri: URI): Boolean = {

    if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) {
      return false
    }

    val srcAuthority = srcUri.getAuthority()
    val dstAuthority = dstUri.getAuthority()
    if (srcAuthority != null && !srcAuthority.equalsIgnoreCase(dstAuthority)) {
      return false
    }

    var srcHost = srcUri.getHost()
    var dstHost = dstUri.getHost()

    // In HA or when using viewfs, the host part of the URI may not actually be a host, but the
    // name of the HDFS namespace. Those names won't resolve, so avoid even trying if they
    // match.
    if (srcHost != null && dstHost != null && srcHost != dstHost) {
      try {
        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
      } catch {
        case e: UnknownHostException =>
          return false
      }
    }

    Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()

  }

  /**
   * Return whether the two file systems are the same.
   */
  protected def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
    val srcUri = srcFs.getUri()
    val dstUri = destFs.getUri()

    compareUri(srcUri, dstUri)
  }

  /**
   * Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
   * This is used for preparing local resources to be included in the container launch context.
   */
  private def getQualifiedLocalPath(localURI: URI, hadoopConf: Configuration): Path = {
    val qualifiedURI =
      if (localURI.getScheme == null) {
        // If not specified, assume this is in the local filesystem to keep the behavior
        // consistent with that of Hadoop
        new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString)
      } else {
        localURI
      }
    new Path(qualifiedURI)
  }

  /**
   * Whether to consider jars provided by the user to have precedence over the Spark jars when
   * loading user classes.
   */
  def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = {
    if (isDriver) {
      conf.get(DRIVER_USER_CLASS_PATH_FIRST)
    } else {
      conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
    }
  }

  /**
   * Joins all the path components using Path.SEPARATOR.
   */
  def buildPath(components: String*): String = {
    components.mkString(Path.SEPARATOR)
  }

  def createAppReport(report: ApplicationReport): YarnAppReport = {
    val diags = report.getDiagnostics()
    val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None
    YarnAppReport(report.getYarnApplicationState(), report.getFinalApplicationStatus(), diagsOpt)
  }

  /**
   * Create a properly quoted and escaped library path string to be added as a prefix to the command
   * executed by YARN. This is different from normal quoting / escaping due to YARN executing the
   * command through "bash -c".
   */
  def createLibraryPathPrefix(libpath: String, conf: SparkConf): String = {
    val cmdPrefix = if (Utils.isWindows) {
      Utils.libraryPathEnvPrefix(Seq(libpath))
    } else {
      val envName = Utils.libraryPathEnvName
      // For quotes, escape both the quote and the escape character when encoding in the command
      // string.
      val quoted = libpath.replace("\"", "\\\\\\\"")
      envName + "=\\\"" + quoted + File.pathSeparator + "$" + envName + "\\\""
    }
    getClusterPath(conf, cmdPrefix)
  }

  def confToProperties(conf: SparkConf): Properties = {
    val props = new Properties()
    conf.getAll.foreach { case (k, v) =>
      props.setProperty(k, v)
    }
    props
  }

  def writePropertiesToArchive(props: Properties, name: String, out: ZipOutputStream): Unit = {
    out.putNextEntry(new ZipEntry(name))
    val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)
    props.store(writer, "Spark configuration.")
    writer.flush()
    out.closeEntry()
  }
}

private[spark] class YarnClusterApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove(JARS)
    conf.remove(FILES)
    conf.remove(ARCHIVES)

    new Client(new ClientArguments(args), conf, null).run()
  }

}

private[spark] case class YarnAppReport(
    appState: YarnApplicationState,
    finalState: FinalApplicationStatus,
    diagnostics: Option[String])

相关信息

spark 源码目录

相关文章

spark ApplicationMaster 源码

spark ApplicationMasterArguments 源码

spark ApplicationMasterSource 源码

spark ClientArguments 源码

spark ClientDistributedCacheManager 源码

spark ExecutorRunnable 源码

spark LocalityPreferredContainerPlacementStrategy 源码

spark ResourceRequestHelper 源码

spark SparkRackResolver 源码

spark YarnAllocator 源码

0  赞