hadoop TempQueuePerPartition 源码

  • 2022-10-20
  • 浏览 (193)

haddop TempQueuePerPartition 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * Temporary data-structure tracking resource availability, pending resource
 * need, current utilization. This is per-queue-per-partition data structure
 */
public class TempQueuePerPartition extends AbstractPreemptionEntity {
  // Following fields are copied from scheduler
  final String partition;

  private final Resource killable;
  private final float absCapacity;
  private final float absMaxCapacity;
  final Resource totalPartitionResource;

  // Following fields are settled and used by candidate selection policies
  Resource untouchableExtra;
  Resource preemptableExtra;

  double[] normalizedGuarantee;

  private Resource effMinRes;
  private Resource effMaxRes;

  final ArrayList<TempQueuePerPartition> children;
  private Collection<TempAppPerPartition> apps;
  AbstractLeafQueue leafQueue;
  ParentQueue parentQueue;
  boolean preemptionDisabled;

  protected Resource pendingDeductReserved;

  // Relative priority of this queue to its parent
  // If parent queue's ordering policy doesn't respect priority,
  // this will be always 0
  int relativePriority = 0;
  TempQueuePerPartition parent = null;

  // This will hold a temp user data structure and will hold userlimit,
  // idealAssigned, used etc.
  Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>();

  @SuppressWarnings("checkstyle:parameternumber")
  public TempQueuePerPartition(String queueName, Resource current,
      boolean preemptionDisabled, String partition, Resource killable,
      float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
      Resource reserved, CSQueue queue, Resource effMinRes,
      Resource effMaxRes) {
    super(queueName, current, Resource.newInstance(0, 0), reserved,
        Resource.newInstance(0, 0));

    if (queue instanceof AbstractLeafQueue) {
      AbstractLeafQueue l = (AbstractLeafQueue) queue;
      pending = l.getTotalPendingResourcesConsideringUserLimit(
          totalPartitionResource, partition, false);
      pendingDeductReserved = l.getTotalPendingResourcesConsideringUserLimit(
          totalPartitionResource, partition, true);
      leafQueue = l;
    } else {
      pending = Resources.createResource(0);
      pendingDeductReserved = Resources.createResource(0);
    }

    if (queue != null && ParentQueue.class.isAssignableFrom(queue.getClass())) {
      parentQueue = (ParentQueue) queue;
    }

    this.normalizedGuarantee = new double[ResourceUtils
        .getNumberOfCountableResourceTypes()];
    this.children = new ArrayList<>();
    this.apps = new ArrayList<>();
    this.untouchableExtra = Resource.newInstance(0, 0);
    this.preemptableExtra = Resource.newInstance(0, 0);
    this.preemptionDisabled = preemptionDisabled;
    this.partition = partition;
    this.killable = killable;
    this.absCapacity = absCapacity;
    this.absMaxCapacity = absMaxCapacity;
    this.totalPartitionResource = totalPartitionResource;
    this.effMinRes = effMinRes;
    this.effMaxRes = effMaxRes;
  }

  public void setLeafQueue(AbstractLeafQueue l) {
    assert children.size() == 0;
    this.leafQueue = l;
  }

  /**
   * When adding a child we also aggregate its pending resource needs.
   *
   * @param q
   *          the child queue to add to this queue
   */
  public void addChild(TempQueuePerPartition q) {
    assert leafQueue == null;
    children.add(q);
    Resources.addTo(pending, q.pending);
    Resources.addTo(pendingDeductReserved, q.pendingDeductReserved);
  }

  public ArrayList<TempQueuePerPartition> getChildren() {
    return children;
  }

  // This function "accepts" all the resources it can (pending) and return
  // the unused ones
  Resource offer(Resource avail, ResourceCalculator rc,
      Resource clusterResource, boolean considersReservedResource,
      boolean allowQueueBalanceAfterAllSafisfied) {
    Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
        Resources.subtract(getMax(), idealAssigned),
        Resource.newInstance(0, 0));
    // accepted = min{avail,
    //               max - assigned,
    //               current + pending - assigned,
    //               # Make sure a queue will not get more than max of its
    //               # used/guaranteed, this is to make sure preemption won't
    //               # happen if all active queues are beyond their guaranteed
    //               # This is for leaf queue only.
    //               max(guaranteed, used) - assigned}
    // remain = avail - accepted
    Resource accepted = Resources.componentwiseMin(
        absMaxCapIdealAssignedDelta,
        Resources.min(rc, clusterResource, avail, Resources
            /*
             * When we're using FifoPreemptionSelector (considerReservedResource
             * = false).
             *
             * We should deduct reserved resource from pending to avoid excessive
             * preemption:
             *
             * For example, if an under-utilized queue has used = reserved = 20.
             * Preemption policy will try to preempt 20 containers (which is not
             * satisfied) from different hosts.
             *
             * In FifoPreemptionSelector, there's no guarantee that preempted
             * resource can be used by pending request, so policy will preempt
             * resources repeatly.
             */
            .subtract(Resources.add(getUsed(),
                (considersReservedResource ? pending : pendingDeductReserved)),
                idealAssigned)));

