hadoop TraceBuilder 源码

  • 2022-10-20
  • 浏览 (182)

haddop TraceBuilder 代码

文件路径:/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TraceBuilder.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.tools.rumen;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * The main driver of the Rumen Parser.
 */
public class TraceBuilder extends Configured implements Tool {
  static final private Logger LOG = LoggerFactory.getLogger(TraceBuilder.class);

  static final int RUN_METHOD_FAILED_EXIT_CODE = 3;

  TopologyBuilder topologyBuilder = new TopologyBuilder();
  Outputter<LoggedJob> traceWriter;
  Outputter<LoggedNetworkTopology> topologyWriter;

  static class MyOptions {
    Class<? extends InputDemuxer> inputDemuxerClass = DefaultInputDemuxer.class;

    @SuppressWarnings("unchecked")
    Class<? extends Outputter> clazzTraceOutputter = DefaultOutputter.class;
    Path traceOutput;
    Path topologyOutput;

    List<Path> inputs = new LinkedList<Path>();

    MyOptions(String[] args, Configuration conf) throws FileNotFoundException,
        IOException, ClassNotFoundException {
      int switchTop = 0;

      // to determine if the input paths should be recursively scanned or not
      boolean doRecursiveTraversal = false;

      while (args[switchTop].startsWith("-")) {
        if (args[switchTop].equalsIgnoreCase("-demuxer")) {
          inputDemuxerClass =
              Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
        } else if (args[switchTop].equalsIgnoreCase("-recursive")) {
          doRecursiveTraversal = true;
        }
        ++switchTop;
      }

      traceOutput = new Path(args[0 + switchTop]);
      topologyOutput = new Path(args[1 + switchTop]);

      for (int i = 2 + switchTop; i < args.length; ++i) {
        inputs.addAll(processInputArgument(
            args[i], conf, doRecursiveTraversal));
      }
    }

    /**
     * Compare the history file names, not the full paths.
     * Job history file name format is such that doing lexicographic sort on the
     * history file names should result in the order of jobs' submission times.
     */
    private static class HistoryLogsComparator
        implements Comparator<FileStatus>, Serializable {
      @Override
      public int compare(FileStatus file1, FileStatus file2) {
        return file1.getPath().getName().compareTo(
                   file2.getPath().getName());
      }
    }

    /**
     * Processes the input file/folder argument. If the input is a file,
     * then it is directly considered for further processing by TraceBuilder.
     * If the input is a folder, then all the history logs in the
     * input folder are considered for further processing.
     *
     * If isRecursive is true, then the input path is recursively scanned
     * for job history logs for further processing by TraceBuilder.
     *
     * NOTE: If the input represents a globbed path, then it is first flattened
     *       and then the individual paths represented by the globbed input
     *       path are considered for further processing.
     *
     * @param input        input path, possibly globbed
     * @param conf         configuration
     * @param isRecursive  whether to recursively traverse the input paths to
     *                     find history logs
     * @return the input history log files' paths
     * @throws FileNotFoundException
     * @throws IOException
     */
    static List<Path> processInputArgument(String input, Configuration conf,
        boolean isRecursive) throws FileNotFoundException, IOException {
      Path inPath = new Path(input);
      FileSystem fs = inPath.getFileSystem(conf);
      FileStatus[] inStatuses = fs.globStatus(inPath);

      List<Path> inputPaths = new LinkedList<Path>();
      if (inStatuses == null || inStatuses.length == 0) {
        return inputPaths;
      }

      for (FileStatus inStatus : inStatuses) {
        Path thisPath = inStatus.getPath();
        if (inStatus.isDirectory()) {

          // Find list of files in this path(recursively if -recursive option
          // is specified).
          List<FileStatus> historyLogs = new ArrayList<FileStatus>();

          RemoteIterator<LocatedFileStatus> iter =
            fs.listFiles(thisPath, isRecursive);
          while (iter.hasNext()) {
            LocatedFileStatus child = iter.next();
            String fileName = child.getPath().getName();

            if (!(fileName.endsWith(".crc") || fileName.startsWith("."))) {
              historyLogs.add(child);
            }
          }

          if (historyLogs.size() > 0) {
            // Add the sorted history log file names in this path to the
            // inputPaths list
            FileStatus[] sortableNames =
                historyLogs.toArray(new FileStatus[historyLogs.size()]);
            Arrays.sort(sortableNames, new HistoryLogsComparator());

            for (FileStatus historyLog : sortableNames) {
              inputPaths.add(historyLog.getPath());
            }
          }
        } else {
          inputPaths.add(thisPath);
        }
      }

      return inputPaths;
    }
  }

