1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
20 
21 import java.io.Serializable;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.util.Comparator;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience.Private;
33 import org.apache.hadoop.classification.InterfaceStability.Unstable;
34 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
35 import org.apache.hadoop.yarn.api.records.Container;
36 import org.apache.hadoop.yarn.api.records.ContainerId;
37 import org.apache.hadoop.yarn.api.records.ContainerStatus;
38 import org.apache.hadoop.yarn.api.records.NodeId;
39 import org.apache.hadoop.yarn.api.records.Priority;
40 import org.apache.hadoop.yarn.api.records.Resource;
41 import org.apache.hadoop.yarn.api.records.ResourceRequest;
42 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
43 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
44 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
45 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
46 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
47 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
48 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
49 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
50 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
51 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
52 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
53 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
54 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
55 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
56 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
57 import org.apache.hadoop.yarn.util.resource.Resources;
58 
59 /**
60  * Represents an application attempt from the viewpoint of the Fair Scheduler.
61  */
62 @Private
63 @Unstable
64 public class FSAppAttempt extends SchedulerApplicationAttempt
65     implements Schedulable {
66 
67   private static final Log LOG = LogFactory.getLog(FSAppAttempt.class);
68   private static final DefaultResourceCalculator RESOURCE_CALCULATOR
69       = new DefaultResourceCalculator();
70 
71   private long startTime;
72   private Priority priority;
73   private ResourceWeights resourceWeights;
74   private Resource demand = Resources.createResource(0);
75   private FairScheduler scheduler;
76   private Resource fairShare = Resources.createResource(0, 0);
77   private Resource preemptedResources = Resources.createResource(0);
78   private RMContainerComparator comparator = new RMContainerComparator();
79   private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
80 
81   /**
82    * Delay scheduling: We often want to prioritize scheduling of node-local
83    * containers over rack-local or off-switch containers. To achieve this
84    * we first only allow node-local assignments for a given priority level,
85    * then relax the locality threshold once we've had a long enough period
86    * without successfully scheduling. We measure both the number of "missed"
87    * scheduling opportunities since the last container was scheduled
88    * at the current allowed level and the time since the last container
89    * was scheduled. Currently we use only the former.
90    */
91   private final Map<Priority, NodeType> allowedLocalityLevel =
92       new HashMap<Priority, NodeType>();
93 
FSAppAttempt(FairScheduler scheduler, ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager, RMContext rmContext)94   public FSAppAttempt(FairScheduler scheduler,
95       ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue,
96       ActiveUsersManager activeUsersManager, RMContext rmContext) {
97     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
98 
99     this.scheduler = scheduler;
100     this.startTime = scheduler.getClock().getTime();
101     this.priority = Priority.newInstance(1);
102     this.resourceWeights = new ResourceWeights();
103   }
104 
getResourceWeights()105   public ResourceWeights getResourceWeights() {
106     return resourceWeights;
107   }
108 
109   /**
110    * Get metrics reference from containing queue.
111    */
getMetrics()112   public QueueMetrics getMetrics() {
113     return queue.getMetrics();
114   }
115 
containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event)116   synchronized public void containerCompleted(RMContainer rmContainer,
117       ContainerStatus containerStatus, RMContainerEventType event) {
118 
119     Container container = rmContainer.getContainer();
120     ContainerId containerId = container.getId();
121 
122     // Remove from the list of newly allocated containers if found
123     newlyAllocatedContainers.remove(rmContainer);
124 
125     // Inform the container
126     rmContainer.handle(
127         new RMContainerFinishedEvent(
128             containerId,
129             containerStatus,
130             event)
131         );
132     LOG.info("Completed container: " + rmContainer.getContainerId() +
133         " in state: " + rmContainer.getState() + " event:" + event);
134 
135     // Remove from the list of containers
136     liveContainers.remove(rmContainer.getContainerId());
137 
138     RMAuditLogger.logSuccess(getUser(),
139         AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
140         getApplicationId(), containerId);
141 
142     // Update usage metrics
143     Resource containerResource = rmContainer.getContainer().getResource();
144     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
145     Resources.subtractFrom(currentConsumption, containerResource);
146 
147     // remove from preemption map if it is completed
148     preemptionMap.remove(rmContainer);
149 
150     // Clear resource utilization metrics cache.
151     lastMemoryAggregateAllocationUpdateTime = -1;
152   }
153 
unreserveInternal( Priority priority, FSSchedulerNode node)154   private synchronized void unreserveInternal(
155       Priority priority, FSSchedulerNode node) {
156     Map<NodeId, RMContainer> reservedContainers =
157         this.reservedContainers.get(priority);
158     RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
159     if (reservedContainers.isEmpty()) {
160       this.reservedContainers.remove(priority);
161     }
162 
163     // Reset the re-reservation count
164     resetReReservations(priority);
165 
166     Resource resource = reservedContainer.getContainer().getResource();
167     Resources.subtractFrom(currentReservation, resource);
168 
169     LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
170         + node + ", currently has " + reservedContainers.size() + " at priority "
171         + priority + "; currentReservation " + currentReservation);
172   }
173 
174   /**
175    * Headroom depends on resources in the cluster, current usage of the
176    * queue, queue's fair-share and queue's max-resources.
177    */
178   @Override
getHeadroom()179   public Resource getHeadroom() {
180     final FSQueue queue = (FSQueue) this.queue;
181     SchedulingPolicy policy = queue.getPolicy();
182 
183     Resource queueFairShare = queue.getFairShare();
184     Resource queueUsage = queue.getResourceUsage();
185     Resource clusterResource = this.scheduler.getClusterResource();
186     Resource clusterUsage = this.scheduler.getRootQueueMetrics()
187         .getAllocatedResources();
188 
189     Resource clusterAvailableResources =
190         Resources.subtract(clusterResource, clusterUsage);
191     Resource queueMaxAvailableResources =
192         Resources.subtract(queue.getMaxShare(), queueUsage);
193     Resource maxAvailableResource = Resources.componentwiseMin(
194         clusterAvailableResources, queueMaxAvailableResources);
195 
196     Resource headroom = policy.getHeadroom(queueFairShare,
197         queueUsage, maxAvailableResource);
198     if (LOG.isDebugEnabled()) {
199       LOG.debug("Headroom calculation for " + this.getName() + ":" +
200           "Min(" +
201           "(queueFairShare=" + queueFairShare +
202           " - queueUsage=" + queueUsage + ")," +
203           " maxAvailableResource=" + maxAvailableResource +
204           "Headroom=" + headroom);
205     }
206     return headroom;
207   }
208 
getLocalityWaitFactor( Priority priority, int clusterNodes)209   public synchronized float getLocalityWaitFactor(
210       Priority priority, int clusterNodes) {
211     // Estimate: Required unique resources (i.e. hosts + racks)
212     int requiredResources =
213         Math.max(this.getResourceRequests(priority).size() - 1, 0);
214 
215     // waitFactor can't be more than '1'
216     // i.e. no point skipping more than clustersize opportunities
217     return Math.min(((float)requiredResources / clusterNodes), 1.0f);
218   }
219 
220   /**
221    * Return the level at which we are allowed to schedule containers, given the
222    * current size of the cluster and thresholds indicating how many nodes to
223    * fail at (as a fraction of cluster size) before relaxing scheduling
224    * constraints.
225    */
getAllowedLocalityLevel(Priority priority, int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold)226   public synchronized NodeType getAllowedLocalityLevel(Priority priority,
227       int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) {
228     // upper limit on threshold
229     if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
230     if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
231 
232     // If delay scheduling is not being used, can schedule anywhere
233     if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
234       return NodeType.OFF_SWITCH;
235     }
236 
237     // Default level is NODE_LOCAL
238     if (!allowedLocalityLevel.containsKey(priority)) {
239       allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
240       return NodeType.NODE_LOCAL;
241     }
242 
243     NodeType allowed = allowedLocalityLevel.get(priority);
244 
245     // If level is already most liberal, we're done
246     if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
247 
248     double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
249       rackLocalityThreshold;
250 
251     // Relax locality constraints once we've surpassed threshold.
252     if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
253       if (allowed.equals(NodeType.NODE_LOCAL)) {
254         allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
255         resetSchedulingOpportunities(priority);
256       }
257       else if (allowed.equals(NodeType.RACK_LOCAL)) {
258         allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
259         resetSchedulingOpportunities(priority);
260       }
261     }
262     return allowedLocalityLevel.get(priority);
263   }
264 
265   /**
266    * Return the level at which we are allowed to schedule containers.
267    * Given the thresholds indicating how much time passed before relaxing
268    * scheduling constraints.
269    */
getAllowedLocalityLevelByTime(Priority priority, long nodeLocalityDelayMs, long rackLocalityDelayMs, long currentTimeMs)270   public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
271           long nodeLocalityDelayMs, long rackLocalityDelayMs,
272           long currentTimeMs) {
273 
274     // if not being used, can schedule anywhere
275     if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
276       return NodeType.OFF_SWITCH;
277     }
278 
279     // default level is NODE_LOCAL
280     if (! allowedLocalityLevel.containsKey(priority)) {
281       allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
282       return NodeType.NODE_LOCAL;
283     }
284 
285     NodeType allowed = allowedLocalityLevel.get(priority);
286 
287     // if level is already most liberal, we're done
288     if (allowed.equals(NodeType.OFF_SWITCH)) {
289       return NodeType.OFF_SWITCH;
290     }
291 
292     // check waiting time
293     long waitTime = currentTimeMs;
294     if (lastScheduledContainer.containsKey(priority)) {
295       waitTime -= lastScheduledContainer.get(priority);
296     } else {
297       waitTime -= getStartTime();
298     }
299 
300     long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
301             nodeLocalityDelayMs : rackLocalityDelayMs;
302 
303     if (waitTime > thresholdTime) {
304       if (allowed.equals(NodeType.NODE_LOCAL)) {
305         allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
306         resetSchedulingOpportunities(priority, currentTimeMs);
307       } else if (allowed.equals(NodeType.RACK_LOCAL)) {
308         allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
309         resetSchedulingOpportunities(priority, currentTimeMs);
310       }
311     }
312     return allowedLocalityLevel.get(priority);
313   }
314 
allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, Container container)315   synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
316       Priority priority, ResourceRequest request,
317       Container container) {
318     // Update allowed locality level
319     NodeType allowed = allowedLocalityLevel.get(priority);
320     if (allowed != null) {
321       if (allowed.equals(NodeType.OFF_SWITCH) &&
322           (type.equals(NodeType.NODE_LOCAL) ||
323               type.equals(NodeType.RACK_LOCAL))) {
324         this.resetAllowedLocalityLevel(priority, type);
325       }
326       else if (allowed.equals(NodeType.RACK_LOCAL) &&
327           type.equals(NodeType.NODE_LOCAL)) {
328         this.resetAllowedLocalityLevel(priority, type);
329       }
330     }
331 
332     // Required sanity check - AM can call 'allocate' to update resource
333     // request without locking the scheduler, hence we need to check
334     if (getTotalRequiredResources(priority) <= 0) {
335       return null;
336     }
337 
338     // Create RMContainer
339     RMContainer rmContainer = new RMContainerImpl(container,
340         getApplicationAttemptId(), node.getNodeID(),
341         appSchedulingInfo.getUser(), rmContext);
342 
343     // Add it to allContainers list.
344     newlyAllocatedContainers.add(rmContainer);
345     liveContainers.put(container.getId(), rmContainer);
346 
347     // Update consumption and track allocations
348     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
349         type, node, priority, request, container);
350     Resources.addTo(currentConsumption, container.getResource());
351 
352     // Update resource requests related to "request" and store in RMContainer
353     ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
354 
355     // Inform the container
356     rmContainer.handle(
357         new RMContainerEvent(container.getId(), RMContainerEventType.START));
358 
359     if (LOG.isDebugEnabled()) {
360       LOG.debug("allocate: applicationAttemptId="
361           + container.getId().getApplicationAttemptId()
362           + " container=" + container.getId() + " host="
363           + container.getNodeId().getHost() + " type=" + type);
364     }
365     RMAuditLogger.logSuccess(getUser(),
366         AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
367         getApplicationId(), container.getId());
368 
369     return rmContainer;
370   }
371 
372   /**
373    * Should be called when the scheduler assigns a container at a higher
374    * degree of locality than the current threshold. Reset the allowed locality
375    * level to a higher degree of locality.
376    */
resetAllowedLocalityLevel(Priority priority, NodeType level)377   public synchronized void resetAllowedLocalityLevel(Priority priority,
378       NodeType level) {
379     NodeType old = allowedLocalityLevel.get(priority);
380     LOG.info("Raising locality level from " + old + " to " + level + " at " +
381         " priority " + priority);
382     allowedLocalityLevel.put(priority, level);
383   }
384 
385   // related methods
addPreemption(RMContainer container, long time)386   public void addPreemption(RMContainer container, long time) {
387     assert preemptionMap.get(container) == null;
388     preemptionMap.put(container, time);
389     Resources.addTo(preemptedResources, container.getAllocatedResource());
390   }
391 
getContainerPreemptionTime(RMContainer container)392   public Long getContainerPreemptionTime(RMContainer container) {
393     return preemptionMap.get(container);
394   }
395 
getPreemptionContainers()396   public Set<RMContainer> getPreemptionContainers() {
397     return preemptionMap.keySet();
398   }
399 
400   @Override
getQueue()401   public FSLeafQueue getQueue() {
402     return (FSLeafQueue)super.getQueue();
403   }
404 
getPreemptedResources()405   public Resource getPreemptedResources() {
406     return preemptedResources;
407   }
408 
resetPreemptedResources()409   public void resetPreemptedResources() {
410     preemptedResources = Resources.createResource(0);
411     for (RMContainer container : getPreemptionContainers()) {
412       Resources.addTo(preemptedResources, container.getAllocatedResource());
413     }
414   }
415 
clearPreemptedResources()416   public void clearPreemptedResources() {
417     preemptedResources.setMemory(0);
418     preemptedResources.setVirtualCores(0);
419   }
420 
421   /**
422    * Create and return a container object reflecting an allocation for the
423    * given appliction on the given node with the given capability and
424    * priority.
425    */
createContainer( FSSchedulerNode node, Resource capability, Priority priority)426   public Container createContainer(
427       FSSchedulerNode node, Resource capability, Priority priority) {
428 
429     NodeId nodeId = node.getRMNode().getNodeID();
430     ContainerId containerId = BuilderUtils.newContainerId(
431         getApplicationAttemptId(), getNewContainerId());
432 
433     // Create the container
434     Container container =
435         BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
436             .getHttpAddress(), capability, priority, null);
437 
438     return container;
439   }
440 
441   /**
442    * Reserve a spot for {@code container} on this {@code node}. If
443    * the container is {@code alreadyReserved} on the node, simply
444    * update relevant bookeeping. This dispatches ro relevant handlers
445    * in {@link FSSchedulerNode}..
446    */
reserve(Priority priority, FSSchedulerNode node, Container container, boolean alreadyReserved)447   private void reserve(Priority priority, FSSchedulerNode node,
448       Container container, boolean alreadyReserved) {
449     LOG.info("Making reservation: node=" + node.getNodeName() +
450         " app_id=" + getApplicationId());
451 
452     if (!alreadyReserved) {
453       getMetrics().reserveResource(getUser(), container.getResource());
454       RMContainer rmContainer =
455           super.reserve(node, priority, null, container);
456       node.reserveResource(this, priority, rmContainer);
457     } else {
458       RMContainer rmContainer = node.getReservedContainer();
459       super.reserve(node, priority, rmContainer, container);
460       node.reserveResource(this, priority, rmContainer);
461     }
462   }
463 
464   /**
465    * Remove the reservation on {@code node} at the given {@link Priority}.
466    * This dispatches SchedulerNode handlers as well.
467    */
unreserve(Priority priority, FSSchedulerNode node)468   public void unreserve(Priority priority, FSSchedulerNode node) {
469     RMContainer rmContainer = node.getReservedContainer();
470     unreserveInternal(priority, node);
471     node.unreserveResource(this);
472     getMetrics().unreserveResource(
473         getUser(), rmContainer.getContainer().getResource());
474   }
475 
476   /**
477    * Assign a container to this node to facilitate {@code request}. If node does
478    * not have enough memory, create a reservation. This is called once we are
479    * sure the particular request should be facilitated by this node.
480    *
481    * @param node
482    *     The node to try placing the container on.
483    * @param request
484    *     The ResourceRequest we're trying to satisfy.
485    * @param type
486    *     The locality of the assignment.
487    * @param reserved
488    *     Whether there's already a container reserved for this app on the node.
489    * @return
490    *     If an assignment was made, returns the resources allocated to the
491    *     container.  If a reservation was made, returns
492    *     FairScheduler.CONTAINER_RESERVED.  If no assignment or reservation was
493    *     made, returns an empty resource.
494    */
assignContainer( FSSchedulerNode node, ResourceRequest request, NodeType type, boolean reserved)495   private Resource assignContainer(
496       FSSchedulerNode node, ResourceRequest request, NodeType type,
497       boolean reserved) {
498 
499     // How much does this request need?
500     Resource capability = request.getCapability();
501 
502     // How much does the node have?
503     Resource available = node.getAvailableResource();
504 
505     Container container = null;
506     if (reserved) {
507       container = node.getReservedContainer().getContainer();
508     } else {
509       container = createContainer(node, capability, request.getPriority());
510     }
511 
512     // Can we allocate a container on this node?
513     if (Resources.fitsIn(capability, available)) {
514       // Inform the application of the new container for this request
515       RMContainer allocatedContainer =
516           allocate(type, node, request.getPriority(), request, container);
517       if (allocatedContainer == null) {
518         // Did the application need this resource?
519         if (reserved) {
520           unreserve(request.getPriority(), node);
521         }
522         return Resources.none();
523       }
524 
525       // If we had previously made a reservation, delete it
526       if (reserved) {
527         unreserve(request.getPriority(), node);
528       }
529 
530       // Inform the node
531       node.allocateContainer(allocatedContainer);
532 
533       // If this container is used to run AM, update the leaf queue's AM usage
534       if (getLiveContainers().size() == 1 && !getUnmanagedAM()) {
535         getQueue().addAMResourceUsage(container.getResource());
536         setAmRunning(true);
537       }
538 
539       return container.getResource();
540     } else {
541       if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) {
542         return Resources.none();
543       }
544 
545       // The desired container won't fit here, so reserve
546       reserve(request.getPriority(), node, container, reserved);
547 
548       return FairScheduler.CONTAINER_RESERVED;
549     }
550   }
551 
hasNodeOrRackLocalRequests(Priority priority)552   private boolean hasNodeOrRackLocalRequests(Priority priority) {
553     return getResourceRequests(priority).size() > 1;
554   }
555 
assignContainer(FSSchedulerNode node, boolean reserved)556   private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
557     if (LOG.isDebugEnabled()) {
558       LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
559     }
560 
561     Collection<Priority> prioritiesToTry = (reserved) ?
562         Arrays.asList(node.getReservedContainer().getReservedPriority()) :
563         getPriorities();
564 
565     // For each priority, see if we can schedule a node local, rack local
566     // or off-switch request. Rack of off-switch requests may be delayed
567     // (not scheduled) in order to promote better locality.
568     synchronized (this) {
569       for (Priority priority : prioritiesToTry) {
570         if (getTotalRequiredResources(priority) <= 0 ||
571             !hasContainerForNode(priority, node)) {
572           continue;
573         }
574 
575         addSchedulingOpportunity(priority);
576 
577         // Check the AM resource usage for the leaf queue
578         if (getLiveContainers().size() == 0 && !getUnmanagedAM()) {
579           if (!getQueue().canRunAppAM(getAMResource())) {
580             return Resources.none();
581           }
582         }
583 
584         ResourceRequest rackLocalRequest = getResourceRequest(priority,
585             node.getRackName());
586         ResourceRequest localRequest = getResourceRequest(priority,
587             node.getNodeName());
588 
589         if (localRequest != null && !localRequest.getRelaxLocality()) {
590           LOG.warn("Relax locality off is not supported on local request: "
591               + localRequest);
592         }
593 
594         NodeType allowedLocality;
595         if (scheduler.isContinuousSchedulingEnabled()) {
596           allowedLocality = getAllowedLocalityLevelByTime(priority,
597               scheduler.getNodeLocalityDelayMs(),
598               scheduler.getRackLocalityDelayMs(),
599               scheduler.getClock().getTime());
600         } else {
601           allowedLocality = getAllowedLocalityLevel(priority,
602               scheduler.getNumClusterNodes(),
603               scheduler.getNodeLocalityThreshold(),
604               scheduler.getRackLocalityThreshold());
605         }
606 
607         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
608             && localRequest != null && localRequest.getNumContainers() != 0) {
609           return assignContainer(node, localRequest,
610               NodeType.NODE_LOCAL, reserved);
611         }
612 
613         if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
614           continue;
615         }
616 
617         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
618             && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
619             allowedLocality.equals(NodeType.OFF_SWITCH))) {
620           return assignContainer(node, rackLocalRequest,
621               NodeType.RACK_LOCAL, reserved);
622         }
623 
624         ResourceRequest offSwitchRequest =
625             getResourceRequest(priority, ResourceRequest.ANY);
626         if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
627           continue;
628         }
629 
630         if (offSwitchRequest != null &&
631             offSwitchRequest.getNumContainers() != 0) {
632           if (!hasNodeOrRackLocalRequests(priority) ||
633               allowedLocality.equals(NodeType.OFF_SWITCH)) {
634             return assignContainer(
635                 node, offSwitchRequest, NodeType.OFF_SWITCH, reserved);
636           }
637         }
638       }
639     }
640     return Resources.none();
641   }
642 
643   /**
644    * Called when this application already has an existing reservation on the
645    * given node.  Sees whether we can turn the reservation into an allocation.
646    * Also checks whether the application needs the reservation anymore, and
647    * releases it if not.
648    *
649    * @param node
650    *     Node that the application has an existing reservation on
651    */
assignReservedContainer(FSSchedulerNode node)652   public Resource assignReservedContainer(FSSchedulerNode node) {
653     RMContainer rmContainer = node.getReservedContainer();
654     Priority priority = rmContainer.getReservedPriority();
655 
656     // Make sure the application still needs requests at this priority
657     if (getTotalRequiredResources(priority) == 0) {
658       unreserve(priority, node);
659       return Resources.none();
660     }
661 
662     // Fail early if the reserved container won't fit.
663     // Note that we have an assumption here that there's only one container size
664     // per priority.
665     if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(),
666         node.getAvailableResource())) {
667       return Resources.none();
668     }
669 
670     return assignContainer(node, true);
671   }
672 
673 
674   /**
675    * Whether this app has containers requests that could be satisfied on the
676    * given node, if the node had full space.
677    */
hasContainerForNode(Priority prio, FSSchedulerNode node)678   public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
679     ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY);
680     ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName());
681     ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName());
682 
683     return
684         // There must be outstanding requests at the given priority:
685         anyRequest != null && anyRequest.getNumContainers() > 0 &&
686             // If locality relaxation is turned off at *-level, there must be a
687             // non-zero request for the node's rack:
688             (anyRequest.getRelaxLocality() ||
689                 (rackRequest != null && rackRequest.getNumContainers() > 0)) &&
690             // If locality relaxation is turned off at rack-level, there must be a
691             // non-zero request at the node:
692             (rackRequest == null || rackRequest.getRelaxLocality() ||
693                 (nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
694             // The requested container must be able to fit on the node:
695             Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
696                 anyRequest.getCapability(), node.getRMNode().getTotalCapability());
697   }
698 
699 
700   static class RMContainerComparator implements Comparator<RMContainer>,
701       Serializable {
702     @Override
compare(RMContainer c1, RMContainer c2)703     public int compare(RMContainer c1, RMContainer c2) {
704       int ret = c1.getContainer().getPriority().compareTo(
705           c2.getContainer().getPriority());
706       if (ret == 0) {
707         return c2.getContainerId().compareTo(c1.getContainerId());
708       }
709       return ret;
710     }
711   }
712 
713   /* Schedulable methods implementation */
714 
715   @Override
getName()716   public String getName() {
717     return getApplicationId().toString();
718   }
719 
720   @Override
getDemand()721   public Resource getDemand() {
722     return demand;
723   }
724 
725   @Override
getStartTime()726   public long getStartTime() {
727     return startTime;
728   }
729 
730   @Override
getMinShare()731   public Resource getMinShare() {
732     return Resources.none();
733   }
734 
735   @Override
getMaxShare()736   public Resource getMaxShare() {
737     return Resources.unbounded();
738   }
739 
740   @Override
getResourceUsage()741   public Resource getResourceUsage() {
742     // Here the getPreemptedResources() always return zero, except in
743     // a preemption round
744     return Resources.subtract(getCurrentConsumption(), getPreemptedResources());
745   }
746 
747   @Override
getWeights()748   public ResourceWeights getWeights() {
749     return scheduler.getAppWeight(this);
750   }
751 
752   @Override
getPriority()753   public Priority getPriority() {
754     // Right now per-app priorities are not passed to scheduler,
755     // so everyone has the same priority.
756     return priority;
757   }
758 
759   @Override
getFairShare()760   public Resource getFairShare() {
761     return this.fairShare;
762   }
763 
764   @Override
setFairShare(Resource fairShare)765   public void setFairShare(Resource fairShare) {
766     this.fairShare = fairShare;
767   }
768 
769   @Override
updateDemand()770   public void updateDemand() {
771     demand = Resources.createResource(0);
772     // Demand is current consumption plus outstanding requests
773     Resources.addTo(demand, getCurrentConsumption());
774 
775     // Add up outstanding resource requests
776     synchronized (this) {
777       for (Priority p : getPriorities()) {
778         for (ResourceRequest r : getResourceRequests(p).values()) {
779           Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
780           Resources.addTo(demand, total);
781         }
782       }
783     }
784   }
785 
786   @Override
assignContainer(FSSchedulerNode node)787   public Resource assignContainer(FSSchedulerNode node) {
788     return assignContainer(node, false);
789   }
790 
791   /**
792    * Preempt a running container according to the priority
793    */
794   @Override
preemptContainer()795   public RMContainer preemptContainer() {
796     if (LOG.isDebugEnabled()) {
797       LOG.debug("App " + getName() + " is going to preempt a running " +
798           "container");
799     }
800 
801     RMContainer toBePreempted = null;
802     for (RMContainer container : getLiveContainers()) {
803       if (!getPreemptionContainers().contains(container) &&
804           (toBePreempted == null ||
805               comparator.compare(toBePreempted, container) > 0)) {
806         toBePreempted = container;
807       }
808     }
809     return toBePreempted;
810   }
811 }
812