1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.File; 22 import java.io.IOException; 23 import java.net.URL; 24 import java.net.URLConnection; 25 import java.util.ArrayList; 26 import java.util.Collection; 27 import java.util.Collections; 28 import java.util.HashMap; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.Set; 32 import java.util.TreeSet; 33 34 import javax.xml.parsers.DocumentBuilder; 35 import javax.xml.parsers.DocumentBuilderFactory; 36 import javax.xml.parsers.ParserConfigurationException; 37 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.apache.hadoop.conf.Configuration; 41 import org.apache.hadoop.mapreduce.TaskType; 42 import org.apache.hadoop.metrics.MetricsContext; 43 import org.w3c.dom.Document; 44 import org.w3c.dom.Element; 45 import org.w3c.dom.Node; 46 import org.w3c.dom.NodeList; 47 import org.w3c.dom.Text; 48 import org.xml.sax.SAXException; 49 50 /** 51 * Maintains a list of pools as well as scheduling parameters for each pool, 52 * such as guaranteed share allocations, from the fair scheduler config file. 53 */ 54 public class PoolManager { 55 public static final Log LOG = LogFactory.getLog( 56 "org.apache.hadoop.mapred.PoolManager"); 57 58 /** Time to wait between checks of the allocation file */ 59 public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000; 60 61 /** 62 * Time to wait after the allocation has been modified before reloading it 63 * (this is done to prevent loading a file that hasn't been fully written). 64 */ 65 public static final long ALLOC_RELOAD_WAIT = 5 * 1000; 66 67 public static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool"; 68 69 private final FairScheduler scheduler; 70 71 // Map and reduce minimum allocations for each pool 72 private Map<String, Integer> mapAllocs = new HashMap<String, Integer>(); 73 private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>(); 74 75 // If set, cap number of map and reduce tasks in a pool 76 private Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>(); 77 private Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>(); 78 79 // Sharing weights for each pool 80 private Map<String, Double> poolWeights = new HashMap<String, Double>(); 81 82 // Max concurrent running jobs for each pool and for each user; in addition, 83 // for users that have no max specified, we use the userMaxJobsDefault. 84 private Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>(); 85 private Map<String, Integer> userMaxJobs = new HashMap<String, Integer>(); 86 private int userMaxJobsDefault = Integer.MAX_VALUE; 87 private int poolMaxJobsDefault = Integer.MAX_VALUE; 88 89 // Min share preemption timeout for each pool in seconds. If a job in the pool 90 // waits this long without receiving its guaranteed share, it is allowed to 91 // preempt other jobs' tasks. 92 private Map<String, Long> minSharePreemptionTimeouts = 93 new HashMap<String, Long>(); 94 95 // Default min share preemption timeout for pools where it is not set 96 // explicitly. 97 private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; 98 99 // Preemption timeout for jobs below fair share in seconds. If a job remains 100 // below half its fair share for this long, it is allowed to preempt tasks. 101 private long fairSharePreemptionTimeout = Long.MAX_VALUE; 102 103 SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR; 104 105 private Object allocFile; // Path to XML file containing allocations. This 106 // is either a URL to specify a classpath resource 107 // (if the fair-scheduler.xml on the classpath is 108 // used) or a String to specify an absolute path (if 109 // mapred.fairscheduler.allocation.file is used). 110 private String poolNameProperty; // Jobconf property to use for determining a 111 // job's pool name (default: user.name) 112 113 private Map<String, Pool> pools = new HashMap<String, Pool>(); 114 115 private long lastReloadAttempt; // Last time we tried to reload the pools file 116 private long lastSuccessfulReload; // Last time we successfully reloaded pools 117 private boolean lastReloadAttemptFailed = false; 118 private Set<String> declaredPools = new TreeSet<String>(); 119 PoolManager(FairScheduler scheduler)120 public PoolManager(FairScheduler scheduler) { 121 this.scheduler = scheduler; 122 } 123 initialize()124 public void initialize() throws IOException, SAXException, 125 AllocationConfigurationException, ParserConfigurationException { 126 Configuration conf = scheduler.getConf(); 127 this.poolNameProperty = conf.get( 128 "mapred.fairscheduler.poolnameproperty", "user.name"); 129 this.allocFile = conf.get("mapred.fairscheduler.allocation.file"); 130 if (allocFile == null) { 131 // No allocation file specified in jobconf. Use the default allocation 132 // file, fair-scheduler.xml, looking for it on the classpath. 133 allocFile = new Configuration().getResource("fair-scheduler.xml"); 134 if (allocFile == null) { 135 LOG.error("The fair scheduler allocation file fair-scheduler.xml was " 136 + "not found on the classpath, and no other config file is given " 137 + "through mapred.fairscheduler.allocation.file."); 138 } 139 } 140 reloadAllocs(); 141 lastSuccessfulReload = System.currentTimeMillis(); 142 lastReloadAttempt = System.currentTimeMillis(); 143 // Create the default pool so that it shows up in the web UI 144 getPool(Pool.DEFAULT_POOL_NAME); 145 } 146 147 /** 148 * Get a pool by name, creating it if necessary 149 */ getPool(String name)150 public synchronized Pool getPool(String name) { 151 Pool pool = pools.get(name); 152 if (pool == null) { 153 pool = new Pool(scheduler, name); 154 pool.setSchedulingMode(defaultSchedulingMode); 155 pools.put(name, pool); 156 } 157 return pool; 158 } 159 160 /** 161 * Get the pool that a given job is in. 162 */ getPool(JobInProgress job)163 public Pool getPool(JobInProgress job) { 164 return getPool(getPoolName(job)); 165 } 166 167 /** 168 * Reload allocations file if it hasn't been loaded in a while 169 */ reloadAllocsIfNecessary()170 public void reloadAllocsIfNecessary() { 171 long time = System.currentTimeMillis(); 172 if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) { 173 lastReloadAttempt = time; 174 if (null == allocFile) { 175 return; 176 } 177 try { 178 // Get last modified time of alloc file depending whether it's a String 179 // (for a path name) or an URL (for a classloader resource) 180 long lastModified; 181 if (allocFile instanceof String) { 182 File file = new File((String) allocFile); 183 lastModified = file.lastModified(); 184 } else { // allocFile is an URL 185 URLConnection conn = ((URL) allocFile).openConnection(); 186 lastModified = conn.getLastModified(); 187 } 188 if (lastModified > lastSuccessfulReload && 189 time > lastModified + ALLOC_RELOAD_WAIT) { 190 reloadAllocs(); 191 lastSuccessfulReload = time; 192 lastReloadAttemptFailed = false; 193 } 194 } catch (Exception e) { 195 // Throwing the error further out here won't help - the RPC thread 196 // will catch it and report it in a loop. Instead, just log it and 197 // hope somebody will notice from the log. 198 // We log the error only on the first failure so we don't fill up the 199 // JobTracker's log with these messages. 200 if (!lastReloadAttemptFailed) { 201 LOG.error("Failed to reload fair scheduler config file - " + 202 "will use existing allocations.", e); 203 } 204 lastReloadAttemptFailed = true; 205 } 206 } 207 } 208 209 /** 210 * Updates the allocation list from the allocation config file. This file is 211 * expected to be in the following whitespace-separated format: 212 * 213 * <code> 214 * poolName1 mapAlloc reduceAlloc 215 * poolName2 mapAlloc reduceAlloc 216 * ... 217 * </code> 218 * 219 * Blank lines and lines starting with # are ignored. 220 * 221 * @throws IOException if the config file cannot be read. 222 * @throws AllocationConfigurationException if allocations are invalid. 223 * @throws ParserConfigurationException if XML parser is misconfigured. 224 * @throws SAXException if config file is malformed. 225 */ reloadAllocs()226 public void reloadAllocs() throws IOException, ParserConfigurationException, 227 SAXException, AllocationConfigurationException { 228 if (allocFile == null) return; 229 // Create some temporary hashmaps to hold the new allocs, and we only save 230 // them in our fields if we have parsed the entire allocs file successfully. 231 Map<String, Integer> mapAllocs = new HashMap<String, Integer>(); 232 Map<String, Integer> reduceAllocs = new HashMap<String, Integer>(); 233 Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>(); 234 Map<String, Integer> userMaxJobs = new HashMap<String, Integer>(); 235 Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>(); 236 Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>(); 237 Map<String, Double> poolWeights = new HashMap<String, Double>(); 238 Map<String, SchedulingMode> poolModes = new HashMap<String, SchedulingMode>(); 239 Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>(); 240 int userMaxJobsDefault = Integer.MAX_VALUE; 241 int poolMaxJobsDefault = Integer.MAX_VALUE; 242 long fairSharePreemptionTimeout = Long.MAX_VALUE; 243 long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; 244 SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR; 245 246 // Remember all pool names so we can display them on web UI, etc. 247 List<String> poolNamesInAllocFile = new ArrayList<String>(); 248 249 // Read and parse the allocations file. 250 DocumentBuilderFactory docBuilderFactory = 251 DocumentBuilderFactory.newInstance(); 252 docBuilderFactory.setIgnoringComments(true); 253 DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); 254 Document doc; 255 if (allocFile instanceof String) { 256 doc = builder.parse(new File((String) allocFile)); 257 } else { 258 doc = builder.parse(allocFile.toString()); 259 } 260 Element root = doc.getDocumentElement(); 261 if (!"allocations".equals(root.getTagName())) 262 throw new AllocationConfigurationException("Bad fair scheduler config " + 263 "file: top-level element not <allocations>"); 264 NodeList elements = root.getChildNodes(); 265 for (int i = 0; i < elements.getLength(); i++) { 266 Node node = elements.item(i); 267 if (!(node instanceof Element)) 268 continue; 269 Element element = (Element)node; 270 if ("pool".equals(element.getTagName())) { 271 String poolName = element.getAttribute("name"); 272 poolNamesInAllocFile.add(poolName); 273 NodeList fields = element.getChildNodes(); 274 for (int j = 0; j < fields.getLength(); j++) { 275 Node fieldNode = fields.item(j); 276 if (!(fieldNode instanceof Element)) 277 continue; 278 Element field = (Element) fieldNode; 279 if ("minMaps".equals(field.getTagName())) { 280 String text = ((Text)field.getFirstChild()).getData().trim(); 281 int val = Integer.parseInt(text); 282 mapAllocs.put(poolName, val); 283 } else if ("minReduces".equals(field.getTagName())) { 284 String text = ((Text)field.getFirstChild()).getData().trim(); 285 int val = Integer.parseInt(text); 286 reduceAllocs.put(poolName, val); 287 } else if ("maxMaps".equals(field.getTagName())) { 288 String text = ((Text)field.getFirstChild()).getData().trim(); 289 int val = Integer.parseInt(text); 290 poolMaxMaps.put(poolName, val); 291 } else if ("maxReduces".equals(field.getTagName())) { 292 String text = ((Text)field.getFirstChild()).getData().trim(); 293 int val = Integer.parseInt(text); 294 poolMaxReduces.put(poolName, val); 295 } else if ("maxRunningJobs".equals(field.getTagName())) { 296 String text = ((Text)field.getFirstChild()).getData().trim(); 297 int val = Integer.parseInt(text); 298 poolMaxJobs.put(poolName, val); 299 } else if ("weight".equals(field.getTagName())) { 300 String text = ((Text)field.getFirstChild()).getData().trim(); 301 double val = Double.parseDouble(text); 302 poolWeights.put(poolName, val); 303 } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { 304 String text = ((Text)field.getFirstChild()).getData().trim(); 305 long val = Long.parseLong(text) * 1000L; 306 minSharePreemptionTimeouts.put(poolName, val); 307 } else if ("schedulingMode".equals(field.getTagName())) { 308 String text = ((Text)field.getFirstChild()).getData().trim(); 309 poolModes.put(poolName, parseSchedulingMode(text)); 310 } 311 } 312 if (poolMaxMaps.containsKey(poolName) && mapAllocs.containsKey(poolName) 313 && poolMaxMaps.get(poolName) < mapAllocs.get(poolName)) { 314 LOG.warn(String.format("Pool %s has max maps %d less than min maps %d", 315 poolName, poolMaxMaps.get(poolName), mapAllocs.get(poolName))); 316 } 317 if(poolMaxReduces.containsKey(poolName) && reduceAllocs.containsKey(poolName) 318 && poolMaxReduces.get(poolName) < reduceAllocs.get(poolName)) { 319 LOG.warn(String.format("Pool %s has max reduces %d less than min reduces %d", 320 poolName, poolMaxReduces.get(poolName), reduceAllocs.get(poolName))); 321 } 322 } else if ("user".equals(element.getTagName())) { 323 String userName = element.getAttribute("name"); 324 NodeList fields = element.getChildNodes(); 325 for (int j = 0; j < fields.getLength(); j++) { 326 Node fieldNode = fields.item(j); 327 if (!(fieldNode instanceof Element)) 328 continue; 329 Element field = (Element) fieldNode; 330 if ("maxRunningJobs".equals(field.getTagName())) { 331 String text = ((Text)field.getFirstChild()).getData().trim(); 332 int val = Integer.parseInt(text); 333 userMaxJobs.put(userName, val); 334 } 335 } 336 } else if ("userMaxJobsDefault".equals(element.getTagName())) { 337 String text = ((Text)element.getFirstChild()).getData().trim(); 338 int val = Integer.parseInt(text); 339 userMaxJobsDefault = val; 340 } else if ("poolMaxJobsDefault".equals(element.getTagName())) { 341 String text = ((Text)element.getFirstChild()).getData().trim(); 342 int val = Integer.parseInt(text); 343 poolMaxJobsDefault = val; 344 } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { 345 String text = ((Text)element.getFirstChild()).getData().trim(); 346 long val = Long.parseLong(text) * 1000L; 347 fairSharePreemptionTimeout = val; 348 } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { 349 String text = ((Text)element.getFirstChild()).getData().trim(); 350 long val = Long.parseLong(text) * 1000L; 351 defaultMinSharePreemptionTimeout = val; 352 } else if ("defaultPoolSchedulingMode".equals(element.getTagName())) { 353 String text = ((Text)element.getFirstChild()).getData().trim(); 354 defaultSchedulingMode = parseSchedulingMode(text); 355 } else { 356 LOG.warn("Bad element in allocations file: " + element.getTagName()); 357 } 358 } 359 360 // Commit the reload; also create any pool defined in the alloc file 361 // if it does not already exist, so it can be displayed on the web UI. 362 synchronized(this) { 363 this.mapAllocs = mapAllocs; 364 this.reduceAllocs = reduceAllocs; 365 this.poolMaxMaps = poolMaxMaps; 366 this.poolMaxReduces = poolMaxReduces; 367 this.poolMaxJobs = poolMaxJobs; 368 this.userMaxJobs = userMaxJobs; 369 this.poolWeights = poolWeights; 370 this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; 371 this.userMaxJobsDefault = userMaxJobsDefault; 372 this.poolMaxJobsDefault = poolMaxJobsDefault; 373 this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; 374 this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; 375 this.defaultSchedulingMode = defaultSchedulingMode; 376 this.declaredPools = Collections.unmodifiableSet(new TreeSet<String>( 377 poolNamesInAllocFile)); 378 for (String name: poolNamesInAllocFile) { 379 Pool pool = getPool(name); 380 if (poolModes.containsKey(name)) { 381 pool.setSchedulingMode(poolModes.get(name)); 382 } else { 383 pool.setSchedulingMode(defaultSchedulingMode); 384 } 385 } 386 } 387 } 388 389 /** 390 * Does the pool have incompatible max and min allocation set. 391 * 392 * @param type 393 * {@link TaskType#MAP} or {@link TaskType#REDUCE} 394 * @param pool 395 * the pool name 396 * @return true if the max is less than the min 397 */ invertedMinMax(TaskType type, String pool)398 boolean invertedMinMax(TaskType type, String pool) { 399 Map<String, Integer> max = TaskType.MAP == type ? poolMaxMaps : poolMaxReduces; 400 Map<String, Integer> min = TaskType.MAP == type ? mapAllocs : reduceAllocs; 401 if (max.containsKey(pool) && min.containsKey(pool) 402 && max.get(pool) < min.get(pool)) { 403 return true; 404 } 405 return false; 406 } 407 parseSchedulingMode(String text)408 private SchedulingMode parseSchedulingMode(String text) 409 throws AllocationConfigurationException { 410 text = text.toLowerCase(); 411 if (text.equals("fair")) { 412 return SchedulingMode.FAIR; 413 } else if (text.equals("fifo")) { 414 return SchedulingMode.FIFO; 415 } else { 416 throw new AllocationConfigurationException( 417 "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'"); 418 } 419 } 420 421 /** 422 * Get the allocation for a particular pool 423 */ getAllocation(String pool, TaskType taskType)424 public int getAllocation(String pool, TaskType taskType) { 425 Map<String, Integer> allocationMap = (taskType == TaskType.MAP ? 426 mapAllocs : reduceAllocs); 427 Integer alloc = allocationMap.get(pool); 428 return (alloc == null ? 0 : alloc); 429 } 430 431 /** 432 * Get the maximum map or reduce slots for the given pool. 433 * @return the cap set on this pool, or Integer.MAX_VALUE if not set. 434 */ getMaxSlots(String poolName, TaskType taskType)435 int getMaxSlots(String poolName, TaskType taskType) { 436 Map<String, Integer> maxMap = (taskType == TaskType.MAP ? poolMaxMaps : poolMaxReduces); 437 if (maxMap.containsKey(poolName)) { 438 return maxMap.get(poolName); 439 } else { 440 return Integer.MAX_VALUE; 441 } 442 } 443 444 /** 445 * Add a job in the appropriate pool 446 */ addJob(JobInProgress job)447 public synchronized void addJob(JobInProgress job) { 448 getPool(getPoolName(job)).addJob(job); 449 } 450 451 /** 452 * Remove a job 453 */ removeJob(JobInProgress job)454 public synchronized void removeJob(JobInProgress job) { 455 getPool(getPoolName(job)).removeJob(job); 456 } 457 458 /** 459 * Change the pool of a particular job 460 */ setPool(JobInProgress job, String pool)461 public synchronized void setPool(JobInProgress job, String pool) { 462 removeJob(job); 463 job.getJobConf().set(EXPLICIT_POOL_PROPERTY, pool); 464 addJob(job); 465 } 466 467 /** 468 * Get a collection of all pools 469 */ getPools()470 public synchronized Collection<Pool> getPools() { 471 return pools.values(); 472 } 473 474 /** 475 * Get the pool name for a JobInProgress from its configuration. This uses 476 * the value of mapred.fairscheduler.pool if specified, otherwise the value 477 * of the property named in mapred.fairscheduler.poolnameproperty if that is 478 * specified. Otherwise if neither is specified it uses the "user.name" property 479 * in the jobconf by default. 480 */ getPoolName(JobInProgress job)481 public String getPoolName(JobInProgress job) { 482 Configuration conf = job.getJobConf(); 483 return conf.get(EXPLICIT_POOL_PROPERTY, 484 conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim(); 485 } 486 487 /** 488 * Get all pool names that have been seen either in the allocation file or in 489 * a MapReduce job. 490 */ getPoolNames()491 public synchronized Collection<String> getPoolNames() { 492 List<String> list = new ArrayList<String>(); 493 for (Pool pool: getPools()) { 494 list.add(pool.getName()); 495 } 496 Collections.sort(list); 497 return list; 498 } 499 getUserMaxJobs(String user)500 public int getUserMaxJobs(String user) { 501 if (userMaxJobs.containsKey(user)) { 502 return userMaxJobs.get(user); 503 } else { 504 return userMaxJobsDefault; 505 } 506 } 507 getPoolMaxJobs(String pool)508 public int getPoolMaxJobs(String pool) { 509 if (poolMaxJobs.containsKey(pool)) { 510 return poolMaxJobs.get(pool); 511 } else { 512 return poolMaxJobsDefault; 513 } 514 } 515 getPoolWeight(String pool)516 public double getPoolWeight(String pool) { 517 if (poolWeights.containsKey(pool)) { 518 return poolWeights.get(pool); 519 } else { 520 return 1.0; 521 } 522 } 523 524 /** 525 * Get a pool's min share preemption timeout, in milliseconds. This is the 526 * time after which jobs in the pool may kill other pools' tasks if they 527 * are below their min share. 528 */ getMinSharePreemptionTimeout(String pool)529 public long getMinSharePreemptionTimeout(String pool) { 530 if (minSharePreemptionTimeouts.containsKey(pool)) { 531 return minSharePreemptionTimeouts.get(pool); 532 } else { 533 return defaultMinSharePreemptionTimeout; 534 } 535 } 536 537 /** 538 * Get the fair share preemption, in milliseconds. This is the time 539 * after which any job may kill other jobs' tasks if it is below half 540 * its fair share. 541 */ getFairSharePreemptionTimeout()542 public long getFairSharePreemptionTimeout() { 543 return fairSharePreemptionTimeout; 544 } 545 updateMetrics()546 synchronized void updateMetrics() { 547 for (Pool pool : pools.values()) { 548 pool.updateMetrics(); 549 } 550 } 551 getDeclaredPools()552 public synchronized Set<String> getDeclaredPools() { 553 return declaredPools; 554 } 555 556 } 557