hadoop AbfsPerfTracker 源码
haddop AbfsPerfTracker 代码
文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfTracker.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
/**
* {@code AbfsPerfTracker} keeps track of service latencies observed by {@code AbfsClient}. Every request hands over
* its perf-related information as a {@code AbfsPerfInfo} object (contains success/failure, latency etc) to the
* {@code AbfsPerfTracker}'s queue. When a request is made, we check {@code AbfsPerfTracker} to see if there are
* any latency numbers to be reported. If there are any, the stats are added to an HTTP header
* ({@code x-ms-abfs-client-latency}) on the next request.
*
* A typical perf log line appears like:
*
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*
* The fields have the following definitions:
*
* h: host name
* t: time when this request was logged
* a: Azure storage account name
* c: container name
* cr: name of the caller method
* ce: name of the callee method
* r: result (Succeeded/Failed)
* l: latency (time spent in callee)
* ls: latency sum (aggregate time spent in caller; logged when there are multiple callees;
* logged with the last callee)
* lc: latency count (number of callees; logged when there are multiple callees;
* logged with the last callee)
* s: HTTP Status code
* e: Error code
* ci: client request ID
* ri: server request ID
* ct: connection time in milliseconds
* st: sending time in milliseconds
* rt: receiving time in milliseconds
* bs: bytes sent
* br: bytes received
* m: HTTP method (GET, PUT etc)
* u: Encoded HTTP URL
*
*/
public final class AbfsPerfTracker {
// the logger.
private static final Logger LOG = LoggerFactory.getLogger(AbfsPerfTracker.class);
// the field names of perf log lines.
private static final String HOST_NAME_KEY = "h";
private static final String TIMESTAMP_KEY = "t";
private static final String STORAGE_ACCOUNT_NAME_KEY = "a";
private static final String CONTAINER_NAME_KEY = "c";
private static final String CALLER_METHOD_NAME_KEY = "cr";
private static final String CALLEE_METHOD_NAME_KEY = "ce";
private static final String RESULT_KEY = "r";
private static final String LATENCY_KEY = "l";
private static final String LATENCY_SUM_KEY = "ls";
private static final String LATENCY_COUNT_KEY = "lc";
private static final String HTTP_STATUS_CODE_KEY = "s";
private static final String ERROR_CODE_KEY = "e";
private static final String CLIENT_REQUEST_ID_KEY = "ci";
private static final String SERVER_REQUEST_ID_KEY = "ri";
private static final String CONNECTION_TIME_KEY = "ct";
private static final String SENDING_TIME_KEY = "st";
private static final String RECEIVING_TIME_KEY = "rt";
private static final String BYTES_SENT_KEY = "bs";
private static final String BYTES_RECEIVED_KEY = "br";
private static final String HTTP_METHOD_KEY = "m";
private static final String HTTP_URL_KEY = "u";
private static final String STRING_PLACEHOLDER = "%s";
// the queue to hold latency information.
private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
// whether the latency tracker has been enabled.
private boolean enabled = false;
// the host name.
private String hostName;
// singleton latency reporting format.
private String singletonLatencyReportingFormat;
// aggregate latency reporting format.
private String aggregateLatencyReportingFormat;
public AbfsPerfTracker(String filesystemName, String accountName, AbfsConfiguration configuration) {
this(filesystemName, accountName, configuration.shouldTrackLatency());
}
protected AbfsPerfTracker(String filesystemName, String accountName, boolean enabled) {
this.enabled = enabled;
LOG.debug("AbfsPerfTracker configuration: {}", enabled);
if (enabled) {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
hostName = "UnknownHost";
}
String commonReportingFormat = new StringBuilder()
.append(HOST_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(hostName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(TIMESTAMP_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(STORAGE_ACCOUNT_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(accountName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CONTAINER_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(filesystemName)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CALLER_METHOD_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(CALLEE_METHOD_NAME_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(RESULT_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.toString();
/**
* Example singleton log (no ls or lc field)
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*/
singletonLatencyReportingFormat = new StringBuilder()
.append(commonReportingFormat)
.append(STRING_PLACEHOLDER)
.toString();
/**
* Example aggregate log
* h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
* c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
* e= ci=95121dae-70a8-4187-b067-614091034558 ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
* u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
*/
aggregateLatencyReportingFormat = new StringBuilder()
.append(commonReportingFormat)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_SUM_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
.append(LATENCY_COUNT_KEY)
.append(AbfsHttpConstants.EQUAL)
.append(STRING_PLACEHOLDER)
.append(STRING_PLACEHOLDER)
.toString();
}
}
public void trackInfo(AbfsPerfInfo perfInfo) {
if (!enabled) {
return;
}
if (isValidInstant(perfInfo.getAggregateStart()) && perfInfo.getAggregateCount() > 0) {
recordClientLatency(
perfInfo.getTrackingStart(),
perfInfo.getTrackingEnd(),
perfInfo.getCallerName(),
perfInfo.getCalleeName(),
perfInfo.getSuccess(),
perfInfo.getAggregateStart(),
perfInfo.getAggregateCount(),
perfInfo.getResult());
} else {
recordClientLatency(
perfInfo.getTrackingStart(),
perfInfo.getTrackingEnd(),
perfInfo.getCallerName(),
perfInfo.getCalleeName(),
perfInfo.getSuccess(),
perfInfo.getResult());
}
}
public Instant getLatencyInstant() {
if (!enabled) {
return null;
}
return Instant.now();
}
private void recordClientLatency(
Instant operationStart,
Instant operationStop,
String callerName,
String calleeName,
boolean success,
AbfsPerfLoggable res) {
Instant trackerStart = Instant.now();
long latency = isValidInstant(operationStart) && isValidInstant(operationStop)
? Duration.between(operationStart, operationStop).toMillis() : -1;
String latencyDetails = String.format(singletonLatencyReportingFormat,
Instant.now(),
callerName,
calleeName,
success ? "Succeeded" : "Failed",
latency,
res == null ? "" : (" " + res.getLogString()));
this.offerToQueue(trackerStart, latencyDetails);
}
private void recordClientLatency(
Instant operationStart,
Instant operationStop,
String callerName,
String calleeName,
boolean success,
Instant aggregateStart,
long aggregateCount,
AbfsPerfLoggable res){
Instant trackerStart = Instant.now();
long latency = isValidInstant(operationStart) && isValidInstant(operationStop)
? Duration.between(operationStart, operationStop).toMillis() : -1;
long aggregateLatency = isValidInstant(aggregateStart) && isValidInstant(operationStop)
? Duration.between(aggregateStart, operationStop).toMillis() : -1;
String latencyDetails = String.format(aggregateLatencyReportingFormat,
Instant.now(),
callerName,
calleeName,
success ? "Succeeded" : "Failed",
latency,
aggregateLatency,
aggregateCount,
res == null ? "" : (" " + res.getLogString()));
offerToQueue(trackerStart, latencyDetails);
}
public String getClientLatency() {
if (!enabled) {
return null;
}
Instant trackerStart = Instant.now();
String latencyDetails = queue.poll(); // non-blocking pop
if (LOG.isDebugEnabled()) {
Instant stop = Instant.now();
long elapsed = Duration.between(trackerStart, stop).toMillis();
LOG.debug("Dequeued latency info [{} ms]: {}", elapsed, latencyDetails);
}
return latencyDetails;
}
private void offerToQueue(Instant trackerStart, String latencyDetails) {
queue.offer(latencyDetails); // non-blocking append
if (LOG.isDebugEnabled()) {
Instant trackerStop = Instant.now();
long elapsed = Duration.between(trackerStart, trackerStop).toMillis();
LOG.debug("Queued latency info [{} ms]: {}", elapsed, latencyDetails);
}
}
private boolean isValidInstant(Instant testInstant) {
return testInstant != null && testInstant != Instant.MIN && testInstant != Instant.MAX;
}
}
相关信息
相关文章
hadoop AbfsClientContextBuilder 源码
hadoop AbfsClientRenameResult 源码
hadoop AbfsClientThrottlingAnalyzer 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