hadoop RBFMetrics 源码

  • 2022-10-20
  • 浏览 (178)

haddop RBFMetrics 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.metrics;

import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
import static org.apache.hadoop.util.Time.now;

import java.io.IOException;
import java.lang.reflect.Method;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;

import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;

import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.codehaus.jettison.json.JSONObject;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * Implementation of the Router metrics collector.
 */
@Metrics(name="RBFActivity", about="RBF metrics", context="dfs")
public class RBFMetrics implements RouterMBean, FederationMBean {

  private static final Logger LOG =
      LoggerFactory.getLogger(RBFMetrics.class);

  private final MetricsRegistry registry = new MetricsRegistry("RBFMetrics");

  /** Format for a date. */
  private static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss";

  /** Prevent holding the page from load too long. */
  private final long timeOut;

  /** Enable/Disable getNodeUsage. **/
  private boolean enableGetDNUsage;

  /** Router interface. */
  private final Router router;

  /** FederationState JMX bean. */
  private ObjectName routerBeanName;
  private ObjectName federationBeanName;

  /** Resolve the namenode for each namespace. */
  private final ActiveNamenodeResolver namenodeResolver;

  /** State store. */
  private final StateStoreService stateStore;
  /** Membership state store. */
  private MembershipStore membershipStore;
  /** Mount table store. */
  private MountTableStore mountTableStore;
  /** Router state store. */
  private RouterStore routerStore;
  /** The number of top token owners reported in metrics. */
  private int topTokenRealOwners;

  public RBFMetrics(Router router) throws IOException {
    this.router = router;

    try {
      StandardMBean bean = new StandardMBean(this, RouterMBean.class);
      this.routerBeanName = MBeans.register("Router", "Router", bean);
      LOG.info("Registered Router MBean: {}", this.routerBeanName);
    } catch (NotCompliantMBeanException e) {
      throw new RuntimeException("Bad Router MBean setup", e);
    }

    try {
      StandardMBean bean = new StandardMBean(this, FederationMBean.class);
      this.federationBeanName = MBeans.register("Router", "FederationState",
          bean);
      LOG.info("Registered FederationState MBean: {}", this.federationBeanName);
    } catch (NotCompliantMBeanException e) {
      throw new RuntimeException("Bad FederationState MBean setup", e);
    }

    // Resolve namenode for each nameservice
    this.namenodeResolver = this.router.getNamenodeResolver();

    // State store interfaces
    this.stateStore = this.router.getStateStore();
    if (this.stateStore == null) {
      LOG.error("State store not available");
    } else {
      this.membershipStore = stateStore.getRegisteredRecordStore(
          MembershipStore.class);
      this.mountTableStore = stateStore.getRegisteredRecordStore(
          MountTableStore.class);
      this.routerStore = stateStore.getRegisteredRecordStore(
          RouterStore.class);
    }

    // Initialize the cache for the DN reports
    Configuration conf = router.getConfig();
    this.timeOut = conf.getTimeDuration(RBFConfigKeys.DN_REPORT_TIME_OUT,
        RBFConfigKeys.DN_REPORT_TIME_OUT_MS_DEFAULT, TimeUnit.MILLISECONDS);
    this.enableGetDNUsage = conf.getBoolean(RBFConfigKeys.DFS_ROUTER_ENABLE_GET_DN_USAGE_KEY,
        RBFConfigKeys.DFS_ROUTER_ENABLE_GET_DN_USAGE_DEFAULT);
    this.topTokenRealOwners = conf.getInt(
        RBFConfigKeys.DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY,
        RBFConfigKeys.DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY_DEFAULT);

    registry.tag(ProcessName, "Router");
    MetricsSystem ms = DefaultMetricsSystem.instance();
    ms.register(RBFMetrics.class.getName(), "RBFActivity Metrics", this);
  }

  @VisibleForTesting
  public void setEnableGetDNUsage(boolean enableGetDNUsage) {
    this.enableGetDNUsage = enableGetDNUsage;
  }

  /**
   * Unregister the JMX beans.
   */
  public void close() {
    if (this.routerBeanName != null) {
      MBeans.unregister(routerBeanName);
    }
    if (this.federationBeanName != null) {
      MBeans.unregister(federationBeanName);
    }
    MetricsSystem ms = DefaultMetricsSystem.instance();
    ms.unregisterSource(RBFMetrics.class.getName());
  }

