1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; 20 21 import java.util.ArrayList; 22 import java.util.Collection; 23 import java.util.List; 24 import java.util.Set; 25 26 import org.apache.hadoop.classification.InterfaceAudience.Private; 27 import org.apache.hadoop.classification.InterfaceStability.Unstable; 28 import org.apache.hadoop.security.UserGroupInformation; 29 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 30 import org.apache.hadoop.yarn.api.records.Priority; 31 import org.apache.hadoop.yarn.api.records.QueueACL; 32 import org.apache.hadoop.yarn.api.records.QueueInfo; 33 import org.apache.hadoop.yarn.api.records.QueueState; 34 import org.apache.hadoop.yarn.api.records.Resource; 35 import org.apache.hadoop.yarn.factories.RecordFactory; 36 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 37 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; 38 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; 39 import org.apache.hadoop.yarn.util.resource.Resources; 40 41 @Private 42 @Unstable 43 public abstract class FSQueue implements Queue, Schedulable { 44 private Resource fairShare = Resources.createResource(0, 0); 45 private Resource steadyFairShare = Resources.createResource(0, 0); 46 private final String name; 47 protected final FairScheduler scheduler; 48 private final FSQueueMetrics metrics; 49 50 protected final FSParentQueue parent; 51 protected final RecordFactory recordFactory = 52 RecordFactoryProvider.getRecordFactory(null); 53 54 protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY; 55 56 private long fairSharePreemptionTimeout = Long.MAX_VALUE; 57 private long minSharePreemptionTimeout = Long.MAX_VALUE; 58 private float fairSharePreemptionThreshold = 0.5f; 59 FSQueue(String name, FairScheduler scheduler, FSParentQueue parent)60 public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { 61 this.name = name; 62 this.scheduler = scheduler; 63 this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf()); 64 metrics.setMinShare(getMinShare()); 65 metrics.setMaxShare(getMaxShare()); 66 this.parent = parent; 67 } 68 getName()69 public String getName() { 70 return name; 71 } 72 73 @Override getQueueName()74 public String getQueueName() { 75 return name; 76 } 77 getPolicy()78 public SchedulingPolicy getPolicy() { 79 return policy; 80 } 81 getParent()82 public FSParentQueue getParent() { 83 return parent; 84 } 85 throwPolicyDoesnotApplyException(SchedulingPolicy policy)86 protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy) 87 throws AllocationConfigurationException { 88 throw new AllocationConfigurationException("SchedulingPolicy " + policy 89 + " does not apply to queue " + getName()); 90 } 91 setPolicy(SchedulingPolicy policy)92 public abstract void setPolicy(SchedulingPolicy policy) 93 throws AllocationConfigurationException; 94 95 @Override getWeights()96 public ResourceWeights getWeights() { 97 return scheduler.getAllocationConfiguration().getQueueWeight(getName()); 98 } 99 100 @Override getMinShare()101 public Resource getMinShare() { 102 return scheduler.getAllocationConfiguration().getMinResources(getName()); 103 } 104 105 @Override getMaxShare()106 public Resource getMaxShare() { 107 return scheduler.getAllocationConfiguration().getMaxResources(getName()); 108 } 109 110 @Override getStartTime()111 public long getStartTime() { 112 return 0; 113 } 114 115 @Override getPriority()116 public Priority getPriority() { 117 Priority p = recordFactory.newRecordInstance(Priority.class); 118 p.setPriority(1); 119 return p; 120 } 121 122 @Override getQueueInfo(boolean includeChildQueues, boolean recursive)123 public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) { 124 QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); 125 queueInfo.setQueueName(getQueueName()); 126 127 if (scheduler.getClusterResource().getMemory() == 0) { 128 queueInfo.setCapacity(0.0f); 129 } else { 130 queueInfo.setCapacity((float) getFairShare().getMemory() / 131 scheduler.getClusterResource().getMemory()); 132 } 133 134 if (getFairShare().getMemory() == 0) { 135 queueInfo.setCurrentCapacity(0.0f); 136 } else { 137 queueInfo.setCurrentCapacity((float) getResourceUsage().getMemory() / 138 getFairShare().getMemory()); 139 } 140 141 ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>(); 142 if (includeChildQueues) { 143 Collection<FSQueue> childQueues = getChildQueues(); 144 for (FSQueue child : childQueues) { 145 childQueueInfos.add(child.getQueueInfo(recursive, recursive)); 146 } 147 } 148 queueInfo.setChildQueues(childQueueInfos); 149 queueInfo.setQueueState(QueueState.RUNNING); 150 return queueInfo; 151 } 152 153 @Override getMetrics()154 public FSQueueMetrics getMetrics() { 155 return metrics; 156 } 157 158 /** Get the fair share assigned to this Schedulable. */ getFairShare()159 public Resource getFairShare() { 160 return fairShare; 161 } 162 163 @Override setFairShare(Resource fairShare)164 public void setFairShare(Resource fairShare) { 165 this.fairShare = fairShare; 166 metrics.setFairShare(fairShare); 167 } 168 169 /** Get the steady fair share assigned to this Schedulable. */ getSteadyFairShare()170 public Resource getSteadyFairShare() { 171 return steadyFairShare; 172 } 173 setSteadyFairShare(Resource steadyFairShare)174 public void setSteadyFairShare(Resource steadyFairShare) { 175 this.steadyFairShare = steadyFairShare; 176 metrics.setSteadyFairShare(steadyFairShare); 177 } 178 hasAccess(QueueACL acl, UserGroupInformation user)179 public boolean hasAccess(QueueACL acl, UserGroupInformation user) { 180 return scheduler.getAllocationConfiguration().hasAccess(name, acl, user); 181 } 182 getFairSharePreemptionTimeout()183 public long getFairSharePreemptionTimeout() { 184 return fairSharePreemptionTimeout; 185 } 186 setFairSharePreemptionTimeout(long fairSharePreemptionTimeout)187 public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) { 188 this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; 189 } 190 getMinSharePreemptionTimeout()191 public long getMinSharePreemptionTimeout() { 192 return minSharePreemptionTimeout; 193 } 194 setMinSharePreemptionTimeout(long minSharePreemptionTimeout)195 public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) { 196 this.minSharePreemptionTimeout = minSharePreemptionTimeout; 197 } 198 getFairSharePreemptionThreshold()199 public float getFairSharePreemptionThreshold() { 200 return fairSharePreemptionThreshold; 201 } 202 setFairSharePreemptionThreshold(float fairSharePreemptionThreshold)203 public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) { 204 this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; 205 } 206 207 /** 208 * Recomputes the shares for all child queues and applications based on this 209 * queue's current share 210 */ recomputeShares()211 public abstract void recomputeShares(); 212 213 /** 214 * Update the min/fair share preemption timeouts and threshold for this queue. 215 */ updatePreemptionVariables()216 public void updatePreemptionVariables() { 217 // For min share timeout 218 minSharePreemptionTimeout = scheduler.getAllocationConfiguration() 219 .getMinSharePreemptionTimeout(getName()); 220 if (minSharePreemptionTimeout == -1 && parent != null) { 221 minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout(); 222 } 223 // For fair share timeout 224 fairSharePreemptionTimeout = scheduler.getAllocationConfiguration() 225 .getFairSharePreemptionTimeout(getName()); 226 if (fairSharePreemptionTimeout == -1 && parent != null) { 227 fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout(); 228 } 229 // For fair share preemption threshold 230 fairSharePreemptionThreshold = scheduler.getAllocationConfiguration() 231 .getFairSharePreemptionThreshold(getName()); 232 if (fairSharePreemptionThreshold < 0 && parent != null) { 233 fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold(); 234 } 235 } 236 237 /** 238 * Gets the children of this queue, if any. 239 */ getChildQueues()240 public abstract List<FSQueue> getChildQueues(); 241 242 /** 243 * Adds all applications in the queue and its subqueues to the given collection. 244 * @param apps the collection to add the applications to 245 */ collectSchedulerApplications( Collection<ApplicationAttemptId> apps)246 public abstract void collectSchedulerApplications( 247 Collection<ApplicationAttemptId> apps); 248 249 /** 250 * Return the number of apps for which containers can be allocated. 251 * Includes apps in subqueues. 252 */ getNumRunnableApps()253 public abstract int getNumRunnableApps(); 254 255 /** 256 * Helper method to check if the queue should attempt assigning resources 257 * 258 * @return true if check passes (can assign) or false otherwise 259 */ assignContainerPreCheck(FSSchedulerNode node)260 protected boolean assignContainerPreCheck(FSSchedulerNode node) { 261 if (!Resources.fitsIn(getResourceUsage(), 262 scheduler.getAllocationConfiguration().getMaxResources(getName())) 263 || node.getReservedContainer() != null) { 264 return false; 265 } 266 return true; 267 } 268 269 /** 270 * Returns true if queue has at least one app running. 271 */ isActive()272 public boolean isActive() { 273 return getNumRunnableApps() > 0; 274 } 275 276 /** Convenient toString implementation for debugging. */ 277 @Override toString()278 public String toString() { 279 return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", 280 getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); 281 } 282 283 @Override getAccessibleNodeLabels()284 public Set<String> getAccessibleNodeLabels() { 285 // TODO, add implementation for FS 286 return null; 287 } 288 289 @Override getDefaultNodeLabelExpression()290 public String getDefaultNodeLabelExpression() { 291 // TODO, add implementation for FS 292 return null; 293 } 294 } 295