1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; 20 21 import java.util.ArrayList; 22 import java.util.Collection; 23 import java.util.Collections; 24 import java.util.Comparator; 25 import java.util.List; 26 import java.util.concurrent.locks.Lock; 27 import java.util.concurrent.locks.ReadWriteLock; 28 import java.util.concurrent.locks.ReentrantReadWriteLock; 29 30 import com.google.common.annotations.VisibleForTesting; 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.classification.InterfaceAudience.Private; 34 import org.apache.hadoop.classification.InterfaceStability.Unstable; 35 import org.apache.hadoop.security.UserGroupInformation; 36 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 37 import org.apache.hadoop.yarn.api.records.QueueACL; 38 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 39 import org.apache.hadoop.yarn.api.records.Resource; 40 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; 41 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; 42 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; 43 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; 44 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; 45 import org.apache.hadoop.yarn.util.resource.Resources; 46 47 @Private 48 @Unstable 49 public class FSLeafQueue extends FSQueue { 50 private static final Log LOG = LogFactory.getLog( 51 FSLeafQueue.class.getName()); 52 53 private final List<FSAppAttempt> runnableApps = // apps that are runnable 54 new ArrayList<FSAppAttempt>(); 55 private final List<FSAppAttempt> nonRunnableApps = 56 new ArrayList<FSAppAttempt>(); 57 // get a lock with fair distribution for app list updates 58 private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); 59 private final Lock readLock = rwl.readLock(); 60 private final Lock writeLock = rwl.writeLock(); 61 62 private Resource demand = Resources.createResource(0); 63 64 // Variables used for preemption 65 private long lastTimeAtMinShare; 66 private long lastTimeAtFairShareThreshold; 67 68 // Track the AM resource usage for this queue 69 private Resource amResourceUsage; 70 71 private final ActiveUsersManager activeUsersManager; 72 FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent)73 public FSLeafQueue(String name, FairScheduler scheduler, 74 FSParentQueue parent) { 75 super(name, scheduler, parent); 76 this.lastTimeAtMinShare = scheduler.getClock().getTime(); 77 this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); 78 activeUsersManager = new ActiveUsersManager(getMetrics()); 79 amResourceUsage = Resource.newInstance(0, 0); 80 } 81 addApp(FSAppAttempt app, boolean runnable)82 public void addApp(FSAppAttempt app, boolean runnable) { 83 writeLock.lock(); 84 try { 85 if (runnable) { 86 runnableApps.add(app); 87 } else { 88 nonRunnableApps.add(app); 89 } 90 } finally { 91 writeLock.unlock(); 92 } 93 } 94 95 // for testing addAppSchedulable(FSAppAttempt appSched)96 void addAppSchedulable(FSAppAttempt appSched) { 97 writeLock.lock(); 98 try { 99 runnableApps.add(appSched); 100 } finally { 101 writeLock.unlock(); 102 } 103 } 104 105 /** 106 * Removes the given app from this queue. 107 * @return whether or not the app was runnable 108 */ removeApp(FSAppAttempt app)109 public boolean removeApp(FSAppAttempt app) { 110 boolean runnable = false; 111 112 // Remove app from runnable/nonRunnable list while holding the write lock 113 writeLock.lock(); 114 try { 115 runnable = runnableApps.remove(app); 116 if (!runnable) { 117 // removeNonRunnableApp acquires the write lock again, which is fine 118 if (!removeNonRunnableApp(app)) { 119 throw new IllegalStateException("Given app to remove " + app + 120 " does not exist in queue " + this); 121 } 122 } 123 } finally { 124 writeLock.unlock(); 125 } 126 127 // Update AM resource usage if needed 128 if (runnable && app.isAmRunning() && app.getAMResource() != null) { 129 Resources.subtractFrom(amResourceUsage, app.getAMResource()); 130 } 131 132 return runnable; 133 } 134 135 /** 136 * Removes the given app if it is non-runnable and belongs to this queue 137 * @return true if the app is removed, false otherwise 138 */ removeNonRunnableApp(FSAppAttempt app)139 public boolean removeNonRunnableApp(FSAppAttempt app) { 140 writeLock.lock(); 141 try { 142 return nonRunnableApps.remove(app); 143 } finally { 144 writeLock.unlock(); 145 } 146 } 147 isRunnableApp(FSAppAttempt attempt)148 public boolean isRunnableApp(FSAppAttempt attempt) { 149 readLock.lock(); 150 try { 151 return runnableApps.contains(attempt); 152 } finally { 153 readLock.unlock(); 154 } 155 } 156 isNonRunnableApp(FSAppAttempt attempt)157 public boolean isNonRunnableApp(FSAppAttempt attempt) { 158 readLock.lock(); 159 try { 160 return nonRunnableApps.contains(attempt); 161 } finally { 162 readLock.unlock(); 163 } 164 } 165 resetPreemptedResources()166 public void resetPreemptedResources() { 167 readLock.lock(); 168 try { 169 for (FSAppAttempt attempt : runnableApps) { 170 attempt.resetPreemptedResources(); 171 } 172 } finally { 173 readLock.unlock(); 174 } 175 } 176 clearPreemptedResources()177 public void clearPreemptedResources() { 178 readLock.lock(); 179 try { 180 for (FSAppAttempt attempt : runnableApps) { 181 attempt.clearPreemptedResources(); 182 } 183 } finally { 184 readLock.unlock(); 185 } 186 } 187 getCopyOfNonRunnableAppSchedulables()188 public List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() { 189 List<FSAppAttempt> appsToReturn = new ArrayList<FSAppAttempt>(); 190 readLock.lock(); 191 try { 192 appsToReturn.addAll(nonRunnableApps); 193 } finally { 194 readLock.unlock(); 195 } 196 return appsToReturn; 197 } 198 199 @Override collectSchedulerApplications( Collection<ApplicationAttemptId> apps)200 public void collectSchedulerApplications( 201 Collection<ApplicationAttemptId> apps) { 202 readLock.lock(); 203 try { 204 for (FSAppAttempt appSched : runnableApps) { 205 apps.add(appSched.getApplicationAttemptId()); 206 } 207 for (FSAppAttempt appSched : nonRunnableApps) { 208 apps.add(appSched.getApplicationAttemptId()); 209 } 210 } finally { 211 readLock.unlock(); 212 } 213 } 214 215 @Override setPolicy(SchedulingPolicy policy)216 public void setPolicy(SchedulingPolicy policy) 217 throws AllocationConfigurationException { 218 if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)) { 219 throwPolicyDoesnotApplyException(policy); 220 } 221 super.policy = policy; 222 } 223 224 @Override recomputeShares()225 public void recomputeShares() { 226 readLock.lock(); 227 try { 228 policy.computeShares(runnableApps, getFairShare()); 229 } finally { 230 readLock.unlock(); 231 } 232 } 233 234 @Override getDemand()235 public Resource getDemand() { 236 return demand; 237 } 238 239 @Override getResourceUsage()240 public Resource getResourceUsage() { 241 Resource usage = Resources.createResource(0); 242 readLock.lock(); 243 try { 244 for (FSAppAttempt app : runnableApps) { 245 Resources.addTo(usage, app.getResourceUsage()); 246 } 247 for (FSAppAttempt app : nonRunnableApps) { 248 Resources.addTo(usage, app.getResourceUsage()); 249 } 250 } finally { 251 readLock.unlock(); 252 } 253 return usage; 254 } 255 getAmResourceUsage()256 public Resource getAmResourceUsage() { 257 return amResourceUsage; 258 } 259 260 @Override updateDemand()261 public void updateDemand() { 262 // Compute demand by iterating through apps in the queue 263 // Limit demand to maxResources 264 Resource maxRes = scheduler.getAllocationConfiguration() 265 .getMaxResources(getName()); 266 demand = Resources.createResource(0); 267 readLock.lock(); 268 try { 269 for (FSAppAttempt sched : runnableApps) { 270 if (Resources.equals(demand, maxRes)) { 271 break; 272 } 273 updateDemandForApp(sched, maxRes); 274 } 275 for (FSAppAttempt sched : nonRunnableApps) { 276 if (Resources.equals(demand, maxRes)) { 277 break; 278 } 279 updateDemandForApp(sched, maxRes); 280 } 281 } finally { 282 readLock.unlock(); 283 } 284 if (LOG.isDebugEnabled()) { 285 LOG.debug("The updated demand for " + getName() + " is " + demand 286 + "; the max is " + maxRes); 287 } 288 } 289 updateDemandForApp(FSAppAttempt sched, Resource maxRes)290 private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) { 291 sched.updateDemand(); 292 Resource toAdd = sched.getDemand(); 293 if (LOG.isDebugEnabled()) { 294 LOG.debug("Counting resource from " + sched.getName() + " " + toAdd 295 + "; Total resource consumption for " + getName() + " now " 296 + demand); 297 } 298 demand = Resources.add(demand, toAdd); 299 demand = Resources.componentwiseMin(demand, maxRes); 300 } 301 302 @Override assignContainer(FSSchedulerNode node)303 public Resource assignContainer(FSSchedulerNode node) { 304 Resource assigned = Resources.none(); 305 if (LOG.isDebugEnabled()) { 306 LOG.debug("Node " + node.getNodeName() + " offered to queue: " + 307 getName()); 308 } 309 310 if (!assignContainerPreCheck(node)) { 311 return assigned; 312 } 313 314 Comparator<Schedulable> comparator = policy.getComparator(); 315 writeLock.lock(); 316 try { 317 Collections.sort(runnableApps, comparator); 318 } finally { 319 writeLock.unlock(); 320 } 321 // Release write lock here for better performance and avoiding deadlocks. 322 // runnableApps can be in unsorted state because of this section, 323 // but we can accept it in practice since the probability is low. 324 readLock.lock(); 325 try { 326 for (FSAppAttempt sched : runnableApps) { 327 if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { 328 continue; 329 } 330 331 assigned = sched.assignContainer(node); 332 if (!assigned.equals(Resources.none())) { 333 break; 334 } 335 } 336 } finally { 337 readLock.unlock(); 338 } 339 return assigned; 340 } 341 342 @Override preemptContainer()343 public RMContainer preemptContainer() { 344 RMContainer toBePreempted = null; 345 346 // If this queue is not over its fair share, reject 347 if (!preemptContainerPreCheck()) { 348 return toBePreempted; 349 } 350 351 if (LOG.isDebugEnabled()) { 352 LOG.debug("Queue " + getName() + " is going to preempt a container " + 353 "from its applications."); 354 } 355 356 // Choose the app that is most over fair share 357 Comparator<Schedulable> comparator = policy.getComparator(); 358 FSAppAttempt candidateSched = null; 359 readLock.lock(); 360 try { 361 for (FSAppAttempt sched : runnableApps) { 362 if (candidateSched == null || 363 comparator.compare(sched, candidateSched) > 0) { 364 candidateSched = sched; 365 } 366 } 367 } finally { 368 readLock.unlock(); 369 } 370 371 // Preempt from the selected app 372 if (candidateSched != null) { 373 toBePreempted = candidateSched.preemptContainer(); 374 } 375 return toBePreempted; 376 } 377 378 @Override getChildQueues()379 public List<FSQueue> getChildQueues() { 380 return new ArrayList<FSQueue>(1); 381 } 382 383 @Override getQueueUserAclInfo(UserGroupInformation user)384 public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) { 385 QueueUserACLInfo userAclInfo = 386 recordFactory.newRecordInstance(QueueUserACLInfo.class); 387 List<QueueACL> operations = new ArrayList<QueueACL>(); 388 for (QueueACL operation : QueueACL.values()) { 389 if (hasAccess(operation, user)) { 390 operations.add(operation); 391 } 392 } 393 394 userAclInfo.setQueueName(getQueueName()); 395 userAclInfo.setUserAcls(operations); 396 return Collections.singletonList(userAclInfo); 397 } 398 getLastTimeAtMinShare()399 public long getLastTimeAtMinShare() { 400 return lastTimeAtMinShare; 401 } 402 setLastTimeAtMinShare(long lastTimeAtMinShare)403 private void setLastTimeAtMinShare(long lastTimeAtMinShare) { 404 this.lastTimeAtMinShare = lastTimeAtMinShare; 405 } 406 getLastTimeAtFairShareThreshold()407 public long getLastTimeAtFairShareThreshold() { 408 return lastTimeAtFairShareThreshold; 409 } 410 setLastTimeAtFairShareThreshold( long lastTimeAtFairShareThreshold)411 private void setLastTimeAtFairShareThreshold( 412 long lastTimeAtFairShareThreshold) { 413 this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; 414 } 415 416 @Override getNumRunnableApps()417 public int getNumRunnableApps() { 418 readLock.lock(); 419 try { 420 return runnableApps.size(); 421 } finally { 422 readLock.unlock(); 423 } 424 } 425 getNumNonRunnableApps()426 public int getNumNonRunnableApps() { 427 readLock.lock(); 428 try { 429 return nonRunnableApps.size(); 430 } finally { 431 readLock.unlock(); 432 } 433 } 434 getNumPendingApps()435 public int getNumPendingApps() { 436 int numPendingApps = 0; 437 readLock.lock(); 438 try { 439 for (FSAppAttempt attempt : runnableApps) { 440 if (attempt.isPending()) { 441 numPendingApps++; 442 } 443 } 444 numPendingApps += nonRunnableApps.size(); 445 } finally { 446 readLock.unlock(); 447 } 448 return numPendingApps; 449 } 450 451 /** 452 * TODO: Based on how frequently this is called, we might want to club 453 * counting pending and active apps in the same method. 454 */ getNumActiveApps()455 public int getNumActiveApps() { 456 int numActiveApps = 0; 457 readLock.lock(); 458 try { 459 for (FSAppAttempt attempt : runnableApps) { 460 if (!attempt.isPending()) { 461 numActiveApps++; 462 } 463 } 464 } finally { 465 readLock.unlock(); 466 } 467 return numActiveApps; 468 } 469 470 @Override getActiveUsersManager()471 public ActiveUsersManager getActiveUsersManager() { 472 return activeUsersManager; 473 } 474 475 /** 476 * Check whether this queue can run this application master under the 477 * maxAMShare limit 478 * 479 * @param amResource 480 * @return true if this queue can run 481 */ canRunAppAM(Resource amResource)482 public boolean canRunAppAM(Resource amResource) { 483 float maxAMShare = 484 scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName()); 485 if (Math.abs(maxAMShare - -1.0f) < 0.0001) { 486 return true; 487 } 488 Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare); 489 Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); 490 return !policy 491 .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource); 492 } 493 addAMResourceUsage(Resource amResource)494 public void addAMResourceUsage(Resource amResource) { 495 if (amResource != null) { 496 Resources.addTo(amResourceUsage, amResource); 497 } 498 } 499 500 @Override recoverContainer(Resource clusterResource, SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer)501 public void recoverContainer(Resource clusterResource, 502 SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { 503 // TODO Auto-generated method stub 504 } 505 506 /** 507 * Update the preemption fields for the queue, i.e. the times since last was 508 * at its guaranteed share and over its fair share threshold. 509 */ updateStarvationStats()510 public void updateStarvationStats() { 511 long now = scheduler.getClock().getTime(); 512 if (!isStarvedForMinShare()) { 513 setLastTimeAtMinShare(now); 514 } 515 if (!isStarvedForFairShare()) { 516 setLastTimeAtFairShareThreshold(now); 517 } 518 } 519 520 /** Allows setting weight for a dynamically created queue 521 * Currently only used for reservation based queues 522 * @param weight queue weight 523 */ setWeights(float weight)524 public void setWeights(float weight) { 525 scheduler.getAllocationConfiguration().setQueueWeight(getName(), 526 new ResourceWeights(weight)); 527 } 528 529 /** 530 * Helper method to check if the queue should preempt containers 531 * 532 * @return true if check passes (can preempt) or false otherwise 533 */ preemptContainerPreCheck()534 private boolean preemptContainerPreCheck() { 535 return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(), 536 getFairShare()); 537 } 538 539 /** 540 * Is a queue being starved for its min share. 541 */ 542 @VisibleForTesting isStarvedForMinShare()543 boolean isStarvedForMinShare() { 544 return isStarved(getMinShare()); 545 } 546 547 /** 548 * Is a queue being starved for its fair share threshold. 549 */ 550 @VisibleForTesting isStarvedForFairShare()551 boolean isStarvedForFairShare() { 552 return isStarved( 553 Resources.multiply(getFairShare(), getFairSharePreemptionThreshold())); 554 } 555 isStarved(Resource share)556 private boolean isStarved(Resource share) { 557 Resource desiredShare = Resources.min(scheduler.getResourceCalculator(), 558 scheduler.getClusterResource(), share, getDemand()); 559 return Resources.lessThan(scheduler.getResourceCalculator(), 560 scheduler.getClusterResource(), getResourceUsage(), desiredShare); 561 } 562 } 563