  @Override
  public String getNamenodes() {
    final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
    if (membershipStore == null) {
      return "{}";
    }

    try {
      // Get the values from the store
      GetNamenodeRegistrationsRequest request =
          GetNamenodeRegistrationsRequest.newInstance();
      GetNamenodeRegistrationsResponse response =
          membershipStore.getNamenodeRegistrations(request);

      // Order the namenodes
      final List<MembershipState> namenodes = response.getNamenodeMemberships();
      if (namenodes == null || namenodes.size() == 0) {
        return JSON.toString(info);
      }
      List<MembershipState> namenodesOrder = new ArrayList<>(namenodes);
      Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR);

      // Dump namenodes information into JSON
      for (MembershipState namenode : namenodesOrder) {
        Map<String, Object> innerInfo = new HashMap<>();
        Map<String, Object> map = getJson(namenode);
        innerInfo.putAll(map);
        long dateModified = namenode.getDateModified();
        long lastHeartbeat = getSecondsSince(dateModified);
        innerInfo.put("lastHeartbeat", lastHeartbeat);
        MembershipStats stats = namenode.getStats();
        long used = stats.getTotalSpace() - stats.getAvailableSpace();
        innerInfo.put("used", used);
        info.put(namenode.getNamenodeKey(),
            Collections.unmodifiableMap(innerInfo));
      }
    } catch (IOException e) {
      LOG.error("Enable to fetch json representation of namenodes {}",
          e.getMessage());
      return "{}";
    }
    return JSON.toString(info);
  }

  @Override
  public String getNameservices() {
    final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
    try {
      final List<MembershipState> namenodes = getActiveNamenodeRegistrations();
      List<MembershipState> namenodesOrder = new ArrayList<>(namenodes);
      Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR);

      // Dump namenodes information into JSON
      for (MembershipState namenode : namenodesOrder) {
        Map<String, Object> innerInfo = new HashMap<>();
        Map<String, Object> map = getJson(namenode);
        innerInfo.putAll(map);
        long dateModified = namenode.getDateModified();
        long lastHeartbeat = getSecondsSince(dateModified);
        innerInfo.put("lastHeartbeat", lastHeartbeat);
        MembershipStats stats = namenode.getStats();
        long used = stats.getTotalSpace() - stats.getAvailableSpace();
        innerInfo.put("used", used);
        info.put(namenode.getNamenodeKey(),
            Collections.unmodifiableMap(innerInfo));
      }
    } catch (IOException e) {
      LOG.error("Cannot retrieve nameservices for JMX: {}", e.getMessage());
      return "{}";
    }
    return JSON.toString(info);
  }

  @Override
  public String getMountTable() {
    final List<Map<String, Object>> info = new LinkedList<>();
    if (mountTableStore == null) {
      return "[]";
    }

    try {
      // Get all the mount points in order
      GetMountTableEntriesRequest request =
          GetMountTableEntriesRequest.newInstance("/");
      GetMountTableEntriesResponse response =
          mountTableStore.getMountTableEntries(request);
      final List<MountTable> mounts = response.getEntries();
      List<MountTable> orderedMounts = new ArrayList<>(mounts);
      Collections.sort(orderedMounts, MountTable.SOURCE_COMPARATOR);

      // Dump mount table entries information into JSON
      for (MountTable entry : orderedMounts) {
        // Summarize destinations
        Set<String> nameservices = new LinkedHashSet<>();
        Set<String> paths = new LinkedHashSet<>();
        for (RemoteLocation location : entry.getDestinations()) {
          nameservices.add(location.getNameserviceId());
          paths.add(location.getDest());
        }

        Map<String, Object> map = getJson(entry);
        // We add some values with a cleaner format
        map.put("dateCreated", getDateString(entry.getDateCreated()));
        map.put("dateModified", getDateString(entry.getDateModified()));

        Map<String, Object> innerInfo = new HashMap<>();
        innerInfo.putAll(map);
        innerInfo.put("nameserviceId", StringUtils.join(",", nameservices));
        innerInfo.put("path", StringUtils.join(",", paths));
        if (nameservices.size() > 1) {
          innerInfo.put("order", entry.getDestOrder().toString());
        } else {
          innerInfo.put("order", "");
        }
        innerInfo.put("readonly", entry.isReadOnly());
        innerInfo.put("faulttolerant", entry.isFaultTolerant());
        info.add(Collections.unmodifiableMap(innerInfo));
      }
    } catch (IOException e) {
      LOG.error(
          "Cannot generate JSON of mount table from store: {}", e.getMessage());
      return "[]";
    }
    return JSON.toString(info);
  }

  @Override
  public String getRouters() {
    final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
    if (routerStore == null) {
      return "{}";
    }
    try {
      // Get all the routers in order
      GetRouterRegistrationsRequest request =
          GetRouterRegistrationsRequest.newInstance();
      GetRouterRegistrationsResponse response =
          routerStore.getRouterRegistrations(request);
      final List<RouterState> routers = response.getRouters();
      List<RouterState> routersOrder = new ArrayList<>(routers);
      Collections.sort(routersOrder);

      // Dump router information into JSON
      for (RouterState record : routersOrder) {
        Map<String, Object> innerInfo = new HashMap<>();
        Map<String, Object> map = getJson(record);
        innerInfo.putAll(map);
        long dateModified = record.getDateModified();
        long lastHeartbeat = getSecondsSince(dateModified);
        innerInfo.put("lastHeartbeat", lastHeartbeat);

        StateStoreVersion stateStoreVersion = record.getStateStoreVersion();
        if (stateStoreVersion == null) {
          LOG.error("Cannot get State Store versions");
        } else {
          setStateStoreVersions(innerInfo, stateStoreVersion);
        }

        info.put(record.getPrimaryKey(),
            Collections.unmodifiableMap(innerInfo));
      }
    } catch (IOException e) {
      LOG.error("Cannot get Routers JSON from the State Store", e);
      return "{}";
    }
    return JSON.toString(info);
  }

  /**
   * Populate the map with the State Store versions.
   *
   * @param map Map with the information.
   * @param version State Store versions.
   */
  private static void setStateStoreVersions(
      Map<String, Object> map, StateStoreVersion version) {

    long membershipVersion = version.getMembershipVersion();
    String lastMembershipUpdate = getDateString(membershipVersion);
    map.put("lastMembershipUpdate", lastMembershipUpdate);

    long mountTableVersion = version.getMountTableVersion();
    String lastMountTableDate = getDateString(mountTableVersion);
    map.put("lastMountTableUpdate", lastMountTableDate);
  }

  @Override
  public long getTotalCapacity() {
    return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
  }

  @Override
  public long getRemainingCapacity() {
    return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace);
  }

  @Override
  public long getUsedCapacity() {
    return getTotalCapacity() - getRemainingCapacity();
  }

  @Override
  public BigInteger getTotalCapacityBigInt() {
    return getNameserviceAggregatedBigInt(MembershipStats::getTotalSpace);
  }

  @Override
  public BigInteger getRemainingCapacityBigInt() {
    return getNameserviceAggregatedBigInt(MembershipStats::getAvailableSpace);
  }

  @Override
  public long getProvidedSpace() {
    return getNameserviceAggregatedLong(MembershipStats::getProvidedSpace);
  }

  @Override
  public BigInteger getUsedCapacityBigInt() {
    return getTotalCapacityBigInt().subtract(getRemainingCapacityBigInt());
  }

  @Override
  public int getNumNameservices() {
    try {
      Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
      return nss.size();
    } catch (IOException e) {
      LOG.error(
          "Cannot fetch number of expired registrations from the store: {}",
          e.getMessage());
      return 0;
    }
  }

  @Override
  public int getNumNamenodes() {
    if (membershipStore == null) {
      return 0;
    }
    try {
      GetNamenodeRegistrationsRequest request =
          GetNamenodeRegistrationsRequest.newInstance();
      GetNamenodeRegistrationsResponse response =
          membershipStore.getNamenodeRegistrations(request);
      List<MembershipState> memberships = response.getNamenodeMemberships();
      return memberships.size();
    } catch (IOException e) {
      LOG.error("Cannot retrieve numNamenodes for JMX: {}", e.getMessage());
      return 0;
    }
  }

  @Override
  public int getNumExpiredNamenodes() {
    if (membershipStore == null) {
      return 0;
    }
    try {
      GetNamenodeRegistrationsRequest request =
          GetNamenodeRegistrationsRequest.newInstance();
      GetNamenodeRegistrationsResponse response =
          membershipStore.getExpiredNamenodeRegistrations(request);
      List<MembershipState> expiredMemberships =
          response.getNamenodeMemberships();
      return expiredMemberships.size();
    } catch (IOException e) {
      LOG.error(
          "Cannot retrieve numExpiredNamenodes for JMX: {}", e.getMessage());
      return 0;
    }
  }

  @Override
  @Metric({"NumLiveNodes", "Number of live data nodes"})
  public int getNumLiveNodes() {
    return getNameserviceAggregatedInt(
        MembershipStats::getNumOfActiveDatanodes);
  }

  @Override
  @Metric({"NumDeadNodes", "Number of dead data nodes"})
  public int getNumDeadNodes() {
    return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes);
  }

  @Override
  @Metric({"NumStaleNodes", "Number of stale data nodes"})
  public int getNumStaleNodes() {
    return getNameserviceAggregatedInt(
        MembershipStats::getNumOfStaleDatanodes);
  }

  @Override
  @Metric({"NumDecommissioningNodes", "Number of Decommissioning data nodes"})
  public int getNumDecommissioningNodes() {
    return getNameserviceAggregatedInt(
        MembershipStats::getNumOfDecommissioningDatanodes);
  }

  @Override
  @Metric({"NumDecomLiveNodes", "Number of decommissioned Live data nodes"})
  public int getNumDecomLiveNodes() {
    return getNameserviceAggregatedInt(
        MembershipStats::getNumOfDecomActiveDatanodes);
  }

  @Override
  @Metric({"NumDecomDeadNodes", "Number of decommissioned dead data nodes"})
  public int getNumDecomDeadNodes() {
    return getNameserviceAggregatedInt(
        MembershipStats::getNumOfDecomDeadDatanodes);
  }

  @Override
  @Metric({"NumInMaintenanceLiveDataNodes",
      "Number of IN_MAINTENANCE live data nodes"})
  public int getNumInMaintenanceLiveDataNodes() {
    return getNameserviceAggregatedInt(
        MembershipStats::getNumOfInMaintenanceLiveDataNodes);
  }

  @Override
  @Metric({"NumInMaintenanceDeadDataNodes",
      "Number of IN_MAINTENANCE dead data nodes"})
  public int getNumInMaintenanceDeadDataNodes() {
    return getNameserviceAggregatedInt(
        MembershipStats::getNumOfInMaintenanceDeadDataNodes);
  }

  @Override
  @Metric({"NumEnteringMaintenanceDataNodes",
      "Number of ENTERING_MAINTENANCE data nodes"})
  public int getNumEnteringMaintenanceDataNodes() {
    return getNameserviceAggregatedInt(
        MembershipStats::getNumOfEnteringMaintenanceDataNodes);
  }

  @Override // NameNodeMXBean
  public String getNodeUsage() {
    double median = 0;
    double max = 0;
    double min = 0;
    double dev = 0;

    final Map<String, Map<String, Object>> info = new HashMap<>();
    try {
      DatanodeInfo[] live = null;
      if (this.enableGetDNUsage) {
        RouterRpcServer rpcServer = this.router.getRpcServer();
        live = rpcServer.getDatanodeReport(DatanodeReportType.LIVE, false, timeOut);
      } else {
        LOG.debug("Getting node usage is disabled.");
      }

      if (live != null && live.length > 0) {
        double[] usages = new double[live.length];
        int i = 0;
        for (DatanodeInfo dn : live) {
          usages[i++] = dn.getDfsUsedPercent();
        }
        Arrays.sort(usages);
        median = usages[usages.length / 2];
        max = usages[usages.length - 1];
        min = usages[0];

        StandardDeviation deviation = new StandardDeviation();
        dev = deviation.evaluate(usages);
      }
    } catch (IOException e) {
      LOG.error("Cannot get the live nodes: {}", e.getMessage());
    }

    final Map<String, Object> innerInfo = new HashMap<>();
    innerInfo.put("min", StringUtils.format("%.2f%%", min));
    innerInfo.put("median", StringUtils.format("%.2f%%", median));
    innerInfo.put("max", StringUtils.format("%.2f%%", max));
    innerInfo.put("stdDev", StringUtils.format("%.2f%%", dev));
    info.put("nodeUsage", innerInfo);

    return JSON.toString(info);
  }

  @Override
  @Metric({"NumBlocks", "Total number of blocks"})
  public long getNumBlocks() {
    return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks);
  }

  @Override
  @Metric({"NumOfMissingBlocks", "Number of missing blocks"})
  public long getNumOfMissingBlocks() {
    return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing);
  }

  @Override
  @Metric({"NumOfBlocksPendingReplication",
      "Number of blocks pending replication"})
  public long getNumOfBlocksPendingReplication() {
    return getNameserviceAggregatedLong(
        MembershipStats::getNumOfBlocksPendingReplication);
  }

  @Override
  @Metric({"NumOfBlocksUnderReplicated", "Number of blocks under replication"})
  public long getNumOfBlocksUnderReplicated() {
    return getNameserviceAggregatedLong(
        MembershipStats::getNumOfBlocksUnderReplicated);
  }

  @Override
  @Metric({"NumOfBlocksPendingDeletion", "Number of blocks pending deletion"})
  public long getNumOfBlocksPendingDeletion() {
    return getNameserviceAggregatedLong(
        MembershipStats::getNumOfBlocksPendingDeletion);
  }

  @Override
  @Metric({"NumFiles", "Number of files"})
  public long getNumFiles() {
    return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles);
  }

  @Override
  public String getRouterStarted() {
    long startTime = this.router.getStartTime();
    return new Date(startTime).toString();
  }

  @Override
  public String getVersion() {
    return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
  }

  @Override
  public String getCompiledDate() {
    return VersionInfo.getDate();
  }

  @Override
  public String getCompileInfo() {
    return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from "
        + VersionInfo.getBranch();
  }

  @Override
  public String getHostAndPort() {
    InetSocketAddress address = this.router.getRpcServerAddress();
    if (address != null) {
      try {
        String hostname = InetAddress.getLocalHost().getHostName();
        int port = address.getPort();
        return hostname + ":" + port;
      } catch (UnknownHostException ignored) { }
    }
    return "Unknown";
  }

  @Override
  public String getRouterId() {
    return this.router.getRouterId();
  }

  @Override
  public String getClusterId() {
    try {
      Collection<String> clusterIds =
          getNamespaceInfo(FederationNamespaceInfo::getClusterId);
      return clusterIds.toString();
    } catch (IOException e) {
      LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage());
      return "";
    }
  }

  @Override
  public String getBlockPoolId() {
    try {
      Collection<String> blockpoolIds =
          getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId);
      return blockpoolIds.toString();
    } catch (IOException e) {
      LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage());
      return "";
    }
  }

  @Override
  public String getRouterStatus() {
    return this.router.getRouterState().toString();
  }

  @Override
  @Metric({"CurrentTokensCount", "Number of router's current tokens"})
  public long getCurrentTokensCount() {
    RouterSecurityManager mgr =
        this.router.getRpcServer().getRouterSecurityManager();
    if (mgr != null && mgr.getSecretManager() != null) {
      return mgr.getSecretManager().getCurrentTokensSize();
    }
    return -1;
  }

  @Override
  public String getTopTokenRealOwners() {
    RouterSecurityManager mgr =
        this.router.getRpcServer().getRouterSecurityManager();
    if (mgr != null && mgr.getSecretManager() != null) {
      return JSON.toString(mgr.getSecretManager()
          .getTopTokenRealOwners(this.topTokenRealOwners));
    }
    return "";
  }

  @Override
  public boolean isSecurityEnabled() {
    return UserGroupInformation.isSecurityEnabled();
  }

  @Override
  public int getCorruptFilesCount() {
    return getNameserviceAggregatedInt(MembershipStats::getCorruptFilesCount);
  }

  @Override
  public long getScheduledReplicationBlocks() {
    return getNameserviceAggregatedLong(
        MembershipStats::getScheduledReplicationBlocks);
  }

  @Override
  public long getNumberOfMissingBlocksWithReplicationFactorOne() {
    return getNameserviceAggregatedLong(
        MembershipStats::getNumberOfMissingBlocksWithReplicationFactorOne);
  }

  @Override
  public long getHighestPriorityLowRedundancyReplicatedBlocks() {
    return getNameserviceAggregatedLong(
        MembershipStats::getHighestPriorityLowRedundancyReplicatedBlocks);
  }

  @Override
  public long getHighestPriorityLowRedundancyECBlocks() {
    return getNameserviceAggregatedLong(
        MembershipStats::getHighestPriorityLowRedundancyECBlocks);
  }

  @Override
  public int getPendingSPSPaths() {
    return getNameserviceAggregatedInt(
        MembershipStats::getPendingSPSPaths);
  }

  @Override
  @Metric({"RouterFederationRenameCount", "Number of federation rename"})
  public int getRouterFederationRenameCount() {
    return this.router.getRpcServer().getRouterFederationRenameCount();
  }

  @Override
  @Metric({"SchedulerJobCount", "Number of scheduler job"})
  public int getSchedulerJobCount() {
    return this.router.getRpcServer().getSchedulerJobCount();
  }

  @Override
  public String getSafemode() {
    if (this.router.isRouterState(RouterServiceState.SAFEMODE)) {
      return "Safe mode is ON. " + this.getSafeModeTip();
    } else {
      return "";
    }
  }

  private String getSafeModeTip() {
    String cmd = "Use \"hdfs dfsrouteradmin -safemode leave\" "
        + "to turn safe mode off.";
    if (this.router.isRouterState(RouterServiceState.INITIALIZING)
        || this.router.isRouterState(RouterServiceState.UNINITIALIZED)) {
      return "Router is in" + this.router.getRouterState()
          + "mode, the router will immediately return to "
          + "normal mode after some time. " + cmd;
    } else if (this.router.isRouterState(RouterServiceState.SAFEMODE)) {
      return "It was turned on manually. " + cmd;
    }
    return "";
  }

  /**
   * Build a set of unique values found in all namespaces.
   *
   * @param f Method reference of the appropriate FederationNamespaceInfo
   *          getter function
   * @return Set of unique string values found in all discovered namespaces.
   * @throws IOException if the query could not be executed.
   */
  private Collection<String> getNamespaceInfo(
      Function<FederationNamespaceInfo, String> f) throws IOException {
    if (membershipStore == null) {
      return new HashSet<>();
    }
    GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
    GetNamespaceInfoResponse response =
        membershipStore.getNamespaceInfo(request);
    return response.getNamespaceInfo().stream()
      .map(f)
      .collect(Collectors.toSet());
  }

  /**
   * Get the aggregated value for a method for all nameservices.
   * @param f Method reference
   * @return Aggregated integer.
   */
  private int getNameserviceAggregatedInt(ToIntFunction<MembershipStats> f) {
    try {
      return getActiveNamenodeRegistrations().stream()
               .map(MembershipState::getStats)
               .collect(Collectors.summingInt(f));
    } catch (IOException e) {
      LOG.error("Unable to extract metrics: {}", e.getMessage());
      return 0;
    }
  }

  /**
   * Get the aggregated value for a method for all nameservices.
   * @param f Method reference
   * @return Aggregated long.
   */
  private long getNameserviceAggregatedLong(ToLongFunction<MembershipStats> f) {
    try {
      return getActiveNamenodeRegistrations().stream()
               .map(MembershipState::getStats)
               .collect(Collectors.summingLong(f));
    } catch (IOException e) {
      LOG.error("Unable to extract metrics: {}", e.getMessage());
      return 0;
    }
  }

  private BigInteger getNameserviceAggregatedBigInt(
      ToLongFunction<MembershipStats> f) {
    try {
      List<MembershipState> states = getActiveNamenodeRegistrations();
      BigInteger sum = BigInteger.valueOf(0);
      for (MembershipState state : states) {
        long lvalue = f.applyAsLong(state.getStats());
        sum = sum.add(BigInteger.valueOf(lvalue));
      }
      return sum;
    } catch (IOException e) {
      LOG.error("Unable to extract metrics: {}", e.getMessage());
      return new BigInteger("0");
    }
  }

  /**
   * Fetches the most active namenode memberships for all known nameservices.
   * The fetched membership may not or may not be active. Excludes expired
   * memberships.
   * @throws IOException if the query could not be performed.
   * @return List of the most active NNs from each known nameservice.
   */
  private List<MembershipState> getActiveNamenodeRegistrations()
      throws IOException {
    List<MembershipState> resultList = new ArrayList<>();
    if (membershipStore == null) {
      return resultList;
    }

    GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
    GetNamespaceInfoResponse response =
        membershipStore.getNamespaceInfo(request);
    for (FederationNamespaceInfo nsInfo : response.getNamespaceInfo()) {
      // Fetch the most recent namenode registration
      String nsId = nsInfo.getNameserviceId();
      List<? extends FederationNamenodeContext> nns =
          namenodeResolver.getNamenodesForNameserviceId(nsId, false);
      if (nns != null) {
        FederationNamenodeContext nn = nns.get(0);
        if (nn instanceof MembershipState) {
          resultList.add((MembershipState) nn);
        }
      }
    }
    return resultList;
  }

  /**
   * Get time as a date string.
   * @param time Seconds since 1970.
   * @return String representing the date.
   */
  @VisibleForTesting
  static String getDateString(long time) {
    if (time <= 0) {
      return "-";
    }
    Date date = new Date(time);

    SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
    return sdf.format(date);
  }

  /**
   * Get the number of seconds passed since a date.
   *
   * @param timeMs to use as a reference.
   * @return Seconds since the date.
   */
  private static long getSecondsSince(long timeMs) {
    if (timeMs < 0) {
      return -1;
    }
    return (now() - timeMs) / 1000;
  }

  /**
   * Get JSON for this record.
   *
   * @return Map representing the data for the JSON representation.
   */
  private static Map<String, Object> getJson(BaseRecord record) {
    Map<String, Object> json = new HashMap<>();
    Map<String, Class<?>> fields = getFields(record);

    for (String fieldName : fields.keySet()) {
      if (!fieldName.equalsIgnoreCase("proto")) {
        try {
          Object value = getField(record, fieldName);
          if (value instanceof BaseRecord) {
            BaseRecord recordField = (BaseRecord) value;
            json.putAll(getJson(recordField));
          } else {
            json.put(fieldName, value == null ? JSONObject.NULL : value);
          }
        } catch (Exception e) {
          throw new IllegalArgumentException(
              "Cannot serialize field " + fieldName + " into JSON");
        }
      }
    }
    return json;
  }

  /**
   * Returns all serializable fields in the object.
   *
   * @return Map with the fields.
   */
  private static Map<String, Class<?>> getFields(BaseRecord record) {
    Map<String, Class<?>> getters = new HashMap<>();
    for (Method m : record.getClass().getDeclaredMethods()) {
      if (m.getName().startsWith("get")) {
        try {
          Class<?> type = m.getReturnType();
          char[] c = m.getName().substring(3).toCharArray();
          c[0] = Character.toLowerCase(c[0]);
          String key = new String(c);
          getters.put(key, type);
        } catch (Exception e) {
          LOG.error("Cannot execute getter {} on {}", m.getName(), record);
        }
      }
    }
    return getters;
  }

  /**
   * Fetches the value for a field name.
   *
   * @param fieldName the legacy name of the field.
   * @return The field data or null if not found.
   */
  private static Object getField(BaseRecord record, String fieldName) {
    Object result = null;
    Method m = locateGetter(record, fieldName);
    if (m != null) {
      try {
        result = m.invoke(record);
      } catch (Exception e) {
        LOG.error("Cannot get field {} on {}", fieldName, record);
      }
    }
    return result;
  }

  /**
   * Finds the appropriate getter for a field name.
   *
   * @param fieldName The legacy name of the field.
   * @return The matching getter or null if not found.
   */
  private static Method locateGetter(BaseRecord record, String fieldName) {
    for (Method m : record.getClass().getMethods()) {
      if (m.getName().equalsIgnoreCase("get" + fieldName)) {
        return m;
      }
    }
    return null;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop FederationMBean 源码

hadoop FederationRPCMBean 源码

hadoop FederationRPCMetrics 源码

hadoop FederationRPCPerformanceMonitor 源码

hadoop NamenodeBeanMetrics 源码

hadoop NameserviceRPCMBean 源码

hadoop NameserviceRPCMetrics 源码

hadoop NullStateStoreMetrics 源码

hadoop RouterMBean 源码

hadoop StateStoreMBean 源码

0  赞