1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
19 
20 import org.apache.hadoop.classification.InterfaceAudience.Public;
21 import org.apache.hadoop.classification.InterfaceStability.Evolving;
22 import org.apache.hadoop.util.ReflectionUtils;
23 import org.apache.hadoop.util.StringUtils;
24 import org.apache.hadoop.yarn.api.records.Resource;
25 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
26 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
27 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
28 
29 import java.util.Collection;
30 import java.util.Comparator;
31 import java.util.concurrent.ConcurrentHashMap;
32 
33 @Public
34 @Evolving
35 public abstract class SchedulingPolicy {
36   private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
37       new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();
38 
39   public static final SchedulingPolicy DEFAULT_POLICY =
40       getInstance(FairSharePolicy.class);
41 
42   public static final byte DEPTH_LEAF = (byte) 1;
43   public static final byte DEPTH_INTERMEDIATE = (byte) 2;
44   public static final byte DEPTH_ROOT = (byte) 4;
45   public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate
46   public static final byte DEPTH_ANY = (byte) 7;
47 
48   /**
49    * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz
50    */
getInstance(Class<? extends SchedulingPolicy> clazz)51   public static SchedulingPolicy getInstance(Class<? extends SchedulingPolicy> clazz) {
52     SchedulingPolicy policy = ReflectionUtils.newInstance(clazz, null);
53     SchedulingPolicy policyRet = instances.putIfAbsent(clazz, policy);
54     if(policyRet != null) {
55       return policyRet;
56     }
57     return policy;
58   }
59 
60   /**
61    * Returns {@link SchedulingPolicy} instance corresponding to the
62    * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
63    * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for
64    * DominantResourceFairnessPolicy. For a custom
65    * {@link SchedulingPolicy}s in the RM classpath, the policy should be
66    * canonical class name of the {@link SchedulingPolicy}.
67    *
68    * @param policy canonical class name or "drf" or "fair" or "fifo"
69    * @throws AllocationConfigurationException
70    */
71   @SuppressWarnings("unchecked")
parse(String policy)72   public static SchedulingPolicy parse(String policy)
73       throws AllocationConfigurationException {
74     @SuppressWarnings("rawtypes")
75     Class clazz;
76     String text = StringUtils.toLowerCase(policy);
77     if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
78       clazz = FairSharePolicy.class;
79     } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
80       clazz = FifoPolicy.class;
81     } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
82       clazz = DominantResourceFairnessPolicy.class;
83     } else {
84       try {
85         clazz = Class.forName(policy);
86       } catch (ClassNotFoundException cnfe) {
87         throw new AllocationConfigurationException(policy
88             + " SchedulingPolicy class not found!");
89       }
90     }
91     if (!SchedulingPolicy.class.isAssignableFrom(clazz)) {
92       throw new AllocationConfigurationException(policy
93           + " does not extend SchedulingPolicy");
94     }
95     return getInstance(clazz);
96   }
97 
initialize(Resource clusterCapacity)98   public void initialize(Resource clusterCapacity) {}
99 
100   /**
101    * @return returns the name of {@link SchedulingPolicy}
102    */
getName()103   public abstract String getName();
104 
105   /**
106    * Specifies the depths in the hierarchy, this {@link SchedulingPolicy}
107    * applies to
108    *
109    * @return depth equal to one of fields {@link SchedulingPolicy}#DEPTH_*
110    */
getApplicableDepth()111   public abstract byte getApplicableDepth();
112 
113   /**
114    * Checks if the specified {@link SchedulingPolicy} can be used for a queue at
115    * the specified depth in the hierarchy
116    *
117    * @param policy {@link SchedulingPolicy} we are checking the
118    *          depth-applicability for
119    * @param depth queue's depth in the hierarchy
120    * @return true if policy is applicable to passed depth, false otherwise
121    */
isApplicableTo(SchedulingPolicy policy, byte depth)122   public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) {
123     return ((policy.getApplicableDepth() & depth) == depth) ? true : false;
124   }
125 
126   /**
127    * The comparator returned by this method is to be used for sorting the
128    * {@link Schedulable}s in that queue.
129    *
130    * @return the comparator to sort by
131    */
getComparator()132   public abstract Comparator<Schedulable> getComparator();
133 
134   /**
135    * Computes and updates the shares of {@link Schedulable}s as per
136    * the {@link SchedulingPolicy}, to be used later for scheduling decisions.
137    * The shares computed are instantaneous and only consider queues with
138    * running applications.
139    *
140    * @param schedulables {@link Schedulable}s whose shares are to be updated
141    * @param totalResources Total {@link Resource}s in the cluster
142    */
computeShares( Collection<? extends Schedulable> schedulables, Resource totalResources)143   public abstract void computeShares(
144       Collection<? extends Schedulable> schedulables, Resource totalResources);
145 
146   /**
147    * Computes and updates the steady shares of {@link FSQueue}s as per the
148    * {@link SchedulingPolicy}. The steady share does not differentiate
149    * between queues with and without running applications under them. The
150    * steady share is not used for scheduling, it is displayed on the Web UI
151    * for better visibility.
152    *
153    * @param queues {@link FSQueue}s whose shares are to be updated
154    * @param totalResources Total {@link Resource}s in the cluster
155    */
computeSteadyShares( Collection<? extends FSQueue> queues, Resource totalResources)156   public abstract void computeSteadyShares(
157       Collection<? extends FSQueue> queues, Resource totalResources);
158 
159   /**
160    * Check if the resource usage is over the fair share under this policy
161    *
162    * @param usage {@link Resource} the resource usage
163    * @param fairShare {@link Resource} the fair share
164    * @return true if check passes (is over) or false otherwise
165    */
checkIfUsageOverFairShare( Resource usage, Resource fairShare)166   public abstract boolean checkIfUsageOverFairShare(
167       Resource usage, Resource fairShare);
168 
169   /**
170    * Check if a leaf queue's AM resource usage over its limit under this policy
171    *
172    * @param usage {@link Resource} the resource used by application masters
173    * @param maxAMResource {@link Resource} the maximum allowed resource for
174    *                                      application masters
175    * @return true if AM resource usage is over the limit
176    */
checkIfAMResourceUsageOverLimit( Resource usage, Resource maxAMResource)177   public abstract boolean checkIfAMResourceUsageOverLimit(
178       Resource usage, Resource maxAMResource);
179 
180   /**
181    * Get headroom by calculating the min of <code>clusterAvailable</code> and
182    * (<code>queueFairShare</code> - <code>queueUsage</code>) resources that are
183    * applicable to this policy. For eg if only memory then leave other
184    * resources such as CPU to same as clusterAvailable.
185    *
186    * @param queueFairShare fairshare in the queue
187    * @param queueUsage resources used in the queue
188    * @param maxAvailable available resource in cluster for this queue
189    * @return calculated headroom
190    */
getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable)191   public abstract Resource getHeadroom(Resource queueFairShare,
192       Resource queueUsage, Resource maxAvailable);
193 
194 }
195