hadoop SLSUtils 源码

  • 2022-10-20
  • 浏览 (214)

haddop SLSUtils 代码

文件路径:/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.sls.utils;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.tools.rumen.JobTraceReader;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.sls.SLSRunner.NodeDetails;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

@Private
@Unstable
public class SLSUtils {
  private static final String LABEL_FORMAT_ERR_MSG =
      "Input format for adding node-labels is not correct, it should be "
          + "labelName1[(exclusive=true/false)],labelName2[] ..";

  // hostname includes the network path and the host name. for example
  // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar".
  // the function returns two Strings, the first element is the network
  // location without "/", the second element is the host name. for example,
  // {"default-rack", "hostFoo"} or "coreSwitchA/TORSwitchB", "hostBar"
  public static String[] getRackHostName(String hostname) {
    NodeBase node = new NodeBase(hostname);
    return new String[] {node.getNetworkLocation().substring(1),
        node.getName()};
  }

  /**
   * parse the rumen trace file, return each host name
   */
  public static Set<NodeDetails> parseNodesFromRumenTrace(
      String jobTrace) throws IOException {
    Set<NodeDetails> nodeSet = new HashSet<>();

    File fin = new File(jobTrace);
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "file:///");
    JobTraceReader reader = new JobTraceReader(
            new Path(fin.getAbsolutePath()), conf);
    try {
      LoggedJob job;
      while ((job = reader.getNext()) != null) {
        for(LoggedTask mapTask : job.getMapTasks()) {
          // select the last attempt
          if (mapTask.getAttempts().size() == 0) {
            continue;
          }
          LoggedTaskAttempt taskAttempt = mapTask.getAttempts()
                  .get(mapTask.getAttempts().size() - 1);
          nodeSet.add(new NodeDetails(
              taskAttempt.getHostName().getValue()));
        }
        for(LoggedTask reduceTask : job.getReduceTasks()) {
          if (reduceTask.getAttempts().size() == 0) {
            continue;
          }
          LoggedTaskAttempt taskAttempt = reduceTask.getAttempts()
                  .get(reduceTask.getAttempts().size() - 1);
          nodeSet.add(new NodeDetails(
              taskAttempt.getHostName().getValue()));
        }
      }
    } finally {
      reader.close();
    }

    return nodeSet;
  }

  /**
   * parse the sls trace file, return each host name
   */
  public static Set<NodeDetails> parseNodesFromSLSTrace(
      String jobTrace) throws IOException {
    Set<NodeDetails> nodeSet = new HashSet<>();
    JsonFactory jsonF = new JsonFactory();
    ObjectMapper mapper = new ObjectMapper();
    Reader input =
        new InputStreamReader(new FileInputStream(jobTrace), StandardCharsets.UTF_8);
    try {
      Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
      while (i.hasNext()) {
        addNodes(nodeSet, i.next());
      }
    } finally {
      input.close();
    }
    return nodeSet;
  }

  private static void addNodes(Set<NodeDetails> nodeSet,
      Map jsonEntry) {
    if (jsonEntry.containsKey(SLSConfiguration.NUM_NODES)) {
      int numNodes = Integer.parseInt(
          jsonEntry.get(SLSConfiguration.NUM_NODES).toString());
      int numRacks = 1;
      if (jsonEntry.containsKey(SLSConfiguration.NUM_RACKS)) {
        numRacks = Integer.parseInt(
            jsonEntry.get(SLSConfiguration.NUM_RACKS).toString());
      }
      nodeSet.addAll(generateNodes(numNodes, numRacks));
    }

    if (jsonEntry.containsKey(SLSConfiguration.JOB_TASKS)) {
      List tasks = (List) jsonEntry.get(SLSConfiguration.JOB_TASKS);
      for (Object o : tasks) {
        Map jsonTask = (Map) o;
        String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
        if (hostname != null) {
          nodeSet.add(new NodeDetails(hostname));
        }
      }
    }
  }

  /**
   * parse the input node file, return each host name
   * sample input: label1(exclusive=true),label2(exclusive=false),label3
   */
  public static Set<NodeDetails> parseNodesFromNodeFile(
      String nodeFile, Resource nmDefaultResource) throws IOException {
    Set<NodeDetails> nodeSet = new HashSet<>();
    JsonFactory jsonF = new JsonFactory();
    ObjectMapper mapper = new ObjectMapper();
    Reader input =
        new InputStreamReader(new FileInputStream(nodeFile), StandardCharsets.UTF_8);
    try {
      Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
      while (i.hasNext()) {
        Map jsonE = i.next();
        String rack = "/" + jsonE.get("rack");
        List tasks = (List) jsonE.get("nodes");
        for (Object o : tasks) {
          Map jsonNode = (Map) o;
          NodeDetails nodeDetails = new NodeDetails(
              rack + "/" + jsonNode.get("node"));
          Resource nodeResource = Resources.clone(nmDefaultResource);
          ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
          for (ResourceInformation info : infors) {
            if (jsonNode.get(info.getName()) != null) {
              nodeResource.setResourceValue(info.getName(),
                  Integer.parseInt(jsonNode.get(info.getName()).toString()));
            }
          }
          nodeDetails.setNodeResource(nodeResource);
          if (jsonNode.get("labels") != null) {
            Set<NodeLabel> nodeLabels =  new HashSet<>(
                YarnClientUtils.buildNodeLabelsFromStr(
                    jsonNode.get("labels").toString()));
            nodeDetails.setLabels(nodeLabels);
          }
          nodeSet.add(nodeDetails);
        }
      }
    } finally {
      input.close();
    }
    return nodeSet;
  }

  public static Set<NodeDetails> generateNodes(int numNodes,
      int numRacks){
    Set<NodeDetails> nodeSet = new HashSet<>();
    if (numRacks < 1) {
      numRacks = 1;
    }

    if (numRacks > numNodes) {
      numRacks = numNodes;
    }

    for (int i = 0; i < numNodes; i++) {
      nodeSet.add(new NodeDetails(
          "/rack" + i % numRacks + "/node" + i));
    }
    return nodeSet;
  }

  /**
   * Generates a node to rack mapping file based on node details.
   * This file is then being used by TableMapping to resolve rack names.
   * The format required by TableMapping is a two column text file
   * where first column specifies node name
   * and second column specifies rack name.
   * @param nodeDetails Set of node details.
   * @param filePath File path where to write table mapping.
   * @throws IOException
   */
  public static void generateNodeTableMapping(Set<NodeDetails> nodeDetails,
      String filePath) throws IOException {
    List<String> entries = new ArrayList<>();
    for (NodeDetails nodeDetail : nodeDetails) {
      if (nodeDetail.getHostname().contains("/")) {
        String hostname = nodeDetail.getHostname();
        int lIndex = hostname.lastIndexOf("/");
        String node = hostname.substring(lIndex + 1);
        String rack = hostname.substring(0, lIndex);
        entries.add(node + " " + rack);
      }
    }
    Files.write(Paths.get(filePath),
        entries,
        StandardCharsets.UTF_8,
        StandardOpenOption.CREATE);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop NodeUsageRanges 源码

hadoop package-info 源码

0  赞