hadoop NamenodeHeartbeatService 源码

  • 2022-10-20
  • 浏览 (187)

haddop NamenodeHeartbeatService 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * The {@link Router} periodically checks the state of a Namenode (usually on
 * the same server) and reports their high availability (HA) state and
 * load/space status to the
 * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService}
 * . Note that this is an optional role as a Router can be independent of any
 * subcluster.
 * <p>
 * For performance with Namenode HA, the Router uses the high availability state
 * information in the State Store to forward the request to the Namenode that is
 * most likely to be active.
 * <p>
 * Note that this service can be embedded into the Namenode itself to simplify
 * the operation.
 */
public class NamenodeHeartbeatService extends PeriodicService {

  private static final Logger LOG =
      LoggerFactory.getLogger(NamenodeHeartbeatService.class);


  /** Configuration for the heartbeat. */
  private Configuration conf;

  /** Router performing the heartbeating. */
  private final ActiveNamenodeResolver resolver;

  /** Interface to the tracked NN. */
  private final String nameserviceId;
  private final String namenodeId;

  /** Namenode HA target. */
  private NNHAServiceTarget localTarget;
  /** Cache HA protocol. */
  private HAServiceProtocol localTargetHAProtocol;
  /** Cache NN protocol. */
  private NamenodeProtocol namenodeProtocol;
  /** Cache Client protocol. */
  private ClientProtocol clientProtocol;
  /** RPC address for the namenode. */
  private String rpcAddress;
  /** Service RPC address for the namenode. */
  private String serviceAddress;
  /** Service RPC address for the namenode. */
  private String lifelineAddress;
  /** HTTP address for the namenode. */
  private String webAddress;
  /** Connection factory for JMX calls. */
  private URLConnectionFactory connectionFactory;
  /** URL scheme to use for JMX calls. */
  private String scheme;

  private String resolvedHost;
  private String originalNnId;

  private int healthMonitorTimeoutMs = (int) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;

  /**
   * Create a new Namenode status updater.
   * @param resolver Namenode resolver service to handle NN registration.
   * @param nsId Identifier of the nameservice.
   * @param nnId Identifier of the namenode in HA.
   */
  public NamenodeHeartbeatService(
      ActiveNamenodeResolver resolver, String nsId, String nnId) {
    super(NamenodeHeartbeatService.class.getSimpleName() +
        (nsId == null ? "" : " " + nsId) +
        (nnId == null ? "" : " " + nnId));

    this.resolver = resolver;

    this.nameserviceId = nsId;
    this.namenodeId = nnId;
  }

  /**
   * Create a new Namenode status updater.
   *
   * @param resolver Namenode resolver service to handle NN registration.
   * @param nsId          Identifier of the nameservice.
   * @param nnId          Identifier of the namenode in HA.
   * @param resolvedHost  resolvedHostname for this specific namenode.
   */
  public NamenodeHeartbeatService(
      ActiveNamenodeResolver resolver, String nsId, String nnId, String resolvedHost) {
    super(getNnHeartBeatServiceName(nsId, nnId));

    this.resolver = resolver;

    this.nameserviceId = nsId;
    // Concat a uniq id from original nnId and resolvedHost
    this.namenodeId = nnId + "-" + resolvedHost;
    this.resolvedHost = resolvedHost;
    // Same the original nnid to get the ports from config.
    this.originalNnId = nnId;

  }

  @Override
  protected void serviceInit(Configuration configuration) throws Exception {

    this.conf = DFSHAAdmin.addSecurityConfiguration(configuration);

    String nnDesc = nameserviceId;
    if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
      nnDesc += "-" + namenodeId;
    } else {
      this.localTarget = null;
    }

    if (originalNnId == null) {
      originalNnId = namenodeId;
    }
    // Get the RPC address for the clients to connect
    this.rpcAddress = getRpcAddress(conf, nameserviceId, originalNnId);

