1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
19 
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 
29 import org.apache.commons.lang.time.DateUtils;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience.Private;
33 import org.apache.hadoop.classification.InterfaceStability.Stable;
34 import org.apache.hadoop.classification.InterfaceStability.Unstable;
35 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
36 import org.apache.hadoop.yarn.api.records.ApplicationId;
37 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
38 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
39 import org.apache.hadoop.yarn.api.records.Container;
40 import org.apache.hadoop.yarn.api.records.ContainerId;
41 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
42 import org.apache.hadoop.yarn.api.records.NMToken;
43 import org.apache.hadoop.yarn.api.records.NodeId;
44 import org.apache.hadoop.yarn.api.records.Priority;
45 import org.apache.hadoop.yarn.api.records.Resource;
46 import org.apache.hadoop.yarn.api.records.ResourceRequest;
47 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
48 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
49 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
50 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
51 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
52 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
53 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
54 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
55 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
56 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
57 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
58 import org.apache.hadoop.yarn.util.resource.Resources;
59 
60 import com.google.common.base.Preconditions;
61 import com.google.common.collect.HashMultiset;
62 import com.google.common.collect.Multiset;
63 
64 /**
65  * Represents an application attempt from the viewpoint of the scheduler.
66  * Each running app attempt in the RM corresponds to one instance
67  * of this class.
68  */
69 @Private
70 @Unstable
71 public class SchedulerApplicationAttempt {
72 
73   private static final Log LOG = LogFactory
74     .getLog(SchedulerApplicationAttempt.class);
75 
76   private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000;
77   protected long lastMemoryAggregateAllocationUpdateTime = 0;
78   private long lastMemorySeconds = 0;
79   private long lastVcoreSeconds = 0;
80 
81   protected final AppSchedulingInfo appSchedulingInfo;
82   protected ApplicationAttemptId attemptId;
83   protected Map<ContainerId, RMContainer> liveContainers =
84       new HashMap<ContainerId, RMContainer>();
85   protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
86       new HashMap<Priority, Map<NodeId, RMContainer>>();
87 
88   private final Multiset<Priority> reReservations = HashMultiset.create();
89 
90   protected final Resource currentReservation = Resource.newInstance(0, 0);
91   private Resource resourceLimit = Resource.newInstance(0, 0);
92   protected Resource currentConsumption = Resource.newInstance(0, 0);
93   private Resource amResource = Resources.none();
94   private boolean unmanagedAM = true;
95   private boolean amRunning = false;
96   private LogAggregationContext logAggregationContext;
97 
98   protected List<RMContainer> newlyAllocatedContainers =
99       new ArrayList<RMContainer>();
100 
101   // This pendingRelease is used in work-preserving recovery scenario to keep
102   // track of the AM's outstanding release requests. RM on recovery could
103   // receive the release request form AM before it receives the container status
104   // from NM for recovery. In this case, the to-be-recovered containers reported
105   // by NM should not be recovered.
106   private Set<ContainerId> pendingRelease = null;
107 
108   /**
109    * Count how many times the application has been given an opportunity
110    * to schedule a task at each priority. Each time the scheduler
111    * asks the application for a task at this priority, it is incremented,
112    * and each time the application successfully schedules a task, it
113    * is reset to 0.
114    */
115   Multiset<Priority> schedulingOpportunities = HashMultiset.create();
116 
117   // Time of the last container scheduled at the current allowed level
118   protected Map<Priority, Long> lastScheduledContainer =
119       new HashMap<Priority, Long>();
120 
121   protected Queue queue;
122   protected boolean isStopped = false;
123 
124   protected final RMContext rmContext;
125 
SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext)126   public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
127       String user, Queue queue, ActiveUsersManager activeUsersManager,
128       RMContext rmContext) {
129     Preconditions.checkNotNull(rmContext, "RMContext should not be null");
130     this.rmContext = rmContext;
131     this.appSchedulingInfo =
132         new AppSchedulingInfo(applicationAttemptId, user, queue,
133             activeUsersManager, rmContext.getEpoch());
134     this.queue = queue;
135     this.pendingRelease = new HashSet<ContainerId>();
136     this.attemptId = applicationAttemptId;
137     if (rmContext.getRMApps() != null &&
138         rmContext.getRMApps()
139             .containsKey(applicationAttemptId.getApplicationId())) {
140       ApplicationSubmissionContext appSubmissionContext =
141           rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
142               .getApplicationSubmissionContext();
143       if (appSubmissionContext != null) {
144         unmanagedAM = appSubmissionContext.getUnmanagedAM();
145         this.logAggregationContext =
146             appSubmissionContext.getLogAggregationContext();
147       }
148     }
149   }
150 
151   /**
152    * Get the live containers of the application.
153    * @return live containers of the application
154    */
getLiveContainers()155   public synchronized Collection<RMContainer> getLiveContainers() {
156     return new ArrayList<RMContainer>(liveContainers.values());
157   }
158 
getAppSchedulingInfo()159   public AppSchedulingInfo getAppSchedulingInfo() {
160     return this.appSchedulingInfo;
161   }
162 
163   /**
164    * Is this application pending?
165    * @return true if it is else false.
166    */
isPending()167   public boolean isPending() {
168     return appSchedulingInfo.isPending();
169   }
170 
171   /**
172    * Get {@link ApplicationAttemptId} of the application master.
173    * @return <code>ApplicationAttemptId</code> of the application master
174    */
getApplicationAttemptId()175   public ApplicationAttemptId getApplicationAttemptId() {
176     return appSchedulingInfo.getApplicationAttemptId();
177   }
178 
getApplicationId()179   public ApplicationId getApplicationId() {
180     return appSchedulingInfo.getApplicationId();
181   }
182 
getUser()183   public String getUser() {
184     return appSchedulingInfo.getUser();
185   }
186 
getResourceRequests(Priority priority)187   public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
188     return appSchedulingInfo.getResourceRequests(priority);
189   }
190 
getPendingRelease()191   public Set<ContainerId> getPendingRelease() {
192     return this.pendingRelease;
193   }
194 
getNewContainerId()195   public long getNewContainerId() {
196     return appSchedulingInfo.getNewContainerId();
197   }
198 
getPriorities()199   public Collection<Priority> getPriorities() {
200     return appSchedulingInfo.getPriorities();
201   }
202 
getResourceRequest(Priority priority, String resourceName)203   public synchronized ResourceRequest getResourceRequest(Priority priority, String resourceName) {
204     return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
205   }
206 
getTotalRequiredResources(Priority priority)207   public synchronized int getTotalRequiredResources(Priority priority) {
208     return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
209   }
210 
getResource(Priority priority)211   public synchronized Resource getResource(Priority priority) {
212     return appSchedulingInfo.getResource(priority);
213   }
214 
getQueueName()215   public String getQueueName() {
216     return appSchedulingInfo.getQueueName();
217   }
218 
getAMResource()219   public Resource getAMResource() {
220     return amResource;
221   }
222 
setAMResource(Resource amResource)223   public void setAMResource(Resource amResource) {
224     this.amResource = amResource;
225   }
226 
isAmRunning()227   public boolean isAmRunning() {
228     return amRunning;
229   }
230 
setAmRunning(boolean bool)231   public void setAmRunning(boolean bool) {
232     amRunning = bool;
233   }
234 
getUnmanagedAM()235   public boolean getUnmanagedAM() {
236     return unmanagedAM;
237   }
238 
getRMContainer(ContainerId id)239   public synchronized RMContainer getRMContainer(ContainerId id) {
240     return liveContainers.get(id);
241   }
242 
resetReReservations(Priority priority)243   protected synchronized void resetReReservations(Priority priority) {
244     reReservations.setCount(priority, 0);
245   }
246 
addReReservation(Priority priority)247   protected synchronized void addReReservation(Priority priority) {
248     reReservations.add(priority);
249   }
250 
getReReservations(Priority priority)251   public synchronized int getReReservations(Priority priority) {
252     return reReservations.count(priority);
253   }
254 
255   /**
256    * Get total current reservations.
257    * Used only by unit tests
258    * @return total current reservations
259    */
260   @Stable
261   @Private
getCurrentReservation()262   public synchronized Resource getCurrentReservation() {
263     return currentReservation;
264   }
265 
getQueue()266   public Queue getQueue() {
267     return queue;
268   }
269 
updateResourceRequests( List<ResourceRequest> requests)270   public synchronized void updateResourceRequests(
271       List<ResourceRequest> requests) {
272     if (!isStopped) {
273       appSchedulingInfo.updateResourceRequests(requests, false);
274     }
275   }
276 
recoverResourceRequests( List<ResourceRequest> requests)277   public synchronized void recoverResourceRequests(
278       List<ResourceRequest> requests) {
279     if (!isStopped) {
280       appSchedulingInfo.updateResourceRequests(requests, true);
281     }
282   }
283 
stop(RMAppAttemptState rmAppAttemptFinalState)284   public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
285     // Cleanup all scheduling information
286     isStopped = true;
287     appSchedulingInfo.stop(rmAppAttemptFinalState);
288   }
289 
isStopped()290   public synchronized boolean isStopped() {
291     return isStopped;
292   }
293 
294   /**
295    * Get the list of reserved containers
296    * @return All of the reserved containers.
297    */
getReservedContainers()298   public synchronized List<RMContainer> getReservedContainers() {
299     List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
300     for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
301       this.reservedContainers.entrySet()) {
302       reservedContainers.addAll(e.getValue().values());
303     }
304     return reservedContainers;
305   }
306 
reserve(SchedulerNode node, Priority priority, RMContainer rmContainer, Container container)307   public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
308       RMContainer rmContainer, Container container) {
309     // Create RMContainer if necessary
310     if (rmContainer == null) {
311       rmContainer =
312           new RMContainerImpl(container, getApplicationAttemptId(),
313               node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
314 
315       Resources.addTo(currentReservation, container.getResource());
316 
317       // Reset the re-reservation count
318       resetReReservations(priority);
319     } else {
320       // Note down the re-reservation
321       addReReservation(priority);
322     }
323     rmContainer.handle(new RMContainerReservedEvent(container.getId(),
324         container.getResource(), node.getNodeID(), priority));
325 
326     Map<NodeId, RMContainer> reservedContainers =
327         this.reservedContainers.get(priority);
328     if (reservedContainers == null) {
329       reservedContainers = new HashMap<NodeId, RMContainer>();
330       this.reservedContainers.put(priority, reservedContainers);
331     }
332     reservedContainers.put(node.getNodeID(), rmContainer);
333 
334     if (LOG.isDebugEnabled()) {
335       LOG.debug("Application attempt " + getApplicationAttemptId()
336           + " reserved container " + rmContainer + " on node " + node
337           + ". This attempt currently has " + reservedContainers.size()
338           + " reserved containers at priority " + priority
339           + "; currentReservation " + currentReservation.getMemory());
340     }
341 
342     return rmContainer;
343   }
344 
345   /**
346    * Has the application reserved the given <code>node</code> at the
347    * given <code>priority</code>?
348    * @param node node to be checked
349    * @param priority priority of reserved container
350    * @return true is reserved, false if not
351    */
isReserved(SchedulerNode node, Priority priority)352   public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
353     Map<NodeId, RMContainer> reservedContainers =
354         this.reservedContainers.get(priority);
355     if (reservedContainers != null) {
356       return reservedContainers.containsKey(node.getNodeID());
357     }
358     return false;
359   }
360 
setHeadroom(Resource globalLimit)361   public synchronized void setHeadroom(Resource globalLimit) {
362     this.resourceLimit = globalLimit;
363   }
364 
365   /**
366    * Get available headroom in terms of resources for the application's user.
367    * @return available resource headroom
368    */
getHeadroom()369   public synchronized Resource getHeadroom() {
370     // Corner case to deal with applications being slightly over-limit
371     if (resourceLimit.getMemory() < 0) {
372       resourceLimit.setMemory(0);
373     }
374 
375     return resourceLimit;
376   }
377 
getNumReservedContainers(Priority priority)378   public synchronized int getNumReservedContainers(Priority priority) {
379     Map<NodeId, RMContainer> reservedContainers =
380         this.reservedContainers.get(priority);
381     return (reservedContainers == null) ? 0 : reservedContainers.size();
382   }
383 
384   @SuppressWarnings("unchecked")
containerLaunchedOnNode(ContainerId containerId, NodeId nodeId)385   public synchronized void containerLaunchedOnNode(ContainerId containerId,
386       NodeId nodeId) {
387     // Inform the container
388     RMContainer rmContainer = getRMContainer(containerId);
389     if (rmContainer == null) {
390       // Some unknown container sneaked into the system. Kill it.
391       rmContext.getDispatcher().getEventHandler()
392         .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
393       return;
394     }
395 
396     rmContainer.handle(new RMContainerEvent(containerId,
397         RMContainerEventType.LAUNCHED));
398   }
399 
showRequests()400   public synchronized void showRequests() {
401     if (LOG.isDebugEnabled()) {
402       for (Priority priority : getPriorities()) {
403         Map<String, ResourceRequest> requests = getResourceRequests(priority);
404         if (requests != null) {
405           LOG.debug("showRequests:" + " application=" + getApplicationId() +
406               " headRoom=" + getHeadroom() +
407               " currentConsumption=" + currentConsumption.getMemory());
408           for (ResourceRequest request : requests.values()) {
409             LOG.debug("showRequests:" + " application=" + getApplicationId()
410                 + " request=" + request);
411           }
412         }
413       }
414     }
415   }
416 
getCurrentConsumption()417   public Resource getCurrentConsumption() {
418     return currentConsumption;
419   }
420 
421   public static class ContainersAndNMTokensAllocation {
422     List<Container> containerList;
423     List<NMToken> nmTokenList;
424 
ContainersAndNMTokensAllocation(List<Container> containerList, List<NMToken> nmTokenList)425     public ContainersAndNMTokensAllocation(List<Container> containerList,
426         List<NMToken> nmTokenList) {
427       this.containerList = containerList;
428       this.nmTokenList = nmTokenList;
429     }
430 
getContainerList()431     public List<Container> getContainerList() {
432       return containerList;
433     }
434 
getNMTokenList()435     public List<NMToken> getNMTokenList() {
436       return nmTokenList;
437     }
438   }
439 
440   // Create container token and NMToken altogether, if either of them fails for
441   // some reason like DNS unavailable, do not return this container and keep it
442   // in the newlyAllocatedContainers waiting to be refetched.
443   public synchronized ContainersAndNMTokensAllocation
pullNewlyAllocatedContainersAndNMTokens()444       pullNewlyAllocatedContainersAndNMTokens() {
445     List<Container> returnContainerList =
446         new ArrayList<Container>(newlyAllocatedContainers.size());
447     List<NMToken> nmTokens = new ArrayList<NMToken>();
448     for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
449       .hasNext();) {
450       RMContainer rmContainer = i.next();
451       Container container = rmContainer.getContainer();
452       try {
453         // create container token and NMToken altogether.
454         container.setContainerToken(rmContext.getContainerTokenSecretManager()
455           .createContainerToken(container.getId(), container.getNodeId(),
456             getUser(), container.getResource(), container.getPriority(),
457             rmContainer.getCreationTime(), this.logAggregationContext));
458         NMToken nmToken =
459             rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
460               getApplicationAttemptId(), container);
461         if (nmToken != null) {
462           nmTokens.add(nmToken);
463         }
464       } catch (IllegalArgumentException e) {
465         // DNS might be down, skip returning this container.
466         LOG.error("Error trying to assign container token and NM token to" +
467             " an allocated container " + container.getId(), e);
468         continue;
469       }
470       returnContainerList.add(container);
471       i.remove();
472       rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
473         RMContainerEventType.ACQUIRED));
474     }
475     return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
476   }
477 
updateBlacklist( List<String> blacklistAdditions, List<String> blacklistRemovals)478   public synchronized void updateBlacklist(
479       List<String> blacklistAdditions, List<String> blacklistRemovals) {
480     if (!isStopped) {
481       this.appSchedulingInfo.updateBlacklist(
482           blacklistAdditions, blacklistRemovals);
483     }
484   }
485 
isBlacklisted(String resourceName)486   public boolean isBlacklisted(String resourceName) {
487     return this.appSchedulingInfo.isBlacklisted(resourceName);
488   }
489 
addSchedulingOpportunity(Priority priority)490   public synchronized void addSchedulingOpportunity(Priority priority) {
491     schedulingOpportunities.setCount(priority,
492         schedulingOpportunities.count(priority) + 1);
493   }
494 
subtractSchedulingOpportunity(Priority priority)495   public synchronized void subtractSchedulingOpportunity(Priority priority) {
496     int count = schedulingOpportunities.count(priority) - 1;
497     this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
498   }
499 
500   /**
501    * Return the number of times the application has been given an opportunity
502    * to schedule a task at the given priority since the last time it
503    * successfully did so.
504    */
getSchedulingOpportunities(Priority priority)505   public synchronized int getSchedulingOpportunities(Priority priority) {
506     return schedulingOpportunities.count(priority);
507   }
508 
509   /**
510    * Should be called when an application has successfully scheduled a container,
511    * or when the scheduling locality threshold is relaxed.
512    * Reset various internal counters which affect delay scheduling
513    *
514    * @param priority The priority of the container scheduled.
515    */
resetSchedulingOpportunities(Priority priority)516   public synchronized void resetSchedulingOpportunities(Priority priority) {
517     resetSchedulingOpportunities(priority, System.currentTimeMillis());
518   }
519   // used for continuous scheduling
resetSchedulingOpportunities(Priority priority, long currentTimeMs)520   public synchronized void resetSchedulingOpportunities(Priority priority,
521       long currentTimeMs) {
522     lastScheduledContainer.put(priority, currentTimeMs);
523     schedulingOpportunities.setCount(priority, 0);
524   }
525 
getRunningAggregateAppResourceUsage()526   synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
527     long currentTimeMillis = System.currentTimeMillis();
528     // Don't walk the whole container list if the resources were computed
529     // recently.
530     if ((currentTimeMillis - lastMemoryAggregateAllocationUpdateTime)
531         > MEM_AGGREGATE_ALLOCATION_CACHE_MSECS) {
532       long memorySeconds = 0;
533       long vcoreSeconds = 0;
534       for (RMContainer rmContainer : this.liveContainers.values()) {
535         long usedMillis = currentTimeMillis - rmContainer.getCreationTime();
536         Resource resource = rmContainer.getContainer().getResource();
537         memorySeconds += resource.getMemory() * usedMillis /
538             DateUtils.MILLIS_PER_SECOND;
539         vcoreSeconds += resource.getVirtualCores() * usedMillis
540             / DateUtils.MILLIS_PER_SECOND;
541       }
542 
543       lastMemoryAggregateAllocationUpdateTime = currentTimeMillis;
544       lastMemorySeconds = memorySeconds;
545       lastVcoreSeconds = vcoreSeconds;
546     }
547     return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds);
548   }
549 
getResourceUsageReport()550   public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
551     AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage();
552     return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
553                reservedContainers.size(), Resources.clone(currentConsumption),
554                Resources.clone(currentReservation),
555                Resources.add(currentConsumption, currentReservation),
556                resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
557   }
558 
getLiveContainersMap()559   public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
560     return this.liveContainers;
561   }
562 
getResourceLimit()563   public synchronized Resource getResourceLimit() {
564     return this.resourceLimit;
565   }
566 
getLastScheduledContainer()567   public synchronized Map<Priority, Long> getLastScheduledContainer() {
568     return this.lastScheduledContainer;
569   }
570 
transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt)571   public synchronized void transferStateFromPreviousAttempt(
572       SchedulerApplicationAttempt appAttempt) {
573     this.liveContainers = appAttempt.getLiveContainersMap();
574     // this.reReservations = appAttempt.reReservations;
575     this.currentConsumption = appAttempt.getCurrentConsumption();
576     this.resourceLimit = appAttempt.getResourceLimit();
577     // this.currentReservation = appAttempt.currentReservation;
578     // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
579     // this.schedulingOpportunities = appAttempt.schedulingOpportunities;
580     this.lastScheduledContainer = appAttempt.getLastScheduledContainer();
581     this.appSchedulingInfo
582       .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
583   }
584 
move(Queue newQueue)585   public synchronized void move(Queue newQueue) {
586     QueueMetrics oldMetrics = queue.getMetrics();
587     QueueMetrics newMetrics = newQueue.getMetrics();
588     String user = getUser();
589     for (RMContainer liveContainer : liveContainers.values()) {
590       Resource resource = liveContainer.getContainer().getResource();
591       oldMetrics.releaseResources(user, 1, resource);
592       newMetrics.allocateResources(user, 1, resource, false);
593     }
594     for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
595       for (RMContainer reservedContainer : map.values()) {
596         Resource resource = reservedContainer.getReservedResource();
597         oldMetrics.unreserveResource(user, resource);
598         newMetrics.reserveResource(user, resource);
599       }
600     }
601 
602     appSchedulingInfo.move(newQueue);
603     this.queue = newQueue;
604   }
605 
recoverContainer(RMContainer rmContainer)606   public synchronized void recoverContainer(RMContainer rmContainer) {
607     // recover app scheduling info
608     appSchedulingInfo.recoverContainer(rmContainer);
609 
610     if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
611       return;
612     }
613     LOG.info("SchedulerAttempt " + getApplicationAttemptId()
614       + " is recovering container " + rmContainer.getContainerId());
615     liveContainers.put(rmContainer.getContainerId(), rmContainer);
616     Resources.addTo(currentConsumption, rmContainer.getContainer()
617       .getResource());
618     // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
619     // is called.
620     // newlyAllocatedContainers.add(rmContainer);
621     // schedulingOpportunities
622     // lastScheduledContainer
623   }
624 
incNumAllocatedContainers(NodeType containerType, NodeType requestType)625   public void incNumAllocatedContainers(NodeType containerType,
626       NodeType requestType) {
627     RMAppAttempt attempt =
628         rmContext.getRMApps().get(attemptId.getApplicationId())
629           .getCurrentAppAttempt();
630     if (attempt != null) {
631       attempt.getRMAppAttemptMetrics().incNumAllocatedContainers(containerType,
632         requestType);
633     }
634   }
635 
setApplicationHeadroomForMetrics(Resource headroom)636   public void setApplicationHeadroomForMetrics(Resource headroom) {
637     RMAppAttempt attempt =
638         rmContext.getRMApps().get(attemptId.getApplicationId())
639             .getCurrentAppAttempt();
640     if (attempt != null) {
641       attempt.getRMAppAttemptMetrics().setApplicationAttemptHeadRoom(
642           Resources.clone(headroom));
643     }
644   }
645 
getBlacklistedNodes()646   public Set<String> getBlacklistedNodes() {
647     return this.appSchedulingInfo.getBlackListCopy();
648   }
649 }
650