hadoop AbfsPerfTracker 源码

  • 2022-10-20
  • 浏览 (185)

haddop AbfsPerfTracker 代码

文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfTracker.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;

/**
 * {@code AbfsPerfTracker} keeps track of service latencies observed by {@code AbfsClient}. Every request hands over
 * its perf-related information as a {@code AbfsPerfInfo} object (contains success/failure, latency etc) to the
 * {@code AbfsPerfTracker}'s queue. When a request is made, we check {@code AbfsPerfTracker} to see if there are
 * any latency numbers to be reported. If there are any, the stats are added to an HTTP header
 * ({@code x-ms-abfs-client-latency}) on the next request.
 *
 * A typical perf log line appears like:
 *
 * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
 * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
 * e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
 * u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
 *
 * The fields have the following definitions:
 *
 * h: host name
 * t: time when this request was logged
 * a: Azure storage account name
 * c: container name
 * cr: name of the caller method
 * ce: name of the callee method
 * r: result (Succeeded/Failed)
 * l: latency (time spent in callee)
 * ls: latency sum (aggregate time spent in caller; logged when there are multiple callees;
 *     logged with the last callee)
 * lc: latency count (number of callees; logged when there are multiple callees;
 *     logged with the last callee)
 * s: HTTP Status code
 * e: Error code
 * ci: client request ID
 * ri: server request ID
 * ct: connection time in milliseconds
 * st: sending time in milliseconds
 * rt: receiving time in milliseconds
 * bs: bytes sent
 * br: bytes received
 * m: HTTP method (GET, PUT etc)
 * u: Encoded HTTP URL
 *
 */
public final class AbfsPerfTracker {

  // the logger.
  private static final Logger LOG = LoggerFactory.getLogger(AbfsPerfTracker.class);

  // the field names of perf log lines.
  private static final String HOST_NAME_KEY = "h";
  private static final String TIMESTAMP_KEY = "t";
  private static final String STORAGE_ACCOUNT_NAME_KEY = "a";
  private static final String CONTAINER_NAME_KEY = "c";
  private static final String CALLER_METHOD_NAME_KEY = "cr";
  private static final String CALLEE_METHOD_NAME_KEY = "ce";
  private static final String RESULT_KEY = "r";
  private static final String LATENCY_KEY = "l";
  private static final String LATENCY_SUM_KEY = "ls";
  private static final String LATENCY_COUNT_KEY = "lc";
  private static final String HTTP_STATUS_CODE_KEY = "s";
  private static final String ERROR_CODE_KEY = "e";
  private static final String CLIENT_REQUEST_ID_KEY = "ci";
  private static final String SERVER_REQUEST_ID_KEY = "ri";
  private static final String CONNECTION_TIME_KEY = "ct";
  private static final String SENDING_TIME_KEY = "st";
  private static final String RECEIVING_TIME_KEY = "rt";
  private static final String BYTES_SENT_KEY = "bs";
  private static final String BYTES_RECEIVED_KEY = "br";
  private static final String HTTP_METHOD_KEY = "m";
  private static final String HTTP_URL_KEY = "u";
  private static final String STRING_PLACEHOLDER = "%s";

  // the queue to hold latency information.
  private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

  // whether the latency tracker has been enabled.
  private boolean enabled = false;

  // the host name.
  private String hostName;

  // singleton latency reporting format.
  private String singletonLatencyReportingFormat;

  // aggregate latency reporting format.
  private String aggregateLatencyReportingFormat;

  public AbfsPerfTracker(String filesystemName, String accountName, AbfsConfiguration configuration) {
    this(filesystemName, accountName, configuration.shouldTrackLatency());
  }

