hadoop BasicTypeSorterBase 源码

  • 2022-10-20
  • 浏览 (153)

haddop BasicTypeSorterBase 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred;

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.OutputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile.ValueBytes;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
import org.apache.hadoop.util.Progressable;

/** This class implements the sort interface using primitive int arrays as 
 * the data structures (that is why this class is called 'BasicType'SorterBase)
 */
abstract class BasicTypeSorterBase implements BufferSorter {
  
  protected OutputBuffer keyValBuffer; //the buffer used for storing
                                           //key/values
  protected int[] startOffsets; //the array used to store the start offsets of
                                //keys in keyValBuffer
  protected int[] keyLengths; //the array used to store the lengths of
                              //keys
  protected int[] valueLengths; //the array used to store the value lengths 
  protected int[] pointers; //the array of startOffsets's indices. This will
                            //be sorted at the end to contain a sorted array of
                            //indices to offsets
  protected RawComparator comparator; //the comparator for the map output
  protected int count; //the number of key/values
  //the overhead of the arrays in memory 
  //12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
  //4 for indices into startOffsets array in the
  //pointers array (ignored the partpointers list itself)
  static private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
  static private final int INITIAL_ARRAY_SIZE = 5;
  //we maintain the max lengths of the key/val that we encounter.  During 
  //iteration of the sorted results, we will create a DataOutputBuffer to
  //return the keys. The max size of the DataOutputBuffer will be the max
  //keylength that we encounter. Expose this value to model memory more
  //accurately.
  private int maxKeyLength = 0;
  private int maxValLength = 0;

  //Reference to the Progressable object for sending KeepAlive
  protected Progressable reporter;

  //Implementation of methods of the SorterBase interface
  //
  public void configure(JobConf conf) {
    comparator = conf.getOutputKeyComparator();
  }
  
  public void setProgressable(Progressable reporter) {
    this.reporter = reporter;  
  }

  public void addKeyValue(int recordOffset, int keyLength, int valLength) {
    //Add the start offset of the key in the startOffsets array and the
    //length in the keyLengths array.
    if (startOffsets == null || count == startOffsets.length)
      grow();
    startOffsets[count] = recordOffset;
    keyLengths[count] = keyLength;
    if (keyLength > maxKeyLength) {
      maxKeyLength = keyLength;
    }
    if (valLength > maxValLength) {
      maxValLength = valLength;
    }
    valueLengths[count] = valLength;
    pointers[count] = count;
    count++;
  }

  public void setInputBuffer(OutputBuffer buffer) {
    //store a reference to the keyValBuffer that we need to read during sort
    this.keyValBuffer = buffer;
  }

  public long getMemoryUtilized() {
    //the total length of the arrays + the max{Key,Val}Length (this will be the 
    //max size of the DataOutputBuffers during the iteration of the sorted
    //keys).
    if (startOffsets != null) {
      return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD + 
              maxKeyLength + maxValLength;
    }
    else { //nothing from this yet
      return 0;
    }
  }

  public abstract RawKeyValueIterator sort();
  
  public void close() {
    //set count to 0; also, we don't reuse the arrays since we want to maintain
    //consistency in the memory model
    count = 0;
    startOffsets = null;
    keyLengths = null;
    valueLengths = null;
    pointers = null;
    maxKeyLength = 0;
    maxValLength = 0;
    
    //release the large key-value buffer so that the GC, if necessary,
    //can collect it away
    keyValBuffer = null;
  }
  
  private void grow() {
    int currLength = 0;
    if (startOffsets != null) {
      currLength = startOffsets.length;
    }
    int newLength = (int)(currLength * 1.1) + 1;
    startOffsets = grow(startOffsets, newLength);
    keyLengths = grow(keyLengths, newLength);
    valueLengths = grow(valueLengths, newLength);
    pointers = grow(pointers, newLength);
  }
  
  private int[] grow(int[] old, int newLength) {
    int[] result = new int[newLength];
    if(old != null) { 
      System.arraycopy(old, 0, result, 0, old.length);
    }
    return result;
  }
} //BasicTypeSorterBase

//Implementation of methods of the RawKeyValueIterator interface. These
//methods must be invoked to iterate over key/vals after sort is done.
//
class MRSortResultIterator implements RawKeyValueIterator {
  
  private int count;
  private int[] pointers;
  private int[] startOffsets;
  private int[] keyLengths;
  private int[] valLengths;
  private int currStartOffsetIndex;
  private int currIndexInPointers;
  private OutputBuffer keyValBuffer;
  private DataOutputBuffer key = new DataOutputBuffer();
  private InMemUncompressedBytes value = new InMemUncompressedBytes();
  
  public MRSortResultIterator(OutputBuffer keyValBuffer, 
                              int []pointers, int []startOffsets,
                              int []keyLengths, int []valLengths) {
    this.count = pointers.length;
    this.pointers = pointers;
    this.startOffsets = startOffsets;
    this.keyLengths = keyLengths;
    this.valLengths = valLengths;
    this.keyValBuffer = keyValBuffer;
  }
  
  public Progress getProgress() {
    return null;
  }
  
  public DataOutputBuffer getKey() throws IOException {
    int currKeyOffset = startOffsets[currStartOffsetIndex];
    int currKeyLength = keyLengths[currStartOffsetIndex];
    //reuse the same key
    key.reset();
    key.write(keyValBuffer.getData(), currKeyOffset, currKeyLength);
    return key;
  }

  public ValueBytes getValue() throws IOException {
    //value[i] is stored in the following byte range:
    //startOffsets[i] + keyLengths[i] through valLengths[i]
    value.reset(keyValBuffer,
                startOffsets[currStartOffsetIndex] + keyLengths[currStartOffsetIndex],
                valLengths[currStartOffsetIndex]);
    return value;
  }

  public boolean next() throws IOException {
    if (count == currIndexInPointers)
      return false;
    currStartOffsetIndex = pointers[currIndexInPointers];
    currIndexInPointers++;
    return true;
  }
  
  public void close() {
    return;
  }
  
  //An implementation of the ValueBytes interface for the in-memory value
  //buffers. 
  private static class InMemUncompressedBytes implements ValueBytes {
    private byte[] data;
    int start;
    int dataSize;
    private void reset(OutputBuffer d, int start, int length) 
      throws IOException {
      data = d.getData();
      this.start = start;
      dataSize = length;
    }
            
    public int getSize() {
      return dataSize;
    }
            
    public void writeUncompressedBytes(DataOutputStream outStream)
      throws IOException {
      outStream.write(data, start, dataSize);
    }

    public void writeCompressedBytes(DataOutputStream outStream) 
      throws IllegalArgumentException, IOException {
      throw
        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
    }
  
  } // InMemUncompressedBytes

} //MRSortResultIterator

相关信息

hadoop 源码目录

相关文章

hadoop AMFeedback 源码

hadoop BackupStore 源码

hadoop BufferSorter 源码

hadoop CleanupQueue 源码

hadoop Clock 源码

hadoop ClusterStatus 源码

hadoop Counters 源码

hadoop CumulativePeriodicStats 源码

hadoop DeprecatedQueueConfigurationParser 源码

hadoop FileAlreadyExistsException 源码

0  赞