1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
20 
21 import java.util.HashMap;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicLong;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantReadWriteLock;
30 
31 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
32 import org.apache.hadoop.classification.InterfaceStability.Unstable;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.service.AbstractService;
35 import org.apache.hadoop.util.ReflectionUtils;
36 import org.apache.hadoop.yarn.api.records.ReservationId;
37 import org.apache.hadoop.yarn.api.records.Resource;
38 import org.apache.hadoop.yarn.conf.YarnConfiguration;
39 import org.apache.hadoop.yarn.exceptions.YarnException;
40 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
41 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
42 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
43 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
44 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
45 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
46 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
47 import org.apache.hadoop.yarn.util.Clock;
48 import org.apache.hadoop.yarn.util.UTCClock;
49 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 
53 /**
54  * This is the implementation of {@link ReservationSystem} based on the
55  * {@link ResourceScheduler}
56  */
57 @LimitedPrivate("yarn")
58 @Unstable
59 public abstract class AbstractReservationSystem extends AbstractService
60     implements ReservationSystem {
61 
62   private static final Logger LOG = LoggerFactory
63       .getLogger(AbstractReservationSystem.class);
64 
65   // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN
66 
67   private final ReentrantReadWriteLock readWriteLock =
68       new ReentrantReadWriteLock(true);
69   private final Lock readLock = readWriteLock.readLock();
70   private final Lock writeLock = readWriteLock.writeLock();
71 
72   private boolean initialized = false;
73 
74   private final Clock clock = new UTCClock();
75 
76   private AtomicLong resCounter = new AtomicLong();
77 
78   private Map<String, Plan> plans = new HashMap<String, Plan>();
79 
80   private Map<ReservationId, String> resQMap =
81       new HashMap<ReservationId, String>();
82 
83   private RMContext rmContext;
84 
85   private ResourceScheduler scheduler;
86 
87   private ScheduledExecutorService scheduledExecutorService;
88 
89   protected Configuration conf;
90 
91   protected long planStepSize;
92 
93   private PlanFollower planFollower;
94 
95   /**
96    * Construct the service.
97    *
98    * @param name service name
99    */
AbstractReservationSystem(String name)100   public AbstractReservationSystem(String name) {
101     super(name);
102   }
103 
104   @Override
setRMContext(RMContext rmContext)105   public void setRMContext(RMContext rmContext) {
106     writeLock.lock();
107     try {
108       this.rmContext = rmContext;
109     } finally {
110       writeLock.unlock();
111     }
112   }
113 
114   @Override
reinitialize(Configuration conf, RMContext rmContext)115   public void reinitialize(Configuration conf, RMContext rmContext)
116       throws YarnException {
117     writeLock.lock();
118     try {
119       if (!initialized) {
120         initialize(conf);
121         initialized = true;
122       } else {
123         initializeNewPlans(conf);
124       }
125     } finally {
126       writeLock.unlock();
127     }
128   }
129 
initialize(Configuration conf)130   private void initialize(Configuration conf) throws YarnException {
131     LOG.info("Initializing Reservation system");
132     this.conf = conf;
133     scheduler = rmContext.getScheduler();
134     // Get the plan step size
135     planStepSize =
136         conf.getTimeDuration(
137             YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
138             YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
139             TimeUnit.MILLISECONDS);
140     if (planStepSize < 0) {
141       planStepSize =
142           YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
143     }
144     // Create a plan corresponding to every reservable queue
145     Set<String> planQueueNames = scheduler.getPlanQueues();
146     for (String planQueueName : planQueueNames) {
147       Plan plan = initializePlan(planQueueName);
148       plans.put(planQueueName, plan);
149     }
150   }
151 
initializeNewPlans(Configuration conf)152   private void initializeNewPlans(Configuration conf) {
153     LOG.info("Refreshing Reservation system");
154     writeLock.lock();
155     try {
156       // Create a plan corresponding to every new reservable queue
157       Set<String> planQueueNames = scheduler.getPlanQueues();
158       for (String planQueueName : planQueueNames) {
159         if (!plans.containsKey(planQueueName)) {
160           Plan plan = initializePlan(planQueueName);
161           plans.put(planQueueName, plan);
162         } else {
163           LOG.warn("Plan based on reservation queue {0} already exists.",
164               planQueueName);
165         }
166       }
167       // Update the plan follower with the active plans
168       if (planFollower != null) {
169         planFollower.setPlans(plans.values());
170       }
171     } catch (YarnException e) {
172       LOG.warn("Exception while trying to refresh reservable queues", e);
173     } finally {
174       writeLock.unlock();
175     }
176   }
177 
createPlanFollower()178   private PlanFollower createPlanFollower() {
179     String planFollowerPolicyClassName =
180         conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
181             getDefaultPlanFollower());
182     if (planFollowerPolicyClassName == null) {
183       return null;
184     }
185     LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
186     try {
187       Class<?> planFollowerPolicyClazz =
188           conf.getClassByName(planFollowerPolicyClassName);
189       if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
190         return (PlanFollower) ReflectionUtils.newInstance(
191             planFollowerPolicyClazz, conf);
192       } else {
193         throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
194             + " not instance of " + PlanFollower.class.getCanonicalName());
195       }
196     } catch (ClassNotFoundException e) {
197       throw new YarnRuntimeException(
198           "Could not instantiate PlanFollowerPolicy: "
199               + planFollowerPolicyClassName, e);
200     }
201   }
202 
getDefaultPlanFollower()203   private String getDefaultPlanFollower() {
204     // currently only capacity scheduler is supported
205     if (scheduler instanceof CapacityScheduler) {
206       return CapacitySchedulerPlanFollower.class.getName();
207     } else if (scheduler instanceof FairScheduler) {
208       return FairSchedulerPlanFollower.class.getName();
209     }
210     return null;
211   }
212 
213   @Override
getPlan(String planName)214   public Plan getPlan(String planName) {
215     readLock.lock();
216     try {
217       return plans.get(planName);
218     } finally {
219       readLock.unlock();
220     }
221   }
222 
223   /**
224    * @return the planStepSize
225    */
226   @Override
getPlanFollowerTimeStep()227   public long getPlanFollowerTimeStep() {
228     readLock.lock();
229     try {
230       return planStepSize;
231     } finally {
232       readLock.unlock();
233     }
234   }
235 
236   @Override
synchronizePlan(String planName)237   public void synchronizePlan(String planName) {
238     writeLock.lock();
239     try {
240       Plan plan = plans.get(planName);
241       if (plan != null) {
242         planFollower.synchronizePlan(plan);
243       }
244     } finally {
245       writeLock.unlock();
246     }
247   }
248 
249   @Override
serviceInit(Configuration conf)250   public void serviceInit(Configuration conf) throws Exception {
251     Configuration configuration = new Configuration(conf);
252     reinitialize(configuration, rmContext);
253     // Create the plan follower with the active plans
254     planFollower = createPlanFollower();
255     if (planFollower != null) {
256       planFollower.init(clock, scheduler, plans.values());
257     }
258     super.serviceInit(conf);
259   }
260 
261   @Override
serviceStart()262   public void serviceStart() throws Exception {
263     if (planFollower != null) {
264       scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
265       scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
266           planStepSize, TimeUnit.MILLISECONDS);
267     }
268     super.serviceStart();
269   }
270 
271   @Override
serviceStop()272   public void serviceStop() {
273     // Stop the plan follower
274     if (scheduledExecutorService != null
275         && !scheduledExecutorService.isShutdown()) {
276       scheduledExecutorService.shutdown();
277     }
278     // Clear the plans
279     plans.clear();
280   }
281 
282   @Override
getQueueForReservation(ReservationId reservationId)283   public String getQueueForReservation(ReservationId reservationId) {
284     readLock.lock();
285     try {
286       return resQMap.get(reservationId);
287     } finally {
288       readLock.unlock();
289     }
290   }
291 
292   @Override
setQueueForReservation(ReservationId reservationId, String queueName)293   public void setQueueForReservation(ReservationId reservationId,
294       String queueName) {
295     writeLock.lock();
296     try {
297       resQMap.put(reservationId, queueName);
298     } finally {
299       writeLock.unlock();
300     }
301   }
302 
303   @Override
getNewReservationId()304   public ReservationId getNewReservationId() {
305     writeLock.lock();
306     try {
307       ReservationId resId =
308           ReservationId.newInstance(ResourceManager.getClusterTimeStamp(),
309               resCounter.incrementAndGet());
310       LOG.info("Allocated new reservationId: " + resId);
311       return resId;
312     } finally {
313       writeLock.unlock();
314     }
315   }
316 
317   @Override
getAllPlans()318   public Map<String, Plan> getAllPlans() {
319     return plans;
320   }
321 
322   /**
323    * Get the default reservation system corresponding to the scheduler
324    *
325    * @param scheduler the scheduler for which the reservation system is required
326    */
getDefaultReservationSystem(ResourceScheduler scheduler)327   public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
328     if (scheduler instanceof CapacityScheduler) {
329       return CapacityReservationSystem.class.getName();
330     } else if (scheduler instanceof FairScheduler) {
331       return FairReservationSystem.class.getName();
332     }
333     return null;
334   }
335 
initializePlan(String planQueueName)336   protected Plan initializePlan(String planQueueName) throws YarnException {
337     String planQueuePath = getPlanQueuePath(planQueueName);
338     SharingPolicy adPolicy = getAdmissionPolicy(planQueuePath);
339     adPolicy.init(planQueuePath, getReservationSchedulerConfiguration());
340     // Calculate the max plan capacity
341     Resource minAllocation = getMinAllocation();
342     Resource maxAllocation = getMaxAllocation();
343     ResourceCalculator rescCalc = getResourceCalculator();
344     Resource totCap = getPlanQueueCapacity(planQueueName);
345     Plan plan =
346         new InMemoryPlan(getRootQueueMetrics(), adPolicy,
347             getAgent(planQueuePath), totCap, planStepSize, rescCalc,
348             minAllocation, maxAllocation, planQueueName,
349             getReplanner(planQueuePath), getReservationSchedulerConfiguration()
350             .getMoveOnExpiry(planQueuePath));
351     LOG.info("Intialized plan {0} based on reservable queue {1}",
352         plan.toString(), planQueueName);
353     return plan;
354   }
355 
getReplanner(String planQueueName)356   protected Planner getReplanner(String planQueueName) {
357     ReservationSchedulerConfiguration reservationConfig =
358         getReservationSchedulerConfiguration();
359     String plannerClassName = reservationConfig.getReplanner(planQueueName);
360     LOG.info("Using Replanner: " + plannerClassName + " for queue: "
361         + planQueueName);
362     try {
363       Class<?> plannerClazz = conf.getClassByName(plannerClassName);
364       if (Planner.class.isAssignableFrom(plannerClazz)) {
365         Planner planner =
366             (Planner) ReflectionUtils.newInstance(plannerClazz, conf);
367         planner.init(planQueueName, reservationConfig);
368         return planner;
369       } else {
370         throw new YarnRuntimeException("Class: " + plannerClazz
371             + " not instance of " + Planner.class.getCanonicalName());
372       }
373     } catch (ClassNotFoundException e) {
374       throw new YarnRuntimeException("Could not instantiate Planner: "
375           + plannerClassName + " for queue: " + planQueueName, e);
376     }
377   }
378 
getAgent(String queueName)379   protected ReservationAgent getAgent(String queueName) {
380     ReservationSchedulerConfiguration reservationConfig =
381         getReservationSchedulerConfiguration();
382     String agentClassName = reservationConfig.getReservationAgent(queueName);
383     LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
384     try {
385       Class<?> agentClazz = conf.getClassByName(agentClassName);
386       if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
387         return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf);
388       } else {
389         throw new YarnRuntimeException("Class: " + agentClassName
390             + " not instance of " + ReservationAgent.class.getCanonicalName());
391       }
392     } catch (ClassNotFoundException e) {
393       throw new YarnRuntimeException("Could not instantiate Agent: "
394           + agentClassName + " for queue: " + queueName, e);
395     }
396   }
397 
getAdmissionPolicy(String queueName)398   protected SharingPolicy getAdmissionPolicy(String queueName) {
399     ReservationSchedulerConfiguration reservationConfig =
400         getReservationSchedulerConfiguration();
401     String admissionPolicyClassName =
402         reservationConfig.getReservationAdmissionPolicy(queueName);
403     LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
404         + " for queue: " + queueName);
405     try {
406       Class<?> admissionPolicyClazz =
407           conf.getClassByName(admissionPolicyClassName);
408       if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
409         return (SharingPolicy) ReflectionUtils.newInstance(
410             admissionPolicyClazz, conf);
411       } else {
412         throw new YarnRuntimeException("Class: " + admissionPolicyClassName
413             + " not instance of " + SharingPolicy.class.getCanonicalName());
414       }
415     } catch (ClassNotFoundException e) {
416       throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
417           + admissionPolicyClassName + " for queue: " + queueName, e);
418     }
419   }
420 
421   protected abstract ReservationSchedulerConfiguration
getReservationSchedulerConfiguration()422       getReservationSchedulerConfiguration();
423 
getPlanQueuePath(String planQueueName)424   protected abstract String getPlanQueuePath(String planQueueName);
425 
getPlanQueueCapacity(String planQueueName)426   protected abstract Resource getPlanQueueCapacity(String planQueueName);
427 
getMinAllocation()428   protected abstract Resource getMinAllocation();
429 
getMaxAllocation()430   protected abstract Resource getMaxAllocation();
431 
getResourceCalculator()432   protected abstract ResourceCalculator getResourceCalculator();
433 
getRootQueueMetrics()434   protected abstract QueueMetrics getRootQueueMetrics();
435 }
436