1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager.reservation; 20 21 import java.util.HashMap; 22 import java.util.Map; 23 import java.util.Set; 24 import java.util.concurrent.ScheduledExecutorService; 25 import java.util.concurrent.ScheduledThreadPoolExecutor; 26 import java.util.concurrent.TimeUnit; 27 import java.util.concurrent.atomic.AtomicLong; 28 import java.util.concurrent.locks.Lock; 29 import java.util.concurrent.locks.ReentrantReadWriteLock; 30 31 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; 32 import org.apache.hadoop.classification.InterfaceStability.Unstable; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.service.AbstractService; 35 import org.apache.hadoop.util.ReflectionUtils; 36 import org.apache.hadoop.yarn.api.records.ReservationId; 37 import org.apache.hadoop.yarn.api.records.Resource; 38 import org.apache.hadoop.yarn.conf.YarnConfiguration; 39 import org.apache.hadoop.yarn.exceptions.YarnException; 40 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 41 import org.apache.hadoop.yarn.server.resourcemanager.RMContext; 42 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; 43 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; 44 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; 45 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; 46 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; 47 import org.apache.hadoop.yarn.util.Clock; 48 import org.apache.hadoop.yarn.util.UTCClock; 49 import org.apache.hadoop.yarn.util.resource.ResourceCalculator; 50 import org.slf4j.Logger; 51 import org.slf4j.LoggerFactory; 52 53 /** 54 * This is the implementation of {@link ReservationSystem} based on the 55 * {@link ResourceScheduler} 56 */ 57 @LimitedPrivate("yarn") 58 @Unstable 59 public abstract class AbstractReservationSystem extends AbstractService 60 implements ReservationSystem { 61 62 private static final Logger LOG = LoggerFactory 63 .getLogger(AbstractReservationSystem.class); 64 65 // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN 66 67 private final ReentrantReadWriteLock readWriteLock = 68 new ReentrantReadWriteLock(true); 69 private final Lock readLock = readWriteLock.readLock(); 70 private final Lock writeLock = readWriteLock.writeLock(); 71 72 private boolean initialized = false; 73 74 private final Clock clock = new UTCClock(); 75 76 private AtomicLong resCounter = new AtomicLong(); 77 78 private Map<String, Plan> plans = new HashMap<String, Plan>(); 79 80 private Map<ReservationId, String> resQMap = 81 new HashMap<ReservationId, String>(); 82 83 private RMContext rmContext; 84 85 private ResourceScheduler scheduler; 86 87 private ScheduledExecutorService scheduledExecutorService; 88 89 protected Configuration conf; 90 91 protected long planStepSize; 92 93 private PlanFollower planFollower; 94 95 /** 96 * Construct the service. 97 * 98 * @param name service name 99 */ AbstractReservationSystem(String name)100 public AbstractReservationSystem(String name) { 101 super(name); 102 } 103 104 @Override setRMContext(RMContext rmContext)105 public void setRMContext(RMContext rmContext) { 106 writeLock.lock(); 107 try { 108 this.rmContext = rmContext; 109 } finally { 110 writeLock.unlock(); 111 } 112 } 113 114 @Override reinitialize(Configuration conf, RMContext rmContext)115 public void reinitialize(Configuration conf, RMContext rmContext) 116 throws YarnException { 117 writeLock.lock(); 118 try { 119 if (!initialized) { 120 initialize(conf); 121 initialized = true; 122 } else { 123 initializeNewPlans(conf); 124 } 125 } finally { 126 writeLock.unlock(); 127 } 128 } 129 initialize(Configuration conf)130 private void initialize(Configuration conf) throws YarnException { 131 LOG.info("Initializing Reservation system"); 132 this.conf = conf; 133 scheduler = rmContext.getScheduler(); 134 // Get the plan step size 135 planStepSize = 136 conf.getTimeDuration( 137 YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, 138 YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, 139 TimeUnit.MILLISECONDS); 140 if (planStepSize < 0) { 141 planStepSize = 142 YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP; 143 } 144 // Create a plan corresponding to every reservable queue 145 Set<String> planQueueNames = scheduler.getPlanQueues(); 146 for (String planQueueName : planQueueNames) { 147 Plan plan = initializePlan(planQueueName); 148 plans.put(planQueueName, plan); 149 } 150 } 151 initializeNewPlans(Configuration conf)152 private void initializeNewPlans(Configuration conf) { 153 LOG.info("Refreshing Reservation system"); 154 writeLock.lock(); 155 try { 156 // Create a plan corresponding to every new reservable queue 157 Set<String> planQueueNames = scheduler.getPlanQueues(); 158 for (String planQueueName : planQueueNames) { 159 if (!plans.containsKey(planQueueName)) { 160 Plan plan = initializePlan(planQueueName); 161 plans.put(planQueueName, plan); 162 } else { 163 LOG.warn("Plan based on reservation queue {0} already exists.", 164 planQueueName); 165 } 166 } 167 // Update the plan follower with the active plans 168 if (planFollower != null) { 169 planFollower.setPlans(plans.values()); 170 } 171 } catch (YarnException e) { 172 LOG.warn("Exception while trying to refresh reservable queues", e); 173 } finally { 174 writeLock.unlock(); 175 } 176 } 177 createPlanFollower()178 private PlanFollower createPlanFollower() { 179 String planFollowerPolicyClassName = 180 conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER, 181 getDefaultPlanFollower()); 182 if (planFollowerPolicyClassName == null) { 183 return null; 184 } 185 LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName); 186 try { 187 Class<?> planFollowerPolicyClazz = 188 conf.getClassByName(planFollowerPolicyClassName); 189 if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) { 190 return (PlanFollower) ReflectionUtils.newInstance( 191 planFollowerPolicyClazz, conf); 192 } else { 193 throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName 194 + " not instance of " + PlanFollower.class.getCanonicalName()); 195 } 196 } catch (ClassNotFoundException e) { 197 throw new YarnRuntimeException( 198 "Could not instantiate PlanFollowerPolicy: " 199 + planFollowerPolicyClassName, e); 200 } 201 } 202 getDefaultPlanFollower()203 private String getDefaultPlanFollower() { 204 // currently only capacity scheduler is supported 205 if (scheduler instanceof CapacityScheduler) { 206 return CapacitySchedulerPlanFollower.class.getName(); 207 } else if (scheduler instanceof FairScheduler) { 208 return FairSchedulerPlanFollower.class.getName(); 209 } 210 return null; 211 } 212 213 @Override getPlan(String planName)214 public Plan getPlan(String planName) { 215 readLock.lock(); 216 try { 217 return plans.get(planName); 218 } finally { 219 readLock.unlock(); 220 } 221 } 222 223 /** 224 * @return the planStepSize 225 */ 226 @Override getPlanFollowerTimeStep()227 public long getPlanFollowerTimeStep() { 228 readLock.lock(); 229 try { 230 return planStepSize; 231 } finally { 232 readLock.unlock(); 233 } 234 } 235 236 @Override synchronizePlan(String planName)237 public void synchronizePlan(String planName) { 238 writeLock.lock(); 239 try { 240 Plan plan = plans.get(planName); 241 if (plan != null) { 242 planFollower.synchronizePlan(plan); 243 } 244 } finally { 245 writeLock.unlock(); 246 } 247 } 248 249 @Override serviceInit(Configuration conf)250 public void serviceInit(Configuration conf) throws Exception { 251 Configuration configuration = new Configuration(conf); 252 reinitialize(configuration, rmContext); 253 // Create the plan follower with the active plans 254 planFollower = createPlanFollower(); 255 if (planFollower != null) { 256 planFollower.init(clock, scheduler, plans.values()); 257 } 258 super.serviceInit(conf); 259 } 260 261 @Override serviceStart()262 public void serviceStart() throws Exception { 263 if (planFollower != null) { 264 scheduledExecutorService = new ScheduledThreadPoolExecutor(1); 265 scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L, 266 planStepSize, TimeUnit.MILLISECONDS); 267 } 268 super.serviceStart(); 269 } 270 271 @Override serviceStop()272 public void serviceStop() { 273 // Stop the plan follower 274 if (scheduledExecutorService != null 275 && !scheduledExecutorService.isShutdown()) { 276 scheduledExecutorService.shutdown(); 277 } 278 // Clear the plans 279 plans.clear(); 280 } 281 282 @Override getQueueForReservation(ReservationId reservationId)283 public String getQueueForReservation(ReservationId reservationId) { 284 readLock.lock(); 285 try { 286 return resQMap.get(reservationId); 287 } finally { 288 readLock.unlock(); 289 } 290 } 291 292 @Override setQueueForReservation(ReservationId reservationId, String queueName)293 public void setQueueForReservation(ReservationId reservationId, 294 String queueName) { 295 writeLock.lock(); 296 try { 297 resQMap.put(reservationId, queueName); 298 } finally { 299 writeLock.unlock(); 300 } 301 } 302 303 @Override getNewReservationId()304 public ReservationId getNewReservationId() { 305 writeLock.lock(); 306 try { 307 ReservationId resId = 308 ReservationId.newInstance(ResourceManager.getClusterTimeStamp(), 309 resCounter.incrementAndGet()); 310 LOG.info("Allocated new reservationId: " + resId); 311 return resId; 312 } finally { 313 writeLock.unlock(); 314 } 315 } 316 317 @Override getAllPlans()318 public Map<String, Plan> getAllPlans() { 319 return plans; 320 } 321 322 /** 323 * Get the default reservation system corresponding to the scheduler 324 * 325 * @param scheduler the scheduler for which the reservation system is required 326 */ getDefaultReservationSystem(ResourceScheduler scheduler)327 public static String getDefaultReservationSystem(ResourceScheduler scheduler) { 328 if (scheduler instanceof CapacityScheduler) { 329 return CapacityReservationSystem.class.getName(); 330 } else if (scheduler instanceof FairScheduler) { 331 return FairReservationSystem.class.getName(); 332 } 333 return null; 334 } 335 initializePlan(String planQueueName)336 protected Plan initializePlan(String planQueueName) throws YarnException { 337 String planQueuePath = getPlanQueuePath(planQueueName); 338 SharingPolicy adPolicy = getAdmissionPolicy(planQueuePath); 339 adPolicy.init(planQueuePath, getReservationSchedulerConfiguration()); 340 // Calculate the max plan capacity 341 Resource minAllocation = getMinAllocation(); 342 Resource maxAllocation = getMaxAllocation(); 343 ResourceCalculator rescCalc = getResourceCalculator(); 344 Resource totCap = getPlanQueueCapacity(planQueueName); 345 Plan plan = 346 new InMemoryPlan(getRootQueueMetrics(), adPolicy, 347 getAgent(planQueuePath), totCap, planStepSize, rescCalc, 348 minAllocation, maxAllocation, planQueueName, 349 getReplanner(planQueuePath), getReservationSchedulerConfiguration() 350 .getMoveOnExpiry(planQueuePath)); 351 LOG.info("Intialized plan {0} based on reservable queue {1}", 352 plan.toString(), planQueueName); 353 return plan; 354 } 355 getReplanner(String planQueueName)356 protected Planner getReplanner(String planQueueName) { 357 ReservationSchedulerConfiguration reservationConfig = 358 getReservationSchedulerConfiguration(); 359 String plannerClassName = reservationConfig.getReplanner(planQueueName); 360 LOG.info("Using Replanner: " + plannerClassName + " for queue: " 361 + planQueueName); 362 try { 363 Class<?> plannerClazz = conf.getClassByName(plannerClassName); 364 if (Planner.class.isAssignableFrom(plannerClazz)) { 365 Planner planner = 366 (Planner) ReflectionUtils.newInstance(plannerClazz, conf); 367 planner.init(planQueueName, reservationConfig); 368 return planner; 369 } else { 370 throw new YarnRuntimeException("Class: " + plannerClazz 371 + " not instance of " + Planner.class.getCanonicalName()); 372 } 373 } catch (ClassNotFoundException e) { 374 throw new YarnRuntimeException("Could not instantiate Planner: " 375 + plannerClassName + " for queue: " + planQueueName, e); 376 } 377 } 378 getAgent(String queueName)379 protected ReservationAgent getAgent(String queueName) { 380 ReservationSchedulerConfiguration reservationConfig = 381 getReservationSchedulerConfiguration(); 382 String agentClassName = reservationConfig.getReservationAgent(queueName); 383 LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName); 384 try { 385 Class<?> agentClazz = conf.getClassByName(agentClassName); 386 if (ReservationAgent.class.isAssignableFrom(agentClazz)) { 387 return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf); 388 } else { 389 throw new YarnRuntimeException("Class: " + agentClassName 390 + " not instance of " + ReservationAgent.class.getCanonicalName()); 391 } 392 } catch (ClassNotFoundException e) { 393 throw new YarnRuntimeException("Could not instantiate Agent: " 394 + agentClassName + " for queue: " + queueName, e); 395 } 396 } 397 getAdmissionPolicy(String queueName)398 protected SharingPolicy getAdmissionPolicy(String queueName) { 399 ReservationSchedulerConfiguration reservationConfig = 400 getReservationSchedulerConfiguration(); 401 String admissionPolicyClassName = 402 reservationConfig.getReservationAdmissionPolicy(queueName); 403 LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName 404 + " for queue: " + queueName); 405 try { 406 Class<?> admissionPolicyClazz = 407 conf.getClassByName(admissionPolicyClassName); 408 if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { 409 return (SharingPolicy) ReflectionUtils.newInstance( 410 admissionPolicyClazz, conf); 411 } else { 412 throw new YarnRuntimeException("Class: " + admissionPolicyClassName 413 + " not instance of " + SharingPolicy.class.getCanonicalName()); 414 } 415 } catch (ClassNotFoundException e) { 416 throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " 417 + admissionPolicyClassName + " for queue: " + queueName, e); 418 } 419 } 420 421 protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration()422 getReservationSchedulerConfiguration(); 423 getPlanQueuePath(String planQueueName)424 protected abstract String getPlanQueuePath(String planQueueName); 425 getPlanQueueCapacity(String planQueueName)426 protected abstract Resource getPlanQueueCapacity(String planQueueName); 427 getMinAllocation()428 protected abstract Resource getMinAllocation(); 429 getMaxAllocation()430 protected abstract Resource getMaxAllocation(); 431 getResourceCalculator()432 protected abstract ResourceCalculator getResourceCalculator(); 433 getRootQueueMetrics()434 protected abstract QueueMetrics getRootQueueMetrics(); 435 } 436