1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; 19 20 import org.apache.hadoop.classification.InterfaceAudience.Public; 21 import org.apache.hadoop.classification.InterfaceStability.Evolving; 22 import org.apache.hadoop.util.ReflectionUtils; 23 import org.apache.hadoop.util.StringUtils; 24 import org.apache.hadoop.yarn.api.records.Resource; 25 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; 26 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; 27 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; 28 29 import java.util.Collection; 30 import java.util.Comparator; 31 import java.util.concurrent.ConcurrentHashMap; 32 33 @Public 34 @Evolving 35 public abstract class SchedulingPolicy { 36 private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances = 37 new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>(); 38 39 public static final SchedulingPolicy DEFAULT_POLICY = 40 getInstance(FairSharePolicy.class); 41 42 public static final byte DEPTH_LEAF = (byte) 1; 43 public static final byte DEPTH_INTERMEDIATE = (byte) 2; 44 public static final byte DEPTH_ROOT = (byte) 4; 45 public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate 46 public static final byte DEPTH_ANY = (byte) 7; 47 48 /** 49 * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz 50 */ getInstance(Class<? extends SchedulingPolicy> clazz)51 public static SchedulingPolicy getInstance(Class<? extends SchedulingPolicy> clazz) { 52 SchedulingPolicy policy = ReflectionUtils.newInstance(clazz, null); 53 SchedulingPolicy policyRet = instances.putIfAbsent(clazz, policy); 54 if(policyRet != null) { 55 return policyRet; 56 } 57 return policy; 58 } 59 60 /** 61 * Returns {@link SchedulingPolicy} instance corresponding to the 62 * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for 63 * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for 64 * DominantResourceFairnessPolicy. For a custom 65 * {@link SchedulingPolicy}s in the RM classpath, the policy should be 66 * canonical class name of the {@link SchedulingPolicy}. 67 * 68 * @param policy canonical class name or "drf" or "fair" or "fifo" 69 * @throws AllocationConfigurationException 70 */ 71 @SuppressWarnings("unchecked") parse(String policy)72 public static SchedulingPolicy parse(String policy) 73 throws AllocationConfigurationException { 74 @SuppressWarnings("rawtypes") 75 Class clazz; 76 String text = StringUtils.toLowerCase(policy); 77 if (text.equalsIgnoreCase(FairSharePolicy.NAME)) { 78 clazz = FairSharePolicy.class; 79 } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) { 80 clazz = FifoPolicy.class; 81 } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) { 82 clazz = DominantResourceFairnessPolicy.class; 83 } else { 84 try { 85 clazz = Class.forName(policy); 86 } catch (ClassNotFoundException cnfe) { 87 throw new AllocationConfigurationException(policy 88 + " SchedulingPolicy class not found!"); 89 } 90 } 91 if (!SchedulingPolicy.class.isAssignableFrom(clazz)) { 92 throw new AllocationConfigurationException(policy 93 + " does not extend SchedulingPolicy"); 94 } 95 return getInstance(clazz); 96 } 97 initialize(Resource clusterCapacity)98 public void initialize(Resource clusterCapacity) {} 99 100 /** 101 * @return returns the name of {@link SchedulingPolicy} 102 */ getName()103 public abstract String getName(); 104 105 /** 106 * Specifies the depths in the hierarchy, this {@link SchedulingPolicy} 107 * applies to 108 * 109 * @return depth equal to one of fields {@link SchedulingPolicy}#DEPTH_* 110 */ getApplicableDepth()111 public abstract byte getApplicableDepth(); 112 113 /** 114 * Checks if the specified {@link SchedulingPolicy} can be used for a queue at 115 * the specified depth in the hierarchy 116 * 117 * @param policy {@link SchedulingPolicy} we are checking the 118 * depth-applicability for 119 * @param depth queue's depth in the hierarchy 120 * @return true if policy is applicable to passed depth, false otherwise 121 */ isApplicableTo(SchedulingPolicy policy, byte depth)122 public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) { 123 return ((policy.getApplicableDepth() & depth) == depth) ? true : false; 124 } 125 126 /** 127 * The comparator returned by this method is to be used for sorting the 128 * {@link Schedulable}s in that queue. 129 * 130 * @return the comparator to sort by 131 */ getComparator()132 public abstract Comparator<Schedulable> getComparator(); 133 134 /** 135 * Computes and updates the shares of {@link Schedulable}s as per 136 * the {@link SchedulingPolicy}, to be used later for scheduling decisions. 137 * The shares computed are instantaneous and only consider queues with 138 * running applications. 139 * 140 * @param schedulables {@link Schedulable}s whose shares are to be updated 141 * @param totalResources Total {@link Resource}s in the cluster 142 */ computeShares( Collection<? extends Schedulable> schedulables, Resource totalResources)143 public abstract void computeShares( 144 Collection<? extends Schedulable> schedulables, Resource totalResources); 145 146 /** 147 * Computes and updates the steady shares of {@link FSQueue}s as per the 148 * {@link SchedulingPolicy}. The steady share does not differentiate 149 * between queues with and without running applications under them. The 150 * steady share is not used for scheduling, it is displayed on the Web UI 151 * for better visibility. 152 * 153 * @param queues {@link FSQueue}s whose shares are to be updated 154 * @param totalResources Total {@link Resource}s in the cluster 155 */ computeSteadyShares( Collection<? extends FSQueue> queues, Resource totalResources)156 public abstract void computeSteadyShares( 157 Collection<? extends FSQueue> queues, Resource totalResources); 158 159 /** 160 * Check if the resource usage is over the fair share under this policy 161 * 162 * @param usage {@link Resource} the resource usage 163 * @param fairShare {@link Resource} the fair share 164 * @return true if check passes (is over) or false otherwise 165 */ checkIfUsageOverFairShare( Resource usage, Resource fairShare)166 public abstract boolean checkIfUsageOverFairShare( 167 Resource usage, Resource fairShare); 168 169 /** 170 * Check if a leaf queue's AM resource usage over its limit under this policy 171 * 172 * @param usage {@link Resource} the resource used by application masters 173 * @param maxAMResource {@link Resource} the maximum allowed resource for 174 * application masters 175 * @return true if AM resource usage is over the limit 176 */ checkIfAMResourceUsageOverLimit( Resource usage, Resource maxAMResource)177 public abstract boolean checkIfAMResourceUsageOverLimit( 178 Resource usage, Resource maxAMResource); 179 180 /** 181 * Get headroom by calculating the min of <code>clusterAvailable</code> and 182 * (<code>queueFairShare</code> - <code>queueUsage</code>) resources that are 183 * applicable to this policy. For eg if only memory then leave other 184 * resources such as CPU to same as clusterAvailable. 185 * 186 * @param queueFairShare fairshare in the queue 187 * @param queueUsage resources used in the queue 188 * @param maxAvailable available resource in cluster for this queue 189 * @return calculated headroom 190 */ getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable)191 public abstract Resource getHeadroom(Resource queueFairShare, 192 Resource queueUsage, Resource maxAvailable); 193 194 } 195