hadoop SortedRanges 源码

  • 2022-10-20
  • 浏览 (160)

haddop SortedRanges 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Keeps the Ranges sorted by startIndex.
 * The added ranges are always ensured to be non-overlapping.
 * Provides the SkipRangeIterator, which skips the Ranges 
 * stored in this object.
 */
class SortedRanges implements Writable{
  
  private static final Logger LOG =
      LoggerFactory.getLogger(SortedRanges.class);
  
  private TreeSet<Range> ranges = new TreeSet<Range>();
  private long indicesCount;
  
  /**
   * Get Iterator which skips the stored ranges.
   * The Iterator.next() call return the index starting from 0.
   * @return SkipRangeIterator
   */
  synchronized SkipRangeIterator skipRangeIterator(){
    return new SkipRangeIterator(ranges.iterator());
  }
  
  /**
   * Get the no of indices stored in the ranges.
   * @return indices count
   */
  synchronized long getIndicesCount() {
    return indicesCount;
  }
  
  /**
   * Get the sorted set of ranges.
   * @return ranges
   */
  synchronized SortedSet<Range> getRanges() {
  	return ranges;
 	}
  
  /**
   * Add the range indices. It is ensured that the added range 
   * doesn't overlap the existing ranges. If it overlaps, the 
   * existing overlapping ranges are removed and a single range 
   * having the superset of all the removed ranges and this range 
   * is added. 
   * If the range is of 0 length, doesn't do anything.
   * @param range Range to be added.
   */
  synchronized void add(Range range){
    if(range.isEmpty()) {
      return;
    }
    
    long startIndex = range.getStartIndex();
    long endIndex = range.getEndIndex();
    //make sure that there are no overlapping ranges
    SortedSet<Range> headSet = ranges.headSet(range);
    if(headSet.size()>0) {
      Range previousRange = headSet.last();
      LOG.debug("previousRange "+previousRange);
      if(startIndex<previousRange.getEndIndex()) {
        //previousRange overlaps this range
        //remove the previousRange
        if(ranges.remove(previousRange)) {
          indicesCount-=previousRange.getLength();
        }
        //expand this range
        startIndex = previousRange.getStartIndex();
        endIndex = endIndex>=previousRange.getEndIndex() ?
                          endIndex : previousRange.getEndIndex();
      }
    }
    
    Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
    while(tailSetIt.hasNext()) {
      Range nextRange = tailSetIt.next();
      LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+
          "  endIndex:"+endIndex);
      if(endIndex>=nextRange.getStartIndex()) {
        //nextRange overlaps this range
        //remove the nextRange
        tailSetIt.remove();
        indicesCount-=nextRange.getLength();
        if(endIndex<nextRange.getEndIndex()) {
          //expand this range
          endIndex = nextRange.getEndIndex();
          break;
        }
      } else {
        break;
      }
    }
    add(startIndex,endIndex);
  }
  
  /**
   * Remove the range indices. If this range is  
   * found in existing ranges, the existing ranges 
   * are shrunk.
   * If range is of 0 length, doesn't do anything.
   * @param range Range to be removed.
   */
  synchronized void remove(Range range) {
    if(range.isEmpty()) {
      return;
    }
    long startIndex = range.getStartIndex();
    long endIndex = range.getEndIndex();
    //make sure that there are no overlapping ranges
    SortedSet<Range> headSet = ranges.headSet(range);
    if(headSet.size()>0) {
      Range previousRange = headSet.last();
      LOG.debug("previousRange "+previousRange);
      if(startIndex<previousRange.getEndIndex()) {
        //previousRange overlaps this range
        //narrow down the previousRange
        if(ranges.remove(previousRange)) {
          indicesCount-=previousRange.getLength();
          LOG.debug("removed previousRange "+previousRange);
        }
        add(previousRange.getStartIndex(), startIndex);
        if(endIndex<=previousRange.getEndIndex()) {
          add(endIndex, previousRange.getEndIndex());
        }
      }
    }
    
    Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
    while(tailSetIt.hasNext()) {
      Range nextRange = tailSetIt.next();
      LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+
          "  endIndex:"+endIndex);
      if(endIndex>nextRange.getStartIndex()) {
        //nextRange overlaps this range
        //narrow down the nextRange
        tailSetIt.remove();
        indicesCount-=nextRange.getLength();
        if(endIndex<nextRange.getEndIndex()) {
          add(endIndex, nextRange.getEndIndex());
          break;
        }
      } else {
        break;
      }
    }
  }
  
  private void add(long start, long end) {
    if(end>start) {
      Range recRange = new Range(start, end-start);
      ranges.add(recRange);
      indicesCount+=recRange.getLength();
      LOG.debug("added "+recRange);
    }
  }
  
  public synchronized void readFields(DataInput in) throws IOException {
    indicesCount = in.readLong();
    ranges = new TreeSet<Range>();
    int size = in.readInt();
    for(int i=0;i<size;i++) {
      Range range = new Range();
      range.readFields(in);
      ranges.add(range);
    }
  }

  public synchronized void write(DataOutput out) throws IOException {
    out.writeLong(indicesCount);
    out.writeInt(ranges.size());
    Iterator<Range> it = ranges.iterator();
    while(it.hasNext()) {
      Range range = it.next();
      range.write(out);
    }
  }
  
  public String toString() {
    StringBuffer sb = new StringBuffer();
    Iterator<Range> it = ranges.iterator();
    while(it.hasNext()) {
      Range range = it.next();
      sb.append(range.toString()+"\n");
    }
    return sb.toString();
  }
  
  /**
   * Index Range. Comprises of start index and length.
   * A Range can be of 0 length also. The Range stores indices 
   * of type long.
   */
  static class Range implements Comparable<Range>, Writable{
    private long startIndex;
    private long length;
        
    Range(long startIndex, long length) {
      if(length<0) {
        throw new RuntimeException("length can't be negative");
      }
      this.startIndex = startIndex;
      this.length = length;
    }
    
    Range() {
      this(0,0);
    }
    
    /**
     * Get the start index. Start index in inclusive.
     * @return startIndex. 
     */
    long getStartIndex() {
      return startIndex;
    }
    
    /**
     * Get the end index. End index is exclusive.
     * @return endIndex.
     */
    long getEndIndex() {
      return startIndex + length;
    }
    
   /**
    * Get Length.
    * @return length
    */
    long getLength() {
      return length;
    }
    
    /**
     * Range is empty if its length is zero.
     * @return <code>true</code> if empty
     *         <code>false</code> otherwise.
     */
    boolean isEmpty() {
      return length==0;
    }
    
    public boolean equals(Object o) {
      if (o instanceof Range) {
        Range range = (Range)o;
        return startIndex==range.startIndex &&
        length==range.length;
      }
      return false;
    }
    
    public int hashCode() {
      return Long.valueOf(startIndex).hashCode() +
          Long.valueOf(length).hashCode();
    }
    
    public int compareTo(Range o) {
      // Ensure sgn(x.compareTo(y) == -sgn(y.compareTo(x))
      return this.startIndex < o.startIndex ? -1 :
          (this.startIndex > o.startIndex ? 1 :
          (this.length < o.length ? -1 :
          (this.length > o.length ? 1 : 0)));
    }

    public void readFields(DataInput in) throws IOException {
      startIndex = in.readLong();
      length = in.readLong();
    }

    public void write(DataOutput out) throws IOException {
      out.writeLong(startIndex);
      out.writeLong(length);
    }
    
    public String toString() {
      return startIndex +":" + length;
    }    
  }
  
  /**
   * Index Iterator which skips the stored ranges.
   */
  static class SkipRangeIterator implements Iterator<Long> {
    Iterator<Range> rangeIterator;
    Range range = new Range();
    long next = -1;
    
    /**
     * Constructor
     * @param rangeIterator the iterator which gives the ranges.
     */
    SkipRangeIterator(Iterator<Range> rangeIterator) {
      this.rangeIterator = rangeIterator;
      doNext();
    }
    
    /**
     * Returns true till the index reaches Long.MAX_VALUE.
     * @return <code>true</code> next index exists.
     *         <code>false</code> otherwise.
     */
    public synchronized boolean hasNext() {
      return next<Long.MAX_VALUE;
    }
    
    /**
     * Get the next available index. The index starts from 0.
     * @return next index
     */
    public synchronized Long next() {
      long ci = next;
      doNext();
      return ci;
    }
    
    private void doNext() {
      next++;
      LOG.debug("currentIndex "+next +"   "+range);
      skipIfInRange();
      while(next>=range.getEndIndex() && rangeIterator.hasNext()) {
        range = rangeIterator.next();
        skipIfInRange();
      }
    }
    
    private void skipIfInRange() {
      if(next>=range.getStartIndex() && 
          next<range.getEndIndex()) {
        //need to skip the range
        LOG.warn("Skipping index " + next +"-" + range.getEndIndex());
        next = range.getEndIndex();
        
      }
    }
    
    /**
     * Get whether all the ranges have been skipped.
     * @return <code>true</code> if all ranges have been skipped.
     *         <code>false</code> otherwise.
     */
    synchronized boolean skippedAllRanges() {
      return !rangeIterator.hasNext() && next>range.getEndIndex();
    }
    
    /**
     * Remove is not supported. Doesn't apply.
     */
    public void remove() {
      throw new UnsupportedOperationException("remove not supported.");
    }
    
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AMFeedback 源码

hadoop BackupStore 源码

hadoop BasicTypeSorterBase 源码

hadoop BufferSorter 源码

hadoop CleanupQueue 源码

hadoop Clock 源码

hadoop ClusterStatus 源码

hadoop Counters 源码

hadoop CumulativePeriodicStats 源码

hadoop DeprecatedQueueConfigurationParser 源码

0  赞