hadoop RouterFederationRename 源码

  • 2022-10-20
  • 浏览 (176)

haddop RouterFederationRename 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
import org.apache.hadoop.tools.fedbalance.TrashProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DELAY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DIFF;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_TRASH;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT;
import static org.apache.hadoop.tools.fedbalance.FedBalance.DISTCP_PROCEDURE;
import static org.apache.hadoop.tools.fedbalance.FedBalance.TRASH_PROCEDURE;
import static org.apache.hadoop.tools.fedbalance.FedBalance.NO_MOUNT;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Rename across router federation namespaces based on federation balance. Both
 * the src and the dst coming from different namespaces need to have only one
 * destination. Snapshot paths are not allowed.
 * Users need write privilege of both src parent and dst parent to do router
 * federation rename.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RouterFederationRename {

  private static final Logger LOG =
      LoggerFactory.getLogger(RouterFederationRename.class.getName());
  private final RouterRpcServer rpcServer;
  private final Configuration conf;
  private final AtomicInteger routerRenameCounter = new AtomicInteger();
  public enum RouterRenameOption {
    NONE, DISTCP
  }

  public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) {
    this.rpcServer = rpcServer;
    this.conf = conf;
  }

  /**
   * Router federation rename across namespaces.
   *
   * @param src the source path. There is no mount point under the src path.
   * @param dst the dst path.
   * @param srcLocations the remote locations of src.
   * @param dstLocations the remote locations of dst.
   * @throws IOException if rename fails.
   * @return true if rename succeeds.
   */
  boolean routerFedRename(final String src, final String dst,
      final List<RemoteLocation> srcLocations,
      final List<RemoteLocation> dstLocations) throws IOException {
    if (!rpcServer.isEnableRenameAcrossNamespace()) {
      throw new IOException("Rename of " + src + " to " + dst
          + " is not allowed, no eligible destination in the same namespace was"
          + " found");
    }
    if (srcLocations.size() != 1 || dstLocations.size() != 1) {
      throw new IOException("Rename of " + src + " to " + dst + " is not"
          + " allowed. The remote location should be exactly one.");
    }
    RemoteLocation srcLoc = srcLocations.get(0);
    RemoteLocation dstLoc = dstLocations.get(0);
    checkSnapshotPath(srcLoc, dstLoc);
    checkPermission(srcLoc, dstLoc);

    UserGroupInformation routerUser = UserGroupInformation.getLoginUser();

    try {
      // as router user with saveJournal and task submission privileges
      return routerUser.doAs((PrivilegedExceptionAction<Boolean>) () -> {
        // Build and submit router federation rename job.
        BalanceJob job = buildRouterRenameJob(srcLoc.getNameserviceId(),
            dstLoc.getNameserviceId(), srcLoc.getDest(), dstLoc.getDest());
        BalanceProcedureScheduler scheduler = rpcServer.getFedRenameScheduler();
        countIncrement();
        try {
          scheduler.submit(job);
          LOG.info("Rename {} to {} from namespace {} to {}. JobId={}.", src,
              dst, srcLoc.getNameserviceId(), dstLoc.getNameserviceId(),
              job.getId());
          scheduler.waitUntilDone(job);
          if (job.getError() != null) {
            throw new IOException("Rename of " + src + " to " + dst +
                " failed.", job.getError());
          }
          return true;
        } finally {
          countDecrement();
        }
      });
    } catch (InterruptedException e) {
      LOG.warn("Fed balance job is interrupted.", e);
      throw new InterruptedIOException(e.getMessage());
    }
  }

  /**
   * Check router federation rename permission.
   */
  private void checkPermission(RemoteLocation src, RemoteLocation dst)
      throws IOException {
    try {
      if (UserGroupInformation.isSecurityEnabled()) {
        // In security mode, check permission as remote user proxy by router
        // user.
        String remoteUserName = NameNode.getRemoteUser().getShortUserName();
        UserGroupInformation proxyUser = UserGroupInformation
            .createProxyUser(remoteUserName,
                UserGroupInformation.getLoginUser());
        proxyUser.doAs((PrivilegedExceptionAction<Object>) () -> {
          checkRenamePermission(src, dst);
          return null;
        });
      } else {
        // In simple mode, check permission as remote user directly.
        checkRenamePermission(src, dst);
      }
    } catch (AccessControlException e) {
      throw new AccessControlException(
          "Permission denied rename " + src.getSrc() + "(" + src + ") to " + dst
              .getSrc() + "(" + dst + ") Reason=" + e.getMessage());
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new InterruptedIOException(
          "Router Federation Rename is interrupted while checking permission.");
    }
  }

  private void checkRenamePermission(RemoteLocation srcLoc,
      RemoteLocation dstLoc) throws IOException {
    // check src path permission.
    Path srcPath =
        new Path("hdfs://" + srcLoc.getNameserviceId() + srcLoc.getDest());
    srcPath.getFileSystem(conf).access(srcPath.getParent(), FsAction.WRITE);
    // check dst path permission.
    Path dstPath =
        new Path("hdfs://" + dstLoc.getNameserviceId() + dstLoc.getDest());
    dstPath.getFileSystem(conf).access(dstPath.getParent(), FsAction.WRITE);
  }

  static void checkSnapshotPath(RemoteLocation src, RemoteLocation dst)
      throws AccessControlException {
    if (src.getDest()
        .contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR + Path.SEPARATOR)) {
      throw new AccessControlException(
          "Router federation rename can't rename snapshot path. src=" + src
              .getSrc() + "(" + src + ")");
    }
    if (dst.getDest()
        .contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR + Path.SEPARATOR)) {
      throw new AccessControlException(
          "Router federation rename can't rename snapshot path. dst=" + dst
              .getSrc() + "(" + dst + ")");
    }
  }

  /**
   * Build router federation rename job moving data from src to dst.
   * @param srcNs the source namespace id.
   * @param dstNs the dst namespace id.
   * @param src the source path.
   * @param dst the dst path.
   */
  private BalanceJob buildRouterRenameJob(String srcNs, String dstNs,
      String src, String dst) throws IOException {
    checkConfiguration(conf);
    Path srcPath = new Path("hdfs://" + srcNs + src);
    Path dstPath = new Path("hdfs://" + dstNs + dst);
    boolean forceCloseOpen =
        conf.getBoolean(DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE,
            DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT);
    int map = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_MAP, -1);
    int bandwidth = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, -1);
    long delay = conf.getLong(DFS_ROUTER_FEDERATION_RENAME_DELAY,
        DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT);
    int diff = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_DIFF,
        DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT);
    String trashPolicy = conf.get(DFS_ROUTER_FEDERATION_RENAME_TRASH,
        DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT);
    FedBalanceConfigs.TrashOption trashOpt =
        FedBalanceConfigs.TrashOption.valueOf(trashPolicy.toUpperCase());
    // Construct job context.
    FedBalanceContext context =
        new FedBalanceContext.Builder(srcPath, dstPath, NO_MOUNT, conf)
            .setForceCloseOpenFiles(forceCloseOpen)
            .setUseMountReadOnly(true)
            .setMapNum(map)
            .setBandwidthLimit(bandwidth)
            .setTrash(trashOpt)
            .setDelayDuration(delay)
            .setDiffThreshold(diff)
            .build();

    LOG.info(context.toString());
    // Construct the balance job.
    BalanceJob.Builder<BalanceProcedure> builder = new BalanceJob.Builder<>();
    DistCpProcedure dcp =
        new DistCpProcedure(DISTCP_PROCEDURE, null, delay, context);
    builder.nextProcedure(dcp);
    TrashProcedure tp =
        new TrashProcedure(TRASH_PROCEDURE, null, delay, context);
    builder.nextProcedure(tp);
    return builder.build();
  }

  public int getRouterFederationRenameCount() {
    return routerRenameCounter.get();
  }

  void countIncrement() {
    routerRenameCounter.incrementAndGet();
  }

  void countDecrement() {
    routerRenameCounter.decrementAndGet();
  }

  static void checkConfiguration(Configuration conf) throws IOException {
    int map = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_MAP, -1);
    int bandwidth = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, -1);
    long delay = conf.getLong(DFS_ROUTER_FEDERATION_RENAME_DELAY,
        DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT);
    int diff = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_DIFF,
        DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT);
    if (map < 0) {
      throw new IOException("map=" + map + " is negative. Please check "
          + DFS_ROUTER_FEDERATION_RENAME_MAP);
    } else if (bandwidth < 0) {
      throw new IOException(
          "bandwidth=" + bandwidth + " is negative. Please check "
              + DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH);
    } else if (delay < 0) {
      throw new IOException("delay=" + delay + " is negative. Please check "
          + DFS_ROUTER_FEDERATION_RENAME_DELAY);
    } else if (diff < 0) {
      throw new IOException("diff=" + diff + " is negative. Please check "
          + DFS_ROUTER_FEDERATION_RENAME_DIFF);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConnectionContext 源码

hadoop ConnectionManager 源码

hadoop ConnectionNullException 源码

hadoop ConnectionPool 源码

hadoop ConnectionPoolId 源码

hadoop DFSRouter 源码

hadoop ErasureCoding 源码

hadoop FederationConnectionId 源码

hadoop FederationUtil 源码

hadoop IsRouterActiveServlet 源码

0  赞