1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
20 
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.List;
26 import java.util.concurrent.locks.Lock;
27 import java.util.concurrent.locks.ReadWriteLock;
28 import java.util.concurrent.locks.ReentrantReadWriteLock;
29 
30 import com.google.common.annotations.VisibleForTesting;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.classification.InterfaceAudience.Private;
34 import org.apache.hadoop.classification.InterfaceStability.Unstable;
35 import org.apache.hadoop.security.UserGroupInformation;
36 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
37 import org.apache.hadoop.yarn.api.records.QueueACL;
38 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
39 import org.apache.hadoop.yarn.api.records.Resource;
40 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
41 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
42 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
43 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
44 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
45 import org.apache.hadoop.yarn.util.resource.Resources;
46 
47 @Private
48 @Unstable
49 public class FSLeafQueue extends FSQueue {
50   private static final Log LOG = LogFactory.getLog(
51       FSLeafQueue.class.getName());
52 
53   private final List<FSAppAttempt> runnableApps = // apps that are runnable
54       new ArrayList<FSAppAttempt>();
55   private final List<FSAppAttempt> nonRunnableApps =
56       new ArrayList<FSAppAttempt>();
57   // get a lock with fair distribution for app list updates
58   private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
59   private final Lock readLock = rwl.readLock();
60   private final Lock writeLock = rwl.writeLock();
61 
62   private Resource demand = Resources.createResource(0);
63 
64   // Variables used for preemption
65   private long lastTimeAtMinShare;
66   private long lastTimeAtFairShareThreshold;
67 
68   // Track the AM resource usage for this queue
69   private Resource amResourceUsage;
70 
71   private final ActiveUsersManager activeUsersManager;
72 
FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent)73   public FSLeafQueue(String name, FairScheduler scheduler,
74       FSParentQueue parent) {
75     super(name, scheduler, parent);
76     this.lastTimeAtMinShare = scheduler.getClock().getTime();
77     this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime();
78     activeUsersManager = new ActiveUsersManager(getMetrics());
79     amResourceUsage = Resource.newInstance(0, 0);
80   }
81 
addApp(FSAppAttempt app, boolean runnable)82   public void addApp(FSAppAttempt app, boolean runnable) {
83     writeLock.lock();
84     try {
85       if (runnable) {
86         runnableApps.add(app);
87       } else {
88         nonRunnableApps.add(app);
89       }
90     } finally {
91       writeLock.unlock();
92     }
93   }
94 
95   // for testing
addAppSchedulable(FSAppAttempt appSched)96   void addAppSchedulable(FSAppAttempt appSched) {
97     writeLock.lock();
98     try {
99       runnableApps.add(appSched);
100     } finally {
101       writeLock.unlock();
102     }
103   }
104 
105   /**
106    * Removes the given app from this queue.
107    * @return whether or not the app was runnable
108    */
removeApp(FSAppAttempt app)109   public boolean removeApp(FSAppAttempt app) {
110     boolean runnable = false;
111 
112     // Remove app from runnable/nonRunnable list while holding the write lock
113     writeLock.lock();
114     try {
115       runnable = runnableApps.remove(app);
116       if (!runnable) {
117         // removeNonRunnableApp acquires the write lock again, which is fine
118         if (!removeNonRunnableApp(app)) {
119           throw new IllegalStateException("Given app to remove " + app +
120               " does not exist in queue " + this);
121         }
122       }
123     } finally {
124       writeLock.unlock();
125     }
126 
127     // Update AM resource usage if needed
128     if (runnable && app.isAmRunning() && app.getAMResource() != null) {
129       Resources.subtractFrom(amResourceUsage, app.getAMResource());
130     }
131 
132     return runnable;
133   }
134 
135   /**
136    * Removes the given app if it is non-runnable and belongs to this queue
137    * @return true if the app is removed, false otherwise
138    */
removeNonRunnableApp(FSAppAttempt app)139   public boolean removeNonRunnableApp(FSAppAttempt app) {
140     writeLock.lock();
141     try {
142       return nonRunnableApps.remove(app);
143     } finally {
144       writeLock.unlock();
145     }
146   }
147 
isRunnableApp(FSAppAttempt attempt)148   public boolean isRunnableApp(FSAppAttempt attempt) {
149     readLock.lock();
150     try {
151       return runnableApps.contains(attempt);
152     } finally {
153       readLock.unlock();
154     }
155   }
156 
isNonRunnableApp(FSAppAttempt attempt)157   public boolean isNonRunnableApp(FSAppAttempt attempt) {
158     readLock.lock();
159     try {
160       return nonRunnableApps.contains(attempt);
161     } finally {
162       readLock.unlock();
163     }
164   }
165 
resetPreemptedResources()166   public void resetPreemptedResources() {
167     readLock.lock();
168     try {
169       for (FSAppAttempt attempt : runnableApps) {
170         attempt.resetPreemptedResources();
171       }
172     } finally {
173       readLock.unlock();
174     }
175   }
176 
clearPreemptedResources()177   public void clearPreemptedResources() {
178     readLock.lock();
179     try {
180       for (FSAppAttempt attempt : runnableApps) {
181         attempt.clearPreemptedResources();
182       }
183     } finally {
184       readLock.unlock();
185     }
186   }
187 
getCopyOfNonRunnableAppSchedulables()188   public List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() {
189     List<FSAppAttempt> appsToReturn = new ArrayList<FSAppAttempt>();
190     readLock.lock();
191     try {
192       appsToReturn.addAll(nonRunnableApps);
193     } finally {
194       readLock.unlock();
195     }
196     return appsToReturn;
197   }
198 
199   @Override
collectSchedulerApplications( Collection<ApplicationAttemptId> apps)200   public void collectSchedulerApplications(
201       Collection<ApplicationAttemptId> apps) {
202     readLock.lock();
203     try {
204       for (FSAppAttempt appSched : runnableApps) {
205         apps.add(appSched.getApplicationAttemptId());
206       }
207       for (FSAppAttempt appSched : nonRunnableApps) {
208         apps.add(appSched.getApplicationAttemptId());
209       }
210     } finally {
211       readLock.unlock();
212     }
213   }
214 
215   @Override
setPolicy(SchedulingPolicy policy)216   public void setPolicy(SchedulingPolicy policy)
217       throws AllocationConfigurationException {
218     if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)) {
219       throwPolicyDoesnotApplyException(policy);
220     }
221     super.policy = policy;
222   }
223 
224   @Override
recomputeShares()225   public void recomputeShares() {
226     readLock.lock();
227     try {
228       policy.computeShares(runnableApps, getFairShare());
229     } finally {
230       readLock.unlock();
231     }
232   }
233 
234   @Override
getDemand()235   public Resource getDemand() {
236     return demand;
237   }
238 
239   @Override
getResourceUsage()240   public Resource getResourceUsage() {
241     Resource usage = Resources.createResource(0);
242     readLock.lock();
243     try {
244       for (FSAppAttempt app : runnableApps) {
245         Resources.addTo(usage, app.getResourceUsage());
246       }
247       for (FSAppAttempt app : nonRunnableApps) {
248         Resources.addTo(usage, app.getResourceUsage());
249       }
250     } finally {
251       readLock.unlock();
252     }
253     return usage;
254   }
255 
getAmResourceUsage()256   public Resource getAmResourceUsage() {
257     return amResourceUsage;
258   }
259 
260   @Override
updateDemand()261   public void updateDemand() {
262     // Compute demand by iterating through apps in the queue
263     // Limit demand to maxResources
264     Resource maxRes = scheduler.getAllocationConfiguration()
265         .getMaxResources(getName());
266     demand = Resources.createResource(0);
267     readLock.lock();
268     try {
269       for (FSAppAttempt sched : runnableApps) {
270         if (Resources.equals(demand, maxRes)) {
271           break;
272         }
273         updateDemandForApp(sched, maxRes);
274       }
275       for (FSAppAttempt sched : nonRunnableApps) {
276         if (Resources.equals(demand, maxRes)) {
277           break;
278         }
279         updateDemandForApp(sched, maxRes);
280       }
281     } finally {
282       readLock.unlock();
283     }
284     if (LOG.isDebugEnabled()) {
285       LOG.debug("The updated demand for " + getName() + " is " + demand
286           + "; the max is " + maxRes);
287     }
288   }
289 
updateDemandForApp(FSAppAttempt sched, Resource maxRes)290   private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) {
291     sched.updateDemand();
292     Resource toAdd = sched.getDemand();
293     if (LOG.isDebugEnabled()) {
294       LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
295           + "; Total resource consumption for " + getName() + " now "
296           + demand);
297     }
298     demand = Resources.add(demand, toAdd);
299     demand = Resources.componentwiseMin(demand, maxRes);
300   }
301 
302   @Override
assignContainer(FSSchedulerNode node)303   public Resource assignContainer(FSSchedulerNode node) {
304     Resource assigned = Resources.none();
305     if (LOG.isDebugEnabled()) {
306       LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
307           getName());
308     }
309 
310     if (!assignContainerPreCheck(node)) {
311       return assigned;
312     }
313 
314     Comparator<Schedulable> comparator = policy.getComparator();
315     writeLock.lock();
316     try {
317       Collections.sort(runnableApps, comparator);
318     } finally {
319       writeLock.unlock();
320     }
321     // Release write lock here for better performance and avoiding deadlocks.
322     // runnableApps can be in unsorted state because of this section,
323     // but we can accept it in practice since the probability is low.
324     readLock.lock();
325     try {
326       for (FSAppAttempt sched : runnableApps) {
327         if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
328           continue;
329         }
330 
331         assigned = sched.assignContainer(node);
332         if (!assigned.equals(Resources.none())) {
333           break;
334         }
335       }
336     } finally {
337       readLock.unlock();
338     }
339     return assigned;
340   }
341 
342   @Override
preemptContainer()343   public RMContainer preemptContainer() {
344     RMContainer toBePreempted = null;
345 
346     // If this queue is not over its fair share, reject
347     if (!preemptContainerPreCheck()) {
348       return toBePreempted;
349     }
350 
351     if (LOG.isDebugEnabled()) {
352       LOG.debug("Queue " + getName() + " is going to preempt a container " +
353           "from its applications.");
354     }
355 
356     // Choose the app that is most over fair share
357     Comparator<Schedulable> comparator = policy.getComparator();
358     FSAppAttempt candidateSched = null;
359     readLock.lock();
360     try {
361       for (FSAppAttempt sched : runnableApps) {
362         if (candidateSched == null ||
363             comparator.compare(sched, candidateSched) > 0) {
364           candidateSched = sched;
365         }
366       }
367     } finally {
368       readLock.unlock();
369     }
370 
371     // Preempt from the selected app
372     if (candidateSched != null) {
373       toBePreempted = candidateSched.preemptContainer();
374     }
375     return toBePreempted;
376   }
377 
378   @Override
getChildQueues()379   public List<FSQueue> getChildQueues() {
380     return new ArrayList<FSQueue>(1);
381   }
382 
383   @Override
getQueueUserAclInfo(UserGroupInformation user)384   public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
385     QueueUserACLInfo userAclInfo =
386       recordFactory.newRecordInstance(QueueUserACLInfo.class);
387     List<QueueACL> operations = new ArrayList<QueueACL>();
388     for (QueueACL operation : QueueACL.values()) {
389       if (hasAccess(operation, user)) {
390         operations.add(operation);
391       }
392     }
393 
394     userAclInfo.setQueueName(getQueueName());
395     userAclInfo.setUserAcls(operations);
396     return Collections.singletonList(userAclInfo);
397   }
398 
getLastTimeAtMinShare()399   public long getLastTimeAtMinShare() {
400     return lastTimeAtMinShare;
401   }
402 
setLastTimeAtMinShare(long lastTimeAtMinShare)403   private void setLastTimeAtMinShare(long lastTimeAtMinShare) {
404     this.lastTimeAtMinShare = lastTimeAtMinShare;
405   }
406 
getLastTimeAtFairShareThreshold()407   public long getLastTimeAtFairShareThreshold() {
408     return lastTimeAtFairShareThreshold;
409   }
410 
setLastTimeAtFairShareThreshold( long lastTimeAtFairShareThreshold)411   private void setLastTimeAtFairShareThreshold(
412       long lastTimeAtFairShareThreshold) {
413     this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold;
414   }
415 
416   @Override
getNumRunnableApps()417   public int getNumRunnableApps() {
418     readLock.lock();
419     try {
420       return runnableApps.size();
421     } finally {
422       readLock.unlock();
423     }
424   }
425 
getNumNonRunnableApps()426   public int getNumNonRunnableApps() {
427     readLock.lock();
428     try {
429       return nonRunnableApps.size();
430     } finally {
431       readLock.unlock();
432     }
433   }
434 
getNumPendingApps()435   public int getNumPendingApps() {
436     int numPendingApps = 0;
437     readLock.lock();
438     try {
439       for (FSAppAttempt attempt : runnableApps) {
440         if (attempt.isPending()) {
441           numPendingApps++;
442         }
443       }
444       numPendingApps += nonRunnableApps.size();
445     } finally {
446       readLock.unlock();
447     }
448     return numPendingApps;
449   }
450 
451   /**
452    * TODO: Based on how frequently this is called, we might want to club
453    * counting pending and active apps in the same method.
454    */
getNumActiveApps()455   public int getNumActiveApps() {
456     int numActiveApps = 0;
457     readLock.lock();
458     try {
459       for (FSAppAttempt attempt : runnableApps) {
460         if (!attempt.isPending()) {
461           numActiveApps++;
462         }
463       }
464     } finally {
465       readLock.unlock();
466     }
467     return numActiveApps;
468   }
469 
470   @Override
getActiveUsersManager()471   public ActiveUsersManager getActiveUsersManager() {
472     return activeUsersManager;
473   }
474 
475   /**
476    * Check whether this queue can run this application master under the
477    * maxAMShare limit
478    *
479    * @param amResource
480    * @return true if this queue can run
481    */
canRunAppAM(Resource amResource)482   public boolean canRunAppAM(Resource amResource) {
483     float maxAMShare =
484         scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
485     if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
486       return true;
487     }
488     Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
489     Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
490     return !policy
491         .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
492   }
493 
addAMResourceUsage(Resource amResource)494   public void addAMResourceUsage(Resource amResource) {
495     if (amResource != null) {
496       Resources.addTo(amResourceUsage, amResource);
497     }
498   }
499 
500   @Override
recoverContainer(Resource clusterResource, SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer)501   public void recoverContainer(Resource clusterResource,
502       SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
503     // TODO Auto-generated method stub
504   }
505 
506   /**
507    * Update the preemption fields for the queue, i.e. the times since last was
508    * at its guaranteed share and over its fair share threshold.
509    */
updateStarvationStats()510   public void updateStarvationStats() {
511     long now = scheduler.getClock().getTime();
512     if (!isStarvedForMinShare()) {
513       setLastTimeAtMinShare(now);
514     }
515     if (!isStarvedForFairShare()) {
516       setLastTimeAtFairShareThreshold(now);
517     }
518   }
519 
520   /** Allows setting weight for a dynamically created queue
521    * Currently only used for reservation based queues
522    * @param weight queue weight
523    */
setWeights(float weight)524   public void setWeights(float weight) {
525     scheduler.getAllocationConfiguration().setQueueWeight(getName(),
526         new ResourceWeights(weight));
527   }
528 
529   /**
530    * Helper method to check if the queue should preempt containers
531    *
532    * @return true if check passes (can preempt) or false otherwise
533    */
preemptContainerPreCheck()534   private boolean preemptContainerPreCheck() {
535     return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
536         getFairShare());
537   }
538 
539   /**
540    * Is a queue being starved for its min share.
541    */
542   @VisibleForTesting
isStarvedForMinShare()543   boolean isStarvedForMinShare() {
544     return isStarved(getMinShare());
545   }
546 
547   /**
548    * Is a queue being starved for its fair share threshold.
549    */
550   @VisibleForTesting
isStarvedForFairShare()551   boolean isStarvedForFairShare() {
552     return isStarved(
553         Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()));
554   }
555 
isStarved(Resource share)556   private boolean isStarved(Resource share) {
557     Resource desiredShare = Resources.min(scheduler.getResourceCalculator(),
558         scheduler.getClusterResource(), share, getDemand());
559     return Resources.lessThan(scheduler.getResourceCalculator(),
560         scheduler.getClusterResource(), getResourceUsage(), desiredShare);
561   }
562 }
563