hadoop RouterQuotaUpdateService 源码

  • 2022-10-20
  • 浏览 (171)

haddop RouterQuotaUpdateService 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Service to periodically update the {@link RouterQuotaUsage}
 * cached information in the {@link Router}.
 */
public class RouterQuotaUpdateService extends PeriodicService {
  private static final Logger LOG =
      LoggerFactory.getLogger(RouterQuotaUpdateService.class);

  private MountTableStore mountTableStore;
  private RouterRpcServer rpcServer;
  /** Router using this Service. */
  private final Router router;
  /** Router Quota manager. */
  private RouterQuotaManager quotaManager;

  public RouterQuotaUpdateService(final Router router) throws IOException {
    super(RouterQuotaUpdateService.class.getName());
    this.router = router;
    this.rpcServer = router.getRpcServer();
    this.quotaManager = router.getQuotaManager();

    if (this.quotaManager == null) {
      throw new IOException("Router quota manager is not initialized.");
    }
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    this.setIntervalMs(conf.getTimeDuration(
        RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPDATE_INTERVAL,
        RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPDATE_INTERVAL_DEFAULT,
        TimeUnit.MILLISECONDS));

    super.serviceInit(conf);
  }

  @Override
  protected void periodicInvoke() {
    LOG.debug("Start to update quota cache.");
    try {
      List<MountTable> mountTables = getQuotaSetMountTables();
      Map<RemoteLocation, QuotaUsage> remoteQuotaUsage = new HashMap<>();
      for (MountTable entry : mountTables) {
        String src = entry.getSourcePath();
        RouterQuotaUsage oldQuota = entry.getQuota();
        long nsQuota = oldQuota.getQuota();
        long ssQuota = oldQuota.getSpaceQuota();
        long[] typeQuota = new long[StorageType.values().length];
        Quota.eachByStorageType(
            t -> typeQuota[t.ordinal()] = oldQuota.getTypeQuota(t));

        QuotaUsage currentQuotaUsage = null;

        // Check whether destination path exists in filesystem. When the
        // mtime is zero, the destination is not present and reset the usage.
        // This is because mount table does not have mtime.
        // For other mount entry get current quota usage
        HdfsFileStatus ret = this.rpcServer.getFileInfo(src);
        if (ret == null || ret.getModificationTime() == 0) {
          long[] zeroConsume = new long[StorageType.values().length];
          currentQuotaUsage =
              new RouterQuotaUsage.Builder().fileAndDirectoryCount(0)
                  .quota(nsQuota).spaceConsumed(0).spaceQuota(ssQuota)
                  .typeConsumed(zeroConsume)
                  .typeQuota(typeQuota).build();
        } else {
          // Call RouterRpcServer#getQuotaUsage for getting current quota usage.
          // If any exception occurs catch it and proceed with other entries.
          try {
            Quota quotaModule = this.rpcServer.getQuotaModule();
            Map<RemoteLocation, QuotaUsage> usageMap =
                quotaModule.getEachQuotaUsage(src);
            currentQuotaUsage = quotaModule.aggregateQuota(src, usageMap);
            remoteQuotaUsage.putAll(usageMap);
          } catch (IOException ioe) {
            LOG.error("Unable to get quota usage for " + src, ioe);
            continue;
          }
        }

        RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
            currentQuotaUsage);
        this.quotaManager.put(src, newQuota);
        entry.setQuota(newQuota);
      }

      // Fix inconsistent quota.
      for (Entry<RemoteLocation, QuotaUsage> en : remoteQuotaUsage
          .entrySet()) {
        RemoteLocation remoteLocation = en.getKey();
        QuotaUsage currentQuota = en.getValue();
        fixGlobalQuota(remoteLocation, currentQuota);
      }
    } catch (IOException e) {
      LOG.error("Quota cache updated error.", e);
    }
  }

  private void fixGlobalQuota(RemoteLocation location, QuotaUsage remoteQuota)
      throws IOException {
    QuotaUsage gQuota =
        this.rpcServer.getQuotaModule().getGlobalQuota(location.getSrc());
    if (remoteQuota.getQuota() != gQuota.getQuota()
        || remoteQuota.getSpaceQuota() != gQuota.getSpaceQuota()) {
      this.rpcServer.getQuotaModule()
          .setQuotaInternal(location.getSrc(), Arrays.asList(location),
              gQuota.getQuota(), gQuota.getSpaceQuota(), null);
      LOG.info("[Fix Quota] src={} dst={} oldQuota={}/{} newQuota={}/{}",
          location.getSrc(), location, remoteQuota.getQuota(),
          remoteQuota.getSpaceQuota(), gQuota.getQuota(),
          gQuota.getSpaceQuota());
    }
    for (StorageType t : StorageType.values()) {
      if (remoteQuota.getTypeQuota(t) != gQuota.getTypeQuota(t)) {
        this.rpcServer.getQuotaModule()
            .setQuotaInternal(location.getSrc(), Arrays.asList(location),
                HdfsConstants.QUOTA_DONT_SET, gQuota.getTypeQuota(t), t);
        LOG.info("[Fix Quota] src={} dst={} type={} oldQuota={} newQuota={}",
            location.getSrc(), location, t, remoteQuota.getTypeQuota(t),
            gQuota.getTypeQuota(t));
      }
    }
  }

  /**
   * Get mount table store management interface.
   * @return MountTableStore instance.
   * @throws IOException
   */
  private MountTableStore getMountTableStore() throws IOException {
    if (this.mountTableStore == null) {
      this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
          MountTableStore.class);
      if (this.mountTableStore == null) {
        throw new IOException("Mount table state store is not available.");
      }
    }
    return this.mountTableStore;
  }

  /**
   * Get all the existing mount tables.
   * @return List of mount tables.
   * @throws IOException
   */
  private List<MountTable> getMountTableEntries() throws IOException {
    // scan mount tables from root path
    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
        .newInstance("/");
    GetMountTableEntriesResponse getResponse = getMountTableStore()
        .getMountTableEntries(getRequest);
    return getResponse.getEntries();
  }

  /**
   * Get mount tables which quota was set.
   * During this time, the quota usage cache will also be updated by
   * quota manager:
   * 1. Stale paths (entries) will be removed.
   * 2. Existing entries will be overridden and updated.
   * @return List of mount tables which quota was set.
   * @throws IOException
   */
  private List<MountTable> getQuotaSetMountTables() throws IOException {
    List<MountTable> mountTables = getMountTableEntries();
    Set<String> allPaths = this.quotaManager.getAll();
    Set<String> stalePaths = new HashSet<>(allPaths);

    List<MountTable> neededMountTables = new LinkedList<>();
    for (MountTable entry : mountTables) {
      // select mount tables which is quota set
      if (isQuotaSet(entry)) {
        neededMountTables.add(entry);
      }

      // update mount table entries info in quota cache
      String src = entry.getSourcePath();
      this.quotaManager.updateQuota(src, entry.getQuota());
      stalePaths.remove(src);
    }

    // remove stale paths that currently cached
    for (String stalePath : stalePaths) {
      this.quotaManager.remove(stalePath);
    }

    return neededMountTables;
  }

  /**
   * Check if the quota was set in given MountTable.
   * @param mountTable Mount table entry.
   */
  private boolean isQuotaSet(MountTable mountTable) {
    if (mountTable != null) {
      return this.quotaManager.isQuotaSet(mountTable.getQuota());
    }
    return false;
  }

  /**
   * Generate a new quota based on old quota and current quota usage value.
   * @param oldQuota Old quota stored in State Store.
   * @param currentQuotaUsage Current quota usage value queried from
   *        subcluster.
   * @return A new RouterQuotaUsage.
   */
  private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota,
      QuotaUsage currentQuotaUsage) {
    RouterQuotaUsage.Builder newQuotaBuilder = new RouterQuotaUsage.Builder()
        .fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount())
        .quota(oldQuota.getQuota())
        .spaceConsumed(currentQuotaUsage.getSpaceConsumed())
        .spaceQuota(oldQuota.getSpaceQuota());
    Quota.eachByStorageType(t -> {
      newQuotaBuilder.typeQuota(t, oldQuota.getTypeQuota(t));
      newQuotaBuilder.typeConsumed(t, currentQuotaUsage.getTypeConsumed(t));
    });
    return newQuotaBuilder.build();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConnectionContext 源码

hadoop ConnectionManager 源码

hadoop ConnectionNullException 源码

hadoop ConnectionPool 源码

hadoop ConnectionPoolId 源码

hadoop DFSRouter 源码

hadoop ErasureCoding 源码

hadoop FederationConnectionId 源码

hadoop FederationUtil 源码

hadoop IsRouterActiveServlet 源码

0  赞