hadoop TextSplitter 源码

  • 2022-10-20
  • 浏览 (167)

haddop TextSplitter 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/TextSplitter.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.db;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MRJobConfig;

/**
 * Implement DBSplitter over text strings.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TextSplitter extends BigDecimalSplitter {

  private static final Logger LOG = LoggerFactory.getLogger(TextSplitter.class);

  /**
   * This method needs to determine the splits between two user-provided strings.
   * In the case where the user's strings are 'A' and 'Z', this is not hard; we 
   * could create two splits from ['A', 'M') and ['M', 'Z'], 26 splits for strings
   * beginning with each letter, etc.
   *
   * If a user has provided us with the strings "Ham" and "Haze", however, we need
   * to create splits that differ in the third letter.
   *
   * The algorithm used is as follows:
   * Since there are 2**16 unicode characters, we interpret characters as digits in
   * base 65536. Given a string 's' containing characters s_0, s_1 .. s_n, we interpret
   * the string as the number: 0.s_0 s_1 s_2.. s_n in base 65536. Having mapped the
   * low and high strings into floating-point values, we then use the BigDecimalSplitter
   * to establish the even split points, then map the resulting floating point values
   * back into strings.
   */
  public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
      throws SQLException {

    LOG.warn("Generating splits for a textual index column.");
    LOG.warn("If your database sorts in a case-insensitive order, "
        + "this may result in a partial import or duplicate records.");
    LOG.warn("You are strongly encouraged to choose an integral split column.");

    String minString = results.getString(1);
    String maxString = results.getString(2);

    boolean minIsNull = false;

    // If the min value is null, switch it to an empty string instead for purposes
    // of interpolation. Then add [null, null] as a special case split.
    if (null == minString) {
      minString = "";
      minIsNull = true;
    }

    if (null == maxString) {
      // If the max string is null, then the min string has to be null too.
      // Just return a special split for this case.
      List<InputSplit> splits = new ArrayList<InputSplit>();
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          colName + " IS NULL", colName + " IS NULL"));
      return splits;
    }

    // Use this as a hint. May need an extra task if the size doesn't
    // divide cleanly.
    int numSplits = conf.getInt(MRJobConfig.NUM_MAPS, 1);

    String lowClausePrefix = colName + " >= '";
    String highClausePrefix = colName + " < '";

    // If there is a common prefix between minString and maxString, establish it
    // and pull it out of minString and maxString.
    int maxPrefixLen = Math.min(minString.length(), maxString.length());
    int sharedLen;
    for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
      char c1 = minString.charAt(sharedLen);
      char c2 = maxString.charAt(sharedLen);
      if (c1 != c2) {
        break;
      }
    }

    // The common prefix has length 'sharedLen'. Extract it from both.
    String commonPrefix = minString.substring(0, sharedLen);
    minString = minString.substring(sharedLen);
    maxString = maxString.substring(sharedLen);

    List<String> splitStrings = split(numSplits, minString, maxString, commonPrefix);
    List<InputSplit> splits = new ArrayList<InputSplit>();

    // Convert the list of split point strings into an actual set of InputSplits.
    String start = splitStrings.get(0);
    for (int i = 1; i < splitStrings.size(); i++) {
      String end = splitStrings.get(i);

      if (i == splitStrings.size() - 1) {
        // This is the last one; use a closed interval.
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
            lowClausePrefix + start + "'", colName + " <= '" + end + "'"));
      } else {
        // Normal open-interval case.
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
            lowClausePrefix + start + "'", highClausePrefix + end + "'"));
      }
    }

    if (minIsNull) {
      // Add the special null split at the end.
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          colName + " IS NULL", colName + " IS NULL"));
    }

    return splits;
  }

  List<String> split(int numSplits, String minString, String maxString, String commonPrefix)
      throws SQLException {

    BigDecimal minVal = stringToBigDecimal(minString);
    BigDecimal maxVal = stringToBigDecimal(maxString);

    List<BigDecimal> splitPoints = split(new BigDecimal(numSplits), minVal, maxVal);
    List<String> splitStrings = new ArrayList<String>();

    // Convert the BigDecimal splitPoints into their string representations.
    for (BigDecimal bd : splitPoints) {
      splitStrings.add(commonPrefix + bigDecimalToString(bd));
    }

    // Make sure that our user-specified boundaries are the first and last entries
    // in the array.
    if (splitStrings.size() == 0 || !splitStrings.get(0).equals(commonPrefix + minString)) {
      splitStrings.add(0, commonPrefix + minString);
    }
    if (splitStrings.size() == 1
        || !splitStrings.get(splitStrings.size() - 1).equals(commonPrefix + maxString)) {
      splitStrings.add(commonPrefix + maxString);
    }

    return splitStrings;
  }

  private final static BigDecimal ONE_PLACE = new BigDecimal(65536);

  // Maximum number of characters to convert. This is to prevent rounding errors
  // or repeating fractions near the very bottom from getting out of control. Note
  // that this still gives us a huge number of possible splits.
  private final static int MAX_CHARS = 8;

  /**
   * Return a BigDecimal representation of string 'str' suitable for use
   * in a numerically-sorting order.
   */
  BigDecimal stringToBigDecimal(String str) {
    BigDecimal result = BigDecimal.ZERO;
    BigDecimal curPlace = ONE_PLACE; // start with 1/65536 to compute the first digit.

    int len = Math.min(str.length(), MAX_CHARS);

    for (int i = 0; i < len; i++) {
      int codePoint = str.codePointAt(i);
      result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
      // advance to the next less significant place. e.g., 1/(65536^2) for the second char.
      curPlace = curPlace.multiply(ONE_PLACE);
    }

    return result;
  }

  /**
   * Return the string encoded in a BigDecimal.
   * Repeatedly multiply the input value by 65536; the integer portion after such a multiplication
   * represents a single character in base 65536. Convert that back into a char and create a
   * string out of these until we have no data left.
   */
  String bigDecimalToString(BigDecimal bd) {
    BigDecimal cur = bd.stripTrailingZeros();
    StringBuilder sb = new StringBuilder();

    for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
      cur = cur.multiply(ONE_PLACE);
      int curCodePoint = cur.intValue();
      if (0 == curCodePoint) {
        break;
      }

      cur = cur.subtract(new BigDecimal(curCodePoint));
      sb.append(Character.toChars(curCodePoint));
    }

    return sb.toString();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BigDecimalSplitter 源码

hadoop BooleanSplitter 源码

hadoop DBConfiguration 源码

hadoop DBInputFormat 源码

hadoop DBOutputFormat 源码

hadoop DBRecordReader 源码

hadoop DBSplitter 源码

hadoop DBWritable 源码

hadoop DataDrivenDBInputFormat 源码

hadoop DataDrivenDBRecordReader 源码

0  赞