1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; 20 21 import java.io.Serializable; 22 import java.util.Arrays; 23 import java.util.Collection; 24 import java.util.Comparator; 25 import java.util.HashMap; 26 import java.util.List; 27 import java.util.Map; 28 import java.util.Set; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.hadoop.classification.InterfaceAudience.Private; 33 import org.apache.hadoop.classification.InterfaceStability.Unstable; 34 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 35 import org.apache.hadoop.yarn.api.records.Container; 36 import org.apache.hadoop.yarn.api.records.ContainerId; 37 import org.apache.hadoop.yarn.api.records.ContainerStatus; 38 import org.apache.hadoop.yarn.api.records.NodeId; 39 import org.apache.hadoop.yarn.api.records.Priority; 40 import org.apache.hadoop.yarn.api.records.Resource; 41 import org.apache.hadoop.yarn.api.records.ResourceRequest; 42 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; 43 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; 44 import org.apache.hadoop.yarn.server.resourcemanager.RMContext; 45 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; 46 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; 47 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; 48 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; 49 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; 50 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; 51 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; 52 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; 53 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; 54 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; 55 import org.apache.hadoop.yarn.server.utils.BuilderUtils; 56 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; 57 import org.apache.hadoop.yarn.util.resource.Resources; 58 59 /** 60 * Represents an application attempt from the viewpoint of the Fair Scheduler. 61 */ 62 @Private 63 @Unstable 64 public class FSAppAttempt extends SchedulerApplicationAttempt 65 implements Schedulable { 66 67 private static final Log LOG = LogFactory.getLog(FSAppAttempt.class); 68 private static final DefaultResourceCalculator RESOURCE_CALCULATOR 69 = new DefaultResourceCalculator(); 70 71 private long startTime; 72 private Priority priority; 73 private ResourceWeights resourceWeights; 74 private Resource demand = Resources.createResource(0); 75 private FairScheduler scheduler; 76 private Resource fairShare = Resources.createResource(0, 0); 77 private Resource preemptedResources = Resources.createResource(0); 78 private RMContainerComparator comparator = new RMContainerComparator(); 79 private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>(); 80 81 /** 82 * Delay scheduling: We often want to prioritize scheduling of node-local 83 * containers over rack-local or off-switch containers. To achieve this 84 * we first only allow node-local assignments for a given priority level, 85 * then relax the locality threshold once we've had a long enough period 86 * without successfully scheduling. We measure both the number of "missed" 87 * scheduling opportunities since the last container was scheduled 88 * at the current allowed level and the time since the last container 89 * was scheduled. Currently we use only the former. 90 */ 91 private final Map<Priority, NodeType> allowedLocalityLevel = 92 new HashMap<Priority, NodeType>(); 93 FSAppAttempt(FairScheduler scheduler, ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager, RMContext rmContext)94 public FSAppAttempt(FairScheduler scheduler, 95 ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, 96 ActiveUsersManager activeUsersManager, RMContext rmContext) { 97 super(applicationAttemptId, user, queue, activeUsersManager, rmContext); 98 99 this.scheduler = scheduler; 100 this.startTime = scheduler.getClock().getTime(); 101 this.priority = Priority.newInstance(1); 102 this.resourceWeights = new ResourceWeights(); 103 } 104 getResourceWeights()105 public ResourceWeights getResourceWeights() { 106 return resourceWeights; 107 } 108 109 /** 110 * Get metrics reference from containing queue. 111 */ getMetrics()112 public QueueMetrics getMetrics() { 113 return queue.getMetrics(); 114 } 115 containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event)116 synchronized public void containerCompleted(RMContainer rmContainer, 117 ContainerStatus containerStatus, RMContainerEventType event) { 118 119 Container container = rmContainer.getContainer(); 120 ContainerId containerId = container.getId(); 121 122 // Remove from the list of newly allocated containers if found 123 newlyAllocatedContainers.remove(rmContainer); 124 125 // Inform the container 126 rmContainer.handle( 127 new RMContainerFinishedEvent( 128 containerId, 129 containerStatus, 130 event) 131 ); 132 LOG.info("Completed container: " + rmContainer.getContainerId() + 133 " in state: " + rmContainer.getState() + " event:" + event); 134 135 // Remove from the list of containers 136 liveContainers.remove(rmContainer.getContainerId()); 137 138 RMAuditLogger.logSuccess(getUser(), 139 AuditConstants.RELEASE_CONTAINER, "SchedulerApp", 140 getApplicationId(), containerId); 141 142 // Update usage metrics 143 Resource containerResource = rmContainer.getContainer().getResource(); 144 queue.getMetrics().releaseResources(getUser(), 1, containerResource); 145 Resources.subtractFrom(currentConsumption, containerResource); 146 147 // remove from preemption map if it is completed 148 preemptionMap.remove(rmContainer); 149 150 // Clear resource utilization metrics cache. 151 lastMemoryAggregateAllocationUpdateTime = -1; 152 } 153 unreserveInternal( Priority priority, FSSchedulerNode node)154 private synchronized void unreserveInternal( 155 Priority priority, FSSchedulerNode node) { 156 Map<NodeId, RMContainer> reservedContainers = 157 this.reservedContainers.get(priority); 158 RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); 159 if (reservedContainers.isEmpty()) { 160 this.reservedContainers.remove(priority); 161 } 162 163 // Reset the re-reservation count 164 resetReReservations(priority); 165 166 Resource resource = reservedContainer.getContainer().getResource(); 167 Resources.subtractFrom(currentReservation, resource); 168 169 LOG.info("Application " + getApplicationId() + " unreserved " + " on node " 170 + node + ", currently has " + reservedContainers.size() + " at priority " 171 + priority + "; currentReservation " + currentReservation); 172 } 173 174 /** 175 * Headroom depends on resources in the cluster, current usage of the 176 * queue, queue's fair-share and queue's max-resources. 177 */ 178 @Override getHeadroom()179 public Resource getHeadroom() { 180 final FSQueue queue = (FSQueue) this.queue; 181 SchedulingPolicy policy = queue.getPolicy(); 182 183 Resource queueFairShare = queue.getFairShare(); 184 Resource queueUsage = queue.getResourceUsage(); 185 Resource clusterResource = this.scheduler.getClusterResource(); 186 Resource clusterUsage = this.scheduler.getRootQueueMetrics() 187 .getAllocatedResources(); 188 189 Resource clusterAvailableResources = 190 Resources.subtract(clusterResource, clusterUsage); 191 Resource queueMaxAvailableResources = 192 Resources.subtract(queue.getMaxShare(), queueUsage); 193 Resource maxAvailableResource = Resources.componentwiseMin( 194 clusterAvailableResources, queueMaxAvailableResources); 195 196 Resource headroom = policy.getHeadroom(queueFairShare, 197 queueUsage, maxAvailableResource); 198 if (LOG.isDebugEnabled()) { 199 LOG.debug("Headroom calculation for " + this.getName() + ":" + 200 "Min(" + 201 "(queueFairShare=" + queueFairShare + 202 " - queueUsage=" + queueUsage + ")," + 203 " maxAvailableResource=" + maxAvailableResource + 204 "Headroom=" + headroom); 205 } 206 return headroom; 207 } 208 getLocalityWaitFactor( Priority priority, int clusterNodes)209 public synchronized float getLocalityWaitFactor( 210 Priority priority, int clusterNodes) { 211 // Estimate: Required unique resources (i.e. hosts + racks) 212 int requiredResources = 213 Math.max(this.getResourceRequests(priority).size() - 1, 0); 214 215 // waitFactor can't be more than '1' 216 // i.e. no point skipping more than clustersize opportunities 217 return Math.min(((float)requiredResources / clusterNodes), 1.0f); 218 } 219 220 /** 221 * Return the level at which we are allowed to schedule containers, given the 222 * current size of the cluster and thresholds indicating how many nodes to 223 * fail at (as a fraction of cluster size) before relaxing scheduling 224 * constraints. 225 */ getAllowedLocalityLevel(Priority priority, int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold)226 public synchronized NodeType getAllowedLocalityLevel(Priority priority, 227 int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) { 228 // upper limit on threshold 229 if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } 230 if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } 231 232 // If delay scheduling is not being used, can schedule anywhere 233 if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { 234 return NodeType.OFF_SWITCH; 235 } 236 237 // Default level is NODE_LOCAL 238 if (!allowedLocalityLevel.containsKey(priority)) { 239 allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); 240 return NodeType.NODE_LOCAL; 241 } 242 243 NodeType allowed = allowedLocalityLevel.get(priority); 244 245 // If level is already most liberal, we're done 246 if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; 247 248 double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold : 249 rackLocalityThreshold; 250 251 // Relax locality constraints once we've surpassed threshold. 252 if (getSchedulingOpportunities(priority) > (numNodes * threshold)) { 253 if (allowed.equals(NodeType.NODE_LOCAL)) { 254 allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); 255 resetSchedulingOpportunities(priority); 256 } 257 else if (allowed.equals(NodeType.RACK_LOCAL)) { 258 allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); 259 resetSchedulingOpportunities(priority); 260 } 261 } 262 return allowedLocalityLevel.get(priority); 263 } 264 265 /** 266 * Return the level at which we are allowed to schedule containers. 267 * Given the thresholds indicating how much time passed before relaxing 268 * scheduling constraints. 269 */ getAllowedLocalityLevelByTime(Priority priority, long nodeLocalityDelayMs, long rackLocalityDelayMs, long currentTimeMs)270 public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, 271 long nodeLocalityDelayMs, long rackLocalityDelayMs, 272 long currentTimeMs) { 273 274 // if not being used, can schedule anywhere 275 if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { 276 return NodeType.OFF_SWITCH; 277 } 278 279 // default level is NODE_LOCAL 280 if (! allowedLocalityLevel.containsKey(priority)) { 281 allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); 282 return NodeType.NODE_LOCAL; 283 } 284 285 NodeType allowed = allowedLocalityLevel.get(priority); 286 287 // if level is already most liberal, we're done 288 if (allowed.equals(NodeType.OFF_SWITCH)) { 289 return NodeType.OFF_SWITCH; 290 } 291 292 // check waiting time 293 long waitTime = currentTimeMs; 294 if (lastScheduledContainer.containsKey(priority)) { 295 waitTime -= lastScheduledContainer.get(priority); 296 } else { 297 waitTime -= getStartTime(); 298 } 299 300 long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? 301 nodeLocalityDelayMs : rackLocalityDelayMs; 302 303 if (waitTime > thresholdTime) { 304 if (allowed.equals(NodeType.NODE_LOCAL)) { 305 allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); 306 resetSchedulingOpportunities(priority, currentTimeMs); 307 } else if (allowed.equals(NodeType.RACK_LOCAL)) { 308 allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); 309 resetSchedulingOpportunities(priority, currentTimeMs); 310 } 311 } 312 return allowedLocalityLevel.get(priority); 313 } 314 allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, Container container)315 synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, 316 Priority priority, ResourceRequest request, 317 Container container) { 318 // Update allowed locality level 319 NodeType allowed = allowedLocalityLevel.get(priority); 320 if (allowed != null) { 321 if (allowed.equals(NodeType.OFF_SWITCH) && 322 (type.equals(NodeType.NODE_LOCAL) || 323 type.equals(NodeType.RACK_LOCAL))) { 324 this.resetAllowedLocalityLevel(priority, type); 325 } 326 else if (allowed.equals(NodeType.RACK_LOCAL) && 327 type.equals(NodeType.NODE_LOCAL)) { 328 this.resetAllowedLocalityLevel(priority, type); 329 } 330 } 331 332 // Required sanity check - AM can call 'allocate' to update resource 333 // request without locking the scheduler, hence we need to check 334 if (getTotalRequiredResources(priority) <= 0) { 335 return null; 336 } 337 338 // Create RMContainer 339 RMContainer rmContainer = new RMContainerImpl(container, 340 getApplicationAttemptId(), node.getNodeID(), 341 appSchedulingInfo.getUser(), rmContext); 342 343 // Add it to allContainers list. 344 newlyAllocatedContainers.add(rmContainer); 345 liveContainers.put(container.getId(), rmContainer); 346 347 // Update consumption and track allocations 348 List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( 349 type, node, priority, request, container); 350 Resources.addTo(currentConsumption, container.getResource()); 351 352 // Update resource requests related to "request" and store in RMContainer 353 ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); 354 355 // Inform the container 356 rmContainer.handle( 357 new RMContainerEvent(container.getId(), RMContainerEventType.START)); 358 359 if (LOG.isDebugEnabled()) { 360 LOG.debug("allocate: applicationAttemptId=" 361 + container.getId().getApplicationAttemptId() 362 + " container=" + container.getId() + " host=" 363 + container.getNodeId().getHost() + " type=" + type); 364 } 365 RMAuditLogger.logSuccess(getUser(), 366 AuditConstants.ALLOC_CONTAINER, "SchedulerApp", 367 getApplicationId(), container.getId()); 368 369 return rmContainer; 370 } 371 372 /** 373 * Should be called when the scheduler assigns a container at a higher 374 * degree of locality than the current threshold. Reset the allowed locality 375 * level to a higher degree of locality. 376 */ resetAllowedLocalityLevel(Priority priority, NodeType level)377 public synchronized void resetAllowedLocalityLevel(Priority priority, 378 NodeType level) { 379 NodeType old = allowedLocalityLevel.get(priority); 380 LOG.info("Raising locality level from " + old + " to " + level + " at " + 381 " priority " + priority); 382 allowedLocalityLevel.put(priority, level); 383 } 384 385 // related methods addPreemption(RMContainer container, long time)386 public void addPreemption(RMContainer container, long time) { 387 assert preemptionMap.get(container) == null; 388 preemptionMap.put(container, time); 389 Resources.addTo(preemptedResources, container.getAllocatedResource()); 390 } 391 getContainerPreemptionTime(RMContainer container)392 public Long getContainerPreemptionTime(RMContainer container) { 393 return preemptionMap.get(container); 394 } 395 getPreemptionContainers()396 public Set<RMContainer> getPreemptionContainers() { 397 return preemptionMap.keySet(); 398 } 399 400 @Override getQueue()401 public FSLeafQueue getQueue() { 402 return (FSLeafQueue)super.getQueue(); 403 } 404 getPreemptedResources()405 public Resource getPreemptedResources() { 406 return preemptedResources; 407 } 408 resetPreemptedResources()409 public void resetPreemptedResources() { 410 preemptedResources = Resources.createResource(0); 411 for (RMContainer container : getPreemptionContainers()) { 412 Resources.addTo(preemptedResources, container.getAllocatedResource()); 413 } 414 } 415 clearPreemptedResources()416 public void clearPreemptedResources() { 417 preemptedResources.setMemory(0); 418 preemptedResources.setVirtualCores(0); 419 } 420 421 /** 422 * Create and return a container object reflecting an allocation for the 423 * given appliction on the given node with the given capability and 424 * priority. 425 */ createContainer( FSSchedulerNode node, Resource capability, Priority priority)426 public Container createContainer( 427 FSSchedulerNode node, Resource capability, Priority priority) { 428 429 NodeId nodeId = node.getRMNode().getNodeID(); 430 ContainerId containerId = BuilderUtils.newContainerId( 431 getApplicationAttemptId(), getNewContainerId()); 432 433 // Create the container 434 Container container = 435 BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() 436 .getHttpAddress(), capability, priority, null); 437 438 return container; 439 } 440 441 /** 442 * Reserve a spot for {@code container} on this {@code node}. If 443 * the container is {@code alreadyReserved} on the node, simply 444 * update relevant bookeeping. This dispatches ro relevant handlers 445 * in {@link FSSchedulerNode}.. 446 */ reserve(Priority priority, FSSchedulerNode node, Container container, boolean alreadyReserved)447 private void reserve(Priority priority, FSSchedulerNode node, 448 Container container, boolean alreadyReserved) { 449 LOG.info("Making reservation: node=" + node.getNodeName() + 450 " app_id=" + getApplicationId()); 451 452 if (!alreadyReserved) { 453 getMetrics().reserveResource(getUser(), container.getResource()); 454 RMContainer rmContainer = 455 super.reserve(node, priority, null, container); 456 node.reserveResource(this, priority, rmContainer); 457 } else { 458 RMContainer rmContainer = node.getReservedContainer(); 459 super.reserve(node, priority, rmContainer, container); 460 node.reserveResource(this, priority, rmContainer); 461 } 462 } 463 464 /** 465 * Remove the reservation on {@code node} at the given {@link Priority}. 466 * This dispatches SchedulerNode handlers as well. 467 */ unreserve(Priority priority, FSSchedulerNode node)468 public void unreserve(Priority priority, FSSchedulerNode node) { 469 RMContainer rmContainer = node.getReservedContainer(); 470 unreserveInternal(priority, node); 471 node.unreserveResource(this); 472 getMetrics().unreserveResource( 473 getUser(), rmContainer.getContainer().getResource()); 474 } 475 476 /** 477 * Assign a container to this node to facilitate {@code request}. If node does 478 * not have enough memory, create a reservation. This is called once we are 479 * sure the particular request should be facilitated by this node. 480 * 481 * @param node 482 * The node to try placing the container on. 483 * @param request 484 * The ResourceRequest we're trying to satisfy. 485 * @param type 486 * The locality of the assignment. 487 * @param reserved 488 * Whether there's already a container reserved for this app on the node. 489 * @return 490 * If an assignment was made, returns the resources allocated to the 491 * container. If a reservation was made, returns 492 * FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was 493 * made, returns an empty resource. 494 */ assignContainer( FSSchedulerNode node, ResourceRequest request, NodeType type, boolean reserved)495 private Resource assignContainer( 496 FSSchedulerNode node, ResourceRequest request, NodeType type, 497 boolean reserved) { 498 499 // How much does this request need? 500 Resource capability = request.getCapability(); 501 502 // How much does the node have? 503 Resource available = node.getAvailableResource(); 504 505 Container container = null; 506 if (reserved) { 507 container = node.getReservedContainer().getContainer(); 508 } else { 509 container = createContainer(node, capability, request.getPriority()); 510 } 511 512 // Can we allocate a container on this node? 513 if (Resources.fitsIn(capability, available)) { 514 // Inform the application of the new container for this request 515 RMContainer allocatedContainer = 516 allocate(type, node, request.getPriority(), request, container); 517 if (allocatedContainer == null) { 518 // Did the application need this resource? 519 if (reserved) { 520 unreserve(request.getPriority(), node); 521 } 522 return Resources.none(); 523 } 524 525 // If we had previously made a reservation, delete it 526 if (reserved) { 527 unreserve(request.getPriority(), node); 528 } 529 530 // Inform the node 531 node.allocateContainer(allocatedContainer); 532 533 // If this container is used to run AM, update the leaf queue's AM usage 534 if (getLiveContainers().size() == 1 && !getUnmanagedAM()) { 535 getQueue().addAMResourceUsage(container.getResource()); 536 setAmRunning(true); 537 } 538 539 return container.getResource(); 540 } else { 541 if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) { 542 return Resources.none(); 543 } 544 545 // The desired container won't fit here, so reserve 546 reserve(request.getPriority(), node, container, reserved); 547 548 return FairScheduler.CONTAINER_RESERVED; 549 } 550 } 551 hasNodeOrRackLocalRequests(Priority priority)552 private boolean hasNodeOrRackLocalRequests(Priority priority) { 553 return getResourceRequests(priority).size() > 1; 554 } 555 assignContainer(FSSchedulerNode node, boolean reserved)556 private Resource assignContainer(FSSchedulerNode node, boolean reserved) { 557 if (LOG.isDebugEnabled()) { 558 LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); 559 } 560 561 Collection<Priority> prioritiesToTry = (reserved) ? 562 Arrays.asList(node.getReservedContainer().getReservedPriority()) : 563 getPriorities(); 564 565 // For each priority, see if we can schedule a node local, rack local 566 // or off-switch request. Rack of off-switch requests may be delayed 567 // (not scheduled) in order to promote better locality. 568 synchronized (this) { 569 for (Priority priority : prioritiesToTry) { 570 if (getTotalRequiredResources(priority) <= 0 || 571 !hasContainerForNode(priority, node)) { 572 continue; 573 } 574 575 addSchedulingOpportunity(priority); 576 577 // Check the AM resource usage for the leaf queue 578 if (getLiveContainers().size() == 0 && !getUnmanagedAM()) { 579 if (!getQueue().canRunAppAM(getAMResource())) { 580 return Resources.none(); 581 } 582 } 583 584 ResourceRequest rackLocalRequest = getResourceRequest(priority, 585 node.getRackName()); 586 ResourceRequest localRequest = getResourceRequest(priority, 587 node.getNodeName()); 588 589 if (localRequest != null && !localRequest.getRelaxLocality()) { 590 LOG.warn("Relax locality off is not supported on local request: " 591 + localRequest); 592 } 593 594 NodeType allowedLocality; 595 if (scheduler.isContinuousSchedulingEnabled()) { 596 allowedLocality = getAllowedLocalityLevelByTime(priority, 597 scheduler.getNodeLocalityDelayMs(), 598 scheduler.getRackLocalityDelayMs(), 599 scheduler.getClock().getTime()); 600 } else { 601 allowedLocality = getAllowedLocalityLevel(priority, 602 scheduler.getNumClusterNodes(), 603 scheduler.getNodeLocalityThreshold(), 604 scheduler.getRackLocalityThreshold()); 605 } 606 607 if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 608 && localRequest != null && localRequest.getNumContainers() != 0) { 609 return assignContainer(node, localRequest, 610 NodeType.NODE_LOCAL, reserved); 611 } 612 613 if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { 614 continue; 615 } 616 617 if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 618 && (allowedLocality.equals(NodeType.RACK_LOCAL) || 619 allowedLocality.equals(NodeType.OFF_SWITCH))) { 620 return assignContainer(node, rackLocalRequest, 621 NodeType.RACK_LOCAL, reserved); 622 } 623 624 ResourceRequest offSwitchRequest = 625 getResourceRequest(priority, ResourceRequest.ANY); 626 if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { 627 continue; 628 } 629 630 if (offSwitchRequest != null && 631 offSwitchRequest.getNumContainers() != 0) { 632 if (!hasNodeOrRackLocalRequests(priority) || 633 allowedLocality.equals(NodeType.OFF_SWITCH)) { 634 return assignContainer( 635 node, offSwitchRequest, NodeType.OFF_SWITCH, reserved); 636 } 637 } 638 } 639 } 640 return Resources.none(); 641 } 642 643 /** 644 * Called when this application already has an existing reservation on the 645 * given node. Sees whether we can turn the reservation into an allocation. 646 * Also checks whether the application needs the reservation anymore, and 647 * releases it if not. 648 * 649 * @param node 650 * Node that the application has an existing reservation on 651 */ assignReservedContainer(FSSchedulerNode node)652 public Resource assignReservedContainer(FSSchedulerNode node) { 653 RMContainer rmContainer = node.getReservedContainer(); 654 Priority priority = rmContainer.getReservedPriority(); 655 656 // Make sure the application still needs requests at this priority 657 if (getTotalRequiredResources(priority) == 0) { 658 unreserve(priority, node); 659 return Resources.none(); 660 } 661 662 // Fail early if the reserved container won't fit. 663 // Note that we have an assumption here that there's only one container size 664 // per priority. 665 if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(), 666 node.getAvailableResource())) { 667 return Resources.none(); 668 } 669 670 return assignContainer(node, true); 671 } 672 673 674 /** 675 * Whether this app has containers requests that could be satisfied on the 676 * given node, if the node had full space. 677 */ hasContainerForNode(Priority prio, FSSchedulerNode node)678 public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { 679 ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY); 680 ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName()); 681 ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName()); 682 683 return 684 // There must be outstanding requests at the given priority: 685 anyRequest != null && anyRequest.getNumContainers() > 0 && 686 // If locality relaxation is turned off at *-level, there must be a 687 // non-zero request for the node's rack: 688 (anyRequest.getRelaxLocality() || 689 (rackRequest != null && rackRequest.getNumContainers() > 0)) && 690 // If locality relaxation is turned off at rack-level, there must be a 691 // non-zero request at the node: 692 (rackRequest == null || rackRequest.getRelaxLocality() || 693 (nodeRequest != null && nodeRequest.getNumContainers() > 0)) && 694 // The requested container must be able to fit on the node: 695 Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, 696 anyRequest.getCapability(), node.getRMNode().getTotalCapability()); 697 } 698 699 700 static class RMContainerComparator implements Comparator<RMContainer>, 701 Serializable { 702 @Override compare(RMContainer c1, RMContainer c2)703 public int compare(RMContainer c1, RMContainer c2) { 704 int ret = c1.getContainer().getPriority().compareTo( 705 c2.getContainer().getPriority()); 706 if (ret == 0) { 707 return c2.getContainerId().compareTo(c1.getContainerId()); 708 } 709 return ret; 710 } 711 } 712 713 /* Schedulable methods implementation */ 714 715 @Override getName()716 public String getName() { 717 return getApplicationId().toString(); 718 } 719 720 @Override getDemand()721 public Resource getDemand() { 722 return demand; 723 } 724 725 @Override getStartTime()726 public long getStartTime() { 727 return startTime; 728 } 729 730 @Override getMinShare()731 public Resource getMinShare() { 732 return Resources.none(); 733 } 734 735 @Override getMaxShare()736 public Resource getMaxShare() { 737 return Resources.unbounded(); 738 } 739 740 @Override getResourceUsage()741 public Resource getResourceUsage() { 742 // Here the getPreemptedResources() always return zero, except in 743 // a preemption round 744 return Resources.subtract(getCurrentConsumption(), getPreemptedResources()); 745 } 746 747 @Override getWeights()748 public ResourceWeights getWeights() { 749 return scheduler.getAppWeight(this); 750 } 751 752 @Override getPriority()753 public Priority getPriority() { 754 // Right now per-app priorities are not passed to scheduler, 755 // so everyone has the same priority. 756 return priority; 757 } 758 759 @Override getFairShare()760 public Resource getFairShare() { 761 return this.fairShare; 762 } 763 764 @Override setFairShare(Resource fairShare)765 public void setFairShare(Resource fairShare) { 766 this.fairShare = fairShare; 767 } 768 769 @Override updateDemand()770 public void updateDemand() { 771 demand = Resources.createResource(0); 772 // Demand is current consumption plus outstanding requests 773 Resources.addTo(demand, getCurrentConsumption()); 774 775 // Add up outstanding resource requests 776 synchronized (this) { 777 for (Priority p : getPriorities()) { 778 for (ResourceRequest r : getResourceRequests(p).values()) { 779 Resource total = Resources.multiply(r.getCapability(), r.getNumContainers()); 780 Resources.addTo(demand, total); 781 } 782 } 783 } 784 } 785 786 @Override assignContainer(FSSchedulerNode node)787 public Resource assignContainer(FSSchedulerNode node) { 788 return assignContainer(node, false); 789 } 790 791 /** 792 * Preempt a running container according to the priority 793 */ 794 @Override preemptContainer()795 public RMContainer preemptContainer() { 796 if (LOG.isDebugEnabled()) { 797 LOG.debug("App " + getName() + " is going to preempt a running " + 798 "container"); 799 } 800 801 RMContainer toBePreempted = null; 802 for (RMContainer container : getLiveContainers()) { 803 if (!getPreemptionContainers().contains(container) && 804 (toBePreempted == null || 805 comparator.compare(toBePreempted, container) > 0)) { 806 toBePreempted = container; 807 } 808 } 809 return toBePreempted; 810 } 811 } 812