hadoop MultipleInputs 源码

  • 2022-10-20
  • 浏览 (152)

haddop MultipleInputs 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.lib.input;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils;

/**
 * This class supports MapReduce jobs that have multiple input paths with
 * a different {@link InputFormat} and {@link Mapper} for each path 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultipleInputs {
  public static final String DIR_FORMATS = 
    "mapreduce.input.multipleinputs.dir.formats";
  public static final String DIR_MAPPERS = 
    "mapreduce.input.multipleinputs.dir.mappers";
  
  /**
   * Add a {@link Path} with a custom {@link InputFormat} to the list of
   * inputs for the map-reduce job.
   * 
   * @param job The {@link Job}
   * @param path {@link Path} to be added to the list of inputs for the job
   * @param inputFormatClass {@link InputFormat} class to use for this path
   */
  @SuppressWarnings("unchecked")
  public static void addInputPath(Job job, Path path,
      Class<? extends InputFormat> inputFormatClass) {
    String inputFormatMapping = path.toString() + ";"
       + inputFormatClass.getName();
    Configuration conf = job.getConfiguration();
    String inputFormats = conf.get(DIR_FORMATS);
    conf.set(DIR_FORMATS,
       inputFormats == null ? inputFormatMapping : inputFormats + ","
           + inputFormatMapping);

    job.setInputFormatClass(DelegatingInputFormat.class);
  }

  /**
   * Add a {@link Path} with a custom {@link InputFormat} and
   * {@link Mapper} to the list of inputs for the map-reduce job.
   * 
   * @param job The {@link Job}
   * @param path {@link Path} to be added to the list of inputs for the job
   * @param inputFormatClass {@link InputFormat} class to use for this path
   * @param mapperClass {@link Mapper} class to use for this path
   */
  @SuppressWarnings("unchecked")
  public static void addInputPath(Job job, Path path,
      Class<? extends InputFormat> inputFormatClass,
      Class<? extends Mapper> mapperClass) {

    addInputPath(job, path, inputFormatClass);
    Configuration conf = job.getConfiguration();
    String mapperMapping = path.toString() + ";" + mapperClass.getName();
    String mappers = conf.get(DIR_MAPPERS);
    conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
       : mappers + "," + mapperMapping);

    job.setMapperClass(DelegatingMapper.class);
  }

  /**
   * Retrieves a map of {@link Path}s to the {@link InputFormat} class
   * that should be used for them.
   * 
   * @param job The {@link JobContext}
   * @see #addInputPath(JobConf, Path, Class)
   * @return A map of paths to inputformats for the job
   */
  @SuppressWarnings("unchecked")
  static Map<Path, InputFormat> getInputFormatMap(JobContext job) {
    Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
    Configuration conf = job.getConfiguration();
    String[] pathMappings = conf.get(DIR_FORMATS).split(",");
    for (String pathMapping : pathMappings) {
      String[] split = pathMapping.split(";");
      InputFormat inputFormat;
      try {
       inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
           .getClassByName(split[1]), conf);
      } catch (ClassNotFoundException e) {
       throw new RuntimeException(e);
      }
      m.put(new Path(split[0]), inputFormat);
    }
    return m;
  }

  /**
   * Retrieves a map of {@link Path}s to the {@link Mapper} class that
   * should be used for them.
   * 
   * @param job The {@link JobContext}
   * @see #addInputPath(JobConf, Path, Class, Class)
   * @return A map of paths to mappers for the job
   */
  @SuppressWarnings("unchecked")
  static Map<Path, Class<? extends Mapper>> 
      getMapperTypeMap(JobContext job) {
    Configuration conf = job.getConfiguration();
    if (conf.get(DIR_MAPPERS) == null) {
      return Collections.emptyMap();
    }
    Map<Path, Class<? extends Mapper>> m = 
      new HashMap<Path, Class<? extends Mapper>>();
    String[] pathMappings = conf.get(DIR_MAPPERS).split(",");
    for (String pathMapping : pathMappings) {
      String[] split = pathMapping.split(";");
      Class<? extends Mapper> mapClass;
      try {
       mapClass = 
         (Class<? extends Mapper>) conf.getClassByName(split[1]);
      } catch (ClassNotFoundException e) {
       throw new RuntimeException(e);
      }
      m.put(new Path(split[0]), mapClass);
    }
    return m;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CombineFileInputFormat 源码

hadoop CombineFileRecordReader 源码

hadoop CombineFileRecordReaderWrapper 源码

hadoop CombineFileSplit 源码

hadoop CombineSequenceFileInputFormat 源码

hadoop CombineTextInputFormat 源码

hadoop CompressedSplitLineReader 源码

hadoop DelegatingInputFormat 源码

hadoop DelegatingMapper 源码

hadoop DelegatingRecordReader 源码

0  赞