spark YarnSparkHadoopUtil 源码

  • 2022-10-20
  • 浏览 (221)

spark YarnSparkHadoopUtil 代码

文件路径:/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.deploy.yarn

import java.util.regex.{Matcher, Pattern}

import scala.collection.immutable.{Map => IMap}
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.util.matching.Regex

import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.resource.ExecutorResourceRequest
import org.apache.spark.util.Utils

object YarnSparkHadoopUtil {

  // Additional memory overhead for application masters in client mode.
  // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering
  // the common cases. Memory overhead tends to grow with container size.
  val AM_MEMORY_OVERHEAD_FACTOR = 0.10

  val ANY_HOST = "*"

  // All RM requests are issued with same priority : we do not (yet) have any distinction between
  // request types (like map/reduce in hadoop for example)
  val RM_REQUEST_PRIORITY = Priority.newInstance(1)

  /**
   * Add a path variable to the given environment map.
   * If the map already contains this key, append the value to the existing value instead.
   */
  def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
    val newValue =
      if (env.contains(key)) {
        env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR  + value
      } else {
        value
      }
    env.put(key, newValue)
  }

  /**
   * Set zero or more environment variables specified by the given input string.
   * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
   */
  def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = {
    if (inputString != null && inputString.length() > 0) {
      val childEnvs = inputString.split(",")
      val p = Pattern.compile(environmentVariableRegex)
      for (cEnv <- childEnvs) {
        val parts = cEnv.split("=") // split on '='
        val m = p.matcher(parts(1))
        val sb = new StringBuffer
        while (m.find()) {
          val variable = m.group(1)
          var replace = ""
          if (env.contains(variable)) {
            replace = env(variable)
          } else {
            // if this key is not configured for the child .. get it from the env
            replace = System.getenv(variable)
            if (replace == null) {
            // the env key is note present anywhere .. simply set it
              replace = ""
            }
          }
          m.appendReplacement(sb, Matcher.quoteReplacement(replace))
        }
        m.appendTail(sb)
        // This treats the environment variable as path variable delimited by `File.pathSeparator`
        // This is kept for backward compatibility and consistency with Hadoop's behavior
        addPathToEnvironment(env, parts(0), sb.toString)
      }
    }
  }

  /**
   * Regex pattern to match the name of an environment variable. Note that Unix variable naming
   * conventions (alphanumeric plus underscore, case-sensitive, can't start with a digit)
   * are used for both Unix and Windows, following the convention of Hadoop's `Shell` class
   * (see specifically [[org.apache.hadoop.util.Shell.getEnvironmentVariableRegex]]).
   */
  private val envVarNameRegex: String = "[A-Za-z_][A-Za-z0-9_]*"

  /**
   * Note that this regex only supports the `$VAR_NAME` and `%VAR_NAME%` syntax, for Unix and
   * Windows respectively, and does not perform any handling of escapes. The Unix `${VAR_NAME}`
   * syntax is not supported.
   */
  private val environmentVariableRegex: String = {
    if (Utils.isWindows) {
      s"%($envVarNameRegex)%"
    } else {
      s"\\$$($envVarNameRegex)"
    }
  }

  // scalastyle:off line.size.limit
  /**
   * Replace environment variables in a string according to the same rules as
   * [[org.apache.hadoop.yarn.api.ApplicationConstants.Environment]]:
   * `$VAR_NAME` for Unix, `%VAR_NAME%` for Windows, and `{{VAR_NAME}}` for all OS.
   * The `${VAR_NAME}` syntax is also supported for Unix.
   * This support escapes for `$` and `\` (on Unix) and `%` and `^` characters (on Windows), e.g.
   * `\$FOO`, `^%FOO^%`, and `%%FOO%%` will be resolved to `$FOO`, `%FOO%`, and `%FOO%`,
   * respectively, instead of being treated as variable names.
   *
   * @param unresolvedString The unresolved string which may contain variable references.
   * @param env The System environment
   * @param isWindows True iff running in a Windows environment
   * @return The input string with variables replaced with their values from `env`
   * @see [[https://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html Environment Variables (IEEE Std 1003.1-2017)]]
   * @see [[https://en.wikibooks.org/wiki/Windows_Batch_Scripting#Quoting_and_escaping Windows Batch Scripting | Quoting and Escaping]]
   */
  // scalastyle:on line.size.limit
  def replaceEnvVars(
      unresolvedString: String,
      env: IMap[String, String],
      isWindows: Boolean = Utils.isWindows): String = {
    val osResolvedString = if (isWindows) {
      // ^% or %% can both be used as escapes for Windows
      val windowsPattern = ("""(?i)(?:\^\^|\^%|%%|%(""" + envVarNameRegex + ")%)").r
      windowsPattern.replaceAllIn(unresolvedString, m =>
        Regex.quoteReplacement(m.matched match {
          case "^^" => "^"
          case "^%" => "%"
          case "%%" => "%"
          case _ => env.getOrElse(m.group(1), "")
        })
      )
    } else {
      val unixPattern =
        ("""(?i)(?:\\\\|\\\$|\$(""" + envVarNameRegex + """)|\$\{(""" + envVarNameRegex + ")})").r
      unixPattern.replaceAllIn(unresolvedString, m =>
        Regex.quoteReplacement(m.matched match {
          case """\\""" => """\"""
          case """\$""" => """$"""
          case str if str.startsWith("${") => env.getOrElse(m.group(2), "")
          case _ => env.getOrElse(m.group(1), "")
        })
      )
    }

    // YARN uses `{{...}}` to represent OS-agnostic variable expansion strings. Normally the
    // NodeManager would replace this string with an OS-specific replacement before launching
    // the container. Here, it gets directly treated as an additional expansion string, which
    // has the same net result.
    // Ref: Javadoc for org.apache.hadoop.yarn.api.ApplicationConstants.Environment.$$()
    val yarnPattern = ("""(?i)\{\{(""" + envVarNameRegex + ")}}").r
    yarnPattern.replaceAllIn(osResolvedString,
      m => Regex.quoteReplacement(env.getOrElse(m.group(1), "")))
  }

  /**
   * Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
   * Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
   * an inconsistent state.
   * TODO: If the OOM is not recoverable by rescheduling it on different node, then do
   * 'something' to fail job ... akin to unhealthy trackers in mapred ?
   *
   * The handler if an OOM Exception is thrown by the JVM must be configured on Windows
   * differently: the 'taskkill' command should be used, whereas Unix-based systems use 'kill'.
   *
   * As the JVM interprets both %p and %%p as the same, we can use either of them. However,
   * some tests on Windows computers suggest, that the JVM only accepts '%%p'.
   *
   * Furthermore, the behavior of the character '%' on the Windows command line differs from
   * the behavior of '%' in a .cmd file: it gets interpreted as an incomplete environment
   * variable. Windows .cmd files escape a '%' by '%%'. Thus, the correct way of writing
   * '%%p' in an escaped way is '%%%%p'.
   */
  private[yarn] def addOutOfMemoryErrorArgument(javaOpts: ListBuffer[String]): Unit = {
    if (!javaOpts.exists(_.contains("-XX:OnOutOfMemoryError"))) {
      if (Utils.isWindows) {
        javaOpts += escapeForShell("-XX:OnOutOfMemoryError=taskkill /F /PID %%%%p")
      } else {
        javaOpts += "-XX:OnOutOfMemoryError='kill %p'"
      }
    }
  }

  /**
   * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
   * using either
   *
   * (Unix-based) `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work.
   * The argument is enclosed in single quotes and some key characters are escaped.
   *
   * (Windows-based) part of a .cmd file in which case windows escaping for each argument must be
   * applied. Windows is quite lenient, however it is usually Java that causes trouble, needing to
   * distinguish between arguments starting with '-' and class names. If arguments are surrounded
   * by ' java takes the following string as is, hence an argument is mistakenly taken as a class
   * name which happens to start with a '-'. The way to avoid this, is to surround nothing with
   * a ', but instead with a ".
   *
   * @param arg A single argument.
   * @return Argument quoted for execution via Yarn's generated shell script.
   */
  def escapeForShell(arg: String): String = {
    if (arg != null) {
      if (Utils.isWindows) {
        YarnCommandBuilderUtils.quoteForBatchScript(arg)
      } else {
        val escaped = new StringBuilder("'")
        arg.foreach {
          case '$' => escaped.append("\\$")
          case '"' => escaped.append("\\\"")
          case '\'' => escaped.append("'\\''")
          case c => escaped.append(c)
        }
        escaped.append("'").toString()
      }
    } else {
      arg
    }
  }

  // YARN/Hadoop acls are specified as user1,user2 group1,group2
  // Users and groups are separated by a space and hence we need to pass the acls in same format
  def getApplicationAclsForYarn(securityMgr: SecurityManager)
      : Map[ApplicationAccessType, String] = {
    Map[ApplicationAccessType, String] (
      ApplicationAccessType.VIEW_APP -> (securityMgr.getViewAcls + " " +
        securityMgr.getViewAclsGroups),
      ApplicationAccessType.MODIFY_APP -> (securityMgr.getModifyAcls + " " +
        securityMgr.getModifyAclsGroups)
    )
  }

  def getContainerId: ContainerId = {
    val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
    ConverterUtils.toContainerId(containerIdString)
  }

  /**
   * Get offHeap memory size from [[ExecutorResourceRequest]]
   * return 0 if MEMORY_OFFHEAP_ENABLED is false.
   */
  def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
    execRequest: ExecutorResourceRequest): Long = {
    Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
  }
}

相关信息

spark 源码目录

相关文章

spark ApplicationMaster 源码

spark ApplicationMasterArguments 源码

spark ApplicationMasterSource 源码

spark Client 源码

spark ClientArguments 源码

spark ClientDistributedCacheManager 源码

spark ExecutorRunnable 源码

spark LocalityPreferredContainerPlacementStrategy 源码

spark ResourceRequestHelper 源码

spark SparkRackResolver 源码

0  赞