  protected AbfsPerfTracker(String filesystemName, String accountName, boolean enabled) {
    this.enabled = enabled;

    LOG.debug("AbfsPerfTracker configuration: {}", enabled);

    if (enabled) {
      try {
        hostName = InetAddress.getLocalHost().getHostName();
      } catch (UnknownHostException e) {
        hostName = "UnknownHost";
      }

      String commonReportingFormat = new StringBuilder()
              .append(HOST_NAME_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(hostName)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(TIMESTAMP_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(STRING_PLACEHOLDER)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(STORAGE_ACCOUNT_NAME_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(accountName)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(CONTAINER_NAME_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(filesystemName)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(CALLER_METHOD_NAME_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(STRING_PLACEHOLDER)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(CALLEE_METHOD_NAME_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(STRING_PLACEHOLDER)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(RESULT_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(STRING_PLACEHOLDER)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(LATENCY_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(STRING_PLACEHOLDER)
              .toString();

      /**
        * Example singleton log (no ls or lc field)
        * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
        * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 s=200
        * e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
        * u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
      */
      singletonLatencyReportingFormat = new StringBuilder()
              .append(commonReportingFormat)
              .append(STRING_PLACEHOLDER)
              .toString();

      /**
       * Example aggregate log
       * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
       * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
       * e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
       * u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
       */
      aggregateLatencyReportingFormat = new StringBuilder()
              .append(commonReportingFormat)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(LATENCY_SUM_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(STRING_PLACEHOLDER)
              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
              .append(LATENCY_COUNT_KEY)
              .append(AbfsHttpConstants.EQUAL)
              .append(STRING_PLACEHOLDER)
              .append(STRING_PLACEHOLDER)
              .toString();
    }
  }

  public void trackInfo(AbfsPerfInfo perfInfo) {
    if (!enabled) {
      return;
    }

    if (isValidInstant(perfInfo.getAggregateStart()) && perfInfo.getAggregateCount() > 0) {
      recordClientLatency(
              perfInfo.getTrackingStart(),
              perfInfo.getTrackingEnd(),
              perfInfo.getCallerName(),
              perfInfo.getCalleeName(),
              perfInfo.getSuccess(),
              perfInfo.getAggregateStart(),
              perfInfo.getAggregateCount(),
              perfInfo.getResult());
    } else {
      recordClientLatency(
              perfInfo.getTrackingStart(),
              perfInfo.getTrackingEnd(),
              perfInfo.getCallerName(),
              perfInfo.getCalleeName(),
              perfInfo.getSuccess(),
              perfInfo.getResult());
    }
  }

  public Instant getLatencyInstant() {
    if (!enabled) {
      return null;
    }

    return Instant.now();
  }

  private void recordClientLatency(
          Instant operationStart,
          Instant operationStop,
          String callerName,
          String calleeName,
          boolean success,
          AbfsPerfLoggable res) {

    Instant trackerStart = Instant.now();
    long latency = isValidInstant(operationStart) && isValidInstant(operationStop)
            ? Duration.between(operationStart, operationStop).toMillis() : -1;

    String latencyDetails = String.format(singletonLatencyReportingFormat,
            Instant.now(),
            callerName,
            calleeName,
            success ? "Succeeded" : "Failed",
            latency,
            res == null ? "" : (" " + res.getLogString()));

    this.offerToQueue(trackerStart, latencyDetails);
  }

  private void recordClientLatency(
          Instant operationStart,
          Instant operationStop,
          String callerName,
          String calleeName,
          boolean success,
          Instant aggregateStart,
          long aggregateCount,
          AbfsPerfLoggable res){

    Instant trackerStart = Instant.now();
    long latency = isValidInstant(operationStart) && isValidInstant(operationStop)
            ? Duration.between(operationStart, operationStop).toMillis() : -1;
    long aggregateLatency = isValidInstant(aggregateStart) && isValidInstant(operationStop)
            ? Duration.between(aggregateStart, operationStop).toMillis() : -1;

    String latencyDetails = String.format(aggregateLatencyReportingFormat,
            Instant.now(),
            callerName,
            calleeName,
            success ? "Succeeded" : "Failed",
            latency,
            aggregateLatency,
            aggregateCount,
            res == null ? "" : (" " + res.getLogString()));

    offerToQueue(trackerStart, latencyDetails);
  }

  public String getClientLatency() {
    if (!enabled) {
      return null;
    }

    Instant trackerStart = Instant.now();
    String latencyDetails = queue.poll(); // non-blocking pop

    if (LOG.isDebugEnabled()) {
      Instant stop = Instant.now();
      long elapsed = Duration.between(trackerStart, stop).toMillis();
      LOG.debug("Dequeued latency info [{} ms]: {}", elapsed, latencyDetails);
    }

    return latencyDetails;
  }

  private void offerToQueue(Instant trackerStart, String latencyDetails) {
    queue.offer(latencyDetails); // non-blocking append

    if (LOG.isDebugEnabled()) {
      Instant trackerStop = Instant.now();
      long elapsed = Duration.between(trackerStart, trackerStop).toMillis();
      LOG.debug("Queued latency info [{} ms]: {}", elapsed, latencyDetails);
    }
  }

  private boolean isValidInstant(Instant testInstant) {
    return testInstant != null && testInstant != Instant.MIN && testInstant != Instant.MAX;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbfsAclHelper 源码

hadoop AbfsClient 源码

hadoop AbfsClientContext 源码

hadoop AbfsClientContextBuilder 源码

hadoop AbfsClientRenameResult 源码

hadoop AbfsClientThrottlingAnalyzer 源码

hadoop AbfsClientThrottlingIntercept 源码

hadoop AbfsCounters 源码

hadoop AbfsErrors 源码

hadoop AbfsHttpHeader 源码

0  赞