1 /******************************************************************************* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 *******************************************************************************/ 18 package org.apache.hadoop.yarn.server.resourcemanager.reservation; 19 20 import java.util.Date; 21 22 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; 23 import org.apache.hadoop.classification.InterfaceStability.Unstable; 24 import org.apache.hadoop.yarn.api.records.Resource; 25 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; 26 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; 27 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; 28 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; 29 import org.apache.hadoop.yarn.util.resource.Resources; 30 31 /** 32 * This policy enforces a time-extended notion of Capacity. In particular it 33 * guarantees that the allocation received in input when combined with all 34 * previous allocation for the user does not violate an instantaneous max limit 35 * on the resources received, and that for every window of time of length 36 * validWindow, the integral of the allocations for a user (sum of the currently 37 * submitted allocation and all prior allocations for the user) does not exceed 38 * validWindow * maxAvg. 39 * 40 * This allows flexibility, in the sense that an allocation can instantaneously 41 * use large portions of the available capacity, but prevents abuses by bounding 42 * the average use over time. 43 * 44 * By controlling maxInst, maxAvg, validWindow the administrator configuring 45 * this policy can obtain a behavior ranging from instantaneously enforced 46 * capacity (akin to existing queues), or fully flexible allocations (likely 47 * reserved to super-users, or trusted systems). 48 */ 49 @LimitedPrivate("yarn") 50 @Unstable 51 public class CapacityOverTimePolicy implements SharingPolicy { 52 53 private ReservationSchedulerConfiguration conf; 54 private long validWindow; 55 private float maxInst; 56 private float maxAvg; 57 58 // For now this is CapacityScheduler specific, but given a hierarchy in the 59 // configuration structure of the schedulers (e.g., SchedulerConfiguration) 60 // it should be easy to remove this limitation 61 @Override init(String reservationQueuePath, ReservationSchedulerConfiguration conf)62 public void init(String reservationQueuePath, 63 ReservationSchedulerConfiguration conf) { 64 this.conf = conf; 65 validWindow = this.conf.getReservationWindow(reservationQueuePath); 66 maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; 67 maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; 68 }; 69 70 @Override validate(Plan plan, ReservationAllocation reservation)71 public void validate(Plan plan, ReservationAllocation reservation) 72 throws PlanningException { 73 74 // this is entire method invoked under a write-lock on the plan, no need 75 // to synchronize accesses to the plan further 76 77 // Try to verify whether there is already a reservation with this ID in 78 // the system (remove its contribution during validation to simulate a 79 // try-n-swap 80 // update). 81 ReservationAllocation oldReservation = 82 plan.getReservationById(reservation.getReservationId()); 83 84 // sanity check that the update of a reservation is not changing username 85 if (oldReservation != null 86 && !oldReservation.getUser().equals(reservation.getUser())) { 87 throw new MismatchedUserException( 88 "Updating an existing reservation with mismatched user:" 89 + oldReservation.getUser() + " != " + reservation.getUser()); 90 } 91 92 long startTime = reservation.getStartTime(); 93 long endTime = reservation.getEndTime(); 94 long step = plan.getStep(); 95 96 Resource planTotalCapacity = plan.getTotalCapacity(); 97 98 Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg); 99 Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst); 100 101 // define variable that will store integral of resources (need diff class to 102 // avoid overflow issues for long/large allocations) 103 IntegralResource runningTot = new IntegralResource(0L, 0L); 104 IntegralResource maxAllowed = new IntegralResource(maxAvgRes); 105 maxAllowed.multiplyBy(validWindow / step); 106 107 // check that the resources offered to the user during any window of length 108 // "validWindow" overlapping this allocation are within maxAllowed 109 // also enforce instantaneous and physical constraints during this pass 110 for (long t = startTime - validWindow; t < endTime + validWindow; t += step) { 111 112 Resource currExistingAllocTot = plan.getTotalCommittedResources(t); 113 Resource currExistingAllocForUser = 114 plan.getConsumptionForUser(reservation.getUser(), t); 115 Resource currNewAlloc = reservation.getResourcesAtTime(t); 116 Resource currOldAlloc = Resources.none(); 117 if (oldReservation != null) { 118 currOldAlloc = oldReservation.getResourcesAtTime(t); 119 } 120 121 // throw exception if the cluster is overcommitted 122 // tot_allocated - old + new > capacity 123 Resource inst = 124 Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc), 125 currOldAlloc); 126 if (Resources.greaterThan(plan.getResourceCalculator(), 127 planTotalCapacity, inst, planTotalCapacity)) { 128 throw new ResourceOverCommitException(" Resources at time " + t 129 + " would be overcommitted (" + inst + " over " 130 + plan.getTotalCapacity() + ") by accepting reservation: " 131 + reservation.getReservationId()); 132 } 133 134 // throw exception if instantaneous limits are violated 135 // tot_alloc_to_this_user - old + new > inst_limit 136 if (Resources.greaterThan(plan.getResourceCalculator(), 137 planTotalCapacity, Resources.subtract( 138 Resources.add(currExistingAllocForUser, currNewAlloc), 139 currOldAlloc), maxInsRes)) { 140 throw new PlanningQuotaException("Instantaneous quota capacity " 141 + maxInst + " would be passed at time " + t 142 + " by accepting reservation: " + reservation.getReservationId()); 143 } 144 145 // throw exception if the running integral of utilization over validWindow 146 // is violated. We perform a delta check, adding/removing instants at the 147 // boundary of the window from runningTot. 148 149 // runningTot = previous_runningTot + currExistingAllocForUser + 150 // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc; 151 152 // Where: 153 // 1) currNewAlloc, currExistingAllocForUser represent the contribution of 154 // the instant in time added in this pass. 155 // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time 156 // instants that are being retired from the the window 157 // 3) currOldAlloc is the contribution (if any) of the previous version of 158 // this reservation (the one we are updating) 159 160 runningTot.add(currExistingAllocForUser); 161 runningTot.add(currNewAlloc); 162 runningTot.subtract(currOldAlloc); 163 164 // expire contributions from instant in time before (t - validWindow) 165 if (t > startTime) { 166 Resource pastOldAlloc = 167 plan.getConsumptionForUser(reservation.getUser(), t - validWindow); 168 Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow); 169 170 // runningTot = runningTot - pastExistingAlloc - pastNewAlloc; 171 runningTot.subtract(pastOldAlloc); 172 runningTot.subtract(pastNewAlloc); 173 } 174 175 // check integral 176 // runningTot > maxAvg * validWindow 177 // NOTE: we need to use comparator of IntegralResource directly, as 178 // Resource and ResourceCalculator assume "int" amount of resources, 179 // which is not sufficient when comparing integrals (out-of-bound) 180 if (maxAllowed.compareTo(runningTot) < 0) { 181 throw new PlanningQuotaException( 182 "Integral (avg over time) quota capacity " + maxAvg 183 + " over a window of " + validWindow / 1000 + " seconds, " 184 + " would be passed at time " + t + "(" + new Date(t) 185 + ") by accepting reservation: " 186 + reservation.getReservationId()); 187 } 188 } 189 } 190 191 @Override getValidWindow()192 public long getValidWindow() { 193 return validWindow; 194 } 195 196 /** 197 * This class provides support for Resource-like book-keeping, based on 198 * long(s), as using Resource to store the "integral" of the allocation over 199 * time leads to integer overflows for large allocations/clusters. (Evolving 200 * Resource to use long is too disruptive at this point.) 201 * 202 * The comparison/multiplication behaviors of IntegralResource are consistent 203 * with the DefaultResourceCalculator. 204 */ 205 private static class IntegralResource { 206 long memory; 207 long vcores; 208 IntegralResource(Resource resource)209 public IntegralResource(Resource resource) { 210 this.memory = resource.getMemory(); 211 this.vcores = resource.getVirtualCores(); 212 } 213 IntegralResource(long mem, long vcores)214 public IntegralResource(long mem, long vcores) { 215 this.memory = mem; 216 this.vcores = vcores; 217 } 218 add(Resource r)219 public void add(Resource r) { 220 memory += r.getMemory(); 221 vcores += r.getVirtualCores(); 222 } 223 subtract(Resource r)224 public void subtract(Resource r) { 225 memory -= r.getMemory(); 226 vcores -= r.getVirtualCores(); 227 } 228 multiplyBy(long window)229 public void multiplyBy(long window) { 230 memory = memory * window; 231 vcores = vcores * window; 232 } 233 compareTo(IntegralResource other)234 public long compareTo(IntegralResource other) { 235 long diff = memory - other.memory; 236 if (diff == 0) { 237 diff = vcores - other.vcores; 238 } 239 return diff; 240 } 241 242 @Override toString()243 public String toString() { 244 return "<memory:" + memory + ", vCores:" + vcores + ">"; 245 } 246 } 247 } 248