hadoop DefaultContainerExecutor 源码

  • 2022-10-20
  • 浏览 (257)

haddop DefaultContainerExecutor 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager;

import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.numaAwarenessEnabled;

import org.apache.hadoop.classification.VisibleForTesting;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@code DefaultContainerExecuter} class offers generic container
 * execution services. Process execution is handled in a platform-independent
 * way via {@link ProcessBuilder}.
 */
public class DefaultContainerExecutor extends ContainerExecutor {

  private static final Logger LOG =
       LoggerFactory.getLogger(DefaultContainerExecutor.class);

  private static final int WIN_MAX_PATH = 260;

  /**
   * A {@link FileContext} for the local file system.
   */
  protected final FileContext lfs;

  private String logDirPermissions = null;

  private NumaResourceAllocator numaResourceAllocator;


  private String numactl;
  /**
   * Default constructor for use in testing.
   */
  @VisibleForTesting
  public DefaultContainerExecutor() {
    try {
      this.lfs = FileContext.getLocalFSFileContext();
    } catch (UnsupportedFileSystemException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Create an instance with a given {@link FileContext}.
   *
   * @param lfs the given {@link FileContext}
   */
  DefaultContainerExecutor(FileContext lfs) {
    this.lfs = lfs;
  }

  /**
   * Copy a file using the {@link #lfs} {@link FileContext}.
   *
   * @param src the file to copy
   * @param dst where to copy the file
   * @param owner the owner of the new copy. Used only in secure Windows
   * clusters
   * @throws IOException when the copy fails
   * @see WindowsSecureContainerExecutor
   */
  protected void copyFile(Path src, Path dst, String owner) throws IOException {
    lfs.util().copy(src, dst, false, true);
  }
  
  /**
   * Make a file executable using the {@link #lfs} {@link FileContext}.
   *
   * @param script the path to make executable
   * @param owner the new owner for the file. Used only in secure Windows
   * clusters
   * @throws IOException when the change mode operation fails
   * @see WindowsSecureContainerExecutor
   */
  protected void setScriptExecutable(Path script, String owner)
      throws IOException {
    lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
  }

  @Override
  public void init(Context nmContext) throws IOException {
    if(numaAwarenessEnabled(getConf())) {
      numaResourceAllocator = new NumaResourceAllocator(nmContext);
      numactl = this.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
              YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
      try {
        numaResourceAllocator.init(this.getConf());
        LOG.info("NUMA resources allocation is enabled in DefaultContainer Executor," +
                " Successfully initialized NUMA resources allocator.");
      } catch (YarnException e) {
        LOG.warn("Improper NUMA configuration provided.", e);
        throw new IOException("Failed to initialize configured numa subsystem!");
      }
    }
  }

  @Override
  public void startLocalizer(LocalizerStartContext ctx)
      throws IOException, InterruptedException {
    Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
    InetSocketAddress nmAddr = ctx.getNmAddr();
    String user = ctx.getUser();
    String appId = ctx.getAppId();
    String locId = ctx.getLocId();
    LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();

    List<String> localDirs = dirsHandler.getLocalDirs();
    List<String> logDirs = dirsHandler.getLogDirs();
    
    createUserLocalDirs(localDirs, user);
    createUserCacheDirs(localDirs, user);
    createAppDirs(localDirs, user, appId);
    createAppLogDirs(appId, logDirs, user);

    // randomly choose the local directory
    Path appStorageDir = getWorkingDir(localDirs, user, appId);

    String tokenFn = String.format(TOKEN_FILE_NAME_FMT, locId);
    Path tokenDst = new Path(appStorageDir, tokenFn);
    copyFile(nmPrivateContainerTokensPath, tokenDst, user);
    LOG.info("Copying from {} to {}", nmPrivateContainerTokensPath, tokenDst);


    FileContext localizerFc =
        FileContext.getFileContext(lfs.getDefaultFileSystem(), getConf());
    localizerFc.setUMask(lfs.getUMask());
    localizerFc.setWorkingDirectory(appStorageDir);
    LOG.info("Localizer CWD set to {} = {}", appStorageDir,
        localizerFc.getWorkingDirectory());

    ContainerLocalizer localizer =
        createContainerLocalizer(user, appId, locId, tokenFn, localDirs,
            localizerFc);
    // TODO: DO it over RPC for maintaining similarity?
    localizer.runLocalization(nmAddr);
  }

  /**
   * Create a new {@link ContainerLocalizer} instance.
   *
   * @param user the user who owns the job for which the localization is being
   * run
   * @param appId the ID of the application for which the localization is being
   * run
   * @param locId the ID of the container for which the localization is being
   * run
   * @param localDirs a list of directories to use as destinations for the
   * localization
   * @param localizerFc the {@link FileContext} to use when localizing files
   * @return the new {@link ContainerLocalizer} instance
   * @throws IOException if {@code user} or {@code locId} is {@code null} or if
   * the container localizer has an initialization failure
   */
  @Private
  @VisibleForTesting
  protected ContainerLocalizer createContainerLocalizer(String user,
      String appId, String locId, String tokenFileName, List<String> localDirs,
      FileContext localizerFc) throws IOException {
    ContainerLocalizer localizer =
        new ContainerLocalizer(localizerFc, user, appId, locId, tokenFileName,
            getPaths(localDirs),
            RecordFactoryProvider.getRecordFactory(getConf()));
    return localizer;
  }

  @Override
  public int launchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    Container container = ctx.getContainer();
    Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
    Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
    Path nmPrivateKeystorePath = ctx.getNmPrivateKeystorePath();
    Path nmPrivateTruststorePath = ctx.getNmPrivateTruststorePath();
    String user = ctx.getUser();
    Path containerWorkDir = ctx.getContainerWorkDir();
    List<String> localDirs = ctx.getLocalDirs();
    List<String> logDirs = ctx.getLogDirs();

    FsPermission dirPerm = new FsPermission(APPDIR_PERM);
    ContainerId containerId = container.getContainerId();

    // create container dirs on all disks
    String containerIdStr = containerId.toString();
    String appIdStr =
            containerId.getApplicationAttemptId().
                getApplicationId().toString();
    for (String sLocalDir : localDirs) {
      Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, user);
      Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
      Path appDir = new Path(appCacheDir, appIdStr);
      Path containerDir = new Path(appDir, containerIdStr);
      createDir(containerDir, dirPerm, true, user);
    }

    // Create the container log-dirs on all disks
    createContainerLogDirs(appIdStr, containerIdStr, logDirs, user);

    Path tmpDir = new Path(containerWorkDir,
        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
    createDir(tmpDir, dirPerm, false, user);


    // copy container tokens to work dir
    Path tokenDst =
      new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
    copyFile(nmPrivateTokensPath, tokenDst, user);

    if (nmPrivateKeystorePath != null) {
      Path keystoreDst =
          new Path(containerWorkDir, ContainerLaunch.KEYSTORE_FILE);
      copyFile(nmPrivateKeystorePath, keystoreDst, user);
    }

    if (nmPrivateTruststorePath != null) {
      Path truststoreDst =
          new Path(containerWorkDir, ContainerLaunch.TRUSTSTORE_FILE);
      copyFile(nmPrivateTruststorePath, truststoreDst, user);
    }

    // copy launch script to work dir
    Path launchDst =
        new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
    copyFile(nmPrivateContainerScriptPath, launchDst, user);

    // Create new local launch wrapper script
    LocalWrapperScriptBuilder sb = getLocalWrapperScriptBuilder(
        containerIdStr, containerWorkDir); 

    // Fail fast if attempting to launch the wrapper script would fail due to
    // Windows path length limitation.
    if (Shell.WINDOWS &&
        sb.getWrapperScriptPath().toString().length() > WIN_MAX_PATH) {
      throw new IOException(String.format(
        "Cannot launch container using script at path %s, because it exceeds " +
        "the maximum supported path length of %d characters.  Consider " +
        "configuring shorter directories in %s.", sb.getWrapperScriptPath(),
        WIN_MAX_PATH, YarnConfiguration.NM_LOCAL_DIRS));
    }

    Path pidFile = getPidFilePath(containerId);
    if (pidFile != null) {
      sb.writeLocalWrapperScript(launchDst, pidFile);
    } else {
      LOG.info("Container {} pid file not set. Returning terminated error",
          containerIdStr);
      return ExitCode.TERMINATED.getExitCode();
    }
    
    // create log dir under app
    // fork script
    Shell.CommandExecutor shExec = null;
    try {
      setScriptExecutable(launchDst, user);
      setScriptExecutable(sb.getWrapperScriptPath(), user);

      // adding numa commands based on configuration
      String[] numaCommands = new String[]{};

      if (numaResourceAllocator != null) {
        try {
          NumaResourceAllocation numaResourceAllocation =
                  numaResourceAllocator.allocateNumaNodes(container);
          if (numaResourceAllocation != null) {
            numaCommands = getNumaCommands(numaResourceAllocation);
          }
        } catch (ResourceHandlerException e) {
          LOG.error("NumaResource Allocation failed!", e);
          throw new IOException("NumaResource Allocation Error!", e);
        }
      }

      shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
              containerIdStr, user, pidFile, container.getResource(),
              new File(containerWorkDir.toUri().getPath()),
              container.getLaunchContext().getEnvironment(),
              numaCommands);

      if (isContainerActive(containerId)) {
        shExec.execute();
      } else {
        LOG.info("Container {} was marked as inactive. "
            + "Returning terminated error", containerIdStr);
        return ExitCode.TERMINATED.getExitCode();
      }
    } catch (IOException e) {
      if (null == shExec) {
        return -1;
      }
      int exitCode = shExec.getExitCode();
      LOG.warn("Exit code from container {} is : {}", containerId, exitCode);
      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
      // terminated/killed forcefully. In all other cases, log the
      // container-executor's output
      if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
          && exitCode != ExitCode.TERMINATED.getExitCode()) {
        LOG.warn("Exception from container-launch with container ID: {}"
            + " and exit code: {}", containerId, exitCode, e);

        StringBuilder builder = new StringBuilder();
        builder.append("Exception from container-launch.\n")
            .append("Container id: ").append(containerId).append("\n")
            .append("Exit code: ").append(exitCode).append("\n");
        if (!Optional.ofNullable(e.getMessage()).orElse("").isEmpty()) {
          builder.append("Exception message: ")
              .append(e.getMessage()).append("\n");
        }

        if (!shExec.getOutput().isEmpty()) {
          builder.append("Shell output: ")
              .append(shExec.getOutput()).append("\n");
        }
        String diagnostics = builder.toString();
        logOutput(diagnostics);
        container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
            diagnostics));
      } else {
        container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
            "Container killed on request. Exit code is " + exitCode));
      }
      return exitCode;
    } finally {
      if (shExec != null) shExec.close();
      postComplete(containerId);
    }
    return 0;
  }

  @Override
  public int relaunchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    return launchContainer(ctx);
  }

  /**
   * Create a new {@link ShellCommandExecutor} using the parameters.
   *
   * @param wrapperScriptPath the path to the script to execute
   * @param containerIdStr the container ID
   * @param user the application owner's username
   * @param pidFile the path to the container's PID file
   * @param resource this parameter controls memory and CPU limits.
   * @param workDir If not-null, specifies the directory which should be set
   * as the current working directory for the command. If null,
   * the current working directory is not modified.
   * @param environment the container environment
   * @param numaCommands list of prefix numa commands
   * @return the new {@link ShellCommandExecutor}
   * @see ShellCommandExecutor
   */
  protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
                            String containerIdStr, String user, Path pidFile, Resource resource,
                            File workDir, Map<String, String> environment, String[] numaCommands) {

    String[] command = getRunCommand(wrapperScriptPath,
        containerIdStr, user, pidFile, this.getConf(), resource);

    // check if numa commands are passed and append it as prefix commands
    if(numaCommands != null && numaCommands.length!=0) {
      command = concatStringCommands(numaCommands, command);
    }

    LOG.info("launchContainer: {}", Arrays.toString(command));
    return new ShellCommandExecutor(
        command,
        workDir,
        environment,
        0L,
        false);
  }

  /**
   * Create a {@link LocalWrapperScriptBuilder} for the given container ID
   * and path that is appropriate to the current platform.
   *
   * @param containerIdStr the container ID
   * @param containerWorkDir the container's working directory
   * @return a new {@link LocalWrapperScriptBuilder}
   */
  protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder(
      String containerIdStr, Path containerWorkDir) {
   return  Shell.WINDOWS ?
       new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) :
       new UnixLocalWrapperScriptBuilder(containerWorkDir);
  }

  /**
   * This class is a utility to create a wrapper script that is platform
   * appropriate.
   */
  protected abstract class LocalWrapperScriptBuilder {

    private final Path wrapperScriptPath;

    /**
     * Return the path for the wrapper script.
     *
     * @return the path for the wrapper script
     */
    public Path getWrapperScriptPath() {
      return wrapperScriptPath;
    }

    /**
     * Write out the wrapper script for the container launch script. This method
     * will create the script at the configured wrapper script path.
     *
     * @param launchDst the script to launch
     * @param pidFile the file that will hold the PID
     * @throws IOException if the wrapper script cannot be created
     * @see #getWrapperScriptPath
     */
    public void writeLocalWrapperScript(Path launchDst, Path pidFile)
        throws IOException {
      try (DataOutputStream out =
               lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
           PrintStream pout =
               new PrintStream(out, false, "UTF-8")) {
        writeLocalWrapperScript(launchDst, pidFile, pout);
      }
    }

    /**
     * Write out the wrapper script for the container launch script.
     *
     * @param launchDst the script to launch
     * @param pidFile the file that will hold the PID
     * @param pout the stream to use to write out the wrapper script
     */
    protected abstract void writeLocalWrapperScript(Path launchDst,
        Path pidFile, PrintStream pout);

    /**
     * Create an instance for the given container working directory.
     *
     * @param containerWorkDir the working directory for the container
     */
    protected LocalWrapperScriptBuilder(Path containerWorkDir) {
      this.wrapperScriptPath = new Path(containerWorkDir,
        Shell.appendScriptExtension("default_container_executor"));
    }
  }

  /**
   * This class is an instance of {@link LocalWrapperScriptBuilder} for
   * non-Windows hosts.
   */
  private final class UnixLocalWrapperScriptBuilder
      extends LocalWrapperScriptBuilder {
    private final Path sessionScriptPath;

    /**
     * Create an instance for the given container path.
     *
     * @param containerWorkDir the container's working directory
     */
    public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
      super(containerWorkDir);
      this.sessionScriptPath = new Path(containerWorkDir,
          Shell.appendScriptExtension("default_container_executor_session"));
    }

    @Override
    public void writeLocalWrapperScript(Path launchDst, Path pidFile)
        throws IOException {
      writeSessionScript(launchDst, pidFile);
      super.writeLocalWrapperScript(launchDst, pidFile);
    }

    @Override
    public void writeLocalWrapperScript(Path launchDst, Path pidFile,
        PrintStream pout) {
      String exitCodeFile = ContainerLaunch.getExitCodeFile(
          pidFile.toString());
      String tmpFile = exitCodeFile + ".tmp";
      pout.println("#!/bin/bash");
      pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\"");
      pout.println("rc=$?");
      pout.println("echo $rc > \"" + tmpFile + "\"");
      pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
      pout.println("exit $rc");
    }

    private void writeSessionScript(Path launchDst, Path pidFile)
        throws IOException {
      try (DataOutputStream out =
               lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
           PrintStream pout =
               new PrintStream(out, false, "UTF-8")) {
        // We need to do a move as writing to a file is not atomic
        // Process reading a file being written to may get garbled data
        // hence write pid to tmp file first followed by a mv
        pout.println("#!/bin/bash");
        pout.println();
        pout.println("echo $$ > " + pidFile.toString() + ".tmp");
        pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
        String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
        pout.printf("%s /bin/bash \"%s\"", exec, launchDst.toUri().getPath());
      }
      lfs.setPermission(sessionScriptPath,
          ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
    }
  }

  /**
   * This class is an instance of {@link LocalWrapperScriptBuilder} for
   * Windows hosts.
   */
  private final class WindowsLocalWrapperScriptBuilder
      extends LocalWrapperScriptBuilder {

    private final String containerIdStr;

    /**
     * Create an instance for the given container and working directory.
     *
     * @param containerIdStr the container ID
     * @param containerWorkDir the container's working directory
     */
    public WindowsLocalWrapperScriptBuilder(String containerIdStr,
        Path containerWorkDir) {

      super(containerWorkDir);
      this.containerIdStr = containerIdStr;
    }

    @Override
    public void writeLocalWrapperScript(Path launchDst, Path pidFile,
        PrintStream pout) {
      // TODO: exit code script for Windows

      // On Windows, the pid is the container ID, so that it can also serve as
      // the name of the job object created by winutils for task management.
      // Write to temp file followed by atomic move.
      String normalizedPidFile = new File(pidFile.toString()).getPath();
      pout.println("@echo " + containerIdStr + " > " + normalizedPidFile +
        ".tmp");
      pout.println("@move /Y " + normalizedPidFile + ".tmp " +
        normalizedPidFile);
      pout.println("@call " + launchDst.toString());
    }
  }

  @Override
  public boolean signalContainer(ContainerSignalContext ctx)
      throws IOException {
    String user = ctx.getUser();
    String pid = ctx.getPid();
    Signal signal = ctx.getSignal();
    LOG.debug("Sending signal {} to pid {} as user {}",
        signal.getValue(), pid, user);
    if (!containerIsAlive(pid)) {
      return false;
    }
    try {
      killContainer(pid, signal);
    } catch (IOException e) {
      if (!containerIsAlive(pid)) {
        return false;
      }
      throw e;
    }
    return true;
  }

  /**
   * No-op for reaping containers within the DefaultContainerExecutor.
   *
   * @param ctx Encapsulates information necessary for reaping containers.
   * @return true given no operations are needed.
   */
  @Override
  public boolean reapContainer(ContainerReapContext ctx) {
    return true;
  }

  @Override
  public boolean isContainerAlive(ContainerLivenessContext ctx)
      throws IOException {
    String pid = ctx.getPid();

    return containerIsAlive(pid);
  }

  /**
   * Returns true if the process with the specified pid is alive.
   * 
   * @param pid String pid
   * @return boolean true if the process is alive
   * @throws IOException if the command to test process liveliness fails
   */
  @VisibleForTesting
  public static boolean containerIsAlive(String pid) throws IOException {
    try {
      new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid))
        .execute();
      // successful execution means process is alive
      return true;
    }
    catch (ExitCodeException e) {
      // failure (non-zero exit code) means process is not alive
      return false;
    }
  }

  /**
   * Send a specified signal to the specified pid
   *
   * @param pid the pid of the process [group] to signal.
   * @param signal signal to send
   * @throws IOException if the command to kill the process fails
   */
  protected void killContainer(String pid, Signal signal) throws IOException {
    new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
      .execute();
  }

  @Override
  public void deleteAsUser(DeletionAsUserContext ctx)
      throws IOException, InterruptedException {
    Path subDir = ctx.getSubDir();
    List<Path> baseDirs = ctx.getBasedirs();

    if (baseDirs == null || baseDirs.size() == 0) {
      LOG.info("Deleting absolute path : {}", subDir);
      if (!lfs.delete(subDir, true)) {
        //Maybe retry
        LOG.warn("delete returned false for path: [{}]", subDir);
      }
      return;
    }
    for (Path baseDir : baseDirs) {
      Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
      LOG.info("Deleting path : {}", del);
      try {
        if (!lfs.delete(del, true)) {
          LOG.warn("delete returned false for path: [{}]", del);
        }
      } catch (FileNotFoundException e) {
        continue;
      }
    }
  }

  @Override
  public void symLink(String target, String symlink) throws IOException {
    FileUtil.symLink(target, symlink);
  }

  /**
   * Permissions for user dir.
   * $local.dir/usercache/$user
   */
  static final short USER_PERM = (short)0750;
  /**
   * Permissions for user appcache dir.
   * $local.dir/usercache/$user/appcache
   */
  static final short APPCACHE_PERM = (short)0710;
  /**
   * Permissions for user filecache dir.
   * $local.dir/usercache/$user/filecache
   */
  static final short FILECACHE_PERM = (short)0710;
  /**
   * Permissions for user app dir.
   * $local.dir/usercache/$user/appcache/$appId
   */
  static final short APPDIR_PERM = (short)0710;

  private long getDiskFreeSpace(Path base) throws IOException {
    return lfs.getFsStatus(base).getRemaining();
  }

  private Path getApplicationDir(Path base, String user, String appId) {
    return new Path(getAppcacheDir(base, user), appId);
  }

  private Path getUserCacheDir(Path base, String user) {
    return new Path(new Path(base, ContainerLocalizer.USERCACHE), user);
  }

  private Path getAppcacheDir(Path base, String user) {
    return new Path(getUserCacheDir(base, user),
        ContainerLocalizer.APPCACHE);
  }

  private Path getFileCacheDir(Path base, String user) {
    return new Path(getUserCacheDir(base, user),
        ContainerLocalizer.FILECACHE);
  }

  /**
   * Return a randomly chosen application directory from a list of local storage
   * directories. The probability of selecting a directory is proportional to
   * its size.
   *
   * @param localDirs the target directories from which to select
   * @param user the user who owns the application
   * @param appId the application ID
   * @return the selected directory
   * @throws IOException if no application directories for the user can be
   * found
   */
  protected Path getWorkingDir(List<String> localDirs, String user,
      String appId) throws IOException {
    long totalAvailable = 0L;
    long[] availableOnDisk = new long[localDirs.size()];
    int i = 0;
    // randomly choose the app directory
    // the chance of picking a directory is proportional to
    // the available space on the directory.
    // firstly calculate the sum of all available space on these directories
    for (String localDir : localDirs) {
      Path curBase = getApplicationDir(new Path(localDir), user, appId);
      long space = 0L;
      try {
        space = getDiskFreeSpace(curBase);
      } catch (IOException e) {
        LOG.warn("Unable to get Free Space for {}", curBase, e);
      }
      availableOnDisk[i++] = space;
      totalAvailable += space;
    }

    // throw an IOException if totalAvailable is 0.
    if (totalAvailable <= 0L) {
      throw new IOException("Not able to find a working directory for " + user);
    }

    // make probability to pick a directory proportional to
    // the available space on the directory.
    long randomPosition = RandomUtils.nextLong() % totalAvailable;
    int dir = pickDirectory(randomPosition, availableOnDisk);

    return getApplicationDir(new Path(localDirs.get(dir)), user, appId);
  }

  /**
   * Picks a directory based on the input random number and
   * available size at each dir.
   */
  @Private
  @VisibleForTesting
  int pickDirectory(long randomPosition, final long[] availableOnDisk) {
    int dir = 0;
    // skip zero available space directory,
    // because totalAvailable is greater than 0 and randomPosition
    // is less than totalAvailable, we can find a valid directory
    // with nonzero available space.
    while (availableOnDisk[dir] == 0L) {
      dir++;
    }
    while (randomPosition >= availableOnDisk[dir]) {
      randomPosition -= availableOnDisk[dir++];
    }
    return dir;
  }

  /**
   * Use the {@link #lfs} {@link FileContext} to create the target directory.
   *
   * @param dirPath the target directory
   * @param perms the target permissions for the target directory
   * @param createParent whether the parent directories should also be created
   * @param user the user as whom the target directory should be created.
   * Used only on secure Windows hosts.
   * @throws IOException if there's a failure performing a file operation
   * @see WindowsSecureContainerExecutor
   */
  protected void createDir(Path dirPath, FsPermission perms,
      boolean createParent, String user) throws IOException {
    lfs.mkdir(dirPath, perms, createParent);
    if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
      lfs.setPermission(dirPath, perms);
    }
  }

  /**
   * Initialize the local directories for a particular user.
   * <ul>.mkdir
   * <li>$local.dir/usercache/$user</li>
   * </ul>
   *
   * @param localDirs the target directories to create
   * @param user the user whose local cache directories should be initialized
   * @throws IOException if there's an issue initializing the user local
   * directories
   */
  void createUserLocalDirs(List<String> localDirs, String user)
      throws IOException {
    boolean userDirStatus = false;
    FsPermission userperms = new FsPermission(USER_PERM);
    for (String localDir : localDirs) {
      // create $local.dir/usercache/$user and its immediate parent
      try {
        createDir(getUserCacheDir(new Path(localDir), user), userperms, true,
            user);
      } catch (IOException e) {
        LOG.warn("Unable to create the user directory : {}", localDir, e);
        continue;
      }
      userDirStatus = true;
    }
    if (!userDirStatus) {
      throw new IOException("Not able to initialize user directories "
          + "in any of the configured local directories for user " + user);
    }
  }


  /**
   * Initialize the local cache directories for a particular user.
   * <ul>
   * <li>$local.dir/usercache/$user</li>
   * <li>$local.dir/usercache/$user/appcache</li>
   * <li>$local.dir/usercache/$user/filecache</li>
   * </ul>
   *
   * @param localDirs the target directories to create
   * @param user the user whose local cache directories should be initialized
   * @throws IOException if there's an issue initializing the cache
   * directories
   */
  void createUserCacheDirs(List<String> localDirs, String user)
      throws IOException {
    LOG.info("Initializing user {}", user);

    boolean appcacheDirStatus = false;
    boolean distributedCacheDirStatus = false;
    FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
    FsPermission fileperms = new FsPermission(FILECACHE_PERM);

    for (String localDir : localDirs) {
      // create $local.dir/usercache/$user/appcache
      Path localDirPath = new Path(localDir);
      final Path appDir = getAppcacheDir(localDirPath, user);
      try {
        createDir(appDir, appCachePerms, true, user);
        appcacheDirStatus = true;
      } catch (IOException e) {
        LOG.warn("Unable to create app cache directory : {}", appDir, e);
      }
      // create $local.dir/usercache/$user/filecache
      final Path distDir = getFileCacheDir(localDirPath, user);
      try {
        createDir(distDir, fileperms, true, user);
        distributedCacheDirStatus = true;
      } catch (IOException e) {
        LOG.warn("Unable to create file cache directory : {}", distDir, e);
      }
    }
    if (!appcacheDirStatus) {
      throw new IOException("Not able to initialize app-cache directories "
          + "in any of the configured local directories for user " + user);
    }
    if (!distributedCacheDirStatus) {
      throw new IOException(
          "Not able to initialize distributed-cache directories "
              + "in any of the configured local directories for user "
              + user);
    }
  }

  /**
   * Initialize the local directories for a particular user.
   * <ul>
   * <li>$local.dir/usercache/$user/appcache/$appid</li>
   * </ul>
   *
   * @param localDirs the target directories to create
   * @param user the user whose local cache directories should be initialized
   * @param appId the application ID
   * @throws IOException if there's an issue initializing the application
   * directories
   */
  void createAppDirs(List<String> localDirs, String user, String appId)
      throws IOException {
    boolean initAppDirStatus = false;
    FsPermission appperms = new FsPermission(APPDIR_PERM);
    for (String localDir : localDirs) {
      Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
      // create $local.dir/usercache/$user/appcache/$appId
      try {
        createDir(fullAppDir, appperms, true, user);
        initAppDirStatus = true;
      } catch (IOException e) {
        LOG.warn("Unable to create app directory {}",
            fullAppDir, e);
      }
    }
    if (!initAppDirStatus) {
      throw new IOException("Not able to initialize app directories "
          + "in any of the configured local directories for app "
          + appId.toString());
    }
  }

  /**
   * Create application log directories on all disks.
   *
   * @param appId the application ID
   * @param logDirs the target directories to create
   * @param user the user whose local cache directories should be initialized
   * @throws IOException if there's an issue initializing the application log
   * directories
   */
  void createAppLogDirs(String appId, List<String> logDirs, String user)
      throws IOException {

    boolean appLogDirStatus = false;
    FsPermission appLogDirPerms = new
        FsPermission(getLogDirPermissions());
    for (String rootLogDir : logDirs) {
      // create $log.dir/$appid
      Path appLogDir = new Path(rootLogDir, appId);
      try {
        createDir(appLogDir, appLogDirPerms, true, user);
      } catch (IOException e) {
        LOG.warn("Unable to create the app-log directory : {}", appLogDir, e);
        continue;
      }
      appLogDirStatus = true;
    }
    if (!appLogDirStatus) {
      throw new IOException("Not able to initialize app-log directories "
          + "in any of the configured local directories for app " + appId);
    }
  }

  /**
   * Create application log directories on all disks.
   *
   * @param appId the application ID
   * @param containerId the container ID
   * @param logDirs the target directories to create
   * @param user the user as whom the directories should be created.
   * Used only on secure Windows hosts.
   * @throws IOException if there's an issue initializing the container log
   * directories
   */
  void createContainerLogDirs(String appId, String containerId,
      List<String> logDirs, String user) throws IOException {
    boolean containerLogDirStatus = false;
    FsPermission containerLogDirPerms = new
        FsPermission(getLogDirPermissions());
    for (String rootLogDir : logDirs) {
      // create $log.dir/$appid/$containerid
      Path appLogDir = new Path(rootLogDir, appId);
      Path containerLogDir = new Path(appLogDir, containerId);
      try {
        createDir(containerLogDir, containerLogDirPerms, true, user);
      } catch (IOException e) {
        LOG.warn("Unable to create the container-log directory : {}",
            appLogDir, e);
        continue;
      }
      containerLogDirStatus = true;
    }
    if (!containerLogDirStatus) {
      throw new IOException(
          "Not able to initialize container-log directories "
              + "in any of the configured local directories for container "
              + containerId);
    }
  }

  /**
   * Return the default container log directory permissions.
   *
   * @return the default container log directory permissions
   */
  @VisibleForTesting
  public String getLogDirPermissions() {
    if (this.logDirPermissions==null) {
      this.logDirPermissions = getConf().get(
          YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR_LOG_DIRS_PERMISSIONS,
          YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR_LOG_DIRS_PERMISSIONS_DEFAULT);
    }
    return this.logDirPermissions;
  }

  /**
   * Clear the internal variable for repeatable testing.
   */
  @VisibleForTesting
  public void clearLogDirPermissions() {
    this.logDirPermissions = null;
  }

  /**
   *
   * @param ctx Encapsulates information necessary for exec containers.
   * @return the input/output stream of interactive docker shell.
   * @throws ContainerExecutionException
   */
  @Override
  public IOStreamPair execContainer(ContainerExecContext ctx)
      throws ContainerExecutionException {
    return null;
  }

  /**
   * Return the list of paths of given local directories.
   *
   * @return the list of paths of given local directories
   */
  private static List<Path> getPaths(List<String> dirs) {
    List<Path> paths = new ArrayList<>(dirs.size());
    for (int i = 0; i < dirs.size(); i++) {
      paths.add(new Path(dirs.get(i)));
    }
    return paths;
  }

  @Override
  public void updateYarnSysFS(Context ctx, String user,
      String appId, String spec) throws IOException {
    throw new ServiceStateException("Implementation unavailable");
  }

  @Override
  public int reacquireContainer(ContainerReacquisitionContext ctx)
          throws IOException, InterruptedException {
    try {
      if (numaResourceAllocator != null) {
        numaResourceAllocator.recoverNumaResource(ctx.getContainerId());
      }
      return super.reacquireContainer(ctx);
    } finally {
      postComplete(ctx.getContainerId());
    }
  }

  /**
   * clean up and release of resources.
   *
   * @param containerId containerId of running container
   */
  public void postComplete(final ContainerId containerId) {
    if (numaResourceAllocator != null) {
      try {
        numaResourceAllocator.releaseNumaResource(containerId);
      } catch (ResourceHandlerException e) {
        LOG.warn("NumaResource release failed for " +
                "containerId: {}. Exception: ", containerId, e);
      }
    }
  }

  /**
   * @param resourceAllocation NonNull NumaResourceAllocation object reference
   * @return Array of numa specific commands
   */
  String[] getNumaCommands(NumaResourceAllocation resourceAllocation) {
    String[] numaCommand = new String[3];
    numaCommand[0] = numactl;
    numaCommand[1] = "--interleave=" + String.join(",", resourceAllocation.getMemNodes());
    numaCommand[2] = "--cpunodebind=" + String.join(",", resourceAllocation.getCpuNodes());
    return numaCommand;

  }

  /**
   * @param firstStringArray  Array of String
   * @param secondStringArray Array of String
   * @return combined array of string where first elements are from firstStringArray
   * and later are the elements from secondStringArray
   */
  String[] concatStringCommands(String[] firstStringArray, String[] secondStringArray) {

    if(firstStringArray == null && secondStringArray == null) {
      return secondStringArray;
    }

    else if(firstStringArray == null || firstStringArray.length == 0) {
      return secondStringArray;
    }

    else if(secondStringArray == null || secondStringArray.length == 0){
      return firstStringArray;
    }

    int len = firstStringArray.length + secondStringArray.length;

    String[] ret = new String[len];
    int idx = 0;
    for (String s : firstStringArray) {
      ret[idx] = s;
      idx++;
    }
    for (String s : secondStringArray) {
      ret[idx] = s;
      idx++;
    }
    return ret;
  }

  @VisibleForTesting
  public void setNumaResourceAllocator(NumaResourceAllocator numaResourceAllocator) {
    this.numaResourceAllocator = numaResourceAllocator;
  }

  @VisibleForTesting
  public void setNumactl(String numactl) {
    this.numactl = numactl;
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop CMgrCompletedAppsEvent 源码

hadoop CMgrCompletedContainersEvent 源码

hadoop CMgrSignalContainersEvent 源码

hadoop CMgrUpdateContainersEvent 源码

hadoop ContainerExecutor 源码

hadoop ContainerManagerEvent 源码

hadoop ContainerManagerEventType 源码

hadoop ContainerStateTransitionListener 源码

hadoop Context 源码

hadoop DeletionService 源码

0  赞