1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.URL;
24 import java.net.URLConnection;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.TreeSet;
33 
34 import javax.xml.parsers.DocumentBuilder;
35 import javax.xml.parsers.DocumentBuilderFactory;
36 import javax.xml.parsers.ParserConfigurationException;
37 
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.mapreduce.TaskType;
42 import org.apache.hadoop.metrics.MetricsContext;
43 import org.w3c.dom.Document;
44 import org.w3c.dom.Element;
45 import org.w3c.dom.Node;
46 import org.w3c.dom.NodeList;
47 import org.w3c.dom.Text;
48 import org.xml.sax.SAXException;
49 
50 /**
51  * Maintains a list of pools as well as scheduling parameters for each pool,
52  * such as guaranteed share allocations, from the fair scheduler config file.
53  */
54 public class PoolManager {
55   public static final Log LOG = LogFactory.getLog(
56     "org.apache.hadoop.mapred.PoolManager");
57 
58   /** Time to wait between checks of the allocation file */
59   public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
60 
61   /**
62    * Time to wait after the allocation has been modified before reloading it
63    * (this is done to prevent loading a file that hasn't been fully written).
64    */
65   public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
66 
67   public static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
68 
69   private final FairScheduler scheduler;
70 
71   // Map and reduce minimum allocations for each pool
72   private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
73   private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
74 
75   // If set, cap number of map and reduce tasks in a pool
76   private Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
77   private Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
78 
79   // Sharing weights for each pool
80   private Map<String, Double> poolWeights = new HashMap<String, Double>();
81 
82   // Max concurrent running jobs for each pool and for each user; in addition,
83   // for users that have no max specified, we use the userMaxJobsDefault.
84   private Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
85   private Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
86   private int userMaxJobsDefault = Integer.MAX_VALUE;
87   private int poolMaxJobsDefault = Integer.MAX_VALUE;
88 
89   // Min share preemption timeout for each pool in seconds. If a job in the pool
90   // waits this long without receiving its guaranteed share, it is allowed to
91   // preempt other jobs' tasks.
92   private Map<String, Long> minSharePreemptionTimeouts =
93     new HashMap<String, Long>();
94 
95   // Default min share preemption timeout for pools where it is not set
96   // explicitly.
97   private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
98 
99   // Preemption timeout for jobs below fair share in seconds. If a job remains
100   // below half its fair share for this long, it is allowed to preempt tasks.
101   private long fairSharePreemptionTimeout = Long.MAX_VALUE;
102 
103   SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
104 
105   private Object allocFile; // Path to XML file containing allocations. This
106                             // is either a URL to specify a classpath resource
107                             // (if the fair-scheduler.xml on the classpath is
108                             // used) or a String to specify an absolute path (if
109                             // mapred.fairscheduler.allocation.file is used).
110   private String poolNameProperty; // Jobconf property to use for determining a
111                                    // job's pool name (default: user.name)
112 
113   private Map<String, Pool> pools = new HashMap<String, Pool>();
114 
115   private long lastReloadAttempt; // Last time we tried to reload the pools file
116   private long lastSuccessfulReload; // Last time we successfully reloaded pools
117   private boolean lastReloadAttemptFailed = false;
118   private Set<String> declaredPools = new TreeSet<String>();
119 
PoolManager(FairScheduler scheduler)120   public PoolManager(FairScheduler scheduler) {
121     this.scheduler = scheduler;
122   }
123 
initialize()124   public void initialize() throws IOException, SAXException,
125       AllocationConfigurationException, ParserConfigurationException {
126     Configuration conf = scheduler.getConf();
127     this.poolNameProperty = conf.get(
128         "mapred.fairscheduler.poolnameproperty", "user.name");
129     this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
130     if (allocFile == null) {
131       // No allocation file specified in jobconf. Use the default allocation
132       // file, fair-scheduler.xml, looking for it on the classpath.
133       allocFile = new Configuration().getResource("fair-scheduler.xml");
134       if (allocFile == null) {
135         LOG.error("The fair scheduler allocation file fair-scheduler.xml was "
136             + "not found on the classpath, and no other config file is given "
137             + "through mapred.fairscheduler.allocation.file.");
138       }
139     }
140     reloadAllocs();
141     lastSuccessfulReload = System.currentTimeMillis();
142     lastReloadAttempt = System.currentTimeMillis();
143     // Create the default pool so that it shows up in the web UI
144     getPool(Pool.DEFAULT_POOL_NAME);
145   }
146 
147   /**
148    * Get a pool by name, creating it if necessary
149    */
getPool(String name)150   public synchronized Pool getPool(String name) {
151     Pool pool = pools.get(name);
152     if (pool == null) {
153       pool = new Pool(scheduler, name);
154       pool.setSchedulingMode(defaultSchedulingMode);
155       pools.put(name, pool);
156     }
157     return pool;
158   }
159 
160   /**
161    * Get the pool that a given job is in.
162    */
getPool(JobInProgress job)163   public Pool getPool(JobInProgress job) {
164     return getPool(getPoolName(job));
165   }
166 
167   /**
168    * Reload allocations file if it hasn't been loaded in a while
169    */
reloadAllocsIfNecessary()170   public void reloadAllocsIfNecessary() {
171     long time = System.currentTimeMillis();
172     if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
173       lastReloadAttempt = time;
174       if (null == allocFile) {
175         return;
176       }
177       try {
178         // Get last modified time of alloc file depending whether it's a String
179         // (for a path name) or an URL (for a classloader resource)
180         long lastModified;
181         if (allocFile instanceof String) {
182           File file = new File((String) allocFile);
183           lastModified = file.lastModified();
184         } else { // allocFile is an URL
185           URLConnection conn = ((URL) allocFile).openConnection();
186           lastModified = conn.getLastModified();
187         }
188         if (lastModified > lastSuccessfulReload &&
189             time > lastModified + ALLOC_RELOAD_WAIT) {
190           reloadAllocs();
191           lastSuccessfulReload = time;
192           lastReloadAttemptFailed = false;
193         }
194       } catch (Exception e) {
195         // Throwing the error further out here won't help - the RPC thread
196         // will catch it and report it in a loop. Instead, just log it and
197         // hope somebody will notice from the log.
198         // We log the error only on the first failure so we don't fill up the
199         // JobTracker's log with these messages.
200         if (!lastReloadAttemptFailed) {
201           LOG.error("Failed to reload fair scheduler config file - " +
202               "will use existing allocations.", e);
203         }
204         lastReloadAttemptFailed = true;
205       }
206     }
207   }
208 
209   /**
210    * Updates the allocation list from the allocation config file. This file is
211    * expected to be in the following whitespace-separated format:
212    *
213    * <code>
214    * poolName1 mapAlloc reduceAlloc
215    * poolName2 mapAlloc reduceAlloc
216    * ...
217    * </code>
218    *
219    * Blank lines and lines starting with # are ignored.
220    *
221    * @throws IOException if the config file cannot be read.
222    * @throws AllocationConfigurationException if allocations are invalid.
223    * @throws ParserConfigurationException if XML parser is misconfigured.
224    * @throws SAXException if config file is malformed.
225    */
reloadAllocs()226   public void reloadAllocs() throws IOException, ParserConfigurationException,
227       SAXException, AllocationConfigurationException {
228     if (allocFile == null) return;
229     // Create some temporary hashmaps to hold the new allocs, and we only save
230     // them in our fields if we have parsed the entire allocs file successfully.
231     Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
232     Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
233     Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
234     Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
235     Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
236     Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
237     Map<String, Double> poolWeights = new HashMap<String, Double>();
238     Map<String, SchedulingMode> poolModes = new HashMap<String, SchedulingMode>();
239     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
240     int userMaxJobsDefault = Integer.MAX_VALUE;
241     int poolMaxJobsDefault = Integer.MAX_VALUE;
242     long fairSharePreemptionTimeout = Long.MAX_VALUE;
243     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
244     SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
245 
246     // Remember all pool names so we can display them on web UI, etc.
247     List<String> poolNamesInAllocFile = new ArrayList<String>();
248 
249     // Read and parse the allocations file.
250     DocumentBuilderFactory docBuilderFactory =
251       DocumentBuilderFactory.newInstance();
252     docBuilderFactory.setIgnoringComments(true);
253     DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
254     Document doc;
255     if (allocFile instanceof String) {
256       doc = builder.parse(new File((String) allocFile));
257     } else {
258       doc = builder.parse(allocFile.toString());
259     }
260     Element root = doc.getDocumentElement();
261     if (!"allocations".equals(root.getTagName()))
262       throw new AllocationConfigurationException("Bad fair scheduler config " +
263           "file: top-level element not <allocations>");
264     NodeList elements = root.getChildNodes();
265     for (int i = 0; i < elements.getLength(); i++) {
266       Node node = elements.item(i);
267       if (!(node instanceof Element))
268         continue;
269       Element element = (Element)node;
270       if ("pool".equals(element.getTagName())) {
271         String poolName = element.getAttribute("name");
272         poolNamesInAllocFile.add(poolName);
273         NodeList fields = element.getChildNodes();
274         for (int j = 0; j < fields.getLength(); j++) {
275           Node fieldNode = fields.item(j);
276           if (!(fieldNode instanceof Element))
277             continue;
278           Element field = (Element) fieldNode;
279           if ("minMaps".equals(field.getTagName())) {
280             String text = ((Text)field.getFirstChild()).getData().trim();
281             int val = Integer.parseInt(text);
282             mapAllocs.put(poolName, val);
283           } else if ("minReduces".equals(field.getTagName())) {
284             String text = ((Text)field.getFirstChild()).getData().trim();
285             int val = Integer.parseInt(text);
286             reduceAllocs.put(poolName, val);
287           } else if ("maxMaps".equals(field.getTagName())) {
288             String text = ((Text)field.getFirstChild()).getData().trim();
289             int val = Integer.parseInt(text);
290             poolMaxMaps.put(poolName, val);
291           } else if ("maxReduces".equals(field.getTagName())) {
292             String text = ((Text)field.getFirstChild()).getData().trim();
293             int val = Integer.parseInt(text);
294             poolMaxReduces.put(poolName, val);
295           } else if ("maxRunningJobs".equals(field.getTagName())) {
296             String text = ((Text)field.getFirstChild()).getData().trim();
297             int val = Integer.parseInt(text);
298             poolMaxJobs.put(poolName, val);
299           } else if ("weight".equals(field.getTagName())) {
300             String text = ((Text)field.getFirstChild()).getData().trim();
301             double val = Double.parseDouble(text);
302             poolWeights.put(poolName, val);
303           } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
304             String text = ((Text)field.getFirstChild()).getData().trim();
305             long val = Long.parseLong(text) * 1000L;
306             minSharePreemptionTimeouts.put(poolName, val);
307           } else if ("schedulingMode".equals(field.getTagName())) {
308             String text = ((Text)field.getFirstChild()).getData().trim();
309             poolModes.put(poolName, parseSchedulingMode(text));
310           }
311         }
312         if (poolMaxMaps.containsKey(poolName) && mapAllocs.containsKey(poolName)
313             && poolMaxMaps.get(poolName) < mapAllocs.get(poolName)) {
314           LOG.warn(String.format("Pool %s has max maps %d less than min maps %d",
315               poolName, poolMaxMaps.get(poolName), mapAllocs.get(poolName)));
316         }
317         if(poolMaxReduces.containsKey(poolName) && reduceAllocs.containsKey(poolName)
318             && poolMaxReduces.get(poolName) < reduceAllocs.get(poolName)) {
319           LOG.warn(String.format("Pool %s has max reduces %d less than min reduces %d",
320               poolName, poolMaxReduces.get(poolName), reduceAllocs.get(poolName)));
321         }
322       } else if ("user".equals(element.getTagName())) {
323         String userName = element.getAttribute("name");
324         NodeList fields = element.getChildNodes();
325         for (int j = 0; j < fields.getLength(); j++) {
326           Node fieldNode = fields.item(j);
327           if (!(fieldNode instanceof Element))
328             continue;
329           Element field = (Element) fieldNode;
330           if ("maxRunningJobs".equals(field.getTagName())) {
331             String text = ((Text)field.getFirstChild()).getData().trim();
332             int val = Integer.parseInt(text);
333             userMaxJobs.put(userName, val);
334           }
335         }
336       } else if ("userMaxJobsDefault".equals(element.getTagName())) {
337         String text = ((Text)element.getFirstChild()).getData().trim();
338         int val = Integer.parseInt(text);
339         userMaxJobsDefault = val;
340       } else if ("poolMaxJobsDefault".equals(element.getTagName())) {
341         String text = ((Text)element.getFirstChild()).getData().trim();
342         int val = Integer.parseInt(text);
343         poolMaxJobsDefault = val;
344       } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
345         String text = ((Text)element.getFirstChild()).getData().trim();
346         long val = Long.parseLong(text) * 1000L;
347         fairSharePreemptionTimeout = val;
348       } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
349         String text = ((Text)element.getFirstChild()).getData().trim();
350         long val = Long.parseLong(text) * 1000L;
351         defaultMinSharePreemptionTimeout = val;
352       } else if ("defaultPoolSchedulingMode".equals(element.getTagName())) {
353         String text = ((Text)element.getFirstChild()).getData().trim();
354         defaultSchedulingMode = parseSchedulingMode(text);
355       } else {
356         LOG.warn("Bad element in allocations file: " + element.getTagName());
357       }
358     }
359 
360     // Commit the reload; also create any pool defined in the alloc file
361     // if it does not already exist, so it can be displayed on the web UI.
362     synchronized(this) {
363       this.mapAllocs = mapAllocs;
364       this.reduceAllocs = reduceAllocs;
365       this.poolMaxMaps = poolMaxMaps;
366       this.poolMaxReduces = poolMaxReduces;
367       this.poolMaxJobs = poolMaxJobs;
368       this.userMaxJobs = userMaxJobs;
369       this.poolWeights = poolWeights;
370       this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
371       this.userMaxJobsDefault = userMaxJobsDefault;
372       this.poolMaxJobsDefault = poolMaxJobsDefault;
373       this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
374       this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
375       this.defaultSchedulingMode = defaultSchedulingMode;
376       this.declaredPools = Collections.unmodifiableSet(new TreeSet<String>(
377           poolNamesInAllocFile));
378       for (String name: poolNamesInAllocFile) {
379         Pool pool = getPool(name);
380         if (poolModes.containsKey(name)) {
381           pool.setSchedulingMode(poolModes.get(name));
382         } else {
383           pool.setSchedulingMode(defaultSchedulingMode);
384         }
385       }
386     }
387   }
388 
389   /**
390    * Does the pool have incompatible max and min allocation set.
391    *
392    * @param type
393    *          {@link TaskType#MAP} or {@link TaskType#REDUCE}
394    * @param pool
395    *          the pool name
396    * @return true if the max is less than the min
397    */
invertedMinMax(TaskType type, String pool)398   boolean invertedMinMax(TaskType type, String pool) {
399     Map<String, Integer> max = TaskType.MAP == type ? poolMaxMaps : poolMaxReduces;
400     Map<String, Integer> min = TaskType.MAP == type ? mapAllocs : reduceAllocs;
401     if (max.containsKey(pool) && min.containsKey(pool)
402         && max.get(pool) < min.get(pool)) {
403       return true;
404     }
405     return false;
406   }
407 
parseSchedulingMode(String text)408   private SchedulingMode parseSchedulingMode(String text)
409       throws AllocationConfigurationException {
410     text = text.toLowerCase();
411     if (text.equals("fair")) {
412       return SchedulingMode.FAIR;
413     } else if (text.equals("fifo")) {
414       return SchedulingMode.FIFO;
415     } else {
416       throw new AllocationConfigurationException(
417           "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
418     }
419   }
420 
421   /**
422    * Get the allocation for a particular pool
423    */
getAllocation(String pool, TaskType taskType)424   public int getAllocation(String pool, TaskType taskType) {
425     Map<String, Integer> allocationMap = (taskType == TaskType.MAP ?
426         mapAllocs : reduceAllocs);
427     Integer alloc = allocationMap.get(pool);
428     return (alloc == null ? 0 : alloc);
429   }
430 
431   /**
432    * Get the maximum map or reduce slots for the given pool.
433    * @return the cap set on this pool, or Integer.MAX_VALUE if not set.
434    */
getMaxSlots(String poolName, TaskType taskType)435   int getMaxSlots(String poolName, TaskType taskType) {
436     Map<String, Integer> maxMap = (taskType == TaskType.MAP ? poolMaxMaps : poolMaxReduces);
437     if (maxMap.containsKey(poolName)) {
438       return maxMap.get(poolName);
439     } else {
440       return Integer.MAX_VALUE;
441     }
442   }
443 
444   /**
445    * Add a job in the appropriate pool
446    */
addJob(JobInProgress job)447   public synchronized void addJob(JobInProgress job) {
448     getPool(getPoolName(job)).addJob(job);
449   }
450 
451   /**
452    * Remove a job
453    */
removeJob(JobInProgress job)454   public synchronized void removeJob(JobInProgress job) {
455     getPool(getPoolName(job)).removeJob(job);
456   }
457 
458   /**
459    * Change the pool of a particular job
460    */
setPool(JobInProgress job, String pool)461   public synchronized void setPool(JobInProgress job, String pool) {
462     removeJob(job);
463     job.getJobConf().set(EXPLICIT_POOL_PROPERTY, pool);
464     addJob(job);
465   }
466 
467   /**
468    * Get a collection of all pools
469    */
getPools()470   public synchronized Collection<Pool> getPools() {
471     return pools.values();
472   }
473 
474   /**
475    * Get the pool name for a JobInProgress from its configuration.  This uses
476    * the value of mapred.fairscheduler.pool if specified, otherwise the value
477    * of the property named in mapred.fairscheduler.poolnameproperty if that is
478    * specified.  Otherwise if neither is specified it uses the "user.name" property
479    * in the jobconf by default.
480    */
getPoolName(JobInProgress job)481   public String getPoolName(JobInProgress job) {
482     Configuration conf = job.getJobConf();
483     return conf.get(EXPLICIT_POOL_PROPERTY,
484       conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim();
485   }
486 
487   /**
488    * Get all pool names that have been seen either in the allocation file or in
489    * a MapReduce job.
490    */
getPoolNames()491   public synchronized Collection<String> getPoolNames() {
492     List<String> list = new ArrayList<String>();
493     for (Pool pool: getPools()) {
494       list.add(pool.getName());
495     }
496     Collections.sort(list);
497     return list;
498   }
499 
getUserMaxJobs(String user)500   public int getUserMaxJobs(String user) {
501     if (userMaxJobs.containsKey(user)) {
502       return userMaxJobs.get(user);
503     } else {
504       return userMaxJobsDefault;
505     }
506   }
507 
getPoolMaxJobs(String pool)508   public int getPoolMaxJobs(String pool) {
509     if (poolMaxJobs.containsKey(pool)) {
510       return poolMaxJobs.get(pool);
511     } else {
512       return poolMaxJobsDefault;
513     }
514   }
515 
getPoolWeight(String pool)516   public double getPoolWeight(String pool) {
517     if (poolWeights.containsKey(pool)) {
518       return poolWeights.get(pool);
519     } else {
520       return 1.0;
521     }
522   }
523 
524   /**
525    * Get a pool's min share preemption timeout, in milliseconds. This is the
526    * time after which jobs in the pool may kill other pools' tasks if they
527    * are below their min share.
528    */
getMinSharePreemptionTimeout(String pool)529   public long getMinSharePreemptionTimeout(String pool) {
530     if (minSharePreemptionTimeouts.containsKey(pool)) {
531       return minSharePreemptionTimeouts.get(pool);
532     } else {
533       return defaultMinSharePreemptionTimeout;
534     }
535   }
536 
537   /**
538    * Get the fair share preemption, in milliseconds. This is the time
539    * after which any job may kill other jobs' tasks if it is below half
540    * its fair share.
541    */
getFairSharePreemptionTimeout()542   public long getFairSharePreemptionTimeout() {
543     return fairSharePreemptionTimeout;
544   }
545 
updateMetrics()546   synchronized void updateMetrics() {
547     for (Pool pool : pools.values()) {
548       pool.updateMetrics();
549     }
550   }
551 
getDeclaredPools()552   public synchronized Set<String> getDeclaredPools() {
553     return declaredPools;
554   }
555 
556 }
557