hadoop Utils 源码

  • 2022-10-20
  • 浏览 (176)

haddop Utils 代码

文件路径:/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/record/Utils.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.record;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;

/**
 * Various utility functions for Hadoop record I/O runtime.
 * 
 * @deprecated Replaced by <a href="http://avro.apache.org/">Avro</a>.
 */
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Utils {
  
  /** Cannot create a new instance of Utils */
  private Utils() {
  }
  
  public static final char[] hexchars = { '0', '1', '2', '3', '4', '5',
                                          '6', '7', '8', '9', 'A', 'B',
                                          'C', 'D', 'E', 'F' };
  /**
   *
   * @param s
   * @return
   */
  static String toXMLString(String s) {
    StringBuilder sb = new StringBuilder();
    for (int idx = 0; idx < s.length(); idx++) {
      char ch = s.charAt(idx);
      if (ch == '<') {
        sb.append("&lt;");
      } else if (ch == '&') {
        sb.append("&amp;");
      } else if (ch == '%') {
        sb.append("%0025");
      } else if (ch < 0x20 ||
                 (ch > 0xD7FF && ch < 0xE000) ||
                 (ch > 0xFFFD)) {
        sb.append("%");
        sb.append(hexchars[(ch & 0xF000) >> 12]);
        sb.append(hexchars[(ch & 0x0F00) >> 8]);
        sb.append(hexchars[(ch & 0x00F0) >> 4]);
        sb.append(hexchars[(ch & 0x000F)]);
      } else {
        sb.append(ch);
      }
    }
    return sb.toString();
  }
  
  static private int h2c(char ch) {
    if (ch >= '0' && ch <= '9') {
      return ch - '0';
    } else if (ch >= 'A' && ch <= 'F') {
      return ch - 'A' + 10;
    } else if (ch >= 'a' && ch <= 'f') {
      return ch - 'a' + 10;
    }
    return 0;
  }
  
  /**
   *
   * @param s
   * @return
   */
  static String fromXMLString(String s) {
    StringBuilder sb = new StringBuilder();
    for (int idx = 0; idx < s.length();) {
      char ch = s.charAt(idx++);
      if (ch == '%') {
        int ch1 = h2c(s.charAt(idx++)) << 12;
        int ch2 = h2c(s.charAt(idx++)) << 8;
        int ch3 = h2c(s.charAt(idx++)) << 4;
        int ch4 = h2c(s.charAt(idx++));
        char res = (char)(ch1 | ch2 | ch3 | ch4);
        sb.append(res);
      } else {
        sb.append(ch);
      }
    }
    return sb.toString();
  }
  
  /**
   *
   * @param s
   * @return
   */
  static String toCSVString(String s) {
    StringBuilder sb = new StringBuilder(s.length()+1);
    sb.append('\'');
    int len = s.length();
    for (int i = 0; i < len; i++) {
      char c = s.charAt(i);
      switch(c) {
      case '\0':
        sb.append("%00");
        break;
      case '\n':
        sb.append("%0A");
        break;
      case '\r':
        sb.append("%0D");
        break;
      case ',':
        sb.append("%2C");
        break;
      case '}':
        sb.append("%7D");
        break;
      case '%':
        sb.append("%25");
        break;
      default:
        sb.append(c);
      }
    }
    return sb.toString();
  }
  
  /**
   *
   * @param s
   * @throws java.io.IOException
   * @return
   */
  static String fromCSVString(String s) throws IOException {
    if (s.charAt(0) != '\'') {
      throw new IOException("Error deserializing string.");
    }
    int len = s.length();
    StringBuilder sb = new StringBuilder(len-1);
    for (int i = 1; i < len; i++) {
      char c = s.charAt(i);
      if (c == '%') {
        char ch1 = s.charAt(i+1);
        char ch2 = s.charAt(i+2);
        i += 2;
        if (ch1 == '0' && ch2 == '0') {
          sb.append('\0');
        } else if (ch1 == '0' && ch2 == 'A') {
          sb.append('\n');
        } else if (ch1 == '0' && ch2 == 'D') {
          sb.append('\r');
        } else if (ch1 == '2' && ch2 == 'C') {
          sb.append(',');
        } else if (ch1 == '7' && ch2 == 'D') {
          sb.append('}');
        } else if (ch1 == '2' && ch2 == '5') {
          sb.append('%');
        } else {
          throw new IOException("Error deserializing string.");
        }
      } else {
        sb.append(c);
      }
    }
    return sb.toString();
  }
  
  /**
   *
   * @param s
   * @return
   */
  static String toXMLBuffer(Buffer s) {
    return s.toString();
  }
  
  /**
   *
   * @param s
   * @throws java.io.IOException
   * @return
   */
  static Buffer fromXMLBuffer(String s)
    throws IOException {
    if (s.length() == 0) { return new Buffer(); }
    int blen = s.length()/2;
    byte[] barr = new byte[blen];
    for (int idx = 0; idx < blen; idx++) {
      char c1 = s.charAt(2*idx);
      char c2 = s.charAt(2*idx+1);
      barr[idx] = (byte)Integer.parseInt(""+c1+c2, 16);
    }
    return new Buffer(barr);
  }
  
  /**
   *
   * @param buf
   * @return
   */
  static String toCSVBuffer(Buffer buf) {
    StringBuilder sb = new StringBuilder("#");
    sb.append(buf.toString());
    return sb.toString();
  }
  
  /**
   * Converts a CSV-serialized representation of buffer to a new
   * Buffer
   * @param s CSV-serialized representation of buffer
   * @throws java.io.IOException
   * @return Deserialized Buffer
   */
  static Buffer fromCSVBuffer(String s)
    throws IOException {
    if (s.charAt(0) != '#') {
      throw new IOException("Error deserializing buffer.");
    }
    if (s.length() == 1) { return new Buffer(); }
    int blen = (s.length()-1)/2;
    byte[] barr = new byte[blen];
    for (int idx = 0; idx < blen; idx++) {
      char c1 = s.charAt(2*idx+1);
      char c2 = s.charAt(2*idx+2);
      barr[idx] = (byte)Integer.parseInt(""+c1+c2, 16);
    }
    return new Buffer(barr);
  }
  
  private static int utf8LenForCodePoint(final int cpt) throws IOException {
    if (cpt >=0 && cpt <= 0x7F) {
      return 1;
    }
    if (cpt >= 0x80 && cpt <= 0x07FF) {
      return 2;
    }
    if ((cpt >= 0x0800 && cpt < 0xD800) ||
        (cpt > 0xDFFF && cpt <= 0xFFFD)) {
      return 3;
    }
    if (cpt >= 0x10000 && cpt <= 0x10FFFF) {
      return 4;
    }
    throw new IOException("Illegal Unicode Codepoint "+
                          Integer.toHexString(cpt)+" in string.");
  }
  
  private static final int B10 =    Integer.parseInt("10000000", 2);
  private static final int B110 =   Integer.parseInt("11000000", 2);
  private static final int B1110 =  Integer.parseInt("11100000", 2);
  private static final int B11110 = Integer.parseInt("11110000", 2);
  private static final int B11 =    Integer.parseInt("11000000", 2);
  private static final int B111 =   Integer.parseInt("11100000", 2);
  private static final int B1111 =  Integer.parseInt("11110000", 2);
  private static final int B11111 = Integer.parseInt("11111000", 2);
  
  private static int writeUtf8(int cpt, final byte[] bytes, final int offset)
    throws IOException {
    if (cpt >=0 && cpt <= 0x7F) {
      bytes[offset] = (byte) cpt;
      return 1;
    }
    if (cpt >= 0x80 && cpt <= 0x07FF) {
      bytes[offset+1] = (byte) (B10 | (cpt & 0x3F));
      cpt = cpt >> 6;
      bytes[offset] = (byte) (B110 | (cpt & 0x1F));
      return 2;
    }
    if ((cpt >= 0x0800 && cpt < 0xD800) ||
        (cpt > 0xDFFF && cpt <= 0xFFFD)) {
      bytes[offset+2] = (byte) (B10 | (cpt & 0x3F));
      cpt = cpt >> 6;
      bytes[offset+1] = (byte) (B10 | (cpt & 0x3F));
      cpt = cpt >> 6;
      bytes[offset] = (byte) (B1110 | (cpt & 0x0F));
      return 3;
    }
    if (cpt >= 0x10000 && cpt <= 0x10FFFF) {
      bytes[offset+3] = (byte) (B10 | (cpt & 0x3F));
      cpt = cpt >> 6;
      bytes[offset+2] = (byte) (B10 | (cpt & 0x3F));
      cpt = cpt >> 6;
      bytes[offset+1] = (byte) (B10 | (cpt & 0x3F));
      cpt = cpt >> 6;
      bytes[offset] = (byte) (B11110 | (cpt & 0x07));
      return 4;
    }
    throw new IOException("Illegal Unicode Codepoint "+
                          Integer.toHexString(cpt)+" in string.");
  }
  
  static void toBinaryString(final DataOutput out, final String str)
    throws IOException {
    final int strlen = str.length();
    byte[] bytes = new byte[strlen*4]; // Codepoints expand to 4 bytes max
    int utf8Len = 0;
    int idx = 0;
    while(idx < strlen) {
      final int cpt = str.codePointAt(idx);
      idx += Character.isSupplementaryCodePoint(cpt) ? 2 : 1;
      utf8Len += writeUtf8(cpt, bytes, utf8Len);
    }
    writeVInt(out, utf8Len);
    out.write(bytes, 0, utf8Len);
  }
  
  static boolean isValidCodePoint(int cpt) {
    return !((cpt > 0x10FFFF) ||
             (cpt >= 0xD800 && cpt <= 0xDFFF) ||
             (cpt >= 0xFFFE && cpt <=0xFFFF));
  }
  
  private static int utf8ToCodePoint(int b1, int b2, int b3, int b4) {
    int cpt = 0;
    cpt = (((b1 & ~B11111) << 18) |
           ((b2 & ~B11) << 12) |
           ((b3 & ~B11) << 6) |
           (b4 & ~B11));
    return cpt;
  }
  
  private static int utf8ToCodePoint(int b1, int b2, int b3) {
    int cpt = 0;
    cpt = (((b1 & ~B1111) << 12) | ((b2 & ~B11) << 6) | (b3 & ~B11));
    return cpt;
  }
  
  private static int utf8ToCodePoint(int b1, int b2) {
    int cpt = 0;
    cpt = (((b1 & ~B111) << 6) | (b2 & ~B11));
    return cpt;
  }
  
  private static void checkB10(int b) throws IOException {
    if ((b & B11) != B10) {
      throw new IOException("Invalid UTF-8 representation.");
    }
  }
  
  static String fromBinaryString(final DataInput din) throws IOException {
    final int utf8Len = readVInt(din);
    final byte[] bytes = new byte[utf8Len];
    din.readFully(bytes);
    int len = 0;
    // For the most commmon case, i.e. ascii, numChars = utf8Len
    StringBuilder sb = new StringBuilder(utf8Len);
    while(len < utf8Len) {
      int cpt = 0;
      final int b1 = bytes[len++] & 0xFF;
      if (b1 <= 0x7F) {
        cpt = b1;
      } else if ((b1 & B11111) == B11110) {
        int b2 = bytes[len++] & 0xFF;
        checkB10(b2);
        int b3 = bytes[len++] & 0xFF;
        checkB10(b3);
        int b4 = bytes[len++] & 0xFF;
        checkB10(b4);
        cpt = utf8ToCodePoint(b1, b2, b3, b4);
      } else if ((b1 & B1111) == B1110) {
        int b2 = bytes[len++] & 0xFF;
        checkB10(b2);
        int b3 = bytes[len++] & 0xFF;
        checkB10(b3);
        cpt = utf8ToCodePoint(b1, b2, b3);
      } else if ((b1 & B111) == B110) {
        int b2 = bytes[len++] & 0xFF;
        checkB10(b2);
        cpt = utf8ToCodePoint(b1, b2);
      } else {
        throw new IOException("Invalid UTF-8 byte "+Integer.toHexString(b1)+
                              " at offset "+(len-1)+" in length of "+utf8Len);
      }
      if (!isValidCodePoint(cpt)) {
        throw new IOException("Illegal Unicode Codepoint "+
                              Integer.toHexString(cpt)+" in stream.");
      }
      sb.appendCodePoint(cpt);
    }
    return sb.toString();
  }
  
  /** Parse a float from a byte array. */
  public static float readFloat(byte[] bytes, int start) {
    return WritableComparator.readFloat(bytes, start);
  }
  
  /** Parse a double from a byte array. */
  public static double readDouble(byte[] bytes, int start) {
    return WritableComparator.readDouble(bytes, start);
  }
  
  /**
   * Reads a zero-compressed encoded long from a byte array and returns it.
   * @param bytes byte array with decode long
   * @param start starting index
   * @throws java.io.IOException
   * @return deserialized long
   */
  public static long readVLong(byte[] bytes, int start) throws IOException {
    return WritableComparator.readVLong(bytes, start);
  }
  
  /**
   * Reads a zero-compressed encoded integer from a byte array and returns it.
   * @param bytes byte array with the encoded integer
   * @param start start index
   * @throws java.io.IOException
   * @return deserialized integer
   */
  public static int readVInt(byte[] bytes, int start) throws IOException {
    return WritableComparator.readVInt(bytes, start);
  }
  
  /**
   * Reads a zero-compressed encoded long from a stream and return it.
   * @param in input stream
   * @throws java.io.IOException
   * @return deserialized long
   */
  public static long readVLong(DataInput in) throws IOException {
    return WritableUtils.readVLong(in);
  }
  
  /**
   * Reads a zero-compressed encoded integer from a stream and returns it.
   * @param in input stream
   * @throws java.io.IOException
   * @return deserialized integer
   */
  public static int readVInt(DataInput in) throws IOException {
    return WritableUtils.readVInt(in);
  }
  
  /**
   * Get the encoded length if an integer is stored in a variable-length format
   * @return the encoded length
   */
  public static int getVIntSize(long i) {
    return WritableUtils.getVIntSize(i);
  }
  
  /**
   * Serializes a long to a binary stream with zero-compressed encoding.
   * For {@literal -112 <= i <= 127}, only one byte is used with the actual
   * value. For other values of i, the first byte value indicates whether the
   * long is positive or negative, and the number of bytes that follow.
   * If the first byte value v is between -113 and -120, the following long
   * is positive, with number of bytes that follow are -(v+112).
   * If the first byte value v is between -121 and -128, the following long
   * is negative, with number of bytes that follow are -(v+120). Bytes are
   * stored in the high-non-zero-byte-first order.
   *
   * @param stream Binary output stream
   * @param i Long to be serialized
   * @throws java.io.IOException
   */
  public static void writeVLong(DataOutput stream, long i) throws IOException {
    WritableUtils.writeVLong(stream, i);
  }
  
  /**
   * Serializes an int to a binary stream with zero-compressed encoding.
   *
   * @param stream Binary output stream
   * @param i int to be serialized
   * @throws java.io.IOException
   */
  public static void writeVInt(DataOutput stream, int i) throws IOException {
    WritableUtils.writeVInt(stream, i);
  }
  
  /** Lexicographic order of binary data. */
  public static int compareBytes(byte[] b1, int s1, int l1,
                                 byte[] b2, int s2, int l2) {
    return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BinaryRecordInput 源码

hadoop BinaryRecordOutput 源码

hadoop Buffer 源码

hadoop CsvRecordOutput 源码

hadoop Index 源码

hadoop Record 源码

hadoop RecordComparator 源码

hadoop RecordInput 源码

hadoop RecordOutput 源码

0  赞