hadoop BackupStore 源码

  • 2022-10-20
  • 浏览 (160)

haddop BackupStore 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <code>BackupStore</code> is an utility class that is used to support
 * the mark-reset functionality of values iterator
 *
 * <p>It has two caches - a memory cache and a file cache where values are
 * stored as they are iterated, after a mark. On reset, values are retrieved
 * from these caches. Framework moves from the memory cache to the 
 * file cache when the memory cache becomes full.
 * 
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class BackupStore<K,V> {

  private static final Logger LOG =
      LoggerFactory.getLogger(BackupStore.class.getName());
  private static final int MAX_VINT_SIZE = 9;
  private static final int EOF_MARKER_SIZE = 2 * MAX_VINT_SIZE;
  private final TaskAttemptID tid;
 
  private MemoryCache memCache;
  private FileCache fileCache;

  List<Segment<K,V>> segmentList = new LinkedList<Segment<K,V>>();
  private int readSegmentIndex = 0;
  private int firstSegmentOffset = 0;

  private int currentKVOffset = 0;
  private int nextKVOffset = -1;

  private DataInputBuffer currentKey = null;
  private DataInputBuffer currentValue = new DataInputBuffer();
  private DataInputBuffer currentDiskValue = new DataInputBuffer();
 
  private boolean hasMore = false;
  private boolean inReset = false;
  private boolean clearMarkFlag = false;
  private boolean lastSegmentEOF = false;
  
  private Configuration conf;

  public BackupStore(Configuration conf, TaskAttemptID taskid)
  throws IOException {
    
    final float bufferPercent =
      conf.getFloat(JobContext.REDUCE_MARKRESET_BUFFER_PERCENT, 0f);

    if (bufferPercent > 1.0 || bufferPercent < 0.0) {
      throw new IOException(JobContext.REDUCE_MARKRESET_BUFFER_PERCENT +
          bufferPercent);
    }

    int maxSize = (int)Math.min(
        Runtime.getRuntime().maxMemory() * bufferPercent, Integer.MAX_VALUE);

    // Support an absolute size also.
    int tmp = conf.getInt(JobContext.REDUCE_MARKRESET_BUFFER_SIZE, 0);
    if (tmp >  0) {
      maxSize = tmp;
    }

    memCache = new MemoryCache(maxSize);
    fileCache = new FileCache(conf);
    tid = taskid;
    
    this.conf = conf;
    
    LOG.info("Created a new BackupStore with a memory of " + maxSize);

  }

  /**
   * Write the given K,V to the cache. 
   * Write to memcache if space is available, else write to the filecache
   * @param key
   * @param value
   * @throws IOException
   */
  public void write(DataInputBuffer key, DataInputBuffer value)
  throws IOException {

    assert (key != null && value != null);

    if (fileCache.isActive()) {
      fileCache.write(key, value);
      return;
    }

    if (memCache.reserveSpace(key, value)) {
      memCache.write(key, value);
    } else {
      fileCache.activate();
      fileCache.write(key, value);
    }
  }

  public void mark() throws IOException {

    // We read one KV pair in advance in hasNext. 
    // If hasNext has read the next KV pair from a new segment, but the
    // user has not called next() for that KV, then reset the readSegmentIndex
    // to the previous segment

    if (nextKVOffset == 0) {
      assert (readSegmentIndex != 0);
      assert (currentKVOffset != 0);
      readSegmentIndex --;
    }

    // just drop segments before the current active segment

    int i = 0;
    Iterator<Segment<K,V>> itr = segmentList.iterator();
    while (itr.hasNext()) {
      Segment<K,V> s = itr.next();
      if (i == readSegmentIndex) {
        break;
      }
      s.close();
      itr.remove();
      i++;
      LOG.debug("Dropping a segment");
    }

    // FirstSegmentOffset is the offset in the current segment from where we
    // need to start reading on the next reset

    firstSegmentOffset = currentKVOffset;
    readSegmentIndex = 0;

    LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
  }

  public void reset() throws IOException {

    // Create a new segment for the previously written records only if we
    // are not already in the reset mode
    
    if (!inReset) {
      if (fileCache.isActive) {
        fileCache.createInDiskSegment();
      } else {
        memCache.createInMemorySegment();
      }
    } 

    inReset = true;
    
    // Reset the segments to the correct position from where the next read
    // should begin. 
    for (int i = 0; i < segmentList.size(); i++) {
      Segment<K,V> s = segmentList.get(i);
      if (s.inMemory()) {
        int offset = (i == 0) ? firstSegmentOffset : 0;
        s.getReader().reset(offset);
      } else {
        s.closeReader();
        if (i == 0) {
          s.reinitReader(firstSegmentOffset);
          s.getReader().disableChecksumValidation();
        }
      }
    }
    
    currentKVOffset = firstSegmentOffset;
    nextKVOffset = -1;
    readSegmentIndex = 0;
    hasMore = false;
    lastSegmentEOF = false;

    LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
        " Segment List Size is " + segmentList.size());
  }

  public boolean hasNext() throws IOException {
    
    if (lastSegmentEOF) {
      return false;
    }
    
    // We read the next KV from the cache to decide if there is any left.
    // Since hasNext can be called several times before the actual call to 
    // next(), we use hasMore to avoid extra reads. hasMore is set to false
    // when the user actually consumes this record in next()

    if (hasMore) {
      return true;
    }

    Segment<K,V> seg = segmentList.get(readSegmentIndex);
    // Mark the current position. This would be set to currentKVOffset
    // when the user consumes this record in next(). 
    nextKVOffset = (int) seg.getActualPosition();
    if (seg.nextRawKey()) {
      currentKey = seg.getKey();
      seg.getValue(currentValue);
      hasMore = true;
      return true;
    } else {
      if (!seg.inMemory()) {
        seg.closeReader();
      }
    }

    // If this is the last segment, mark the lastSegmentEOF flag and return
    if (readSegmentIndex == segmentList.size() - 1) {
      nextKVOffset = -1;
      lastSegmentEOF = true;
      return false;
    }

    nextKVOffset = 0;
    readSegmentIndex ++;

    Segment<K,V> nextSegment = segmentList.get(readSegmentIndex);
    
    // We possibly are moving from a memory segment to a disk segment.
    // Reset so that we do not corrupt the in-memory segment buffer.
    // See HADOOP-5494
    
    if (!nextSegment.inMemory()) {
      currentValue.reset(currentDiskValue.getData(), 
          currentDiskValue.getLength());
      nextSegment.init(null);
    }
 
    if (nextSegment.nextRawKey()) {
      currentKey = nextSegment.getKey();
      nextSegment.getValue(currentValue);
      hasMore = true;
      return true;
    } else {
      throw new IOException("New segment did not have even one K/V");
    }
  }

  public void next() throws IOException {
    if (!hasNext()) {
      throw new NoSuchElementException("iterate past last value");
    }
    // Reset hasMore. See comment in hasNext()
    hasMore = false;
    currentKVOffset = nextKVOffset;
    nextKVOffset = -1;
  }

  public DataInputBuffer nextValue() {
    return  currentValue;
  }

  public DataInputBuffer nextKey() {
    return  currentKey;
  }

  public void reinitialize() throws IOException {
    if (segmentList.size() != 0) {
      clearSegmentList();
    }
    memCache.reinitialize(true);
    fileCache.reinitialize();
    readSegmentIndex = firstSegmentOffset = 0;
    currentKVOffset = 0;
    nextKVOffset = -1;
    hasMore = inReset = clearMarkFlag = false;
  }

  /**
   * This function is called the ValuesIterator when a mark is called
   * outside of a reset zone.  
   */
  public void exitResetMode() throws IOException { 
    inReset = false;
    if (clearMarkFlag ) {
      // If a flag was set to clear mark, do the reinit now.
      // See clearMark()
      reinitialize();
      return;
    }
    if (!fileCache.isActive) {
      memCache.reinitialize(false);
    }
  }

  /** For writing the first key and value bytes directly from the
   *  value iterators, pass the current underlying output stream
   *  @param length The length of the impending write
   */
  public DataOutputStream getOutputStream(int length) throws IOException {
    if (memCache.reserveSpace(length)) {
      return memCache.dataOut;
    } else {
      fileCache.activate();
      return fileCache.writer.getOutputStream();
    }
  }

  /** This method is called by the valueIterators after writing the first
   *  key and value bytes to the BackupStore
   * @param length 
   */
  public void updateCounters(int length) {
    if (fileCache.isActive) {
      fileCache.writer.updateCountersForExternalAppend(length);
    } else {
      memCache.usedSize += length;
    }
  }

  public void clearMark() throws IOException {
    if (inReset) {
      // If we are in the reset mode, we just mark a flag and come out
      // The actual re initialization would be done when we exit the reset
      // mode
      clearMarkFlag = true;
    } else {
      reinitialize();
    }
  }
  
  private void clearSegmentList() throws IOException {
    for (Segment<K,V> segment: segmentList) {
      long len = segment.getLength();
      segment.close();
      if (segment.inMemory()) {
       memCache.unreserve(len);
      }
    }
    segmentList.clear();
  }

  class MemoryCache {
    private DataOutputBuffer dataOut;
    private int blockSize;
    private int usedSize;
    private final BackupRamManager ramManager;

    // Memory cache is made up of blocks.
    private int defaultBlockSize = 1024 * 1024;

    public MemoryCache(int maxSize) {
      ramManager = new BackupRamManager(maxSize);
      if (maxSize < defaultBlockSize) {
        defaultBlockSize = maxSize;
      }
    }

    public void unreserve(long len) {
      ramManager.unreserve((int)len);
    }

    /**
     * Re-initialize the memory cache.
     * 
     * @param clearAll If true, re-initialize the ramManager also.
     */
    void reinitialize(boolean clearAll) {
      if (clearAll) {
        ramManager.reinitialize();
      }
      int allocatedSize = createNewMemoryBlock(defaultBlockSize, 
          defaultBlockSize);
      assert(allocatedSize == defaultBlockSize || allocatedSize == 0);
      LOG.debug("Created a new mem block of " + allocatedSize);
    }

    private int createNewMemoryBlock(int requestedSize, int minSize) {
      int allocatedSize = ramManager.reserve(requestedSize, minSize);
      usedSize = 0;
      if (allocatedSize == 0) {
        dataOut = null;
        blockSize = 0;
      } else {
        dataOut = new DataOutputBuffer(allocatedSize);
        blockSize = allocatedSize;
      }
      return allocatedSize;
    }

    /**
     * This method determines if there is enough space left in the 
     * memory cache to write to the requested length + space for
     * subsequent EOF makers.
     * @param length
     * @return true if enough space is available
     */
    boolean reserveSpace(int length) throws IOException {
      int availableSize = blockSize - usedSize;
      if (availableSize >= length + EOF_MARKER_SIZE) {
        return true;
      }
      // Not enough available. Close this block 
      assert (!inReset); 

      createInMemorySegment();
      
      // Create a new block
      int tmp = Math.max(length + EOF_MARKER_SIZE, defaultBlockSize);
      availableSize = createNewMemoryBlock(tmp, 
          (length + EOF_MARKER_SIZE));
      
      return (availableSize == 0) ? false : true;
    }

    boolean reserveSpace(DataInputBuffer key, DataInputBuffer value)
    throws IOException {
      int keyLength = key.getLength() - key.getPosition();
      int valueLength = value.getLength() - value.getPosition();

      int requestedSize = keyLength + valueLength + 
        WritableUtils.getVIntSize(keyLength) +
        WritableUtils.getVIntSize(valueLength);
      return reserveSpace(requestedSize);
    }
    
    /**
     * Write the key and value to the cache in the IFile format
     * @param key
     * @param value
     * @throws IOException
     */
    public void write(DataInputBuffer key, DataInputBuffer value)
    throws IOException {
      int keyLength = key.getLength() - key.getPosition();
      int valueLength = value.getLength() - value.getPosition();
      WritableUtils.writeVInt(dataOut, keyLength);
      WritableUtils.writeVInt(dataOut, valueLength);
      dataOut.write(key.getData(), key.getPosition(), keyLength);
      dataOut.write(value.getData(), value.getPosition(), valueLength);
      usedSize += keyLength + valueLength + 
        WritableUtils.getVIntSize(keyLength) +
        WritableUtils.getVIntSize(valueLength);
      LOG.debug("ID: " + segmentList.size() + " WRITE TO MEM");
    }

    /**
     * This method creates a memory segment from the existing buffer
     * @throws IOException
     */
    void createInMemorySegment () throws IOException {

      // If nothing was written in this block because the record size
      // was greater than the allocated block size, just return.
      if (usedSize == 0) {
        ramManager.unreserve(blockSize);
        return;
      }

      // spaceAvailable would have ensured that there is enough space
      // left for the EOF markers.
      assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
  
      WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
      WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);

      usedSize += EOF_MARKER_SIZE;

      ramManager.unreserve(blockSize - usedSize);

      Reader<K, V> reader = 
        new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
            (org.apache.hadoop.mapred.TaskAttemptID) tid, 
            dataOut.getData(), 0, usedSize, conf);
      Segment<K, V> segment = new Segment<K, V>(reader, false);
      segmentList.add(segment);
      LOG.debug("Added Memory Segment to List. List Size is " + 
          segmentList.size());
    }
  }

  class FileCache {
    private LocalDirAllocator lDirAlloc;
    private final Configuration conf;
    private final FileSystem fs;
    private boolean isActive = false;

    private Path file = null;
    private IFile.Writer<K,V> writer = null;
    private int spillNumber = 0;

    public FileCache(Configuration conf)
    throws IOException {
      this.conf = conf;
      this.fs = FileSystem.getLocal(conf);
      this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
    }

    void write(DataInputBuffer key, DataInputBuffer value)
    throws IOException {
      if (writer == null) {
        // If spillNumber is 0, we should have called activate and not
        // come here at all
        assert (spillNumber != 0); 
        writer = createSpillFile();
      }
      writer.append(key, value);
      LOG.debug("ID: " + segmentList.size() + " WRITE TO DISK");
    }

    void reinitialize() {
      spillNumber = 0;
      writer = null;
      isActive = false;
    }

    void activate() throws IOException {
      isActive = true;
      writer = createSpillFile();
    }

    void createInDiskSegment() throws IOException {
      assert (writer != null);
      writer.close();
      Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
      writer = null;
      segmentList.add(s);
      LOG.debug("Disk Segment added to List. Size is "  + segmentList.size());
    }

    boolean isActive() { return isActive; }

    private Writer<K,V> createSpillFile() throws IOException {
      Path tmp =
          new Path(MRJobConfig.OUTPUT + "/backup_" + tid.getId() + "_"
              + (spillNumber++) + ".out");

      LOG.info("Created file: " + tmp);

      file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), 
          -1, conf);
      FSDataOutputStream out = fs.create(file);
      out = IntermediateEncryptedStream.wrapIfNecessary(conf, out, tmp);
      return new Writer<K, V>(conf, out, null, null, null, null, true);
    }
  }

  static class BackupRamManager implements RamManager {

    private int availableSize = 0;
    private final int maxSize;

    public BackupRamManager(int size) {
      availableSize = maxSize = size;
    }

    public boolean reserve(int requestedSize, InputStream in) {
      // Not used
      LOG.warn("Reserve(int, InputStream) not supported by BackupRamManager");
      return false;
    }

    int reserve(int requestedSize) {
      if (availableSize == 0) {
        return 0;
      }
      int reservedSize = Math.min(requestedSize, availableSize);
      availableSize -= reservedSize;
      LOG.debug("Reserving: " + reservedSize + " Requested: " + requestedSize);
      return reservedSize;
    }

    int reserve(int requestedSize, int minSize) {
      if (availableSize < minSize) {
        LOG.debug("No space available. Available: " + availableSize + 
            " MinSize: " + minSize);
        return 0;
      } else {
        return reserve(requestedSize);
      }
    }

    public void unreserve(int requestedSize) {
      availableSize += requestedSize;
      LOG.debug("Unreserving: " + requestedSize +
          ". Available: " + availableSize);
    }
    
    void reinitialize() {
      availableSize = maxSize;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AMFeedback 源码

hadoop BasicTypeSorterBase 源码

hadoop BufferSorter 源码

hadoop CleanupQueue 源码

hadoop Clock 源码

hadoop ClusterStatus 源码

hadoop Counters 源码

hadoop CumulativePeriodicStats 源码

hadoop DeprecatedQueueConfigurationParser 源码

hadoop FileAlreadyExistsException 源码

0  赞