hadoop SLSSchedulerCommons 源码
haddop SLSSchedulerCommons 代码
文件路径:/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSSchedulerCommons.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.scheduler;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class SLSSchedulerCommons {
private static final Logger LOG = LoggerFactory.getLogger(SLSSchedulerCommons.class);
private final AbstractYarnScheduler<?, ?> scheduler;
private boolean metricsON;
private SchedulerMetrics schedulerMetrics;
private final Map<ContainerId, Resource> preemptionContainerMap = new ConcurrentHashMap<>();
private final Map<ApplicationAttemptId, String> appQueueMap = new ConcurrentHashMap<>();
private final Tracker tracker;
public SLSSchedulerCommons(AbstractYarnScheduler<?, ?> scheduler) {
this.scheduler = scheduler;
this.tracker = new Tracker();
}
public void initMetrics(Class<? extends AbstractYarnScheduler<?, ?>> schedulerClass, Configuration conf) {
metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
if (metricsON) {
try {
schedulerMetrics = SchedulerMetrics.getInstance(conf, schedulerClass);
schedulerMetrics.init(scheduler, conf);
} catch (Exception e) {
LOG.error("Caught exception while initializing schedulerMetrics", e);
}
}
}
void stopMetrics() {
try {
if (metricsON) {
schedulerMetrics.tearDown();
}
} catch (Exception e) {
LOG.error("Caught exception while stopping service", e);
}
}
public Allocation allocate(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<SchedulingRequest> schedulingRequests,
List<ContainerId> containerIds,
List<String> blacklistAdditions,
List<String> blacklistRemovals,
ContainerUpdates updateRequests) {
if (metricsON) {
final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
.time();
Allocation allocation = null;
try {
allocation = ((SchedulerWrapper)scheduler).allocatePropagated(
attemptId, resourceRequests,
schedulingRequests, containerIds,
blacklistAdditions, blacklistRemovals, updateRequests);
return allocation;
} catch (Exception e) {
LOG.error("Caught exception from allocate", e);
throw e;
} finally {
context.stop();
schedulerMetrics.increaseSchedulerAllocationCounter();
try {
updateQueueWithAllocateRequest(allocation, attemptId,
resourceRequests, containerIds);
} catch (IOException e) {
LOG.error("Caught exception while executing finally block", e);
}
}
} else {
return ((SchedulerWrapper)scheduler).allocatePropagated(
attemptId, resourceRequests, schedulingRequests,
containerIds,
blacklistAdditions, blacklistRemovals, updateRequests);
}
}
private void updateQueueWithAllocateRequest(Allocation allocation,
ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests,
List<ContainerId> containerIds) throws IOException {
// update queue information
Resource pendingResource = Resources.createResource(0, 0);
Resource allocatedResource = Resources.createResource(0, 0);
String queueName = appQueueMap.get(attemptId);
// container requested
for (ResourceRequest request : resourceRequests) {
if (request.getResourceName().equals(ResourceRequest.ANY)) {
Resources.addTo(pendingResource,
Resources.multiply(request.getCapability(),
request.getNumContainers()));
}
}
// container allocated
for (Container container : allocation.getContainers()) {
Resources.addTo(allocatedResource, container.getResource());
Resources.subtractFrom(pendingResource, container.getResource());
}
// container released from AM
SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId);
for (ContainerId containerId : containerIds) {
Container container = null;
for (RMContainer c : report.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
// released allocated containers
Resources.subtractFrom(allocatedResource, container.getResource());
} else {
for (RMContainer c : report.getReservedContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
// released reserved containers
Resources.subtractFrom(pendingResource, container.getResource());
}
}
}
// containers released/preemption from scheduler
Set<ContainerId> preemptionContainers = new HashSet<>();
if (allocation.getContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getContainerPreemptions());
}
if (allocation.getStrictContainerPreemptions() != null) {
preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
}
if (!preemptionContainers.isEmpty()) {
for (ContainerId containerId : preemptionContainers) {
if (!preemptionContainerMap.containsKey(containerId)) {
Container container = null;
for (RMContainer c : report.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
container = c.getContainer();
break;
}
}
if (container != null) {
preemptionContainerMap.put(containerId, container.getResource());
}
}
}
}
// update metrics
schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
queueName);
}
public void handle(SchedulerEvent schedulerEvent) {
SchedulerWrapper wrapper = (SchedulerWrapper) scheduler;
if (!metricsON) {
((SchedulerWrapper)scheduler).propagatedHandle(schedulerEvent);
return;
}
if (!schedulerMetrics.isRunning()) {
schedulerMetrics.setRunning(true);
}
Timer.Context handlerTimer = null;
Timer.Context operationTimer = null;
NodeUpdateSchedulerEventWrapper eventWrapper;
try {
if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
&& schedulerEvent instanceof NodeUpdateSchedulerEvent) {
eventWrapper = new NodeUpdateSchedulerEventWrapper(
(NodeUpdateSchedulerEvent) schedulerEvent);
schedulerEvent = eventWrapper;
updateQueueWithNodeUpdate(eventWrapper);
} else if (
schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
// check if having AM Container, update resource usage information
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
ApplicationAttemptId appAttemptId =
appRemoveEvent.getApplicationAttemptID();
String queue = appQueueMap.get(appAttemptId);
SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
if (!app.getLiveContainers().isEmpty()) { // have 0 or 1
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
schedulerMetrics.updateQueueMetricsByRelease(
rmc.getContainer().getResource(), queue);
}
}
handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
operationTimer = schedulerMetrics.getSchedulerHandleTimer(
schedulerEvent.getType()).time();
((SchedulerWrapper)scheduler).propagatedHandle(schedulerEvent);
} finally {
if (handlerTimer != null) {
handlerTimer.stop();
}
if (operationTimer != null) {
operationTimer.stop();
}
schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
wrapper.getSLSRunner().decreaseRemainingApps();
AppAttemptRemovedSchedulerEvent appRemoveEvent =
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
if (wrapper.getSLSRunner().getRemainingApps() == 0) {
try {
schedulerMetrics.tearDown();
SLSRunner.exitSLSRunner();
} catch (Exception e) {
LOG.error("Scheduler Metrics failed to tear down.", e);
}
}
} else if (schedulerEvent.getType() ==
SchedulerEventType.APP_ATTEMPT_ADDED
&& schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
AppAttemptAddedSchedulerEvent appAddEvent =
(AppAttemptAddedSchedulerEvent) schedulerEvent;
SchedulerApplication app =
scheduler.getSchedulerApplications()
.get(appAddEvent.getApplicationAttemptId().getApplicationId());
appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
.getQueueName());
}
}
}
private void updateQueueWithNodeUpdate(
NodeUpdateSchedulerEventWrapper eventWrapper) {
RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
for (UpdatedContainerInfo info : containerList) {
for (ContainerStatus status : info.getCompletedContainers()) {
ContainerId containerId = status.getContainerId();
SchedulerAppReport app = scheduler.getSchedulerAppInfo(
containerId.getApplicationAttemptId());
if (app == null) {
// this happens for the AM container
// The app have already removed when the NM sends the release
// information.
continue;
}
int releasedMemory = 0, releasedVCores = 0;
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
Resource resource = rmc.getContainer().getResource();
releasedMemory += resource.getMemorySize();
releasedVCores += resource.getVirtualCores();
break;
}
}
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemorySize();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
}
// update queue counters
String queue = appQueueMap.get(containerId.getApplicationAttemptId());
schedulerMetrics.updateQueueMetricsByRelease(
Resource.newInstance(releasedMemory, releasedVCores), queue);
}
}
}
public SchedulerMetrics getSchedulerMetrics() {
return schedulerMetrics;
}
public boolean isMetricsON() {
return metricsON;
}
public Tracker getTracker() {
return tracker;
}
}
相关信息
相关文章
hadoop CapacitySchedulerMetrics 源码
hadoop FairSchedulerMetrics 源码
hadoop FifoSchedulerMetrics 源码
hadoop NodeUpdateSchedulerEventWrapper 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