1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.tools.rumen;
19 
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Random;
24 import java.util.HashMap;
25 
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.mapred.JobConf;
30 import org.apache.hadoop.mapred.TaskStatus.State;
31 import org.apache.hadoop.mapreduce.ID;
32 import org.apache.hadoop.mapreduce.InputSplit;
33 import org.apache.hadoop.mapreduce.JobID;
34 import org.apache.hadoop.mapreduce.TaskAttemptID;
35 import org.apache.hadoop.mapreduce.TaskID;
36 import org.apache.hadoop.mapreduce.TaskType;
37 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
38 import org.apache.hadoop.tools.rumen.datatypes.*;
39 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
40 
41 /**
42  * {@link ZombieJob} is a layer above {@link LoggedJob} raw JSON objects.
43  *
44  * Each {@link ZombieJob} object represents a job in job history. For everything
45  * that exists in job history, contents are returned unchanged faithfully. To
46  * get input splits of a non-exist task, a non-exist task attempt, or an
47  * ill-formed task attempt, proper objects are made up from statistical
48  * sketches.
49  */
50 @SuppressWarnings("deprecation")
51 public class ZombieJob implements JobStory {
52   static final Log LOG = LogFactory.getLog(ZombieJob.class);
53   private final LoggedJob job;
54   private Map<TaskID, LoggedTask> loggedTaskMap;
55   private Map<TaskAttemptID, LoggedTaskAttempt> loggedTaskAttemptMap;
56   private final Random random;
57   private InputSplit[] splits;
58   private final ClusterStory cluster;
59   private JobConf jobConf;
60 
61   private long seed;
62   private long numRandomSeeds = 0;
63   private boolean hasRandomSeed = false;
64 
65   private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap =
66       new HashMap<LoggedDiscreteCDF, CDFRandomGenerator>();
67 
68   // TODO: Fix ZombieJob to initialize this correctly from observed data
69   double rackLocalOverNodeLocal = 1.5;
70   double rackRemoteOverNodeLocal = 3.0;
71 
72   /**
73    * This constructor creates a {@link ZombieJob} with the same semantics as the
74    * {@link LoggedJob} passed in this parameter
75    *
76    * @param job
77    *          The dead job this ZombieJob instance is based on.
78    * @param cluster
79    *          The cluster topology where the dead job ran on. This argument can
80    *          be null if we do not have knowledge of the cluster topology.
81    * @param seed
82    *          Seed for the random number generator for filling in information
83    *          not available from the ZombieJob.
84    */
ZombieJob(LoggedJob job, ClusterStory cluster, long seed)85   public ZombieJob(LoggedJob job, ClusterStory cluster, long seed) {
86     if (job == null) {
87       throw new IllegalArgumentException("job is null");
88     }
89     this.job = job;
90     this.cluster = cluster;
91     random = new Random(seed);
92     this.seed = seed;
93     hasRandomSeed = true;
94   }
95 
96   /**
97    * This constructor creates a {@link ZombieJob} with the same semantics as the
98    * {@link LoggedJob} passed in this parameter
99    *
100    * @param job
101    *          The dead job this ZombieJob instance is based on.
102    * @param cluster
103    *          The cluster topology where the dead job ran on. This argument can
104    *          be null if we do not have knowledge of the cluster topology.
105    */
ZombieJob(LoggedJob job, ClusterStory cluster)106   public ZombieJob(LoggedJob job, ClusterStory cluster) {
107     this(job, cluster, System.nanoTime());
108   }
109 
convertState(Values status)110   private static State convertState(Values status) {
111     if (status == Values.SUCCESS) {
112       return State.SUCCEEDED;
113     } else if (status == Values.FAILED) {
114       return State.FAILED;
115     } else if (status == Values.KILLED) {
116       return State.KILLED;
117     } else {
118       throw new IllegalArgumentException("unknown status " + status);
119     }
120   }
121 
122   @Override
getJobConf()123   public synchronized JobConf getJobConf() {
124     if (jobConf == null) {
125       jobConf = new JobConf();
126 
127       // Add parameters from the configuration in the job trace
128       //
129       // The reason why the job configuration parameters, as seen in the jobconf
130       // file, are added first because the specialized values obtained from
131       // Rumen should override the job conf values.
132       //
133       for (Map.Entry<Object, Object> entry : job.getJobProperties().getValue().entrySet()) {
134         jobConf.set(entry.getKey().toString(), entry.getValue().toString());
135       }
136 
137       //TODO Eliminate parameters that are already copied from the job's
138       // configuration file.
139       jobConf.setJobName(getName());
140       jobConf.setUser(getUser());
141       jobConf.setNumMapTasks(getNumberMaps());
142       jobConf.setNumReduceTasks(getNumberReduces());
143       jobConf.setQueueName(getQueueName());
144     }
145     return jobConf;
146   }
147 
148   @Override
getInputSplits()149   public InputSplit[] getInputSplits() {
150     if (splits == null) {
151       List<InputSplit> splitsList = new ArrayList<InputSplit>();
152       Path emptyPath = new Path("/");
153       int totalHosts = 0; // use to determine avg # of hosts per split.
154       for (LoggedTask mapTask : job.getMapTasks()) {
155         Pre21JobHistoryConstants.Values taskType = mapTask.getTaskType();
156         if (taskType != Pre21JobHistoryConstants.Values.MAP) {
157           LOG.warn("TaskType for a MapTask is not Map. task="
158               + mapTask.getTaskID() + " type="
159               + ((taskType == null) ? "null" : taskType.toString()));
160           continue;
161         }
162         List<LoggedLocation> locations = mapTask.getPreferredLocations();
163         List<String> hostList = new ArrayList<String>();
164         if (locations != null) {
165           for (LoggedLocation location : locations) {
166             List<NodeName> layers = location.getLayers();
167             if (layers.size() == 0) {
168               LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
169               continue;
170             }
171             String host = layers.get(layers.size() - 1).getValue();
172             if (host == null) {
173               LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers);
174               continue;
175             }
176             hostList.add(host);
177           }
178         }
179         String[] hosts = hostList.toArray(new String[hostList.size()]);
180         totalHosts += hosts.length;
181         long mapInputBytes = getTaskInfo(mapTask).getInputBytes();
182         if (mapInputBytes < 0) {
183           LOG.warn("InputBytes for task "+mapTask.getTaskID()+" is not defined.");
184           mapInputBytes = 0;
185         }
186 
187         splitsList.add(new FileSplit(emptyPath, 0, mapInputBytes, hosts));
188       }
189 
190       // If not all map tasks are in job trace, should make up some splits
191       // for missing map tasks.
192       int totalMaps = job.getTotalMaps();
193       if (totalMaps < splitsList.size()) {
194         LOG.warn("TotalMaps for job " + job.getJobID()
195             + " is less than the total number of map task descriptions ("
196             + totalMaps + "<" + splitsList.size() + ").");
197       }
198 
199       int avgHostPerSplit;
200       if (splitsList.size() == 0) {
201         avgHostPerSplit = 3;
202       } else {
203         avgHostPerSplit = totalHosts / splitsList.size();
204         if (avgHostPerSplit == 0) {
205           avgHostPerSplit = 3;
206         }
207       }
208 
209       for (int i = splitsList.size(); i < totalMaps; i++) {
210         if (cluster == null) {
211           splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
212         } else {
213           MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit,
214                                                            random);
215           String[] hosts = new String[mNodes.length];
216           for (int j = 0; j < hosts.length; ++j) {
217             hosts[j] = mNodes[j].getName();
218           }
219           // TODO set size of a split to 0 now.
220           splitsList.add(new FileSplit(emptyPath, 0, 0, hosts));
221         }
222       }
223 
224       splits = splitsList.toArray(new InputSplit[splitsList.size()]);
225     }
226     return splits;
227   }
228 
229   @Override
getName()230   public String getName() {
231     JobName jobName = job.getJobName();
232     if (jobName == null || jobName.getValue() == null) {
233       return "(name unknown)";
234     } else {
235       return jobName.getValue();
236     }
237   }
238 
239   @Override
getJobID()240   public JobID getJobID() {
241     return getLoggedJob().getJobID();
242   }
243 
sanitizeValue(int oldVal, int defaultVal, String name, JobID id)244   private int sanitizeValue(int oldVal, int defaultVal, String name, JobID id) {
245     if (oldVal == -1) {
246       LOG.warn(name +" not defined for "+id);
247       return defaultVal;
248     }
249     return oldVal;
250   }
251 
252   @Override
getNumberMaps()253   public int getNumberMaps() {
254     return sanitizeValue(job.getTotalMaps(), 0, "NumberMaps", job.getJobID());
255   }
256 
257   @Override
getNumberReduces()258   public int getNumberReduces() {
259     return sanitizeValue(job.getTotalReduces(), 0, "NumberReduces", job.getJobID());
260   }
261 
262   @Override
getOutcome()263   public Values getOutcome() {
264     return job.getOutcome();
265   }
266 
267   @Override
getSubmissionTime()268   public long getSubmissionTime() {
269     return job.getSubmitTime() - job.getRelativeTime();
270   }
271 
272   @Override
getQueueName()273   public String getQueueName() {
274     QueueName queue = job.getQueue();
275     return (queue == null || queue.getValue() == null)
276            ? JobConf.DEFAULT_QUEUE_NAME
277            : queue.getValue();
278   }
279 
280   /**
281    * Getting the number of map tasks that are actually logged in the trace.
282    * @return The number of map tasks that are actually logged in the trace.
283    */
getNumLoggedMaps()284   public int getNumLoggedMaps() {
285     return job.getMapTasks().size();
286   }
287 
288 
289   /**
290    * Getting the number of reduce tasks that are actually logged in the trace.
291    * @return The number of map tasks that are actually logged in the trace.
292    */
getNumLoggedReduces()293   public int getNumLoggedReduces() {
294     return job.getReduceTasks().size();
295   }
296 
297   /**
298    * Mask the job ID part in a {@link TaskID}.
299    *
300    * @param taskId
301    *          raw {@link TaskID} read from trace
302    * @return masked {@link TaskID} with empty {@link JobID}.
303    */
maskTaskID(TaskID taskId)304   private TaskID maskTaskID(TaskID taskId) {
305     JobID jobId = new JobID();
306     TaskType taskType = taskId.getTaskType();
307     return new TaskID(jobId, taskType, taskId.getId());
308   }
309 
310   /**
311    * Mask the job ID part in a {@link TaskAttemptID}.
312    *
313    * @param attemptId
314    *          raw {@link TaskAttemptID} read from trace
315    * @return masked {@link TaskAttemptID} with empty {@link JobID}.
316    */
maskAttemptID(TaskAttemptID attemptId)317   private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
318     JobID jobId = new JobID();
319     TaskType taskType = attemptId.getTaskType();
320     TaskID taskId = attemptId.getTaskID();
321     return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType,
322         taskId.getId(), attemptId.getId());
323   }
324 
sanitizeLoggedTask(LoggedTask task)325   private LoggedTask sanitizeLoggedTask(LoggedTask task) {
326     if (task == null) {
327       return null;
328     }
329     if (task.getTaskType() == null) {
330       LOG.warn("Task " + task.getTaskID() + " has nulll TaskType");
331       return null;
332     }
333     if (task.getTaskStatus() == null) {
334       LOG.warn("Task " + task.getTaskID() + " has nulll TaskStatus");
335       return null;
336     }
337     return task;
338   }
339 
sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt)340   private LoggedTaskAttempt sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt) {
341     if (attempt == null) {
342       return null;
343     }
344     if (attempt.getResult() == null) {
345       LOG.warn("TaskAttempt " + attempt.getResult() + " has nulll Result");
346       return null;
347     }
348 
349     return attempt;
350   }
351 
352   /**
353    * Build task mapping and task attempt mapping, to be later used to find
354    * information of a particular {@link TaskID} or {@link TaskAttemptID}.
355    */
buildMaps()356   private synchronized void buildMaps() {
357     if (loggedTaskMap == null) {
358       loggedTaskMap = new HashMap<TaskID, LoggedTask>();
359       loggedTaskAttemptMap = new HashMap<TaskAttemptID, LoggedTaskAttempt>();
360 
361       for (LoggedTask map : job.getMapTasks()) {
362         map = sanitizeLoggedTask(map);
363         if (map != null) {
364           loggedTaskMap.put(maskTaskID(map.taskID), map);
365 
366           for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
367             mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
368             if (mapAttempt != null) {
369               TaskAttemptID id = mapAttempt.getAttemptID();
370               loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
371             }
372           }
373         }
374       }
375       for (LoggedTask reduce : job.getReduceTasks()) {
376         reduce = sanitizeLoggedTask(reduce);
377         if (reduce != null) {
378           loggedTaskMap.put(maskTaskID(reduce.taskID), reduce);
379 
380           for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
381             reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
382             if (reduceAttempt != null) {
383               TaskAttemptID id = reduceAttempt.getAttemptID();
384               loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
385             }
386           }
387         }
388       }
389 
390       // TODO: do not care about "other" tasks, "setup" or "clean"
391     }
392   }
393 
394   @Override
getUser()395   public String getUser() {
396     UserName retval = job.getUser();
397     return (retval == null || retval.getValue() == null)
398            ? "(unknown)"
399            : retval.getValue();
400   }
401 
402   /**
403    * Get the underlining {@link LoggedJob} object read directly from the trace.
404    * This is mainly for debugging.
405    *
406    * @return the underlining {@link LoggedJob} object
407    */
getLoggedJob()408   public LoggedJob getLoggedJob() {
409     return job;
410   }
411 
412   /**
413    * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
414    * taskType, taskNumber, and taskAttemptNumber. This function does not care
415    * about locality, and follows the following decision logic: 1. Make up a
416    * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
417    * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
418    * trace, 3. Otherwise (final state is SUCCEEDED or FAILED), construct the
419    * {@link TaskAttemptInfo} from the trace.
420    */
getTaskAttemptInfo(TaskType taskType, int taskNumber, int taskAttemptNumber)421   public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
422       int taskAttemptNumber) {
423     // does not care about locality. assume default locality is NODE_LOCAL.
424     // But if both task and task attempt exist in trace, use logged locality.
425     int locality = 0;
426     LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
427     if (loggedTask == null) {
428       // TODO insert parameters
429       TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
430       return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
431           taskNumber, locality);
432     }
433 
434     LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
435         taskNumber, taskAttemptNumber);
436     if (loggedAttempt == null) {
437       // Task exists, but attempt is missing.
438       TaskInfo taskInfo = getTaskInfo(loggedTask);
439       return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
440           taskNumber, locality);
441     } else {
442       // TODO should we handle killed attempts later?
443       if (loggedAttempt.getResult()== Values.KILLED) {
444         TaskInfo taskInfo = getTaskInfo(loggedTask);
445         return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
446             taskNumber, locality);
447       } else {
448         return getTaskAttemptInfo(loggedTask, loggedAttempt);
449       }
450     }
451   }
452 
453   @Override
getTaskInfo(TaskType taskType, int taskNumber)454   public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
455     return getTaskInfo(getLoggedTask(taskType, taskNumber));
456   }
457 
458   /**
459    * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
460    * taskType, taskNumber, and taskAttemptNumber. This function considers
461    * locality, and follows the following decision logic: 1. Make up a
462    * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
463    * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
464    * trace, 3. If final state is FAILED, construct a {@link TaskAttemptInfo}
465    * from the trace, without considering locality. 4. If final state is
466    * SUCCEEDED, construct a {@link TaskAttemptInfo} from the trace, with runtime
467    * scaled according to locality in simulation and locality in trace.
468    */
469   @Override
getMapTaskAttemptInfoAdjusted(int taskNumber, int taskAttemptNumber, int locality)470   public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
471       int taskAttemptNumber, int locality) {
472     TaskType taskType = TaskType.MAP;
473     LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
474     if (loggedTask == null) {
475       // TODO insert parameters
476       TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
477       return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
478           taskNumber, locality);
479     }
480     LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
481         taskNumber, taskAttemptNumber);
482     if (loggedAttempt == null) {
483       // Task exists, but attempt is missing.
484       TaskInfo taskInfo = getTaskInfo(loggedTask);
485       return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
486           taskNumber, locality);
487     } else {
488       // Task and TaskAttempt both exist.
489       if (loggedAttempt.getResult() == Values.KILLED) {
490         TaskInfo taskInfo = getTaskInfo(loggedTask);
491         return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
492             taskNumber, locality);
493       } else if (loggedAttempt.getResult() == Values.FAILED) {
494         /**
495          * FAILED attempt is not affected by locality however, made-up FAILED
496          * attempts ARE affected by locality, since statistics are present for
497          * attempts of different locality.
498          */
499         return getTaskAttemptInfo(loggedTask, loggedAttempt);
500       } else if (loggedAttempt.getResult() == Values.SUCCESS) {
501         int loggedLocality = getLocality(loggedTask, loggedAttempt);
502         if (locality == loggedLocality) {
503           return getTaskAttemptInfo(loggedTask, loggedAttempt);
504         } else {
505           // attempt succeeded in trace. It is scheduled in simulation with
506           // a different locality.
507           return scaleInfo(loggedTask, loggedAttempt, locality, loggedLocality,
508               rackLocalOverNodeLocal, rackRemoteOverNodeLocal);
509         }
510       } else {
511         throw new IllegalArgumentException(
512             "attempt result is not SUCCEEDED, FAILED or KILLED: "
513                 + loggedAttempt.getResult());
514       }
515     }
516   }
517 
sanitizeTaskRuntime(long time, ID id)518   private long sanitizeTaskRuntime(long time, ID id) {
519     if (time < 0) {
520       LOG.warn("Negative running time for task "+id+": "+time);
521       return 100L; // set default to 100ms.
522     }
523     return time;
524   }
525 
526   @SuppressWarnings("hiding")
scaleInfo(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality, double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal)527   private TaskAttemptInfo scaleInfo(LoggedTask loggedTask,
528       LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality,
529       double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) {
530     TaskInfo taskInfo = getTaskInfo(loggedTask);
531     double[] factors = new double[] { 1.0, rackLocalOverNodeLocal,
532         rackRemoteOverNodeLocal };
533     double scaleFactor = factors[locality] / factors[loggedLocality];
534     State state = convertState(loggedAttempt.getResult());
535     if (loggedTask.getTaskType() == Values.MAP) {
536       long taskTime = 0;
537       if (loggedAttempt.getStartTime() == 0) {
538         taskTime = makeUpMapRuntime(state, locality);
539       } else {
540         taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
541       }
542       taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
543       taskTime *= scaleFactor;
544       return new MapTaskAttemptInfo
545         (state, taskInfo, taskTime, loggedAttempt.allSplitVectors());
546     } else {
547       throw new IllegalArgumentException("taskType can only be MAP: "
548           + loggedTask.getTaskType());
549     }
550   }
551 
getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt)552   private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
553     int distance = cluster.getMaximumDistance();
554     String rackHostName = loggedAttempt.getHostName().getValue();
555     if (rackHostName == null) {
556       return distance;
557     }
558     MachineNode mn = getMachineNode(rackHostName);
559     if (mn == null) {
560       return distance;
561     }
562     List<LoggedLocation> locations = loggedTask.getPreferredLocations();
563     if (locations != null) {
564       for (LoggedLocation location : locations) {
565         List<NodeName> layers = location.getLayers();
566         if ((layers == null) || (layers.isEmpty())) {
567           continue;
568         }
569         String dataNodeName = layers.get(layers.size()-1).getValue();
570         MachineNode dataNode = cluster.getMachineByName(dataNodeName);
571         if (dataNode != null) {
572           distance = Math.min(distance, cluster.distance(mn, dataNode));
573         }
574       }
575     }
576     return distance;
577   }
578 
getMachineNode(String rackHostName)579   private MachineNode getMachineNode(String rackHostName) {
580     ParsedHost parsedHost = ParsedHost.parse(rackHostName);
581     String hostName = (parsedHost == null) ? rackHostName
582                                            : parsedHost.getNodeName();
583     if (hostName == null) {
584       return null;
585     }
586     return (cluster == null) ? null : cluster.getMachineByName(hostName);
587   }
588 
getTaskAttemptInfo(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt)589   private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask,
590       LoggedTaskAttempt loggedAttempt) {
591     TaskInfo taskInfo = getTaskInfo(loggedTask);
592 
593     List<List<Integer>> allSplitVectors = loggedAttempt.allSplitVectors();
594 
595     State state = convertState(loggedAttempt.getResult());
596     if (loggedTask.getTaskType() == Values.MAP) {
597       long taskTime;
598       if (loggedAttempt.getStartTime() == 0) {
599         int locality = getLocality(loggedTask, loggedAttempt);
600         taskTime = makeUpMapRuntime(state, locality);
601       } else {
602         taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
603       }
604       taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
605       return new MapTaskAttemptInfo(state, taskInfo, taskTime, allSplitVectors);
606     } else if (loggedTask.getTaskType() == Values.REDUCE) {
607       long startTime = loggedAttempt.getStartTime();
608       long mergeDone = loggedAttempt.getSortFinished();
609       long shuffleDone = loggedAttempt.getShuffleFinished();
610       long finishTime = loggedAttempt.getFinishTime();
611       if (startTime <= 0 || startTime >= finishTime) {
612         // have seen startTime>finishTime.
613         // haven't seen reduce task with startTime=0 ever. But if this happens,
614         // make up a reduceTime with no shuffle/merge.
615         long reduceTime = makeUpReduceRuntime(state);
616         return new ReduceTaskAttemptInfo
617           (state, taskInfo, 0, 0, reduceTime, allSplitVectors);
618       } else {
619         if (shuffleDone <= 0) {
620           shuffleDone = startTime;
621         }
622         if (mergeDone <= 0) {
623           mergeDone = finishTime;
624         }
625         long shuffleTime = shuffleDone - startTime;
626         long mergeTime = mergeDone - shuffleDone;
627         long reduceTime = finishTime - mergeDone;
628         reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID());
629 
630         return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
631             mergeTime, reduceTime, allSplitVectors);
632       }
633     } else {
634       throw new IllegalArgumentException("taskType for "
635           + loggedTask.getTaskID() + " is neither MAP nor REDUCE: "
636           + loggedTask.getTaskType());
637     }
638   }
639 
getTaskInfo(LoggedTask loggedTask)640   private TaskInfo getTaskInfo(LoggedTask loggedTask) {
641     if (loggedTask == null) {
642       return new TaskInfo(0, 0, 0, 0, 0);
643     }
644     List<LoggedTaskAttempt> attempts = loggedTask.getAttempts();
645 
646     long inputBytes = -1;
647     long inputRecords = -1;
648     long outputBytes = -1;
649     long outputRecords = -1;
650     long heapMegabytes = -1;
651     ResourceUsageMetrics metrics = new ResourceUsageMetrics();
652 
653     Values type = loggedTask.getTaskType();
654     if ((type != Values.MAP) && (type != Values.REDUCE)) {
655       throw new IllegalArgumentException(
656           "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString()
657               + " for task = " + loggedTask.getTaskID());
658     }
659 
660     for (LoggedTaskAttempt attempt : attempts) {
661       attempt = sanitizeLoggedTaskAttempt(attempt);
662       // ignore bad attempts or unsuccessful attempts.
663       if ((attempt == null) || (attempt.getResult() != Values.SUCCESS)) {
664         continue;
665       }
666 
667       if (type == Values.MAP) {
668         inputBytes = attempt.getHdfsBytesRead();
669         inputRecords = attempt.getMapInputRecords();
670         outputBytes =
671             (job.getTotalReduces() > 0) ? attempt.getMapOutputBytes() : attempt
672                 .getHdfsBytesWritten();
673         outputRecords = attempt.getMapOutputRecords();
674         heapMegabytes =
675             (job.getJobMapMB() > 0) ? job.getJobMapMB() : job
676                 .getHeapMegabytes();
677       } else {
678         inputBytes = attempt.getReduceShuffleBytes();
679         inputRecords = attempt.getReduceInputRecords();
680         outputBytes = attempt.getHdfsBytesWritten();
681         outputRecords = attempt.getReduceOutputRecords();
682         heapMegabytes =
683             (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
684                 .getHeapMegabytes();
685       }
686       // set the resource usage metrics
687       metrics = attempt.getResourceUsageMetrics();
688       break;
689     }
690 
691     TaskInfo taskInfo =
692         new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
693             (int) outputRecords, (int) heapMegabytes,
694             metrics);
695     return taskInfo;
696   }
697 
makeTaskAttemptID(TaskType taskType, int taskNumber, int taskAttemptNumber)698   private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
699       int taskAttemptNumber) {
700     return new TaskAttemptID(new TaskID(job.getJobID(), taskType, taskNumber),
701                              taskAttemptNumber);
702   }
703 
makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo, int taskAttemptNumber, int taskNumber, int locality)704   private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
705       int taskAttemptNumber, int taskNumber, int locality) {
706     if (taskType == TaskType.MAP) {
707       State state = State.SUCCEEDED;
708       long runtime = 0;
709 
710       // make up state
711       state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
712       runtime = makeUpMapRuntime(state, locality);
713       runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
714                                                taskNumber, taskAttemptNumber));
715       TaskAttemptInfo tai
716         = new MapTaskAttemptInfo(state, taskInfo, runtime, null);
717       return tai;
718     } else if (taskType == TaskType.REDUCE) {
719       State state = State.SUCCEEDED;
720       long shuffleTime = 0;
721       long sortTime = 0;
722       long reduceTime = 0;
723 
724       // TODO make up state
725       // state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed());
726       reduceTime = makeUpReduceRuntime(state);
727       TaskAttemptInfo tai = new ReduceTaskAttemptInfo
728         (state, taskInfo, shuffleTime, sortTime, reduceTime, null);
729       return tai;
730     }
731 
732     throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
733         + taskType);
734   }
735 
makeUpReduceRuntime(State state)736   private long makeUpReduceRuntime(State state) {
737     long reduceTime = 0;
738     for (int i = 0; i < 5; i++) {
739       reduceTime = doMakeUpReduceRuntime(state);
740       if (reduceTime >= 0) {
741         return reduceTime;
742       }
743     }
744     return 0;
745   }
746 
doMakeUpReduceRuntime(State state)747   private long doMakeUpReduceRuntime(State state) {
748     long reduceTime;
749     try {
750       if (state == State.SUCCEEDED) {
751         reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF());
752       } else if (state == State.FAILED) {
753         reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF());
754       } else {
755         throw new IllegalArgumentException(
756             "state is neither SUCCEEDED nor FAILED: " + state);
757       }
758       return reduceTime;
759     } catch (NoValueToMakeUpRuntime e) {
760       return 0;
761     }
762   }
763 
makeUpMapRuntime(State state, int locality)764   private long makeUpMapRuntime(State state, int locality) {
765     long runtime;
766     // make up runtime
767     if (state == State.SUCCEEDED || state == State.FAILED) {
768       List<LoggedDiscreteCDF> cdfList =
769           state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
770               .getFailedMapAttemptCDFs();
771       // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
772       // the last group is "distance cannot be determined". All pig jobs
773       // would have only the 4th group, and pig tasks usually do not have
774       // any locality, so this group should count as "distance=2".
775       // However, setup/cleanup tasks are also counted in the 4th group.
776       // These tasks do not make sense.
777       if(cdfList==null) {
778     	  runtime = -1;
779     	  return runtime;
780       }
781       try {
782         runtime = makeUpRuntime(cdfList.get(locality));
783       } catch (NoValueToMakeUpRuntime e) {
784         runtime = makeUpRuntime(cdfList);
785       }
786     } else {
787       throw new IllegalArgumentException(
788           "state is neither SUCCEEDED nor FAILED: " + state);
789     }
790     return runtime;
791   }
792 
793   /**
794    * Perform a weighted random selection on a list of CDFs, and produce a random
795    * variable using the selected CDF.
796    *
797    * @param mapAttemptCDFs
798    *          A list of CDFs for the distribution of runtime for the 1st, 2nd,
799    *          ... map attempts for the job.
800    */
makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs)801   private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) {
802     int total = 0;
803     if(mapAttemptCDFs == null) {
804     	return -1;
805     }
806     for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
807       total += cdf.getNumberValues();
808     }
809     if (total == 0) {
810       return -1;
811     }
812     int index = random.nextInt(total);
813     for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
814       if (index >= cdf.getNumberValues()) {
815         index -= cdf.getNumberValues();
816       } else {
817         if (index < 0) {
818           throw new IllegalStateException("application error");
819         }
820         return makeUpRuntime(cdf);
821       }
822     }
823     throw new IllegalStateException("not possible to get here");
824   }
825 
makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF)826   private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) {
827     /*
828      * We need this odd-looking code because if a seed exists we need to ensure
829      * that only one interpolator is generated per LoggedDiscreteCDF, but if no
830      * seed exists then the potentially lengthy process of making an
831      * interpolator can happen outside the lock. makeUpRuntimeCore only locks
832      * around the two hash map accesses.
833      */
834     if (hasRandomSeed) {
835       synchronized (interpolatorMap) {
836         return makeUpRuntimeCore(loggedDiscreteCDF);
837       }
838     }
839 
840     return makeUpRuntimeCore(loggedDiscreteCDF);
841   }
842 
getNextRandomSeed()843   private synchronized long getNextRandomSeed() {
844     numRandomSeeds++;
845     return RandomSeedGenerator.getSeed("forZombieJob" + job.getJobID(),
846                                        numRandomSeeds);
847   }
848 
makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF)849   private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) {
850     CDFRandomGenerator interpolator;
851 
852     synchronized (interpolatorMap) {
853       interpolator = interpolatorMap.get(loggedDiscreteCDF);
854     }
855 
856     if (interpolator == null) {
857       if (loggedDiscreteCDF.getNumberValues() == 0) {
858         throw new NoValueToMakeUpRuntime("no value to use to make up runtime");
859       }
860 
861       interpolator =
862           hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator(
863               loggedDiscreteCDF, getNextRandomSeed())
864               : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF);
865 
866       /*
867        * It doesn't matter if we compute and store an interpolator twice because
868        * the two instances will be semantically identical and stateless, unless
869        * we're seeded, in which case we're not stateless but this code will be
870        * called synchronizedly.
871        */
872       synchronized (interpolatorMap) {
873         interpolatorMap.put(loggedDiscreteCDF, interpolator);
874       }
875     }
876 
877     return interpolator.randomValue();
878   }
879 
880   static private class NoValueToMakeUpRuntime extends IllegalArgumentException {
881     static final long serialVersionUID = 1L;
882 
NoValueToMakeUpRuntime()883     NoValueToMakeUpRuntime() {
884       super();
885     }
886 
NoValueToMakeUpRuntime(String detailMessage)887     NoValueToMakeUpRuntime(String detailMessage) {
888       super(detailMessage);
889     }
890 
NoValueToMakeUpRuntime(String detailMessage, Throwable cause)891     NoValueToMakeUpRuntime(String detailMessage, Throwable cause) {
892       super(detailMessage, cause);
893     }
894 
NoValueToMakeUpRuntime(Throwable cause)895     NoValueToMakeUpRuntime(Throwable cause) {
896       super(cause);
897     }
898   }
899 
makeUpState(int taskAttemptNumber, double[] numAttempts)900   private State makeUpState(int taskAttemptNumber, double[] numAttempts) {
901 
902   // if numAttempts == null we are returning FAILED.
903   if(numAttempts == null) {
904     return State.FAILED;
905   }
906     if (taskAttemptNumber >= numAttempts.length - 1) {
907       // always succeed
908       return State.SUCCEEDED;
909     } else {
910       double pSucceed = numAttempts[taskAttemptNumber];
911       double pFail = 0;
912       for (int i = taskAttemptNumber + 1; i < numAttempts.length; i++) {
913         pFail += numAttempts[i];
914       }
915       return (random.nextDouble() < pSucceed / (pSucceed + pFail)) ? State.SUCCEEDED
916           : State.FAILED;
917     }
918   }
919 
getMaskedTaskID(TaskType taskType, int taskNumber)920   private TaskID getMaskedTaskID(TaskType taskType, int taskNumber) {
921     return new TaskID(new JobID(), taskType, taskNumber);
922   }
923 
getLoggedTask(TaskType taskType, int taskNumber)924   private LoggedTask getLoggedTask(TaskType taskType, int taskNumber) {
925     buildMaps();
926     return loggedTaskMap.get(getMaskedTaskID(taskType, taskNumber));
927   }
928 
getLoggedTaskAttempt(TaskType taskType, int taskNumber, int taskAttemptNumber)929   private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType,
930       int taskNumber, int taskAttemptNumber) {
931     buildMaps();
932     TaskAttemptID id =
933         new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
934             taskAttemptNumber);
935     return loggedTaskAttemptMap.get(id);
936   }
937 
938 }
939