1 /*******************************************************************************
2  *   Licensed to the Apache Software Foundation (ASF) under one
3  *   or more contributor license agreements.  See the NOTICE file
4  *   distributed with this work for additional information
5  *   regarding copyright ownership.  The ASF licenses this file
6  *   to you under the Apache License, Version 2.0 (the
7  *   "License"); you may not use this file except in compliance
8  *   with the License.  You may obtain a copy of the License at
9  *
10  *       http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *   Unless required by applicable law or agreed to in writing, software
13  *   distributed under the License is distributed on an "AS IS" BASIS,
14  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *   See the License for the specific language governing permissions and
16  *   limitations under the License.
17  *******************************************************************************/
18 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
19 
20 import java.util.Date;
21 
22 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
23 import org.apache.hadoop.classification.InterfaceStability.Unstable;
24 import org.apache.hadoop.yarn.api.records.Resource;
25 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
26 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
27 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
28 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
29 import org.apache.hadoop.yarn.util.resource.Resources;
30 
31 /**
32  * This policy enforces a time-extended notion of Capacity. In particular it
33  * guarantees that the allocation received in input when combined with all
34  * previous allocation for the user does not violate an instantaneous max limit
35  * on the resources received, and that for every window of time of length
36  * validWindow, the integral of the allocations for a user (sum of the currently
37  * submitted allocation and all prior allocations for the user) does not exceed
38  * validWindow * maxAvg.
39  *
40  * This allows flexibility, in the sense that an allocation can instantaneously
41  * use large portions of the available capacity, but prevents abuses by bounding
42  * the average use over time.
43  *
44  * By controlling maxInst, maxAvg, validWindow the administrator configuring
45  * this policy can obtain a behavior ranging from instantaneously enforced
46  * capacity (akin to existing queues), or fully flexible allocations (likely
47  * reserved to super-users, or trusted systems).
48  */
49 @LimitedPrivate("yarn")
50 @Unstable
51 public class CapacityOverTimePolicy implements SharingPolicy {
52 
53   private ReservationSchedulerConfiguration conf;
54   private long validWindow;
55   private float maxInst;
56   private float maxAvg;
57 
58   // For now this is CapacityScheduler specific, but given a hierarchy in the
59   // configuration structure of the schedulers (e.g., SchedulerConfiguration)
60   // it should be easy to remove this limitation
61   @Override
init(String reservationQueuePath, ReservationSchedulerConfiguration conf)62   public void init(String reservationQueuePath,
63       ReservationSchedulerConfiguration conf) {
64     this.conf = conf;
65     validWindow = this.conf.getReservationWindow(reservationQueuePath);
66     maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
67     maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
68   };
69 
70   @Override
validate(Plan plan, ReservationAllocation reservation)71   public void validate(Plan plan, ReservationAllocation reservation)
72       throws PlanningException {
73 
74     // this is entire method invoked under a write-lock on the plan, no need
75     // to synchronize accesses to the plan further
76 
77     // Try to verify whether there is already a reservation with this ID in
78     // the system (remove its contribution during validation to simulate a
79     // try-n-swap
80     // update).
81     ReservationAllocation oldReservation =
82         plan.getReservationById(reservation.getReservationId());
83 
84     // sanity check that the update of a reservation is not changing username
85     if (oldReservation != null
86         && !oldReservation.getUser().equals(reservation.getUser())) {
87       throw new MismatchedUserException(
88           "Updating an existing reservation with mismatched user:"
89               + oldReservation.getUser() + " != " + reservation.getUser());
90     }
91 
92     long startTime = reservation.getStartTime();
93     long endTime = reservation.getEndTime();
94     long step = plan.getStep();
95 
96     Resource planTotalCapacity = plan.getTotalCapacity();
97 
98     Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg);
99     Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
100 
101     // define variable that will store integral of resources (need diff class to
102     // avoid overflow issues for long/large allocations)
103     IntegralResource runningTot = new IntegralResource(0L, 0L);
104     IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
105     maxAllowed.multiplyBy(validWindow / step);
106 
107     // check that the resources offered to the user during any window of length
108     // "validWindow" overlapping this allocation are within maxAllowed
109     // also enforce instantaneous and physical constraints during this pass
110     for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
111 
112       Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
113       Resource currExistingAllocForUser =
114           plan.getConsumptionForUser(reservation.getUser(), t);
115       Resource currNewAlloc = reservation.getResourcesAtTime(t);
116       Resource currOldAlloc = Resources.none();
117       if (oldReservation != null) {
118         currOldAlloc = oldReservation.getResourcesAtTime(t);
119       }
120 
121       // throw exception if the cluster is overcommitted
122       // tot_allocated - old + new > capacity
123       Resource inst =
124           Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc),
125               currOldAlloc);
126       if (Resources.greaterThan(plan.getResourceCalculator(),
127           planTotalCapacity, inst, planTotalCapacity)) {
128         throw new ResourceOverCommitException(" Resources at time " + t
129             + " would be overcommitted (" + inst + " over "
130             + plan.getTotalCapacity() + ") by accepting reservation: "
131             + reservation.getReservationId());
132       }
133 
134       // throw exception if instantaneous limits are violated
135       // tot_alloc_to_this_user - old + new > inst_limit
136       if (Resources.greaterThan(plan.getResourceCalculator(),
137           planTotalCapacity, Resources.subtract(
138               Resources.add(currExistingAllocForUser, currNewAlloc),
139               currOldAlloc), maxInsRes)) {
140         throw new PlanningQuotaException("Instantaneous quota capacity "
141             + maxInst + " would be passed at time " + t
142             + " by accepting reservation: " + reservation.getReservationId());
143       }
144 
145       // throw exception if the running integral of utilization over validWindow
146       // is violated. We perform a delta check, adding/removing instants at the
147       // boundary of the window from runningTot.
148 
149       // runningTot = previous_runningTot + currExistingAllocForUser +
150       // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc;
151 
152       // Where:
153       // 1) currNewAlloc, currExistingAllocForUser represent the contribution of
154       // the instant in time added in this pass.
155       // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time
156       // instants that are being retired from the the window
157       // 3) currOldAlloc is the contribution (if any) of the previous version of
158       // this reservation (the one we are updating)
159 
160       runningTot.add(currExistingAllocForUser);
161       runningTot.add(currNewAlloc);
162       runningTot.subtract(currOldAlloc);
163 
164       // expire contributions from instant in time before (t - validWindow)
165       if (t > startTime) {
166         Resource pastOldAlloc =
167             plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
168         Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
169 
170         // runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
171         runningTot.subtract(pastOldAlloc);
172         runningTot.subtract(pastNewAlloc);
173       }
174 
175       // check integral
176       // runningTot > maxAvg * validWindow
177       // NOTE: we need to use comparator of IntegralResource directly, as
178       // Resource and ResourceCalculator assume "int" amount of resources,
179       // which is not sufficient when comparing integrals (out-of-bound)
180       if (maxAllowed.compareTo(runningTot) < 0) {
181         throw new PlanningQuotaException(
182             "Integral (avg over time) quota capacity " + maxAvg
183                 + " over a window of " + validWindow / 1000 + " seconds, "
184                 + " would be passed at time " + t + "(" + new Date(t)
185                 + ") by accepting reservation: "
186                 + reservation.getReservationId());
187       }
188     }
189   }
190 
191   @Override
getValidWindow()192   public long getValidWindow() {
193     return validWindow;
194   }
195 
196   /**
197    * This class provides support for Resource-like book-keeping, based on
198    * long(s), as using Resource to store the "integral" of the allocation over
199    * time leads to integer overflows for large allocations/clusters. (Evolving
200    * Resource to use long is too disruptive at this point.)
201    *
202    * The comparison/multiplication behaviors of IntegralResource are consistent
203    * with the DefaultResourceCalculator.
204    */
205   private static class IntegralResource {
206     long memory;
207     long vcores;
208 
IntegralResource(Resource resource)209     public IntegralResource(Resource resource) {
210       this.memory = resource.getMemory();
211       this.vcores = resource.getVirtualCores();
212     }
213 
IntegralResource(long mem, long vcores)214     public IntegralResource(long mem, long vcores) {
215       this.memory = mem;
216       this.vcores = vcores;
217     }
218 
add(Resource r)219     public void add(Resource r) {
220       memory += r.getMemory();
221       vcores += r.getVirtualCores();
222     }
223 
subtract(Resource r)224     public void subtract(Resource r) {
225       memory -= r.getMemory();
226       vcores -= r.getVirtualCores();
227     }
228 
multiplyBy(long window)229     public void multiplyBy(long window) {
230       memory = memory * window;
231       vcores = vcores * window;
232     }
233 
compareTo(IntegralResource other)234     public long compareTo(IntegralResource other) {
235       long diff = memory - other.memory;
236       if (diff == 0) {
237         diff = vcores - other.vcores;
238       }
239       return diff;
240     }
241 
242     @Override
toString()243     public String toString() {
244       return "<memory:" + memory + ", vCores:" + vcores + ">";
245     }
246   }
247 }
248