  public static void main(String[] args) {
    TraceBuilder builder = new TraceBuilder();
    int result = RUN_METHOD_FAILED_EXIT_CODE;

    try {
      result = ToolRunner.run(builder, args); 
    } catch (Throwable t) {
      t.printStackTrace(System.err);
    } finally {
      try {
        builder.finish();
      } finally {
        if (result == 0) {
          return;
        }

        System.exit(result);
      }
    }
  }


  @SuppressWarnings("unchecked")
  @Override
  public int run(String[] args) throws Exception {
    MyOptions options = new MyOptions(args, getConf());
    traceWriter = options.clazzTraceOutputter.newInstance();
    traceWriter.init(options.traceOutput, getConf());
    topologyWriter = new DefaultOutputter<LoggedNetworkTopology>();
    topologyWriter.init(options.topologyOutput, getConf());

    try {
      JobBuilder jobBuilder = null;

      for (Path p : options.inputs) {
        InputDemuxer inputDemuxer = options.inputDemuxerClass.newInstance();

        try {
          inputDemuxer.bindTo(p, getConf());
        } catch (IOException e) {
          LOG.warn("Unable to bind Path " + p + " .  Skipping...", e);

          continue;
        }

        Pair<String, InputStream> filePair = null;

        try {
          while ((filePair = inputDemuxer.getNext()) != null) {
            RewindableInputStream ris =
                new RewindableInputStream(filePair.second());

            JobHistoryParser parser = null;

            try {
              String jobID = JobHistoryUtils.extractJobID(filePair.first());
              if (jobID == null) {
                LOG.warn("File skipped: Invalid file name: "
                    + filePair.first());
                continue;
              }
              if ((jobBuilder == null)
                  || (!jobBuilder.getJobID().equals(jobID))) {
                if (jobBuilder != null) {
                  traceWriter.output(jobBuilder.build());
                }
                jobBuilder = new JobBuilder(jobID);
              }

              if (JobHistoryUtils.isJobConfXml(filePair.first())) {
                processJobConf(JobConfigurationParser.parse(ris.rewind()),
                               jobBuilder);
              } else {
                parser = JobHistoryParserFactory.getParser(ris);
                if (parser == null) {
                  LOG.warn("File skipped: Cannot find suitable parser: "
                      + filePair.first());
                } else {
                  processJobHistory(parser, jobBuilder);
                }
              }
            } finally {
              if (parser == null) {
                ris.close();
              } else {
                parser.close();
                parser = null;
              }
            }
          }
        } catch (Throwable t) {
          if (filePair != null) {
            LOG.warn("TraceBuilder got an error while processing the [possibly virtual] file "
                + filePair.first() + " within Path " + p , t);
          }
        } finally {
          inputDemuxer.close();
        }
      }
      if (jobBuilder != null) {
        traceWriter.output(jobBuilder.build());
        jobBuilder = null;
      } else {
        LOG.warn("No job found in traces: ");
      }

      topologyWriter.output(topologyBuilder.build());
    } finally {
      traceWriter.close();
      topologyWriter.close();
    }

    return 0;
  }

  private void processJobConf(Properties properties, JobBuilder jobBuilder) {
    jobBuilder.process(properties);
    topologyBuilder.process(properties);
  }

  void processJobHistory(JobHistoryParser parser, JobBuilder jobBuilder)
      throws IOException {
    HistoryEvent e;
    while ((e = parser.nextEvent()) != null) {
      jobBuilder.process(e);
      topologyBuilder.process(e);
    }

    parser.close();
  }

  void finish() {
    IOUtils.cleanupWithLogger(LOG, traceWriter, topologyWriter);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractClusterStory 源码

hadoop Anonymizer 源码

hadoop CDFPiecewiseLinearRandomGenerator 源码

hadoop CDFRandomGenerator 源码

hadoop ClusterStory 源码

hadoop ClusterTopologyReader 源码

hadoop CurrentJHParser 源码

hadoop DeepCompare 源码

hadoop DeepInequalityException 源码

hadoop DefaultInputDemuxer 源码

0  赞