1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.yarn.server.resourcemanager.scheduler; 19 20 import java.util.ArrayList; 21 import java.util.Collection; 22 import java.util.HashMap; 23 import java.util.HashSet; 24 import java.util.Iterator; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Set; 28 29 import org.apache.commons.lang.time.DateUtils; 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.hadoop.classification.InterfaceAudience.Private; 33 import org.apache.hadoop.classification.InterfaceStability.Stable; 34 import org.apache.hadoop.classification.InterfaceStability.Unstable; 35 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 36 import org.apache.hadoop.yarn.api.records.ApplicationId; 37 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; 38 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 39 import org.apache.hadoop.yarn.api.records.Container; 40 import org.apache.hadoop.yarn.api.records.ContainerId; 41 import org.apache.hadoop.yarn.api.records.LogAggregationContext; 42 import org.apache.hadoop.yarn.api.records.NMToken; 43 import org.apache.hadoop.yarn.api.records.NodeId; 44 import org.apache.hadoop.yarn.api.records.Priority; 45 import org.apache.hadoop.yarn.api.records.Resource; 46 import org.apache.hadoop.yarn.api.records.ResourceRequest; 47 import org.apache.hadoop.yarn.server.resourcemanager.RMContext; 48 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; 49 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; 50 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; 51 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; 52 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; 53 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; 54 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; 55 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; 56 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; 57 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; 58 import org.apache.hadoop.yarn.util.resource.Resources; 59 60 import com.google.common.base.Preconditions; 61 import com.google.common.collect.HashMultiset; 62 import com.google.common.collect.Multiset; 63 64 /** 65 * Represents an application attempt from the viewpoint of the scheduler. 66 * Each running app attempt in the RM corresponds to one instance 67 * of this class. 68 */ 69 @Private 70 @Unstable 71 public class SchedulerApplicationAttempt { 72 73 private static final Log LOG = LogFactory 74 .getLog(SchedulerApplicationAttempt.class); 75 76 private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000; 77 protected long lastMemoryAggregateAllocationUpdateTime = 0; 78 private long lastMemorySeconds = 0; 79 private long lastVcoreSeconds = 0; 80 81 protected final AppSchedulingInfo appSchedulingInfo; 82 protected ApplicationAttemptId attemptId; 83 protected Map<ContainerId, RMContainer> liveContainers = 84 new HashMap<ContainerId, RMContainer>(); 85 protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 86 new HashMap<Priority, Map<NodeId, RMContainer>>(); 87 88 private final Multiset<Priority> reReservations = HashMultiset.create(); 89 90 protected final Resource currentReservation = Resource.newInstance(0, 0); 91 private Resource resourceLimit = Resource.newInstance(0, 0); 92 protected Resource currentConsumption = Resource.newInstance(0, 0); 93 private Resource amResource = Resources.none(); 94 private boolean unmanagedAM = true; 95 private boolean amRunning = false; 96 private LogAggregationContext logAggregationContext; 97 98 protected List<RMContainer> newlyAllocatedContainers = 99 new ArrayList<RMContainer>(); 100 101 // This pendingRelease is used in work-preserving recovery scenario to keep 102 // track of the AM's outstanding release requests. RM on recovery could 103 // receive the release request form AM before it receives the container status 104 // from NM for recovery. In this case, the to-be-recovered containers reported 105 // by NM should not be recovered. 106 private Set<ContainerId> pendingRelease = null; 107 108 /** 109 * Count how many times the application has been given an opportunity 110 * to schedule a task at each priority. Each time the scheduler 111 * asks the application for a task at this priority, it is incremented, 112 * and each time the application successfully schedules a task, it 113 * is reset to 0. 114 */ 115 Multiset<Priority> schedulingOpportunities = HashMultiset.create(); 116 117 // Time of the last container scheduled at the current allowed level 118 protected Map<Priority, Long> lastScheduledContainer = 119 new HashMap<Priority, Long>(); 120 121 protected Queue queue; 122 protected boolean isStopped = false; 123 124 protected final RMContext rmContext; 125 SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext)126 public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, 127 String user, Queue queue, ActiveUsersManager activeUsersManager, 128 RMContext rmContext) { 129 Preconditions.checkNotNull(rmContext, "RMContext should not be null"); 130 this.rmContext = rmContext; 131 this.appSchedulingInfo = 132 new AppSchedulingInfo(applicationAttemptId, user, queue, 133 activeUsersManager, rmContext.getEpoch()); 134 this.queue = queue; 135 this.pendingRelease = new HashSet<ContainerId>(); 136 this.attemptId = applicationAttemptId; 137 if (rmContext.getRMApps() != null && 138 rmContext.getRMApps() 139 .containsKey(applicationAttemptId.getApplicationId())) { 140 ApplicationSubmissionContext appSubmissionContext = 141 rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) 142 .getApplicationSubmissionContext(); 143 if (appSubmissionContext != null) { 144 unmanagedAM = appSubmissionContext.getUnmanagedAM(); 145 this.logAggregationContext = 146 appSubmissionContext.getLogAggregationContext(); 147 } 148 } 149 } 150 151 /** 152 * Get the live containers of the application. 153 * @return live containers of the application 154 */ getLiveContainers()155 public synchronized Collection<RMContainer> getLiveContainers() { 156 return new ArrayList<RMContainer>(liveContainers.values()); 157 } 158 getAppSchedulingInfo()159 public AppSchedulingInfo getAppSchedulingInfo() { 160 return this.appSchedulingInfo; 161 } 162 163 /** 164 * Is this application pending? 165 * @return true if it is else false. 166 */ isPending()167 public boolean isPending() { 168 return appSchedulingInfo.isPending(); 169 } 170 171 /** 172 * Get {@link ApplicationAttemptId} of the application master. 173 * @return <code>ApplicationAttemptId</code> of the application master 174 */ getApplicationAttemptId()175 public ApplicationAttemptId getApplicationAttemptId() { 176 return appSchedulingInfo.getApplicationAttemptId(); 177 } 178 getApplicationId()179 public ApplicationId getApplicationId() { 180 return appSchedulingInfo.getApplicationId(); 181 } 182 getUser()183 public String getUser() { 184 return appSchedulingInfo.getUser(); 185 } 186 getResourceRequests(Priority priority)187 public Map<String, ResourceRequest> getResourceRequests(Priority priority) { 188 return appSchedulingInfo.getResourceRequests(priority); 189 } 190 getPendingRelease()191 public Set<ContainerId> getPendingRelease() { 192 return this.pendingRelease; 193 } 194 getNewContainerId()195 public long getNewContainerId() { 196 return appSchedulingInfo.getNewContainerId(); 197 } 198 getPriorities()199 public Collection<Priority> getPriorities() { 200 return appSchedulingInfo.getPriorities(); 201 } 202 getResourceRequest(Priority priority, String resourceName)203 public synchronized ResourceRequest getResourceRequest(Priority priority, String resourceName) { 204 return this.appSchedulingInfo.getResourceRequest(priority, resourceName); 205 } 206 getTotalRequiredResources(Priority priority)207 public synchronized int getTotalRequiredResources(Priority priority) { 208 return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers(); 209 } 210 getResource(Priority priority)211 public synchronized Resource getResource(Priority priority) { 212 return appSchedulingInfo.getResource(priority); 213 } 214 getQueueName()215 public String getQueueName() { 216 return appSchedulingInfo.getQueueName(); 217 } 218 getAMResource()219 public Resource getAMResource() { 220 return amResource; 221 } 222 setAMResource(Resource amResource)223 public void setAMResource(Resource amResource) { 224 this.amResource = amResource; 225 } 226 isAmRunning()227 public boolean isAmRunning() { 228 return amRunning; 229 } 230 setAmRunning(boolean bool)231 public void setAmRunning(boolean bool) { 232 amRunning = bool; 233 } 234 getUnmanagedAM()235 public boolean getUnmanagedAM() { 236 return unmanagedAM; 237 } 238 getRMContainer(ContainerId id)239 public synchronized RMContainer getRMContainer(ContainerId id) { 240 return liveContainers.get(id); 241 } 242 resetReReservations(Priority priority)243 protected synchronized void resetReReservations(Priority priority) { 244 reReservations.setCount(priority, 0); 245 } 246 addReReservation(Priority priority)247 protected synchronized void addReReservation(Priority priority) { 248 reReservations.add(priority); 249 } 250 getReReservations(Priority priority)251 public synchronized int getReReservations(Priority priority) { 252 return reReservations.count(priority); 253 } 254 255 /** 256 * Get total current reservations. 257 * Used only by unit tests 258 * @return total current reservations 259 */ 260 @Stable 261 @Private getCurrentReservation()262 public synchronized Resource getCurrentReservation() { 263 return currentReservation; 264 } 265 getQueue()266 public Queue getQueue() { 267 return queue; 268 } 269 updateResourceRequests( List<ResourceRequest> requests)270 public synchronized void updateResourceRequests( 271 List<ResourceRequest> requests) { 272 if (!isStopped) { 273 appSchedulingInfo.updateResourceRequests(requests, false); 274 } 275 } 276 recoverResourceRequests( List<ResourceRequest> requests)277 public synchronized void recoverResourceRequests( 278 List<ResourceRequest> requests) { 279 if (!isStopped) { 280 appSchedulingInfo.updateResourceRequests(requests, true); 281 } 282 } 283 stop(RMAppAttemptState rmAppAttemptFinalState)284 public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { 285 // Cleanup all scheduling information 286 isStopped = true; 287 appSchedulingInfo.stop(rmAppAttemptFinalState); 288 } 289 isStopped()290 public synchronized boolean isStopped() { 291 return isStopped; 292 } 293 294 /** 295 * Get the list of reserved containers 296 * @return All of the reserved containers. 297 */ getReservedContainers()298 public synchronized List<RMContainer> getReservedContainers() { 299 List<RMContainer> reservedContainers = new ArrayList<RMContainer>(); 300 for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 301 this.reservedContainers.entrySet()) { 302 reservedContainers.addAll(e.getValue().values()); 303 } 304 return reservedContainers; 305 } 306 reserve(SchedulerNode node, Priority priority, RMContainer rmContainer, Container container)307 public synchronized RMContainer reserve(SchedulerNode node, Priority priority, 308 RMContainer rmContainer, Container container) { 309 // Create RMContainer if necessary 310 if (rmContainer == null) { 311 rmContainer = 312 new RMContainerImpl(container, getApplicationAttemptId(), 313 node.getNodeID(), appSchedulingInfo.getUser(), rmContext); 314 315 Resources.addTo(currentReservation, container.getResource()); 316 317 // Reset the re-reservation count 318 resetReReservations(priority); 319 } else { 320 // Note down the re-reservation 321 addReReservation(priority); 322 } 323 rmContainer.handle(new RMContainerReservedEvent(container.getId(), 324 container.getResource(), node.getNodeID(), priority)); 325 326 Map<NodeId, RMContainer> reservedContainers = 327 this.reservedContainers.get(priority); 328 if (reservedContainers == null) { 329 reservedContainers = new HashMap<NodeId, RMContainer>(); 330 this.reservedContainers.put(priority, reservedContainers); 331 } 332 reservedContainers.put(node.getNodeID(), rmContainer); 333 334 if (LOG.isDebugEnabled()) { 335 LOG.debug("Application attempt " + getApplicationAttemptId() 336 + " reserved container " + rmContainer + " on node " + node 337 + ". This attempt currently has " + reservedContainers.size() 338 + " reserved containers at priority " + priority 339 + "; currentReservation " + currentReservation.getMemory()); 340 } 341 342 return rmContainer; 343 } 344 345 /** 346 * Has the application reserved the given <code>node</code> at the 347 * given <code>priority</code>? 348 * @param node node to be checked 349 * @param priority priority of reserved container 350 * @return true is reserved, false if not 351 */ isReserved(SchedulerNode node, Priority priority)352 public synchronized boolean isReserved(SchedulerNode node, Priority priority) { 353 Map<NodeId, RMContainer> reservedContainers = 354 this.reservedContainers.get(priority); 355 if (reservedContainers != null) { 356 return reservedContainers.containsKey(node.getNodeID()); 357 } 358 return false; 359 } 360 setHeadroom(Resource globalLimit)361 public synchronized void setHeadroom(Resource globalLimit) { 362 this.resourceLimit = globalLimit; 363 } 364 365 /** 366 * Get available headroom in terms of resources for the application's user. 367 * @return available resource headroom 368 */ getHeadroom()369 public synchronized Resource getHeadroom() { 370 // Corner case to deal with applications being slightly over-limit 371 if (resourceLimit.getMemory() < 0) { 372 resourceLimit.setMemory(0); 373 } 374 375 return resourceLimit; 376 } 377 getNumReservedContainers(Priority priority)378 public synchronized int getNumReservedContainers(Priority priority) { 379 Map<NodeId, RMContainer> reservedContainers = 380 this.reservedContainers.get(priority); 381 return (reservedContainers == null) ? 0 : reservedContainers.size(); 382 } 383 384 @SuppressWarnings("unchecked") containerLaunchedOnNode(ContainerId containerId, NodeId nodeId)385 public synchronized void containerLaunchedOnNode(ContainerId containerId, 386 NodeId nodeId) { 387 // Inform the container 388 RMContainer rmContainer = getRMContainer(containerId); 389 if (rmContainer == null) { 390 // Some unknown container sneaked into the system. Kill it. 391 rmContext.getDispatcher().getEventHandler() 392 .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); 393 return; 394 } 395 396 rmContainer.handle(new RMContainerEvent(containerId, 397 RMContainerEventType.LAUNCHED)); 398 } 399 showRequests()400 public synchronized void showRequests() { 401 if (LOG.isDebugEnabled()) { 402 for (Priority priority : getPriorities()) { 403 Map<String, ResourceRequest> requests = getResourceRequests(priority); 404 if (requests != null) { 405 LOG.debug("showRequests:" + " application=" + getApplicationId() + 406 " headRoom=" + getHeadroom() + 407 " currentConsumption=" + currentConsumption.getMemory()); 408 for (ResourceRequest request : requests.values()) { 409 LOG.debug("showRequests:" + " application=" + getApplicationId() 410 + " request=" + request); 411 } 412 } 413 } 414 } 415 } 416 getCurrentConsumption()417 public Resource getCurrentConsumption() { 418 return currentConsumption; 419 } 420 421 public static class ContainersAndNMTokensAllocation { 422 List<Container> containerList; 423 List<NMToken> nmTokenList; 424 ContainersAndNMTokensAllocation(List<Container> containerList, List<NMToken> nmTokenList)425 public ContainersAndNMTokensAllocation(List<Container> containerList, 426 List<NMToken> nmTokenList) { 427 this.containerList = containerList; 428 this.nmTokenList = nmTokenList; 429 } 430 getContainerList()431 public List<Container> getContainerList() { 432 return containerList; 433 } 434 getNMTokenList()435 public List<NMToken> getNMTokenList() { 436 return nmTokenList; 437 } 438 } 439 440 // Create container token and NMToken altogether, if either of them fails for 441 // some reason like DNS unavailable, do not return this container and keep it 442 // in the newlyAllocatedContainers waiting to be refetched. 443 public synchronized ContainersAndNMTokensAllocation pullNewlyAllocatedContainersAndNMTokens()444 pullNewlyAllocatedContainersAndNMTokens() { 445 List<Container> returnContainerList = 446 new ArrayList<Container>(newlyAllocatedContainers.size()); 447 List<NMToken> nmTokens = new ArrayList<NMToken>(); 448 for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i 449 .hasNext();) { 450 RMContainer rmContainer = i.next(); 451 Container container = rmContainer.getContainer(); 452 try { 453 // create container token and NMToken altogether. 454 container.setContainerToken(rmContext.getContainerTokenSecretManager() 455 .createContainerToken(container.getId(), container.getNodeId(), 456 getUser(), container.getResource(), container.getPriority(), 457 rmContainer.getCreationTime(), this.logAggregationContext)); 458 NMToken nmToken = 459 rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), 460 getApplicationAttemptId(), container); 461 if (nmToken != null) { 462 nmTokens.add(nmToken); 463 } 464 } catch (IllegalArgumentException e) { 465 // DNS might be down, skip returning this container. 466 LOG.error("Error trying to assign container token and NM token to" + 467 " an allocated container " + container.getId(), e); 468 continue; 469 } 470 returnContainerList.add(container); 471 i.remove(); 472 rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), 473 RMContainerEventType.ACQUIRED)); 474 } 475 return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens); 476 } 477 updateBlacklist( List<String> blacklistAdditions, List<String> blacklistRemovals)478 public synchronized void updateBlacklist( 479 List<String> blacklistAdditions, List<String> blacklistRemovals) { 480 if (!isStopped) { 481 this.appSchedulingInfo.updateBlacklist( 482 blacklistAdditions, blacklistRemovals); 483 } 484 } 485 isBlacklisted(String resourceName)486 public boolean isBlacklisted(String resourceName) { 487 return this.appSchedulingInfo.isBlacklisted(resourceName); 488 } 489 addSchedulingOpportunity(Priority priority)490 public synchronized void addSchedulingOpportunity(Priority priority) { 491 schedulingOpportunities.setCount(priority, 492 schedulingOpportunities.count(priority) + 1); 493 } 494 subtractSchedulingOpportunity(Priority priority)495 public synchronized void subtractSchedulingOpportunity(Priority priority) { 496 int count = schedulingOpportunities.count(priority) - 1; 497 this.schedulingOpportunities.setCount(priority, Math.max(count, 0)); 498 } 499 500 /** 501 * Return the number of times the application has been given an opportunity 502 * to schedule a task at the given priority since the last time it 503 * successfully did so. 504 */ getSchedulingOpportunities(Priority priority)505 public synchronized int getSchedulingOpportunities(Priority priority) { 506 return schedulingOpportunities.count(priority); 507 } 508 509 /** 510 * Should be called when an application has successfully scheduled a container, 511 * or when the scheduling locality threshold is relaxed. 512 * Reset various internal counters which affect delay scheduling 513 * 514 * @param priority The priority of the container scheduled. 515 */ resetSchedulingOpportunities(Priority priority)516 public synchronized void resetSchedulingOpportunities(Priority priority) { 517 resetSchedulingOpportunities(priority, System.currentTimeMillis()); 518 } 519 // used for continuous scheduling resetSchedulingOpportunities(Priority priority, long currentTimeMs)520 public synchronized void resetSchedulingOpportunities(Priority priority, 521 long currentTimeMs) { 522 lastScheduledContainer.put(priority, currentTimeMs); 523 schedulingOpportunities.setCount(priority, 0); 524 } 525 getRunningAggregateAppResourceUsage()526 synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { 527 long currentTimeMillis = System.currentTimeMillis(); 528 // Don't walk the whole container list if the resources were computed 529 // recently. 530 if ((currentTimeMillis - lastMemoryAggregateAllocationUpdateTime) 531 > MEM_AGGREGATE_ALLOCATION_CACHE_MSECS) { 532 long memorySeconds = 0; 533 long vcoreSeconds = 0; 534 for (RMContainer rmContainer : this.liveContainers.values()) { 535 long usedMillis = currentTimeMillis - rmContainer.getCreationTime(); 536 Resource resource = rmContainer.getContainer().getResource(); 537 memorySeconds += resource.getMemory() * usedMillis / 538 DateUtils.MILLIS_PER_SECOND; 539 vcoreSeconds += resource.getVirtualCores() * usedMillis 540 / DateUtils.MILLIS_PER_SECOND; 541 } 542 543 lastMemoryAggregateAllocationUpdateTime = currentTimeMillis; 544 lastMemorySeconds = memorySeconds; 545 lastVcoreSeconds = vcoreSeconds; 546 } 547 return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds); 548 } 549 getResourceUsageReport()550 public synchronized ApplicationResourceUsageReport getResourceUsageReport() { 551 AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage(); 552 return ApplicationResourceUsageReport.newInstance(liveContainers.size(), 553 reservedContainers.size(), Resources.clone(currentConsumption), 554 Resources.clone(currentReservation), 555 Resources.add(currentConsumption, currentReservation), 556 resUsage.getMemorySeconds(), resUsage.getVcoreSeconds()); 557 } 558 getLiveContainersMap()559 public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() { 560 return this.liveContainers; 561 } 562 getResourceLimit()563 public synchronized Resource getResourceLimit() { 564 return this.resourceLimit; 565 } 566 getLastScheduledContainer()567 public synchronized Map<Priority, Long> getLastScheduledContainer() { 568 return this.lastScheduledContainer; 569 } 570 transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt)571 public synchronized void transferStateFromPreviousAttempt( 572 SchedulerApplicationAttempt appAttempt) { 573 this.liveContainers = appAttempt.getLiveContainersMap(); 574 // this.reReservations = appAttempt.reReservations; 575 this.currentConsumption = appAttempt.getCurrentConsumption(); 576 this.resourceLimit = appAttempt.getResourceLimit(); 577 // this.currentReservation = appAttempt.currentReservation; 578 // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers; 579 // this.schedulingOpportunities = appAttempt.schedulingOpportunities; 580 this.lastScheduledContainer = appAttempt.getLastScheduledContainer(); 581 this.appSchedulingInfo 582 .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo); 583 } 584 move(Queue newQueue)585 public synchronized void move(Queue newQueue) { 586 QueueMetrics oldMetrics = queue.getMetrics(); 587 QueueMetrics newMetrics = newQueue.getMetrics(); 588 String user = getUser(); 589 for (RMContainer liveContainer : liveContainers.values()) { 590 Resource resource = liveContainer.getContainer().getResource(); 591 oldMetrics.releaseResources(user, 1, resource); 592 newMetrics.allocateResources(user, 1, resource, false); 593 } 594 for (Map<NodeId, RMContainer> map : reservedContainers.values()) { 595 for (RMContainer reservedContainer : map.values()) { 596 Resource resource = reservedContainer.getReservedResource(); 597 oldMetrics.unreserveResource(user, resource); 598 newMetrics.reserveResource(user, resource); 599 } 600 } 601 602 appSchedulingInfo.move(newQueue); 603 this.queue = newQueue; 604 } 605 recoverContainer(RMContainer rmContainer)606 public synchronized void recoverContainer(RMContainer rmContainer) { 607 // recover app scheduling info 608 appSchedulingInfo.recoverContainer(rmContainer); 609 610 if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { 611 return; 612 } 613 LOG.info("SchedulerAttempt " + getApplicationAttemptId() 614 + " is recovering container " + rmContainer.getContainerId()); 615 liveContainers.put(rmContainer.getContainerId(), rmContainer); 616 Resources.addTo(currentConsumption, rmContainer.getContainer() 617 .getResource()); 618 // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource 619 // is called. 620 // newlyAllocatedContainers.add(rmContainer); 621 // schedulingOpportunities 622 // lastScheduledContainer 623 } 624 incNumAllocatedContainers(NodeType containerType, NodeType requestType)625 public void incNumAllocatedContainers(NodeType containerType, 626 NodeType requestType) { 627 RMAppAttempt attempt = 628 rmContext.getRMApps().get(attemptId.getApplicationId()) 629 .getCurrentAppAttempt(); 630 if (attempt != null) { 631 attempt.getRMAppAttemptMetrics().incNumAllocatedContainers(containerType, 632 requestType); 633 } 634 } 635 setApplicationHeadroomForMetrics(Resource headroom)636 public void setApplicationHeadroomForMetrics(Resource headroom) { 637 RMAppAttempt attempt = 638 rmContext.getRMApps().get(attemptId.getApplicationId()) 639 .getCurrentAppAttempt(); 640 if (attempt != null) { 641 attempt.getRMAppAttemptMetrics().setApplicationAttemptHeadRoom( 642 Resources.clone(headroom)); 643 } 644 } 645 getBlacklistedNodes()646 public Set<String> getBlacklistedNodes() { 647 return this.appSchedulingInfo.getBlackListCopy(); 648 } 649 } 650