    // Get the Service RPC address for monitoring
    this.serviceAddress =
        DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, originalNnId);
    if (this.serviceAddress == null) {
      LOG.error("Cannot locate RPC service address for NN {}, " +
          "using RPC address {}", nnDesc, this.rpcAddress);
      this.serviceAddress = this.rpcAddress;
    }

    // Get the Lifeline RPC address for faster monitoring
    this.lifelineAddress =
        DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, originalNnId);
    if (this.lifelineAddress == null) {
      this.lifelineAddress = this.serviceAddress;
    }

    // Get the Web address for UI
    this.webAddress =
        DFSUtil.getNamenodeWebAddr(conf, nameserviceId, originalNnId);

    if (resolvedHost != null) {
      // Get the addresses from resolvedHost plus the configured ports.
      rpcAddress = resolvedHost + ":"
          + NetUtils.getPortFromHostPortString(rpcAddress);
      serviceAddress = resolvedHost + ":"
          + NetUtils.getPortFromHostPortString(serviceAddress);
      lifelineAddress = resolvedHost + ":"
          + NetUtils.getPortFromHostPortString(lifelineAddress);
      webAddress = resolvedHost + ":"
          + NetUtils.getPortFromHostPortString(webAddress);
    }

    LOG.info("{} RPC address: {}", nnDesc, rpcAddress);
    LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress);
    LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress);
    LOG.info("{} Web address: {}", nnDesc, webAddress);

    if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
      this.localTarget = new NNHAServiceTarget(
          conf, nameserviceId, namenodeId, serviceAddress, lifelineAddress);
    }

    this.connectionFactory =
        URLConnectionFactory.newDefaultURLConnectionFactory(conf);

    this.scheme =
        DFSUtil.getHttpPolicy(conf).isHttpEnabled() ? "http" : "https";

    this.setIntervalMs(conf.getLong(
        DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
        DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));

    long timeoutMs = conf.getTimeDuration(DFS_ROUTER_HEALTH_MONITOR_TIMEOUT,
        DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
    if (timeoutMs < 0) {
      LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
          "Using value of : 0ms instead.", timeoutMs, DFS_ROUTER_HEALTH_MONITOR_TIMEOUT);
      this.healthMonitorTimeoutMs = 0;
    } else {
      this.healthMonitorTimeoutMs = (int) timeoutMs;
    }

    super.serviceInit(configuration);
  }

  @Override
  public void periodicInvoke() {
    updateState();
  }

  /**
   * Get the RPC address for a Namenode.
   * @param conf Configuration.
   * @param nsId Name service identifier.
   * @param nnId Name node identifier.
   * @return RPC address in format hostname:1234.
   */
  private static String getRpcAddress(
      Configuration conf, String nsId, String nnId) {

    // Get it from the regular RPC setting
    String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
    String ret = conf.get(confKey);

    if (nsId != null || nnId != null) {
      // Get if for the proper nameservice and namenode
      confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
      ret = conf.get(confKey);

      // If not available, get it from the map
      if (ret == null) {
        Map<String, InetSocketAddress> rpcAddresses =
            DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
        InetSocketAddress sockAddr = null;
        if (nnId != null) {
          sockAddr = rpcAddresses.get(nnId);
        } else if (rpcAddresses.size() == 1) {
          // Get the only namenode in the namespace
          sockAddr = rpcAddresses.values().iterator().next();
        }
        if (sockAddr != null) {
          InetAddress addr = sockAddr.getAddress();
          ret = addr.getHostName() + ":" + sockAddr.getPort();
        }
      }
    }
    return ret;
  }

  /**
   * Update the state of the Namenode.
   */
  private void updateState() {
    NamenodeStatusReport report = getNamenodeStatusReport();
    if (!report.registrationValid()) {
      // Not operational
      LOG.error("Namenode is not operational: {}", getNamenodeDesc());
    } else if (report.haStateValid()) {
      // block and HA status available
      if (LOG.isDebugEnabled()) {
        LOG.debug("Received service state: {} from HA namenode: {}",
            report.getState(), getNamenodeDesc());
      }
    } else if (localTarget == null) {
      // block info available, HA status not expected
      if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Reporting non-HA namenode as operational: {}", getNamenodeDesc());
      }
    } else {
      // block info available, HA status should be available, but was not
      // fetched do nothing and let the current state stand
      return;
    }
    try {
      if (!resolver.registerNamenode(report)) {
        LOG.warn("Cannot register namenode {}", report);
      }
    } catch (IOException e) {
      LOG.info("Cannot register namenode in the State Store");
    } catch (Exception ex) {
      LOG.error("Unhandled exception updating NN registration for {}",
          getNamenodeDesc(), ex);
    }
  }

  /**
   * Get the status report for the Namenode monitored by this heartbeater.
   * @return Namenode status report.
   */
  protected NamenodeStatusReport getNamenodeStatusReport() {
    NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
        namenodeId, rpcAddress, serviceAddress,
        lifelineAddress, scheme, webAddress);

    try {
      LOG.debug("Probing NN at service address: {}", serviceAddress);

      URI serviceURI = new URI("hdfs://" + serviceAddress);

      // Read the filesystem info from RPC (required)
      updateNameSpaceInfoParameters(serviceURI, report);
      if (!report.registrationValid()) {
        return report;
      }

      // Check for safemode from the client protocol. Currently optional, but
      // should be required at some point for QoS
      updateSafeModeParameters(serviceURI, report);

      // Read the stats from JMX (optional)
      updateJMXParameters(webAddress, report);

      // Try to get the HA status
      updateHAStatusParameters(report);
    } catch (IOException e) {
      LOG.error("Cannot communicate with {}: {}",
          getNamenodeDesc(), e.getMessage());
    } catch (Throwable e) {
      // Generic error that we don't know about
      LOG.error("Unexpected exception while communicating with {}: {}",
          getNamenodeDesc(), e.getMessage(), e);
    }
    return report;
  }

  @VisibleForTesting
  NNHAServiceTarget getLocalTarget(){
    return this.localTarget;
  }

  /**
   * Get the description of the Namenode to monitor.
   * @return Description of the Namenode to monitor.
   */
  public String getNamenodeDesc() {
    if (namenodeId != null && !namenodeId.isEmpty()) {
      return nameserviceId + "-" + namenodeId + ":" + serviceAddress;
    } else {
      return nameserviceId + ":" + serviceAddress;
    }
  }

  private static String getNnHeartBeatServiceName(String nsId, String nnId) {
    return NamenodeHeartbeatService.class.getSimpleName() +
        (nsId == null ? "" : " " + nsId) +
        (nnId == null ? "" : " " + nnId);
  }

  /**
   * Get the namespace information for a Namenode via RPC and add them to the report.
   * @param serviceURI Server address of the Namenode to monitor.
   * @param report Namenode status report updating with namespace information data.
   * @throws IOException This method will throw IOException up, because RBF need
   *                     use Namespace Info to identify this NS. If there are some IOExceptions,
   *                     RBF doesn't need to get other information from NameNode,
   *                     so throw IOException up.
   */
  private void updateNameSpaceInfoParameters(URI serviceURI,
      NamenodeStatusReport report) throws IOException {
    try {
      if (this.namenodeProtocol == null) {
        this.namenodeProtocol = NameNodeProxies.createProxy(this.conf, serviceURI,
            NamenodeProtocol.class).getProxy();
      }
      if (namenodeProtocol != null) {
        NamespaceInfo info = namenodeProtocol.versionRequest();
        if (info != null) {
          report.setNamespaceInfo(info);
        }
      }
    } catch (IOException e) {
      this.namenodeProtocol = null;
      throw e;
    }
  }

  /**
   * Get the safemode information for a Namenode via RPC and add them to the report.
   * Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
   * So If there are some IOExceptions, RBF can just ignore it and try to collect
   * other information form namenode continue.
   * @param serviceURI Server address of the Namenode to monitor.
   * @param report Namenode status report updating with safemode information data.
   */
  private void updateSafeModeParameters(URI serviceURI, NamenodeStatusReport report) {
    try {
      if (this.clientProtocol == null) {
        this.clientProtocol = NameNodeProxies
            .createProxy(this.conf, serviceURI, ClientProtocol.class)
            .getProxy();
      }
      if (clientProtocol != null) {
        boolean isSafeMode = clientProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
        report.setSafeMode(isSafeMode);
      }
    } catch (Exception e) {
      LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
      this.clientProtocol = null;
    }
  }

  /**
   * Get the parameters for a Namenode from JMX and add them to the report.
   * @param address Web interface of the Namenode to monitor.
   * @param report Namenode status report to update with JMX data.
   */
  private void updateJMXParameters(
      String address, NamenodeStatusReport report) {
    try {
      // TODO part of this should be moved to its own utility
      getFsNamesystemMetrics(address, report);
      getNamenodeInfoMetrics(address, report);
    } catch (Exception e) {
      LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
    }
  }

  /**
   * Get the HA status for a Namenode via RPC and add them to the report.
   * @param report Namenode status report updating with HA status information data.
   */
  private void updateHAStatusParameters(NamenodeStatusReport report) {
    if (localTarget != null) {
      try {
        // Determine if NN is active
        if (localTargetHAProtocol == null) {
          localTargetHAProtocol = localTarget.getHealthMonitorProxy(
              conf, this.healthMonitorTimeoutMs);
          LOG.debug("Get HA status with address {}", lifelineAddress);
        }
        HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
        report.setHAServiceState(status.getState());
      } catch (Throwable e) {
        if (e.getMessage().startsWith("HA for namenode is not enabled")) {
          LOG.error("HA for {} is not enabled", getNamenodeDesc());
          localTarget = null;
        } else {
          // Failed to fetch HA status, ignoring failure
          LOG.error("Cannot fetch HA status for {}", getNamenodeDesc(), e);
        }
        localTargetHAProtocol = null;
      }
    }
  }

  /**
   * Fetches NamenodeInfo metrics from namenode.
   * @param address Web interface of the Namenode to monitor.
   * @param report Namenode status report to update with JMX data.
   * @throws JSONException
   */
  private void getNamenodeInfoMetrics(String address,
      NamenodeStatusReport report) throws JSONException {
    String query = "Hadoop:service=NameNode,name=NameNodeInfo";
    JSONArray aux =
        FederationUtil.getJmx(query, address, connectionFactory, scheme);
    if (aux != null && aux.length() > 0) {
      JSONObject jsonObject = aux.getJSONObject(0);
      String name = jsonObject.getString("name");
      if (name.equals("Hadoop:service=NameNode,name=NameNodeInfo")) {
        report.setNamenodeInfo(jsonObject.optInt("CorruptFilesCount"),
            jsonObject
                .optLong("NumberOfMissingBlocksWithReplicationFactorOne"),
            jsonObject
                .optLong("HighestPriorityLowRedundancyReplicatedBlocks"),
            jsonObject.optLong("HighestPriorityLowRedundancyECBlocks"));
      }
    }
  }

  /**
   * Fetches FSNamesystem* metrics from namenode.
   * @param address Web interface of the Namenode to monitor.
   * @param report Namenode status report to update with JMX data.
   * @throws JSONException
   */
  private void getFsNamesystemMetrics(String address,
      NamenodeStatusReport report) throws JSONException {
    String query = "Hadoop:service=NameNode,name=FSNamesystem*";
    JSONArray aux = FederationUtil.getJmx(
        query, address, connectionFactory, scheme);
    if (aux != null) {
      for (int i = 0; i < aux.length(); i++) {
        JSONObject jsonObject = aux.getJSONObject(i);
        String name = jsonObject.getString("name");
        if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
          report.setDatanodeInfo(
              jsonObject.getInt("NumLiveDataNodes"),
              jsonObject.getInt("NumDeadDataNodes"),
              jsonObject.getInt("NumStaleDataNodes"),
              jsonObject.getInt("NumDecommissioningDataNodes"),
              jsonObject.getInt("NumDecomLiveDataNodes"),
              jsonObject.getInt("NumDecomDeadDataNodes"),
              jsonObject.optInt("NumInMaintenanceLiveDataNodes"),
              jsonObject.optInt("NumInMaintenanceDeadDataNodes"),
              jsonObject.optInt("NumEnteringMaintenanceDataNodes"),
              jsonObject.optLong("ScheduledReplicationBlocks"));
        } else if (name.equals(
            "Hadoop:service=NameNode,name=FSNamesystem")) {
          report.setNamesystemInfo(
              jsonObject.getLong("CapacityRemaining"),
              jsonObject.getLong("CapacityTotal"),
              jsonObject.getLong("FilesTotal"),
              jsonObject.getLong("BlocksTotal"),
              jsonObject.getLong("MissingBlocks"),
              jsonObject.getLong("PendingReplicationBlocks"),
              jsonObject.getLong("UnderReplicatedBlocks"),
              jsonObject.getLong("PendingDeletionBlocks"),
              jsonObject.optLong("ProvidedCapacityTotal"),
              jsonObject.getInt("PendingSPSPaths"));
        }
      }
    }
  }

  @Override
  protected void serviceStop() throws Exception {
    LOG.info("Stopping NamenodeHeartbeat service for, NS {} NN {} ",
        this.nameserviceId, this.namenodeId);
    if (this.connectionFactory != null) {
      this.connectionFactory.destroy();
    }
    super.serviceStop();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConnectionContext 源码

hadoop ConnectionManager 源码

hadoop ConnectionNullException 源码

hadoop ConnectionPool 源码

hadoop ConnectionPoolId 源码

hadoop DFSRouter 源码

hadoop ErasureCoding 源码

hadoop FederationConnectionId 源码

hadoop FederationUtil 源码

hadoop IsRouterActiveServlet 源码

0  赞