hadoop MembershipNamenodeResolver 源码

  • 2022-10-20
  • 浏览 (251)

haddop MembershipNamenodeResolver 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.resolver;

import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.OBSERVER;
import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implements a cached lookup of the most recently active namenode for a
 * particular nameservice. Relies on the {@link StateStoreService} to
 * discover available nameservices and namenodes.
 */
public class MembershipNamenodeResolver
    implements ActiveNamenodeResolver, StateStoreCache {

  private static final Logger LOG =
      LoggerFactory.getLogger(MembershipNamenodeResolver.class);

  /** Reference to the State Store. */
  private final StateStoreService stateStore;
  /** Membership State Store interface. */
  private MembershipStore membershipInterface;
  /** Disabled Nameservice State Store interface. */
  private DisabledNameserviceStore disabledNameserviceInterface;

  /** Parent router ID. */
  private String routerId;

  /** Cached lookup of namenodes for nameservice. The keys are a pair of the nameservice
   * name and a boolean indicating if observer namenodes should be listed first.
   * If true, observer namenodes are listed first. If false, active namenodes are listed first.
   *  Invalidated on cache refresh. */
  private Map<Pair<String,Boolean>, List<? extends FederationNamenodeContext>> cacheNS;
  /** Cached lookup of NN for block pool. Invalidated on cache refresh. */
  private Map<String, List<? extends FederationNamenodeContext>> cacheBP;


  public MembershipNamenodeResolver(
      Configuration conf, StateStoreService store) throws IOException {
    this.stateStore = store;

    this.cacheNS = new ConcurrentHashMap<>();
    this.cacheBP = new ConcurrentHashMap<>();

    if (this.stateStore != null) {
      // Request cache updates from the state store
      this.stateStore.registerCacheExternal(this);
    }
  }

  private synchronized MembershipStore getMembershipStore() throws IOException {
    if (this.membershipInterface == null) {
      this.membershipInterface = getStoreInterface(MembershipStore.class);
    }
    return this.membershipInterface;
  }

  private synchronized DisabledNameserviceStore getDisabledNameserviceStore()
      throws IOException {
    if (this.disabledNameserviceInterface == null) {
      this.disabledNameserviceInterface =
          getStoreInterface(DisabledNameserviceStore.class);
    }
    return this.disabledNameserviceInterface;
  }

  private <T extends RecordStore<?>> T getStoreInterface(Class<T> clazz)
      throws IOException{
    T store = this.stateStore.getRegisteredRecordStore(clazz);
    if (store == null) {
      throw new IOException("State Store does not have an interface for " +
          clazz.getSimpleName());
    }
    return store;
  }

  @Override
  public boolean loadCache(boolean force) {
    // Our cache depends on the store, update it first
    try {
      MembershipStore membership = getMembershipStore();
      membership.loadCache(force);
      DisabledNameserviceStore disabled = getDisabledNameserviceStore();
      disabled.loadCache(force);
    } catch (IOException e) {
      LOG.error("Cannot update membership from the State Store", e);
    }

    // Force refresh of active NN cache
    cacheBP.clear();
    cacheNS.clear();
    return true;
  }

  @Override public void updateUnavailableNamenode(String nsId,
      InetSocketAddress address) throws IOException {
    updateNameNodeState(nsId, address, UNAVAILABLE);
  }

  @Override
  public void updateActiveNamenode(
      final String nsId, final InetSocketAddress address) throws IOException {
    updateNameNodeState(nsId, address, ACTIVE);
  }


  private void updateNameNodeState(final String nsId,
      final InetSocketAddress address, FederationNamenodeServiceState state)
      throws IOException {
    // Temporarily update our cache, it will be overwritten on the next update.
    try {
      MembershipState partial = MembershipState.newInstance();
      String rpcAddress = address.getHostName() + ":" + address.getPort();
      partial.setRpcAddress(rpcAddress);
      partial.setNameserviceId(nsId);

      GetNamenodeRegistrationsRequest request =
          GetNamenodeRegistrationsRequest.newInstance(partial);

      MembershipStore membership = getMembershipStore();
      GetNamenodeRegistrationsResponse response =
          membership.getNamenodeRegistrations(request);
      List<MembershipState> records = response.getNamenodeMemberships();

      if (records != null && records.size() == 1) {
        MembershipState record = records.get(0);
        UpdateNamenodeRegistrationRequest updateRequest =
            UpdateNamenodeRegistrationRequest.newInstance(
                record.getNameserviceId(), record.getNamenodeId(), state);
        membership.updateNamenodeRegistration(updateRequest);

        cacheNS.remove(Pair.of(nsId, Boolean.TRUE));
        cacheNS.remove(Pair.of(nsId, Boolean.FALSE));
        // Invalidating the full cacheBp since getting the blockpool id from
        // namespace id is quite costly.
        cacheBP.clear();
      }
    } catch (StateStoreUnavailableException e) {
      LOG.error("Cannot update {} as active, State Store unavailable", address);
    }
  }

  @Override
  public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
      final String nsId, boolean listObserversFirst) throws IOException {

    List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
    if (ret != null) {
      return ret;
    }

    // Not cached, generate the value
    final List<MembershipState> result;
    try {
      MembershipState partial = MembershipState.newInstance();
      partial.setNameserviceId(nsId);
      GetNamenodeRegistrationsRequest request =
          GetNamenodeRegistrationsRequest.newInstance(partial);
      result = getRecentRegistrationForQuery(request, true,
          false, listObserversFirst);
    } catch (StateStoreUnavailableException e) {
      LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
      return null;
    }
    if (result == null || result.isEmpty()) {
      LOG.error("Cannot locate eligible NNs for {}", nsId);
      return null;
    }

    // Mark disabled name services
    try {
      Set<String> disabled =
          getDisabledNameserviceStore().getDisabledNameservices();
      if (disabled == null) {
        LOG.error("Cannot get disabled name services");
      } else {
        for (MembershipState nn : result) {
          if (disabled.contains(nn.getNameserviceId())) {
            nn.setState(FederationNamenodeServiceState.DISABLED);
          }
        }
      }
    } catch (StateStoreUnavailableException e) {
      LOG.error("Cannot get disabled name services, State Store unavailable");
    }

    // Cache the response
    ret = Collections.unmodifiableList(result);
    cacheNS.put(Pair.of(nsId, listObserversFirst), result);
    return ret;
  }

  @Override
  public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
      final String bpId) throws IOException {

    List<? extends FederationNamenodeContext> ret = cacheBP.get(bpId);
    if (ret == null) {
      try {
        MembershipState partial = MembershipState.newInstance();
        partial.setBlockPoolId(bpId);
        GetNamenodeRegistrationsRequest request =
            GetNamenodeRegistrationsRequest.newInstance(partial);

        final List<MembershipState> result =
            getRecentRegistrationForQuery(request, true, false, false);
        if (result == null || result.isEmpty()) {
          LOG.error("Cannot locate eligible NNs for {}", bpId);
        } else {
          cacheBP.put(bpId, result);
          ret = result;
        }
      } catch (StateStoreUnavailableException e) {
        LOG.error("Cannot get active NN for {}, State Store unavailable", bpId);
        return null;
      }
    }
    if (ret == null) {
      return null;
    }
    return Collections.unmodifiableList(ret);
  }

  @Override
  public boolean registerNamenode(NamenodeStatusReport report)
      throws IOException {

    if (this.routerId == null) {
      LOG.warn("Cannot register namenode, router ID is not known {}", report);
      return false;
    }

    MembershipState record = MembershipState.newInstance(
        routerId, report.getNameserviceId(), report.getNamenodeId(),
        report.getClusterId(), report.getBlockPoolId(),
        NetUtils.normalizeIP2HostName(report.getRpcAddress()),
        report.getServiceAddress(), report.getLifelineAddress(),
        report.getWebScheme(), report.getWebAddress(), report.getState(),
        report.getSafemode());

    if (report.statsValid()) {
      MembershipStats stats = MembershipStats.newInstance();
      stats.setNumOfFiles(report.getNumFiles());
      stats.setNumOfBlocks(report.getNumBlocks());
      stats.setNumOfBlocksMissing(report.getNumBlocksMissing());
      stats.setNumOfBlocksPendingReplication(
          report.getNumOfBlocksPendingReplication());
      stats.setNumOfBlocksUnderReplicated(
          report.getNumOfBlocksUnderReplicated());
      stats.setNumOfBlocksPendingDeletion(
          report.getNumOfBlocksPendingDeletion());
      stats.setAvailableSpace(report.getAvailableSpace());
      stats.setTotalSpace(report.getTotalSpace());
      stats.setProvidedSpace(report.getProvidedSpace());
      stats.setNumOfDecommissioningDatanodes(
          report.getNumDecommissioningDatanodes());
      stats.setNumOfActiveDatanodes(report.getNumLiveDatanodes());
      stats.setNumOfDeadDatanodes(report.getNumDeadDatanodes());
      stats.setNumOfStaleDatanodes(report.getNumStaleDatanodes());
      stats.setNumOfDecomActiveDatanodes(report.getNumDecomLiveDatanodes());
      stats.setNumOfDecomDeadDatanodes(report.getNumDecomDeadDatanodes());
      stats.setNumOfInMaintenanceLiveDataNodes(
          report.getNumInMaintenanceLiveDataNodes());
      stats.setNumOfInMaintenanceDeadDataNodes(
          report.getNumInMaintenanceDeadDataNodes());
      stats.setNumOfEnteringMaintenanceDataNodes(
          report.getNumEnteringMaintenanceDataNodes());
      stats.setCorruptFilesCount(report.getCorruptFilesCount());
      stats.setScheduledReplicationBlocks(
          report.getScheduledReplicationBlocks());
      stats.setNumberOfMissingBlocksWithReplicationFactorOne(
          report.getNumberOfMissingBlocksWithReplicationFactorOne());
      stats.setHighestPriorityLowRedundancyReplicatedBlocks(
          report.getHighestPriorityLowRedundancyReplicatedBlocks());
      stats.setHighestPriorityLowRedundancyECBlocks(
          report.getHighestPriorityLowRedundancyECBlocks());
      stats.setPendingSPSPaths(report.getPendingSPSPaths());
      record.setStats(stats);
    }

    if (report.getState() != UNAVAILABLE) {
      // Set/update our last contact time
      record.setLastContact(Time.now());
    }

    NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance();
    request.setNamenodeMembership(record);
    return getMembershipStore().namenodeHeartbeat(request).getResult();
  }

  @Override
  public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
    GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
    GetNamespaceInfoResponse response =
        getMembershipStore().getNamespaceInfo(request);
    Set<FederationNamespaceInfo> nss = response.getNamespaceInfo();

    // Filter disabled namespaces
    Set<FederationNamespaceInfo> ret = new TreeSet<>();
    Set<String> disabled = getDisabledNamespaces();
    for (FederationNamespaceInfo ns : nss) {
      if (!disabled.contains(ns.getNameserviceId())) {
        ret.add(ns);
      }
    }

    return ret;
  }

  @Override
  public Set<String> getDisabledNamespaces() throws IOException {
    DisabledNameserviceStore store = getDisabledNameserviceStore();
    return store.getDisabledNameservices();
  }

  /**
   * Picks the most relevant record registration that matches the query.
   * If not observer read,
   * return registrations matching the query in this preference:
   * 1) Most recently updated ACTIVE registration
   * 2) Most recently updated Observer registration
   * 3) Most recently updated STANDBY registration (if showStandby)
   * 4) Most recently updated UNAVAILABLE registration (if showUnavailable).
   *
   * If observer read,
   * return registrations matching the query in this preference:
   * 1) Observer registrations, shuffled to disperse queries.
   * 2) Most recently updated ACTIVE registration
   * 3) Most recently updated STANDBY registration (if showStandby)
   * 4) Most recently updated UNAVAILABLE registration (if showUnavailable).
   *
   * EXPIRED registrations are ignored.
   *
   * @param request The select query for NN registrations.
   * @param addUnavailable include UNAVAILABLE registrations.
   * @param addExpired include EXPIRED registrations.
   * @param observerRead  Observer read case, observer NN will be ranked first
   * @return List of memberships or null if no registrations that
   *         both match the query AND the selected states.
   * @throws IOException
   */
  private List<MembershipState> getRecentRegistrationForQuery(
      GetNamenodeRegistrationsRequest request, boolean addUnavailable,
      boolean addExpired, boolean observerRead) throws IOException {

    // Retrieve a list of all registrations that match this query.
    // This may include all NN records for a namespace/blockpool, including
    // duplicate records for the same NN from different routers.
    MembershipStore membershipStore = getMembershipStore();
    GetNamenodeRegistrationsResponse response =
        membershipStore.getNamenodeRegistrations(request);

    List<MembershipState> memberships = response.getNamenodeMemberships();
    List<MembershipState> observerMemberships = new ArrayList<>();
    Iterator<MembershipState> iterator = memberships.iterator();
    while (iterator.hasNext()) {
      MembershipState membership = iterator.next();
      if (membership.getState() == EXPIRED && !addExpired) {
        iterator.remove();
      } else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
        iterator.remove();
      } else if (membership.getState() == OBSERVER && observerRead) {
        iterator.remove();
        observerMemberships.add(membership);
      }
    }

    memberships.sort(new NamenodePriorityComparator());
    if(observerRead) {
      List<MembershipState> ret = new ArrayList<>(
          memberships.size() + observerMemberships.size());
      if(observerMemberships.size() > 1) {
        Collections.shuffle(observerMemberships);
      }
      ret.addAll(observerMemberships);
      ret.addAll(memberships);
      memberships = ret;
    }

    LOG.debug("Selected most recent NN {} for query", memberships);
    return memberships;
  }

  @Override
  public void setRouterId(String router) {
    this.routerId = router;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ActiveNamenodeResolver 源码

hadoop FederationNamenodeContext 源码

hadoop FederationNamenodeServiceState 源码

hadoop FederationNamespaceInfo 源码

hadoop FileSubclusterResolver 源码

hadoop MountTableManager 源码

hadoop MountTableResolver 源码

hadoop MultipleDestinationMountTableResolver 源码

hadoop NamenodePriorityComparator 源码

hadoop NamenodeStatusReport 源码

0  赞