1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
20 
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Set;
25 
26 import org.apache.hadoop.classification.InterfaceAudience.Private;
27 import org.apache.hadoop.classification.InterfaceStability.Unstable;
28 import org.apache.hadoop.security.UserGroupInformation;
29 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
30 import org.apache.hadoop.yarn.api.records.Priority;
31 import org.apache.hadoop.yarn.api.records.QueueACL;
32 import org.apache.hadoop.yarn.api.records.QueueInfo;
33 import org.apache.hadoop.yarn.api.records.QueueState;
34 import org.apache.hadoop.yarn.api.records.Resource;
35 import org.apache.hadoop.yarn.factories.RecordFactory;
36 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
37 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
38 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
39 import org.apache.hadoop.yarn.util.resource.Resources;
40 
41 @Private
42 @Unstable
43 public abstract class FSQueue implements Queue, Schedulable {
44   private Resource fairShare = Resources.createResource(0, 0);
45   private Resource steadyFairShare = Resources.createResource(0, 0);
46   private final String name;
47   protected final FairScheduler scheduler;
48   private final FSQueueMetrics metrics;
49 
50   protected final FSParentQueue parent;
51   protected final RecordFactory recordFactory =
52       RecordFactoryProvider.getRecordFactory(null);
53 
54   protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY;
55 
56   private long fairSharePreemptionTimeout = Long.MAX_VALUE;
57   private long minSharePreemptionTimeout = Long.MAX_VALUE;
58   private float fairSharePreemptionThreshold = 0.5f;
59 
FSQueue(String name, FairScheduler scheduler, FSParentQueue parent)60   public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
61     this.name = name;
62     this.scheduler = scheduler;
63     this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
64     metrics.setMinShare(getMinShare());
65     metrics.setMaxShare(getMaxShare());
66     this.parent = parent;
67   }
68 
getName()69   public String getName() {
70     return name;
71   }
72 
73   @Override
getQueueName()74   public String getQueueName() {
75     return name;
76   }
77 
getPolicy()78   public SchedulingPolicy getPolicy() {
79     return policy;
80   }
81 
getParent()82   public FSParentQueue getParent() {
83     return parent;
84   }
85 
throwPolicyDoesnotApplyException(SchedulingPolicy policy)86   protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
87       throws AllocationConfigurationException {
88     throw new AllocationConfigurationException("SchedulingPolicy " + policy
89         + " does not apply to queue " + getName());
90   }
91 
setPolicy(SchedulingPolicy policy)92   public abstract void setPolicy(SchedulingPolicy policy)
93       throws AllocationConfigurationException;
94 
95   @Override
getWeights()96   public ResourceWeights getWeights() {
97     return scheduler.getAllocationConfiguration().getQueueWeight(getName());
98   }
99 
100   @Override
getMinShare()101   public Resource getMinShare() {
102     return scheduler.getAllocationConfiguration().getMinResources(getName());
103   }
104 
105   @Override
getMaxShare()106   public Resource getMaxShare() {
107     return scheduler.getAllocationConfiguration().getMaxResources(getName());
108   }
109 
110   @Override
getStartTime()111   public long getStartTime() {
112     return 0;
113   }
114 
115   @Override
getPriority()116   public Priority getPriority() {
117     Priority p = recordFactory.newRecordInstance(Priority.class);
118     p.setPriority(1);
119     return p;
120   }
121 
122   @Override
getQueueInfo(boolean includeChildQueues, boolean recursive)123   public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
124     QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
125     queueInfo.setQueueName(getQueueName());
126 
127     if (scheduler.getClusterResource().getMemory() == 0) {
128       queueInfo.setCapacity(0.0f);
129     } else {
130       queueInfo.setCapacity((float) getFairShare().getMemory() /
131           scheduler.getClusterResource().getMemory());
132     }
133 
134     if (getFairShare().getMemory() == 0) {
135       queueInfo.setCurrentCapacity(0.0f);
136     } else {
137       queueInfo.setCurrentCapacity((float) getResourceUsage().getMemory() /
138           getFairShare().getMemory());
139     }
140 
141     ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
142     if (includeChildQueues) {
143       Collection<FSQueue> childQueues = getChildQueues();
144       for (FSQueue child : childQueues) {
145         childQueueInfos.add(child.getQueueInfo(recursive, recursive));
146       }
147     }
148     queueInfo.setChildQueues(childQueueInfos);
149     queueInfo.setQueueState(QueueState.RUNNING);
150     return queueInfo;
151   }
152 
153   @Override
getMetrics()154   public FSQueueMetrics getMetrics() {
155     return metrics;
156   }
157 
158   /** Get the fair share assigned to this Schedulable. */
getFairShare()159   public Resource getFairShare() {
160     return fairShare;
161   }
162 
163   @Override
setFairShare(Resource fairShare)164   public void setFairShare(Resource fairShare) {
165     this.fairShare = fairShare;
166     metrics.setFairShare(fairShare);
167   }
168 
169   /** Get the steady fair share assigned to this Schedulable. */
getSteadyFairShare()170   public Resource getSteadyFairShare() {
171     return steadyFairShare;
172   }
173 
setSteadyFairShare(Resource steadyFairShare)174   public void setSteadyFairShare(Resource steadyFairShare) {
175     this.steadyFairShare = steadyFairShare;
176     metrics.setSteadyFairShare(steadyFairShare);
177   }
178 
hasAccess(QueueACL acl, UserGroupInformation user)179   public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
180     return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
181   }
182 
getFairSharePreemptionTimeout()183   public long getFairSharePreemptionTimeout() {
184     return fairSharePreemptionTimeout;
185   }
186 
setFairSharePreemptionTimeout(long fairSharePreemptionTimeout)187   public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
188     this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
189   }
190 
getMinSharePreemptionTimeout()191   public long getMinSharePreemptionTimeout() {
192     return minSharePreemptionTimeout;
193   }
194 
setMinSharePreemptionTimeout(long minSharePreemptionTimeout)195   public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
196     this.minSharePreemptionTimeout = minSharePreemptionTimeout;
197   }
198 
getFairSharePreemptionThreshold()199   public float getFairSharePreemptionThreshold() {
200     return fairSharePreemptionThreshold;
201   }
202 
setFairSharePreemptionThreshold(float fairSharePreemptionThreshold)203   public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
204     this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
205   }
206 
207   /**
208    * Recomputes the shares for all child queues and applications based on this
209    * queue's current share
210    */
recomputeShares()211   public abstract void recomputeShares();
212 
213   /**
214    * Update the min/fair share preemption timeouts and threshold for this queue.
215    */
updatePreemptionVariables()216   public void updatePreemptionVariables() {
217     // For min share timeout
218     minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
219         .getMinSharePreemptionTimeout(getName());
220     if (minSharePreemptionTimeout == -1 && parent != null) {
221       minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout();
222     }
223     // For fair share timeout
224     fairSharePreemptionTimeout = scheduler.getAllocationConfiguration()
225         .getFairSharePreemptionTimeout(getName());
226     if (fairSharePreemptionTimeout == -1 && parent != null) {
227       fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout();
228     }
229     // For fair share preemption threshold
230     fairSharePreemptionThreshold = scheduler.getAllocationConfiguration()
231         .getFairSharePreemptionThreshold(getName());
232     if (fairSharePreemptionThreshold < 0 && parent != null) {
233       fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
234     }
235   }
236 
237   /**
238    * Gets the children of this queue, if any.
239    */
getChildQueues()240   public abstract List<FSQueue> getChildQueues();
241 
242   /**
243    * Adds all applications in the queue and its subqueues to the given collection.
244    * @param apps the collection to add the applications to
245    */
collectSchedulerApplications( Collection<ApplicationAttemptId> apps)246   public abstract void collectSchedulerApplications(
247       Collection<ApplicationAttemptId> apps);
248 
249   /**
250    * Return the number of apps for which containers can be allocated.
251    * Includes apps in subqueues.
252    */
getNumRunnableApps()253   public abstract int getNumRunnableApps();
254 
255   /**
256    * Helper method to check if the queue should attempt assigning resources
257    *
258    * @return true if check passes (can assign) or false otherwise
259    */
assignContainerPreCheck(FSSchedulerNode node)260   protected boolean assignContainerPreCheck(FSSchedulerNode node) {
261     if (!Resources.fitsIn(getResourceUsage(),
262         scheduler.getAllocationConfiguration().getMaxResources(getName()))
263         || node.getReservedContainer() != null) {
264       return false;
265     }
266     return true;
267   }
268 
269   /**
270    * Returns true if queue has at least one app running.
271    */
isActive()272   public boolean isActive() {
273     return getNumRunnableApps() > 0;
274   }
275 
276   /** Convenient toString implementation for debugging. */
277   @Override
toString()278   public String toString() {
279     return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
280         getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
281   }
282 
283   @Override
getAccessibleNodeLabels()284   public Set<String> getAccessibleNodeLabels() {
285     // TODO, add implementation for FS
286     return null;
287   }
288 
289   @Override
getDefaultNodeLabelExpression()290   public String getDefaultNodeLabelExpression() {
291     // TODO, add implementation for FS
292     return null;
293   }
294 }
295