    // For leaf queue: accept = min(accept, max(guaranteed, used) - assigned)
    // Why only for leaf queue?
    // Because for a satisfied parent queue, it could have some under-utilized
    // leaf queues. Such under-utilized leaf queue could preemption resources
    // from over-utilized leaf queue located at other hierarchies.

    // Allow queues can continue grow and balance even if all queues are satisfied.
    if (!allowQueueBalanceAfterAllSafisfied) {
      accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
    }

    // accepted so far contains the "quota acceptable" amount, we now filter by
    // locality acceptable

    accepted = acceptedByLocality(rc, accepted);

    // accept should never be < 0
    accepted = Resources.componentwiseMax(accepted, Resources.none());

    // or more than offered
    accepted = Resources.componentwiseMin(accepted, avail);

    Resource remain = Resources.subtract(avail, accepted);
    Resources.addTo(idealAssigned, accepted);
    return remain;
  }

  public float getAbsCapacity() {
    return absCapacity;
  }

  public Resource getGuaranteed() {
    if(!effMinRes.equals(Resources.none())) {
      return Resources.clone(effMinRes);
    }

    return Resources.multiply(totalPartitionResource, absCapacity);
  }

  public Resource getMax() {
    if(!effMaxRes.equals(Resources.none())) {
      return Resources.clone(effMaxRes);
    }

    return Resources.multiply(totalPartitionResource, absMaxCapacity);
  }

  public void updatePreemptableExtras(ResourceCalculator rc) {
    // Reset untouchableExtra and preemptableExtra
    untouchableExtra = Resources.none();
    preemptableExtra = Resources.none();

    Resource extra = Resources.subtract(getUsed(), getGuaranteed());
    if (Resources.lessThan(rc, totalPartitionResource, extra,
        Resources.none())) {
      extra = Resources.none();
    }

    if (null == children || children.isEmpty()) {
      // If it is a leaf queue
      if (preemptionDisabled) {
        untouchableExtra = extra;
      } else {
        preemptableExtra = extra;
      }
    } else {
      // If it is a parent queue
      Resource childrensPreemptable = Resource.newInstance(0, 0);
      for (TempQueuePerPartition child : children) {
        Resources.addTo(childrensPreemptable, child.preemptableExtra);
      }
      // untouchableExtra = max(extra - childrenPreemptable, 0)
      if (Resources.greaterThanOrEqual(rc, totalPartitionResource,
          childrensPreemptable, extra)) {
        untouchableExtra = Resource.newInstance(0, 0);
      } else {
        untouchableExtra = Resources.subtract(extra, childrensPreemptable);
      }
      preemptableExtra = Resources.min(rc, totalPartitionResource,
          childrensPreemptable, extra);
    }
  }

  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
        .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
        .append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
        .append(Arrays.toString(normalizedGuarantee))
        .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
        .append(" IDEAL_PREEMPT: ").append(toBePreempted)
        .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
        .append(" UNTOUCHABLE: ").append(untouchableExtra)
        .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");

    return sb.toString();
  }

  public void assignPreemption(float scalingFactor, ResourceCalculator rc,
      Resource clusterResource) {
    Resource usedDeductKillable = Resources.subtract(getUsed(), killable);
    Resource totalResource = Resources.add(getUsed(), pending);

    // The minimum resource that we need to keep for a queue is:
    // max(idealAssigned, min(used + pending, guaranteed)).
    //
    // Doing this because when we calculate ideal allocation doesn't consider
    // reserved resource, ideal-allocation calculated could be less than
    // guaranteed and total. We should avoid preempt from a queue if it is
    // already
    // <= its guaranteed resource.
    Resource minimumQueueResource = Resources.max(rc, clusterResource,
        Resources.min(rc, clusterResource, totalResource, getGuaranteed()),
        idealAssigned);

    if (Resources.greaterThan(rc, clusterResource, usedDeductKillable,
        minimumQueueResource)) {
      toBePreempted = Resources.multiply(
          Resources.subtract(usedDeductKillable, minimumQueueResource),
          scalingFactor);
    } else {
      toBePreempted = Resources.none();
    }
  }

  public void deductActuallyToBePreempted(ResourceCalculator rc,
      Resource cluster, Resource toBeDeduct) {
    if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(),
        toBeDeduct)) {
      Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
    }
    setActuallyToBePreempted(Resources.max(rc, cluster,
        getActuallyToBePreempted(), Resources.none()));
  }

  void appendLogString(StringBuilder sb) {
    sb.append(queueName).append(", ").append(current.getMemorySize())
        .append(", ").append(current.getVirtualCores()).append(", ")
        .append(pending.getMemorySize()).append(", ")
        .append(pending.getVirtualCores()).append(", ")
        .append(getGuaranteed().getMemorySize()).append(", ")
        .append(getGuaranteed().getVirtualCores()).append(", ")
        .append(idealAssigned.getMemorySize()).append(", ")
        .append(idealAssigned.getVirtualCores()).append(", ")
        .append(toBePreempted.getMemorySize()).append(", ")
        .append(toBePreempted.getVirtualCores()).append(", ")
        .append(getActuallyToBePreempted().getMemorySize()).append(", ")
        .append(getActuallyToBePreempted().getVirtualCores());
  }

  public void addAllApps(Collection<TempAppPerPartition> orderedApps) {
    this.apps = orderedApps;
  }

  public Collection<TempAppPerPartition> getApps() {
    return apps;
  }

  public void addUserPerPartition(String userName,
      TempUserPerPartition tmpUser) {
    this.usersPerPartition.put(userName, tmpUser);
  }

  public Map<String, TempUserPerPartition> getUsersPerPartition() {
    return usersPerPartition;
  }

  public void setPending(Resource pending) {
    this.pending = pending;
  }

  public Resource getIdealAssigned() {
    return idealAssigned;
  }

  public String toGlobalString() {
    StringBuilder sb = new StringBuilder();
    sb.append("\n").append(toString());
    for (TempQueuePerPartition c : children) {
      sb.append(c.toGlobalString());
    }
    return sb.toString();
  }

  /**
   * This method is visible to allow sub-classes to override the behavior,
   * specifically to take into account locality-based limitations of how much
   * the queue can consumed.
   *
   * @param rc the ResourceCalculator to be used.
   * @param offered the input amount of Resource offered to this queue.
   *
   * @return  the subset of Resource(s) that the queue can consumed after
   *          accounting for locality effects.
   */
  protected Resource acceptedByLocality(ResourceCalculator rc,
      Resource offered) {
    return offered;
  }

  /**
   * This method is visible to allow sub-classes to override the behavior,
   * specifically for federation purposes we do not want to cap resources as it
   * is done here.
   *
   * @param rc the {@code ResourceCalculator} to be used
   * @param clusterResource the total cluster resources
   * @param offered the resources offered to this queue
   * @return the amount of resources accepted after considering max and
   *         deducting assigned.
   */
  protected Resource filterByMaxDeductAssigned(ResourceCalculator rc,
      Resource clusterResource, Resource offered) {
    if (null == children || children.isEmpty()) {
      Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract(
          Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
          idealAssigned);
      maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource,
          maxOfGuranteedAndUsedDeductAssigned, Resources.none());
      offered = Resources.min(rc, clusterResource, offered,
          maxOfGuranteedAndUsedDeductAssigned);
    }
    return offered;
  }

  /**
   * This method is visible to allow sub-classes to ovverride the behavior,
   * specifically for federation purposes we need to initialize per-sub-cluster
   * roots as well as the global one.
   */
  protected void initializeRootIdealWithGuarangeed() {
    idealAssigned = Resources.clone(getGuaranteed());
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractPreemptableResourceCalculator 源码

hadoop AbstractPreemptionEntity 源码

hadoop CapacitySchedulerPreemptionContext 源码

hadoop CapacitySchedulerPreemptionUtils 源码

hadoop FifoCandidatesSelector 源码

hadoop FifoIntraQueuePreemptionPlugin 源码

hadoop IntraQueueCandidatesSelector 源码

hadoop IntraQueuePreemptionComputePlugin 源码

hadoop PreemptableResourceCalculator 源码

hadoop PreemptionCandidatesSelector 源码

0  赞