1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.tools.rumen; 19 20 import java.util.ArrayList; 21 import java.util.List; 22 import java.util.Map; 23 import java.util.Random; 24 import java.util.HashMap; 25 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import org.apache.hadoop.fs.Path; 29 import org.apache.hadoop.mapred.JobConf; 30 import org.apache.hadoop.mapred.TaskStatus.State; 31 import org.apache.hadoop.mapreduce.ID; 32 import org.apache.hadoop.mapreduce.InputSplit; 33 import org.apache.hadoop.mapreduce.JobID; 34 import org.apache.hadoop.mapreduce.TaskAttemptID; 35 import org.apache.hadoop.mapreduce.TaskID; 36 import org.apache.hadoop.mapreduce.TaskType; 37 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 38 import org.apache.hadoop.tools.rumen.datatypes.*; 39 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; 40 41 /** 42 * {@link ZombieJob} is a layer above {@link LoggedJob} raw JSON objects. 43 * 44 * Each {@link ZombieJob} object represents a job in job history. For everything 45 * that exists in job history, contents are returned unchanged faithfully. To 46 * get input splits of a non-exist task, a non-exist task attempt, or an 47 * ill-formed task attempt, proper objects are made up from statistical 48 * sketches. 49 */ 50 @SuppressWarnings("deprecation") 51 public class ZombieJob implements JobStory { 52 static final Log LOG = LogFactory.getLog(ZombieJob.class); 53 private final LoggedJob job; 54 private Map<TaskID, LoggedTask> loggedTaskMap; 55 private Map<TaskAttemptID, LoggedTaskAttempt> loggedTaskAttemptMap; 56 private final Random random; 57 private InputSplit[] splits; 58 private final ClusterStory cluster; 59 private JobConf jobConf; 60 61 private long seed; 62 private long numRandomSeeds = 0; 63 private boolean hasRandomSeed = false; 64 65 private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap = 66 new HashMap<LoggedDiscreteCDF, CDFRandomGenerator>(); 67 68 // TODO: Fix ZombieJob to initialize this correctly from observed data 69 double rackLocalOverNodeLocal = 1.5; 70 double rackRemoteOverNodeLocal = 3.0; 71 72 /** 73 * This constructor creates a {@link ZombieJob} with the same semantics as the 74 * {@link LoggedJob} passed in this parameter 75 * 76 * @param job 77 * The dead job this ZombieJob instance is based on. 78 * @param cluster 79 * The cluster topology where the dead job ran on. This argument can 80 * be null if we do not have knowledge of the cluster topology. 81 * @param seed 82 * Seed for the random number generator for filling in information 83 * not available from the ZombieJob. 84 */ ZombieJob(LoggedJob job, ClusterStory cluster, long seed)85 public ZombieJob(LoggedJob job, ClusterStory cluster, long seed) { 86 if (job == null) { 87 throw new IllegalArgumentException("job is null"); 88 } 89 this.job = job; 90 this.cluster = cluster; 91 random = new Random(seed); 92 this.seed = seed; 93 hasRandomSeed = true; 94 } 95 96 /** 97 * This constructor creates a {@link ZombieJob} with the same semantics as the 98 * {@link LoggedJob} passed in this parameter 99 * 100 * @param job 101 * The dead job this ZombieJob instance is based on. 102 * @param cluster 103 * The cluster topology where the dead job ran on. This argument can 104 * be null if we do not have knowledge of the cluster topology. 105 */ ZombieJob(LoggedJob job, ClusterStory cluster)106 public ZombieJob(LoggedJob job, ClusterStory cluster) { 107 this(job, cluster, System.nanoTime()); 108 } 109 convertState(Values status)110 private static State convertState(Values status) { 111 if (status == Values.SUCCESS) { 112 return State.SUCCEEDED; 113 } else if (status == Values.FAILED) { 114 return State.FAILED; 115 } else if (status == Values.KILLED) { 116 return State.KILLED; 117 } else { 118 throw new IllegalArgumentException("unknown status " + status); 119 } 120 } 121 122 @Override getJobConf()123 public synchronized JobConf getJobConf() { 124 if (jobConf == null) { 125 jobConf = new JobConf(); 126 127 // Add parameters from the configuration in the job trace 128 // 129 // The reason why the job configuration parameters, as seen in the jobconf 130 // file, are added first because the specialized values obtained from 131 // Rumen should override the job conf values. 132 // 133 for (Map.Entry<Object, Object> entry : job.getJobProperties().getValue().entrySet()) { 134 jobConf.set(entry.getKey().toString(), entry.getValue().toString()); 135 } 136 137 //TODO Eliminate parameters that are already copied from the job's 138 // configuration file. 139 jobConf.setJobName(getName()); 140 jobConf.setUser(getUser()); 141 jobConf.setNumMapTasks(getNumberMaps()); 142 jobConf.setNumReduceTasks(getNumberReduces()); 143 jobConf.setQueueName(getQueueName()); 144 } 145 return jobConf; 146 } 147 148 @Override getInputSplits()149 public InputSplit[] getInputSplits() { 150 if (splits == null) { 151 List<InputSplit> splitsList = new ArrayList<InputSplit>(); 152 Path emptyPath = new Path("/"); 153 int totalHosts = 0; // use to determine avg # of hosts per split. 154 for (LoggedTask mapTask : job.getMapTasks()) { 155 Pre21JobHistoryConstants.Values taskType = mapTask.getTaskType(); 156 if (taskType != Pre21JobHistoryConstants.Values.MAP) { 157 LOG.warn("TaskType for a MapTask is not Map. task=" 158 + mapTask.getTaskID() + " type=" 159 + ((taskType == null) ? "null" : taskType.toString())); 160 continue; 161 } 162 List<LoggedLocation> locations = mapTask.getPreferredLocations(); 163 List<String> hostList = new ArrayList<String>(); 164 if (locations != null) { 165 for (LoggedLocation location : locations) { 166 List<NodeName> layers = location.getLayers(); 167 if (layers.size() == 0) { 168 LOG.warn("Bad location layer format for task "+mapTask.getTaskID()); 169 continue; 170 } 171 String host = layers.get(layers.size() - 1).getValue(); 172 if (host == null) { 173 LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers); 174 continue; 175 } 176 hostList.add(host); 177 } 178 } 179 String[] hosts = hostList.toArray(new String[hostList.size()]); 180 totalHosts += hosts.length; 181 long mapInputBytes = getTaskInfo(mapTask).getInputBytes(); 182 if (mapInputBytes < 0) { 183 LOG.warn("InputBytes for task "+mapTask.getTaskID()+" is not defined."); 184 mapInputBytes = 0; 185 } 186 187 splitsList.add(new FileSplit(emptyPath, 0, mapInputBytes, hosts)); 188 } 189 190 // If not all map tasks are in job trace, should make up some splits 191 // for missing map tasks. 192 int totalMaps = job.getTotalMaps(); 193 if (totalMaps < splitsList.size()) { 194 LOG.warn("TotalMaps for job " + job.getJobID() 195 + " is less than the total number of map task descriptions (" 196 + totalMaps + "<" + splitsList.size() + ")."); 197 } 198 199 int avgHostPerSplit; 200 if (splitsList.size() == 0) { 201 avgHostPerSplit = 3; 202 } else { 203 avgHostPerSplit = totalHosts / splitsList.size(); 204 if (avgHostPerSplit == 0) { 205 avgHostPerSplit = 3; 206 } 207 } 208 209 for (int i = splitsList.size(); i < totalMaps; i++) { 210 if (cluster == null) { 211 splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0])); 212 } else { 213 MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit, 214 random); 215 String[] hosts = new String[mNodes.length]; 216 for (int j = 0; j < hosts.length; ++j) { 217 hosts[j] = mNodes[j].getName(); 218 } 219 // TODO set size of a split to 0 now. 220 splitsList.add(new FileSplit(emptyPath, 0, 0, hosts)); 221 } 222 } 223 224 splits = splitsList.toArray(new InputSplit[splitsList.size()]); 225 } 226 return splits; 227 } 228 229 @Override getName()230 public String getName() { 231 JobName jobName = job.getJobName(); 232 if (jobName == null || jobName.getValue() == null) { 233 return "(name unknown)"; 234 } else { 235 return jobName.getValue(); 236 } 237 } 238 239 @Override getJobID()240 public JobID getJobID() { 241 return getLoggedJob().getJobID(); 242 } 243 sanitizeValue(int oldVal, int defaultVal, String name, JobID id)244 private int sanitizeValue(int oldVal, int defaultVal, String name, JobID id) { 245 if (oldVal == -1) { 246 LOG.warn(name +" not defined for "+id); 247 return defaultVal; 248 } 249 return oldVal; 250 } 251 252 @Override getNumberMaps()253 public int getNumberMaps() { 254 return sanitizeValue(job.getTotalMaps(), 0, "NumberMaps", job.getJobID()); 255 } 256 257 @Override getNumberReduces()258 public int getNumberReduces() { 259 return sanitizeValue(job.getTotalReduces(), 0, "NumberReduces", job.getJobID()); 260 } 261 262 @Override getOutcome()263 public Values getOutcome() { 264 return job.getOutcome(); 265 } 266 267 @Override getSubmissionTime()268 public long getSubmissionTime() { 269 return job.getSubmitTime() - job.getRelativeTime(); 270 } 271 272 @Override getQueueName()273 public String getQueueName() { 274 QueueName queue = job.getQueue(); 275 return (queue == null || queue.getValue() == null) 276 ? JobConf.DEFAULT_QUEUE_NAME 277 : queue.getValue(); 278 } 279 280 /** 281 * Getting the number of map tasks that are actually logged in the trace. 282 * @return The number of map tasks that are actually logged in the trace. 283 */ getNumLoggedMaps()284 public int getNumLoggedMaps() { 285 return job.getMapTasks().size(); 286 } 287 288 289 /** 290 * Getting the number of reduce tasks that are actually logged in the trace. 291 * @return The number of map tasks that are actually logged in the trace. 292 */ getNumLoggedReduces()293 public int getNumLoggedReduces() { 294 return job.getReduceTasks().size(); 295 } 296 297 /** 298 * Mask the job ID part in a {@link TaskID}. 299 * 300 * @param taskId 301 * raw {@link TaskID} read from trace 302 * @return masked {@link TaskID} with empty {@link JobID}. 303 */ maskTaskID(TaskID taskId)304 private TaskID maskTaskID(TaskID taskId) { 305 JobID jobId = new JobID(); 306 TaskType taskType = taskId.getTaskType(); 307 return new TaskID(jobId, taskType, taskId.getId()); 308 } 309 310 /** 311 * Mask the job ID part in a {@link TaskAttemptID}. 312 * 313 * @param attemptId 314 * raw {@link TaskAttemptID} read from trace 315 * @return masked {@link TaskAttemptID} with empty {@link JobID}. 316 */ maskAttemptID(TaskAttemptID attemptId)317 private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) { 318 JobID jobId = new JobID(); 319 TaskType taskType = attemptId.getTaskType(); 320 TaskID taskId = attemptId.getTaskID(); 321 return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType, 322 taskId.getId(), attemptId.getId()); 323 } 324 sanitizeLoggedTask(LoggedTask task)325 private LoggedTask sanitizeLoggedTask(LoggedTask task) { 326 if (task == null) { 327 return null; 328 } 329 if (task.getTaskType() == null) { 330 LOG.warn("Task " + task.getTaskID() + " has nulll TaskType"); 331 return null; 332 } 333 if (task.getTaskStatus() == null) { 334 LOG.warn("Task " + task.getTaskID() + " has nulll TaskStatus"); 335 return null; 336 } 337 return task; 338 } 339 sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt)340 private LoggedTaskAttempt sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt) { 341 if (attempt == null) { 342 return null; 343 } 344 if (attempt.getResult() == null) { 345 LOG.warn("TaskAttempt " + attempt.getResult() + " has nulll Result"); 346 return null; 347 } 348 349 return attempt; 350 } 351 352 /** 353 * Build task mapping and task attempt mapping, to be later used to find 354 * information of a particular {@link TaskID} or {@link TaskAttemptID}. 355 */ buildMaps()356 private synchronized void buildMaps() { 357 if (loggedTaskMap == null) { 358 loggedTaskMap = new HashMap<TaskID, LoggedTask>(); 359 loggedTaskAttemptMap = new HashMap<TaskAttemptID, LoggedTaskAttempt>(); 360 361 for (LoggedTask map : job.getMapTasks()) { 362 map = sanitizeLoggedTask(map); 363 if (map != null) { 364 loggedTaskMap.put(maskTaskID(map.taskID), map); 365 366 for (LoggedTaskAttempt mapAttempt : map.getAttempts()) { 367 mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt); 368 if (mapAttempt != null) { 369 TaskAttemptID id = mapAttempt.getAttemptID(); 370 loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt); 371 } 372 } 373 } 374 } 375 for (LoggedTask reduce : job.getReduceTasks()) { 376 reduce = sanitizeLoggedTask(reduce); 377 if (reduce != null) { 378 loggedTaskMap.put(maskTaskID(reduce.taskID), reduce); 379 380 for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) { 381 reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt); 382 if (reduceAttempt != null) { 383 TaskAttemptID id = reduceAttempt.getAttemptID(); 384 loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt); 385 } 386 } 387 } 388 } 389 390 // TODO: do not care about "other" tasks, "setup" or "clean" 391 } 392 } 393 394 @Override getUser()395 public String getUser() { 396 UserName retval = job.getUser(); 397 return (retval == null || retval.getValue() == null) 398 ? "(unknown)" 399 : retval.getValue(); 400 } 401 402 /** 403 * Get the underlining {@link LoggedJob} object read directly from the trace. 404 * This is mainly for debugging. 405 * 406 * @return the underlining {@link LoggedJob} object 407 */ getLoggedJob()408 public LoggedJob getLoggedJob() { 409 return job; 410 } 411 412 /** 413 * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with 414 * taskType, taskNumber, and taskAttemptNumber. This function does not care 415 * about locality, and follows the following decision logic: 1. Make up a 416 * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up 417 * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in 418 * trace, 3. Otherwise (final state is SUCCEEDED or FAILED), construct the 419 * {@link TaskAttemptInfo} from the trace. 420 */ getTaskAttemptInfo(TaskType taskType, int taskNumber, int taskAttemptNumber)421 public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, 422 int taskAttemptNumber) { 423 // does not care about locality. assume default locality is NODE_LOCAL. 424 // But if both task and task attempt exist in trace, use logged locality. 425 int locality = 0; 426 LoggedTask loggedTask = getLoggedTask(taskType, taskNumber); 427 if (loggedTask == null) { 428 // TODO insert parameters 429 TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); 430 return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, 431 taskNumber, locality); 432 } 433 434 LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType, 435 taskNumber, taskAttemptNumber); 436 if (loggedAttempt == null) { 437 // Task exists, but attempt is missing. 438 TaskInfo taskInfo = getTaskInfo(loggedTask); 439 return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, 440 taskNumber, locality); 441 } else { 442 // TODO should we handle killed attempts later? 443 if (loggedAttempt.getResult()== Values.KILLED) { 444 TaskInfo taskInfo = getTaskInfo(loggedTask); 445 return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, 446 taskNumber, locality); 447 } else { 448 return getTaskAttemptInfo(loggedTask, loggedAttempt); 449 } 450 } 451 } 452 453 @Override getTaskInfo(TaskType taskType, int taskNumber)454 public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { 455 return getTaskInfo(getLoggedTask(taskType, taskNumber)); 456 } 457 458 /** 459 * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with 460 * taskType, taskNumber, and taskAttemptNumber. This function considers 461 * locality, and follows the following decision logic: 1. Make up a 462 * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up 463 * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in 464 * trace, 3. If final state is FAILED, construct a {@link TaskAttemptInfo} 465 * from the trace, without considering locality. 4. If final state is 466 * SUCCEEDED, construct a {@link TaskAttemptInfo} from the trace, with runtime 467 * scaled according to locality in simulation and locality in trace. 468 */ 469 @Override getMapTaskAttemptInfoAdjusted(int taskNumber, int taskAttemptNumber, int locality)470 public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, 471 int taskAttemptNumber, int locality) { 472 TaskType taskType = TaskType.MAP; 473 LoggedTask loggedTask = getLoggedTask(taskType, taskNumber); 474 if (loggedTask == null) { 475 // TODO insert parameters 476 TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); 477 return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, 478 taskNumber, locality); 479 } 480 LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType, 481 taskNumber, taskAttemptNumber); 482 if (loggedAttempt == null) { 483 // Task exists, but attempt is missing. 484 TaskInfo taskInfo = getTaskInfo(loggedTask); 485 return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, 486 taskNumber, locality); 487 } else { 488 // Task and TaskAttempt both exist. 489 if (loggedAttempt.getResult() == Values.KILLED) { 490 TaskInfo taskInfo = getTaskInfo(loggedTask); 491 return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, 492 taskNumber, locality); 493 } else if (loggedAttempt.getResult() == Values.FAILED) { 494 /** 495 * FAILED attempt is not affected by locality however, made-up FAILED 496 * attempts ARE affected by locality, since statistics are present for 497 * attempts of different locality. 498 */ 499 return getTaskAttemptInfo(loggedTask, loggedAttempt); 500 } else if (loggedAttempt.getResult() == Values.SUCCESS) { 501 int loggedLocality = getLocality(loggedTask, loggedAttempt); 502 if (locality == loggedLocality) { 503 return getTaskAttemptInfo(loggedTask, loggedAttempt); 504 } else { 505 // attempt succeeded in trace. It is scheduled in simulation with 506 // a different locality. 507 return scaleInfo(loggedTask, loggedAttempt, locality, loggedLocality, 508 rackLocalOverNodeLocal, rackRemoteOverNodeLocal); 509 } 510 } else { 511 throw new IllegalArgumentException( 512 "attempt result is not SUCCEEDED, FAILED or KILLED: " 513 + loggedAttempt.getResult()); 514 } 515 } 516 } 517 sanitizeTaskRuntime(long time, ID id)518 private long sanitizeTaskRuntime(long time, ID id) { 519 if (time < 0) { 520 LOG.warn("Negative running time for task "+id+": "+time); 521 return 100L; // set default to 100ms. 522 } 523 return time; 524 } 525 526 @SuppressWarnings("hiding") scaleInfo(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality, double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal)527 private TaskAttemptInfo scaleInfo(LoggedTask loggedTask, 528 LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality, 529 double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) { 530 TaskInfo taskInfo = getTaskInfo(loggedTask); 531 double[] factors = new double[] { 1.0, rackLocalOverNodeLocal, 532 rackRemoteOverNodeLocal }; 533 double scaleFactor = factors[locality] / factors[loggedLocality]; 534 State state = convertState(loggedAttempt.getResult()); 535 if (loggedTask.getTaskType() == Values.MAP) { 536 long taskTime = 0; 537 if (loggedAttempt.getStartTime() == 0) { 538 taskTime = makeUpMapRuntime(state, locality); 539 } else { 540 taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime(); 541 } 542 taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID()); 543 taskTime *= scaleFactor; 544 return new MapTaskAttemptInfo 545 (state, taskInfo, taskTime, loggedAttempt.allSplitVectors()); 546 } else { 547 throw new IllegalArgumentException("taskType can only be MAP: " 548 + loggedTask.getTaskType()); 549 } 550 } 551 getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt)552 private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) { 553 int distance = cluster.getMaximumDistance(); 554 String rackHostName = loggedAttempt.getHostName().getValue(); 555 if (rackHostName == null) { 556 return distance; 557 } 558 MachineNode mn = getMachineNode(rackHostName); 559 if (mn == null) { 560 return distance; 561 } 562 List<LoggedLocation> locations = loggedTask.getPreferredLocations(); 563 if (locations != null) { 564 for (LoggedLocation location : locations) { 565 List<NodeName> layers = location.getLayers(); 566 if ((layers == null) || (layers.isEmpty())) { 567 continue; 568 } 569 String dataNodeName = layers.get(layers.size()-1).getValue(); 570 MachineNode dataNode = cluster.getMachineByName(dataNodeName); 571 if (dataNode != null) { 572 distance = Math.min(distance, cluster.distance(mn, dataNode)); 573 } 574 } 575 } 576 return distance; 577 } 578 getMachineNode(String rackHostName)579 private MachineNode getMachineNode(String rackHostName) { 580 ParsedHost parsedHost = ParsedHost.parse(rackHostName); 581 String hostName = (parsedHost == null) ? rackHostName 582 : parsedHost.getNodeName(); 583 if (hostName == null) { 584 return null; 585 } 586 return (cluster == null) ? null : cluster.getMachineByName(hostName); 587 } 588 getTaskAttemptInfo(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt)589 private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask, 590 LoggedTaskAttempt loggedAttempt) { 591 TaskInfo taskInfo = getTaskInfo(loggedTask); 592 593 List<List<Integer>> allSplitVectors = loggedAttempt.allSplitVectors(); 594 595 State state = convertState(loggedAttempt.getResult()); 596 if (loggedTask.getTaskType() == Values.MAP) { 597 long taskTime; 598 if (loggedAttempt.getStartTime() == 0) { 599 int locality = getLocality(loggedTask, loggedAttempt); 600 taskTime = makeUpMapRuntime(state, locality); 601 } else { 602 taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime(); 603 } 604 taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID()); 605 return new MapTaskAttemptInfo(state, taskInfo, taskTime, allSplitVectors); 606 } else if (loggedTask.getTaskType() == Values.REDUCE) { 607 long startTime = loggedAttempt.getStartTime(); 608 long mergeDone = loggedAttempt.getSortFinished(); 609 long shuffleDone = loggedAttempt.getShuffleFinished(); 610 long finishTime = loggedAttempt.getFinishTime(); 611 if (startTime <= 0 || startTime >= finishTime) { 612 // have seen startTime>finishTime. 613 // haven't seen reduce task with startTime=0 ever. But if this happens, 614 // make up a reduceTime with no shuffle/merge. 615 long reduceTime = makeUpReduceRuntime(state); 616 return new ReduceTaskAttemptInfo 617 (state, taskInfo, 0, 0, reduceTime, allSplitVectors); 618 } else { 619 if (shuffleDone <= 0) { 620 shuffleDone = startTime; 621 } 622 if (mergeDone <= 0) { 623 mergeDone = finishTime; 624 } 625 long shuffleTime = shuffleDone - startTime; 626 long mergeTime = mergeDone - shuffleDone; 627 long reduceTime = finishTime - mergeDone; 628 reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID()); 629 630 return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime, 631 mergeTime, reduceTime, allSplitVectors); 632 } 633 } else { 634 throw new IllegalArgumentException("taskType for " 635 + loggedTask.getTaskID() + " is neither MAP nor REDUCE: " 636 + loggedTask.getTaskType()); 637 } 638 } 639 getTaskInfo(LoggedTask loggedTask)640 private TaskInfo getTaskInfo(LoggedTask loggedTask) { 641 if (loggedTask == null) { 642 return new TaskInfo(0, 0, 0, 0, 0); 643 } 644 List<LoggedTaskAttempt> attempts = loggedTask.getAttempts(); 645 646 long inputBytes = -1; 647 long inputRecords = -1; 648 long outputBytes = -1; 649 long outputRecords = -1; 650 long heapMegabytes = -1; 651 ResourceUsageMetrics metrics = new ResourceUsageMetrics(); 652 653 Values type = loggedTask.getTaskType(); 654 if ((type != Values.MAP) && (type != Values.REDUCE)) { 655 throw new IllegalArgumentException( 656 "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString() 657 + " for task = " + loggedTask.getTaskID()); 658 } 659 660 for (LoggedTaskAttempt attempt : attempts) { 661 attempt = sanitizeLoggedTaskAttempt(attempt); 662 // ignore bad attempts or unsuccessful attempts. 663 if ((attempt == null) || (attempt.getResult() != Values.SUCCESS)) { 664 continue; 665 } 666 667 if (type == Values.MAP) { 668 inputBytes = attempt.getHdfsBytesRead(); 669 inputRecords = attempt.getMapInputRecords(); 670 outputBytes = 671 (job.getTotalReduces() > 0) ? attempt.getMapOutputBytes() : attempt 672 .getHdfsBytesWritten(); 673 outputRecords = attempt.getMapOutputRecords(); 674 heapMegabytes = 675 (job.getJobMapMB() > 0) ? job.getJobMapMB() : job 676 .getHeapMegabytes(); 677 } else { 678 inputBytes = attempt.getReduceShuffleBytes(); 679 inputRecords = attempt.getReduceInputRecords(); 680 outputBytes = attempt.getHdfsBytesWritten(); 681 outputRecords = attempt.getReduceOutputRecords(); 682 heapMegabytes = 683 (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job 684 .getHeapMegabytes(); 685 } 686 // set the resource usage metrics 687 metrics = attempt.getResourceUsageMetrics(); 688 break; 689 } 690 691 TaskInfo taskInfo = 692 new TaskInfo(inputBytes, (int) inputRecords, outputBytes, 693 (int) outputRecords, (int) heapMegabytes, 694 metrics); 695 return taskInfo; 696 } 697 makeTaskAttemptID(TaskType taskType, int taskNumber, int taskAttemptNumber)698 private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber, 699 int taskAttemptNumber) { 700 return new TaskAttemptID(new TaskID(job.getJobID(), taskType, taskNumber), 701 taskAttemptNumber); 702 } 703 makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo, int taskAttemptNumber, int taskNumber, int locality)704 private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo, 705 int taskAttemptNumber, int taskNumber, int locality) { 706 if (taskType == TaskType.MAP) { 707 State state = State.SUCCEEDED; 708 long runtime = 0; 709 710 // make up state 711 state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed()); 712 runtime = makeUpMapRuntime(state, locality); 713 runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType, 714 taskNumber, taskAttemptNumber)); 715 TaskAttemptInfo tai 716 = new MapTaskAttemptInfo(state, taskInfo, runtime, null); 717 return tai; 718 } else if (taskType == TaskType.REDUCE) { 719 State state = State.SUCCEEDED; 720 long shuffleTime = 0; 721 long sortTime = 0; 722 long reduceTime = 0; 723 724 // TODO make up state 725 // state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed()); 726 reduceTime = makeUpReduceRuntime(state); 727 TaskAttemptInfo tai = new ReduceTaskAttemptInfo 728 (state, taskInfo, shuffleTime, sortTime, reduceTime, null); 729 return tai; 730 } 731 732 throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: " 733 + taskType); 734 } 735 makeUpReduceRuntime(State state)736 private long makeUpReduceRuntime(State state) { 737 long reduceTime = 0; 738 for (int i = 0; i < 5; i++) { 739 reduceTime = doMakeUpReduceRuntime(state); 740 if (reduceTime >= 0) { 741 return reduceTime; 742 } 743 } 744 return 0; 745 } 746 doMakeUpReduceRuntime(State state)747 private long doMakeUpReduceRuntime(State state) { 748 long reduceTime; 749 try { 750 if (state == State.SUCCEEDED) { 751 reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF()); 752 } else if (state == State.FAILED) { 753 reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF()); 754 } else { 755 throw new IllegalArgumentException( 756 "state is neither SUCCEEDED nor FAILED: " + state); 757 } 758 return reduceTime; 759 } catch (NoValueToMakeUpRuntime e) { 760 return 0; 761 } 762 } 763 makeUpMapRuntime(State state, int locality)764 private long makeUpMapRuntime(State state, int locality) { 765 long runtime; 766 // make up runtime 767 if (state == State.SUCCEEDED || state == State.FAILED) { 768 List<LoggedDiscreteCDF> cdfList = 769 state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job 770 .getFailedMapAttemptCDFs(); 771 // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and 772 // the last group is "distance cannot be determined". All pig jobs 773 // would have only the 4th group, and pig tasks usually do not have 774 // any locality, so this group should count as "distance=2". 775 // However, setup/cleanup tasks are also counted in the 4th group. 776 // These tasks do not make sense. 777 if(cdfList==null) { 778 runtime = -1; 779 return runtime; 780 } 781 try { 782 runtime = makeUpRuntime(cdfList.get(locality)); 783 } catch (NoValueToMakeUpRuntime e) { 784 runtime = makeUpRuntime(cdfList); 785 } 786 } else { 787 throw new IllegalArgumentException( 788 "state is neither SUCCEEDED nor FAILED: " + state); 789 } 790 return runtime; 791 } 792 793 /** 794 * Perform a weighted random selection on a list of CDFs, and produce a random 795 * variable using the selected CDF. 796 * 797 * @param mapAttemptCDFs 798 * A list of CDFs for the distribution of runtime for the 1st, 2nd, 799 * ... map attempts for the job. 800 */ makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs)801 private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) { 802 int total = 0; 803 if(mapAttemptCDFs == null) { 804 return -1; 805 } 806 for (LoggedDiscreteCDF cdf : mapAttemptCDFs) { 807 total += cdf.getNumberValues(); 808 } 809 if (total == 0) { 810 return -1; 811 } 812 int index = random.nextInt(total); 813 for (LoggedDiscreteCDF cdf : mapAttemptCDFs) { 814 if (index >= cdf.getNumberValues()) { 815 index -= cdf.getNumberValues(); 816 } else { 817 if (index < 0) { 818 throw new IllegalStateException("application error"); 819 } 820 return makeUpRuntime(cdf); 821 } 822 } 823 throw new IllegalStateException("not possible to get here"); 824 } 825 makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF)826 private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) { 827 /* 828 * We need this odd-looking code because if a seed exists we need to ensure 829 * that only one interpolator is generated per LoggedDiscreteCDF, but if no 830 * seed exists then the potentially lengthy process of making an 831 * interpolator can happen outside the lock. makeUpRuntimeCore only locks 832 * around the two hash map accesses. 833 */ 834 if (hasRandomSeed) { 835 synchronized (interpolatorMap) { 836 return makeUpRuntimeCore(loggedDiscreteCDF); 837 } 838 } 839 840 return makeUpRuntimeCore(loggedDiscreteCDF); 841 } 842 getNextRandomSeed()843 private synchronized long getNextRandomSeed() { 844 numRandomSeeds++; 845 return RandomSeedGenerator.getSeed("forZombieJob" + job.getJobID(), 846 numRandomSeeds); 847 } 848 makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF)849 private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) { 850 CDFRandomGenerator interpolator; 851 852 synchronized (interpolatorMap) { 853 interpolator = interpolatorMap.get(loggedDiscreteCDF); 854 } 855 856 if (interpolator == null) { 857 if (loggedDiscreteCDF.getNumberValues() == 0) { 858 throw new NoValueToMakeUpRuntime("no value to use to make up runtime"); 859 } 860 861 interpolator = 862 hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator( 863 loggedDiscreteCDF, getNextRandomSeed()) 864 : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF); 865 866 /* 867 * It doesn't matter if we compute and store an interpolator twice because 868 * the two instances will be semantically identical and stateless, unless 869 * we're seeded, in which case we're not stateless but this code will be 870 * called synchronizedly. 871 */ 872 synchronized (interpolatorMap) { 873 interpolatorMap.put(loggedDiscreteCDF, interpolator); 874 } 875 } 876 877 return interpolator.randomValue(); 878 } 879 880 static private class NoValueToMakeUpRuntime extends IllegalArgumentException { 881 static final long serialVersionUID = 1L; 882 NoValueToMakeUpRuntime()883 NoValueToMakeUpRuntime() { 884 super(); 885 } 886 NoValueToMakeUpRuntime(String detailMessage)887 NoValueToMakeUpRuntime(String detailMessage) { 888 super(detailMessage); 889 } 890 NoValueToMakeUpRuntime(String detailMessage, Throwable cause)891 NoValueToMakeUpRuntime(String detailMessage, Throwable cause) { 892 super(detailMessage, cause); 893 } 894 NoValueToMakeUpRuntime(Throwable cause)895 NoValueToMakeUpRuntime(Throwable cause) { 896 super(cause); 897 } 898 } 899 makeUpState(int taskAttemptNumber, double[] numAttempts)900 private State makeUpState(int taskAttemptNumber, double[] numAttempts) { 901 902 // if numAttempts == null we are returning FAILED. 903 if(numAttempts == null) { 904 return State.FAILED; 905 } 906 if (taskAttemptNumber >= numAttempts.length - 1) { 907 // always succeed 908 return State.SUCCEEDED; 909 } else { 910 double pSucceed = numAttempts[taskAttemptNumber]; 911 double pFail = 0; 912 for (int i = taskAttemptNumber + 1; i < numAttempts.length; i++) { 913 pFail += numAttempts[i]; 914 } 915 return (random.nextDouble() < pSucceed / (pSucceed + pFail)) ? State.SUCCEEDED 916 : State.FAILED; 917 } 918 } 919 getMaskedTaskID(TaskType taskType, int taskNumber)920 private TaskID getMaskedTaskID(TaskType taskType, int taskNumber) { 921 return new TaskID(new JobID(), taskType, taskNumber); 922 } 923 getLoggedTask(TaskType taskType, int taskNumber)924 private LoggedTask getLoggedTask(TaskType taskType, int taskNumber) { 925 buildMaps(); 926 return loggedTaskMap.get(getMaskedTaskID(taskType, taskNumber)); 927 } 928 getLoggedTaskAttempt(TaskType taskType, int taskNumber, int taskAttemptNumber)929 private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType, 930 int taskNumber, int taskAttemptNumber) { 931 buildMaps(); 932 TaskAttemptID id = 933 new TaskAttemptID(getMaskedTaskID(taskType, taskNumber), 934 taskAttemptNumber); 935 return loggedTaskAttemptMap.get(id); 936 } 937 938 } 939