hadoop RouterQuotaManager 源码

  • 2022-10-20
  • 浏览 (205)

haddop RouterQuotaManager 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry;

import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;

/**
 * Router quota manager in Router. The manager maintains
 * {@link RouterQuotaUsage} cache of mount tables and do management
 * for the quota caches.
 */
public class RouterQuotaManager {
  /** Quota usage <MountTable Path, Aggregated QuotaUsage> cache. */
  private TreeMap<String, RouterQuotaUsage> cache;

  /** Lock to access the quota cache. */
  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  private final Lock readLock = readWriteLock.readLock();
  private final Lock writeLock = readWriteLock.writeLock();

  public RouterQuotaManager() {
    this.cache = new TreeMap<>();
  }

  /**
   * Get all the mount quota paths.
   * @return All the mount quota paths.
   */
  public Set<String> getAll() {
    readLock.lock();
    try {
      return this.cache.keySet();
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Is the path a mount entry.
   *
   * @param path the path.
   * @return {@code true} if path is a mount entry; {@code false} otherwise.
   */
  boolean isMountEntry(String path) {
    readLock.lock();
    try {
      return this.cache.containsKey(path);
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get the nearest ancestor's quota usage, and meanwhile its quota was set.
   * @param path The path being written.
   * @return RouterQuotaUsage Quota usage.
   */
  public RouterQuotaUsage getQuotaUsage(String path) {
    readLock.lock();
    try {
      RouterQuotaUsage quotaUsage = this.cache.get(path);
      if (quotaUsage != null && isQuotaSet(quotaUsage)) {
        return quotaUsage;
      }

      // If not found, look for its parent path usage value.
      int pos = path.lastIndexOf(Path.SEPARATOR);
      if (pos != -1) {
        String parentPath = path.substring(0, pos);
        return getQuotaUsage(parentPath);
      }
    } finally {
      readLock.unlock();
    }

    return null;
  }

  /**
   * Get children paths (can include itself) under specified federation path.
   * @param parentPath Federated path.
   * @return Set of children paths.
   */
  public Set<String> getPaths(String parentPath) {
    readLock.lock();
    try {
      String from = parentPath;
      String to = parentPath + Character.MAX_VALUE;
      SortedMap<String, RouterQuotaUsage> subMap = this.cache.subMap(from, to);

      Set<String> validPaths = new HashSet<>();
      if (subMap != null) {
        for (String path : subMap.keySet()) {
          if (isParentEntry(path, parentPath)) {
            validPaths.add(path);
          }
        }
      }
      return validPaths;
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get parent paths (including itself) and quotas of the specified federation
   * path. Only parents containing quota are returned.
   * @param childPath Federated path.
   * @return TreeMap of parent paths and quotas.
   */
  TreeMap<String, RouterQuotaUsage> getParentsContainingQuota(
      String childPath) {
    TreeMap<String, RouterQuotaUsage> res = new TreeMap<>();
    readLock.lock();
    try {
      Entry<String, RouterQuotaUsage> entry = this.cache.floorEntry(childPath);
      while (entry != null) {
        String mountPath = entry.getKey();
        RouterQuotaUsage quota = entry.getValue();
        if (isQuotaSet(quota) && isParentEntry(childPath, mountPath)) {
          res.put(mountPath, quota);
        }
        entry = this.cache.lowerEntry(mountPath);
      }
      return res;
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Put new entity into cache.
   * @param path Mount table path.
   * @param quotaUsage Corresponding cache value.
   */
  public void put(String path, RouterQuotaUsage quotaUsage) {
    writeLock.lock();
    try {
      this.cache.put(path, quotaUsage);
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Update quota in cache. The usage will be preserved.
   * @param path Mount table path.
   * @param quota Corresponding quota value.
   */
  public void updateQuota(String path, RouterQuotaUsage quota) {
    writeLock.lock();
    try {
      RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
          .quota(quota.getQuota()).spaceQuota(quota.getSpaceQuota());
      RouterQuotaUsage current = this.cache.get(path);
      if (current != null) {
        builder.fileAndDirectoryCount(current.getFileAndDirectoryCount())
            .spaceConsumed(current.getSpaceConsumed());
      }
      this.cache.put(path, builder.build());
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Remove the entity from cache.
   * @param path Mount table path.
   */
  public void remove(String path) {
    writeLock.lock();
    try {
      this.cache.remove(path);
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Clean up the cache.
   */
  public void clear() {
    writeLock.lock();
    try {
      this.cache.clear();
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Check if the quota was set.
   * @param quota the quota usage.
   * @return True if the quota is set.
   */
  public static boolean isQuotaSet(QuotaUsage quota) {
    if (quota != null) {
      long nsQuota = quota.getQuota();
      long ssQuota = quota.getSpaceQuota();

      // once nsQuota or ssQuota was set, this mount table is quota set
      if (nsQuota != HdfsConstants.QUOTA_RESET
          || ssQuota != HdfsConstants.QUOTA_RESET || Quota.orByStorageType(
              t -> quota.getTypeQuota(t) != HdfsConstants.QUOTA_RESET)) {
        return true;
      }
    }

    return false;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConnectionContext 源码

hadoop ConnectionManager 源码

hadoop ConnectionNullException 源码

hadoop ConnectionPool 源码

hadoop ConnectionPoolId 源码

hadoop DFSRouter 源码

hadoop ErasureCoding 源码

hadoop FederationConnectionId 源码

hadoop FederationUtil 源码

hadoop IsRouterActiveServlet 源码

0  赞