hadoop AbfsInputStreamStatisticsImpl 源码

  • 2022-10-20
  • 浏览 (185)

haddop AbfsInputStreamStatisticsImpl 代码

文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.classification.VisibleForTesting;

import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;

/**
 * Stats for the AbfsInputStream.
 */
public class AbfsInputStreamStatisticsImpl
    implements AbfsInputStreamStatistics {

  private final IOStatisticsStore ioStatisticsStore = iostatisticsStore()
      .withCounters(
          StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS,
          StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS,
          StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS,
          StreamStatisticNames.STREAM_READ_BYTES,
          StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
          StreamStatisticNames.STREAM_READ_OPERATIONS,
          StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS,
          StreamStatisticNames.SEEK_IN_BUFFER,
          StreamStatisticNames.BYTES_READ_BUFFER,
          StreamStatisticNames.REMOTE_READ_OP,
          StreamStatisticNames.READ_AHEAD_BYTES_READ,
          StreamStatisticNames.REMOTE_BYTES_READ
          )
      .withDurationTracking(ACTION_HTTP_GET_REQUEST)
      .build();

  /* Reference to the atomic counter for frequently updated counters to avoid
   * cost of the map lookup on every increment.
   */
  private final AtomicLong bytesRead =
      ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_BYTES);
  private final AtomicLong readOps =
      ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS);
  private final AtomicLong seekOps =
      ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);

  /**
   * Seek backwards, incrementing the seek and backward seek counters.
   *
   * @param negativeOffset how far was the seek?
   *                       This is expected to be negative.
   */
  @Override
  public void seekBackwards(long negativeOffset) {
    seekOps.incrementAndGet();
    ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
    ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS, negativeOffset);
  }

  /**
   * Record a forward seek, adding a seek operation, a forward
   * seek operation, and any bytes skipped.
   *
   * @param skipped number of bytes skipped by reading from the stream.
   *                If the seek was implemented by a close + reopen, set this to zero.
   */
  @Override
  public void seekForwards(long skipped) {
    seekOps.incrementAndGet();
    ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
    ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, skipped);
  }

  /**
   * Record a forward or backward seek, adding a seek operation, a forward or
   * a backward seek operation, and number of bytes skipped.
   * The seek direction will be calculated based on the parameters.
   *
   * @param seekTo     seek to the position.
   * @param currentPos current position.
   */
  @Override
  public void seek(long seekTo, long currentPos) {
    if (seekTo >= currentPos) {
      this.seekForwards(seekTo - currentPos);
    } else {
      this.seekBackwards(currentPos - seekTo);
    }
  }

  /**
   * Increment the bytes read counter by the number of bytes;
   * no-op if the argument is negative.
   *
   * @param bytes number of bytes read.
   */
  @Override
  public void bytesRead(long bytes) {
    bytesRead.addAndGet(bytes);
  }

  /**
   * {@inheritDoc}
   *
   * Total bytes read from the buffer.
   *
   * @param bytes number of bytes that are read from buffer.
   */
  @Override
  public void bytesReadFromBuffer(long bytes) {
    ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_READ_BUFFER, bytes);
  }

  /**
   * {@inheritDoc}
   *
   * Increment the number of seeks in the buffer.
   */
  @Override
  public void seekInBuffer() {
    ioStatisticsStore.incrementCounter(StreamStatisticNames.SEEK_IN_BUFFER);
  }

  /**
   * A {@code read(byte[] buf, int off, int len)} operation has started.
   */
  @Override
  public void readOperationStarted() {
    readOps.incrementAndGet();
  }

  /**
   * Total bytes read from readAhead buffer during a read operation.
   *
   * @param bytes the bytes to be incremented.
   */
  @Override
  public void readAheadBytesRead(long bytes) {
    ioStatisticsStore.incrementCounter(StreamStatisticNames.READ_AHEAD_BYTES_READ, bytes);
  }

  /**
   * Total bytes read remotely after nothing was read from readAhead buffer.
   *
   * @param bytes the bytes to be incremented.
   */
  @Override
  public void remoteBytesRead(long bytes) {
    ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_BYTES_READ, bytes);
  }

  /**
   * {@inheritDoc}
   *
   * Increment the counter when a remote read operation occurs.
   */
  @Override
  public void remoteReadOperation() {
    ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_READ_OP);
  }

  /**
   * Getter for IOStatistics instance used.
   * @return IOStatisticsStore instance which extends IOStatistics.
   */
  @Override
  public IOStatistics getIOStatistics() {
    return ioStatisticsStore;
  }

  @VisibleForTesting
  public long getSeekOperations() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
  }

  @VisibleForTesting
  public long getForwardSeekOperations() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
  }

  @VisibleForTesting
  public long getBackwardSeekOperations() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
  }

  @VisibleForTesting
  public long getBytesRead() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_BYTES);
  }

  @VisibleForTesting
  public long getBytesSkippedOnSeek() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED);
  }

  @VisibleForTesting
  public long getBytesBackwardsOnSeek() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS);
  }

  @VisibleForTesting
  public long getSeekInBuffer() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.SEEK_IN_BUFFER);

  }

  @VisibleForTesting
  public long getReadOperations() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_OPERATIONS);
  }

  @VisibleForTesting
  public long getBytesReadFromBuffer() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_READ_BUFFER);
  }

  @VisibleForTesting
  public long getRemoteReadOperations() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_READ_OP);
  }

  @VisibleForTesting
  public long getReadAheadBytesRead() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.READ_AHEAD_BYTES_READ);
  }

  @VisibleForTesting
  public long getRemoteBytesRead() {
    return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_BYTES_READ);
  }

  /**
   * Getter for the mean value of the time taken to complete a HTTP GET
   * request by AbfsInputStream.
   * @return mean value.
   */
  @VisibleForTesting
  public double getActionHttpGetRequest() {
    return ioStatisticsStore.meanStatistics().
        get(ACTION_HTTP_GET_REQUEST + SUFFIX_MEAN).mean();
  }

  /**
   * String operator describes all the current statistics.
   * <b>Important: there are no guarantees as to the stability
   * of this value.</b>
   *
   * @return the current values of the stream statistics.
   */
  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder(
        "StreamStatistics{");
    sb.append(ioStatisticsStore.toString());
    sb.append('}');
    return sb.toString();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbfsAclHelper 源码

hadoop AbfsClient 源码

hadoop AbfsClientContext 源码

hadoop AbfsClientContextBuilder 源码

hadoop AbfsClientRenameResult 源码

hadoop AbfsClientThrottlingAnalyzer 源码

hadoop AbfsClientThrottlingIntercept 源码

hadoop AbfsCounters 源码

hadoop AbfsErrors 源码

hadoop AbfsHttpHeader 源码

0  赞