1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.tools.rumen;
19 
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Set;
24 import java.util.TreeSet;
25 
26 import org.apache.hadoop.mapreduce.TaskID;
27 import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
28 import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
29 import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
30 
31 import org.apache.hadoop.util.StringUtils;
32 import org.codehaus.jackson.annotate.JsonAnySetter;
33 
34 /**
35  * A {@link LoggedTask} represents a [hadoop] task that is part of a hadoop job.
36  * It knows about the [pssibly empty] sequence of attempts, its I/O footprint,
37  * and its runtime.
38  *
39  * All of the public methods are simply accessors for the instance variables we
40  * want to write out in the JSON files.
41  *
42  */
43 public class LoggedTask implements DeepCompare {
44   long inputBytes = -1L;
45   long inputRecords = -1L;
46   long outputBytes = -1L;
47   long outputRecords = -1L;
48   TaskID taskID;
49   long startTime = -1L;
50   long finishTime = -1L;
51   Pre21JobHistoryConstants.Values taskType;
52   Pre21JobHistoryConstants.Values taskStatus;
53   List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
54   List<LoggedLocation> preferredLocations = Collections.emptyList();
55 
56   static private Set<String> alreadySeenAnySetterAttributes =
57       new TreeSet<String>();
58 
59   // for input parameter ignored.
60   @JsonAnySetter
setUnknownAttribute(String attributeName, Object ignored)61   public void setUnknownAttribute(String attributeName, Object ignored) {
62     if (!alreadySeenAnySetterAttributes.contains(attributeName)) {
63       alreadySeenAnySetterAttributes.add(attributeName);
64       System.err.println("In LoggedJob, we saw the unknown attribute "
65           + attributeName + ".");
66     }
67   }
68 
LoggedTask()69   LoggedTask() {
70     super();
71   }
72 
adjustTimes(long adjustment)73   void adjustTimes(long adjustment) {
74     startTime += adjustment;
75     finishTime += adjustment;
76 
77     for (LoggedTaskAttempt attempt : attempts) {
78       attempt.adjustTimes(adjustment);
79     }
80   }
81 
getInputBytes()82   public long getInputBytes() {
83     return inputBytes;
84   }
85 
setInputBytes(long inputBytes)86   void setInputBytes(long inputBytes) {
87     this.inputBytes = inputBytes;
88   }
89 
getInputRecords()90   public long getInputRecords() {
91     return inputRecords;
92   }
93 
setInputRecords(long inputRecords)94   void setInputRecords(long inputRecords) {
95     this.inputRecords = inputRecords;
96   }
97 
getOutputBytes()98   public long getOutputBytes() {
99     return outputBytes;
100   }
101 
setOutputBytes(long outputBytes)102   void setOutputBytes(long outputBytes) {
103     this.outputBytes = outputBytes;
104   }
105 
getOutputRecords()106   public long getOutputRecords() {
107     return outputRecords;
108   }
109 
setOutputRecords(long outputRecords)110   void setOutputRecords(long outputRecords) {
111     this.outputRecords = outputRecords;
112   }
113 
getTaskID()114   public TaskID getTaskID() {
115     return taskID;
116   }
117 
setTaskID(String taskID)118   void setTaskID(String taskID) {
119     this.taskID = TaskID.forName(taskID);
120   }
121 
getStartTime()122   public long getStartTime() {
123     return startTime;
124   }
125 
setStartTime(long startTime)126   void setStartTime(long startTime) {
127     this.startTime = startTime;
128   }
129 
getFinishTime()130   public long getFinishTime() {
131     return finishTime;
132   }
133 
setFinishTime(long finishTime)134   void setFinishTime(long finishTime) {
135     this.finishTime = finishTime;
136   }
137 
getAttempts()138   public List<LoggedTaskAttempt> getAttempts() {
139     return attempts;
140   }
141 
setAttempts(List<LoggedTaskAttempt> attempts)142   void setAttempts(List<LoggedTaskAttempt> attempts) {
143     if (attempts == null) {
144       this.attempts = new ArrayList<LoggedTaskAttempt>();
145     } else {
146       this.attempts = attempts;
147     }
148   }
149 
getPreferredLocations()150   public List<LoggedLocation> getPreferredLocations() {
151     return preferredLocations;
152   }
153 
setPreferredLocations(List<LoggedLocation> preferredLocations)154   void setPreferredLocations(List<LoggedLocation> preferredLocations) {
155     if (preferredLocations == null || preferredLocations.isEmpty()) {
156       this.preferredLocations = Collections.emptyList();
157     } else {
158       this.preferredLocations = preferredLocations;
159     }
160   }
161 
getTaskStatus()162   public Pre21JobHistoryConstants.Values getTaskStatus() {
163     return taskStatus;
164   }
165 
setTaskStatus(Pre21JobHistoryConstants.Values taskStatus)166   void setTaskStatus(Pre21JobHistoryConstants.Values taskStatus) {
167     this.taskStatus = taskStatus;
168   }
169 
getTaskType()170   public Pre21JobHistoryConstants.Values getTaskType() {
171     return taskType;
172   }
173 
setTaskType(Pre21JobHistoryConstants.Values taskType)174   void setTaskType(Pre21JobHistoryConstants.Values taskType) {
175     this.taskType = taskType;
176   }
177 
incorporateMapCounters(JhCounters counters)178   private void incorporateMapCounters(JhCounters counters) {
179     incorporateCounter(new SetField(this) {
180       @Override
181       void set(long val) {
182         task.inputBytes = val;
183       }
184     }, counters, "HDFS_BYTES_READ");
185     incorporateCounter(new SetField(this) {
186       @Override
187       void set(long val) {
188         task.outputBytes = val;
189       }
190     }, counters, "FILE_BYTES_WRITTEN");
191     incorporateCounter(new SetField(this) {
192       @Override
193       void set(long val) {
194         task.inputRecords = val;
195       }
196     }, counters, "MAP_INPUT_RECORDS");
197     incorporateCounter(new SetField(this) {
198       @Override
199       void set(long val) {
200         task.outputRecords = val;
201       }
202     }, counters, "MAP_OUTPUT_RECORDS");
203   }
204 
incorporateReduceCounters(JhCounters counters)205   private void incorporateReduceCounters(JhCounters counters) {
206     incorporateCounter(new SetField(this) {
207       @Override
208       void set(long val) {
209         task.inputBytes = val;
210       }
211     }, counters, "REDUCE_SHUFFLE_BYTES");
212     incorporateCounter(new SetField(this) {
213       @Override
214       void set(long val) {
215         task.outputBytes = val;
216       }
217     }, counters, "HDFS_BYTES_WRITTEN");
218     incorporateCounter(new SetField(this) {
219       @Override
220       void set(long val) {
221         task.inputRecords = val;
222       }
223     }, counters, "REDUCE_INPUT_RECORDS");
224     incorporateCounter(new SetField(this) {
225       @Override
226       void set(long val) {
227         task.outputRecords = val;
228       }
229     }, counters, "REDUCE_OUTPUT_RECORDS");
230   }
231 
232   // incorporate event counters
233   // LoggedTask MUST KNOW ITS TYPE BEFORE THIS CALL
incorporateCounters(JhCounters counters)234   public void incorporateCounters(JhCounters counters) {
235     switch (taskType) {
236     case MAP:
237       incorporateMapCounters(counters);
238       return;
239     case REDUCE:
240       incorporateReduceCounters(counters);
241       return;
242       // NOT exhaustive
243     }
244   }
245 
canonicalizeCounterName(String nonCanonicalName)246   private static String canonicalizeCounterName(String nonCanonicalName) {
247     String result = StringUtils.toLowerCase(nonCanonicalName);
248 
249     result = result.replace(' ', '|');
250     result = result.replace('-', '|');
251     result = result.replace('_', '|');
252     result = result.replace('.', '|');
253 
254     return result;
255   }
256 
257   private abstract class SetField {
258     LoggedTask task;
259 
SetField(LoggedTask task)260     SetField(LoggedTask task) {
261       this.task = task;
262     }
263 
set(long value)264     abstract void set(long value);
265   }
266 
incorporateCounter(SetField thunk, JhCounters counters, String counterName)267   private static void incorporateCounter(SetField thunk, JhCounters counters,
268       String counterName) {
269     counterName = canonicalizeCounterName(counterName);
270 
271     for (JhCounterGroup group : counters.groups) {
272       for (JhCounter counter : group.counts) {
273         if (counterName
274             .equals(canonicalizeCounterName(counter.name.toString()))) {
275           thunk.set(counter.value);
276           return;
277         }
278       }
279     }
280   }
281 
compare1(long c1, long c2, TreePath loc, String eltname)282   private void compare1(long c1, long c2, TreePath loc, String eltname)
283       throws DeepInequalityException {
284     if (c1 != c2) {
285       throw new DeepInequalityException(eltname + " miscompared", new TreePath(
286           loc, eltname));
287     }
288   }
289 
compare1(String c1, String c2, TreePath loc, String eltname)290   private void compare1(String c1, String c2, TreePath loc, String eltname)
291       throws DeepInequalityException {
292     if (c1 == null && c2 == null) {
293       return;
294     }
295     if (c1 == null || c2 == null || !c1.equals(c2)) {
296       throw new DeepInequalityException(eltname + " miscompared", new TreePath(
297           loc, eltname));
298     }
299   }
300 
compare1(Pre21JobHistoryConstants.Values c1, Pre21JobHistoryConstants.Values c2, TreePath loc, String eltname)301   private void compare1(Pre21JobHistoryConstants.Values c1,
302       Pre21JobHistoryConstants.Values c2, TreePath loc, String eltname)
303       throws DeepInequalityException {
304     if (c1 == null && c2 == null) {
305       return;
306     }
307     if (c1 == null || c2 == null || !c1.equals(c2)) {
308       throw new DeepInequalityException(eltname + " miscompared", new TreePath(
309           loc, eltname));
310     }
311   }
312 
compareLoggedLocations(List<LoggedLocation> c1, List<LoggedLocation> c2, TreePath loc, String eltname)313   private void compareLoggedLocations(List<LoggedLocation> c1,
314       List<LoggedLocation> c2, TreePath loc, String eltname)
315       throws DeepInequalityException {
316     if (c1 == null && c2 == null) {
317       return;
318     }
319 
320     if (c1 == null || c2 == null || c1.size() != c2.size()) {
321       throw new DeepInequalityException(eltname + " miscompared", new TreePath(
322           loc, eltname));
323     }
324 
325     for (int i = 0; i < c1.size(); ++i) {
326       c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
327     }
328   }
329 
compareLoggedTaskAttempts(List<LoggedTaskAttempt> c1, List<LoggedTaskAttempt> c2, TreePath loc, String eltname)330   private void compareLoggedTaskAttempts(List<LoggedTaskAttempt> c1,
331       List<LoggedTaskAttempt> c2, TreePath loc, String eltname)
332       throws DeepInequalityException {
333     if (c1 == null && c2 == null) {
334       return;
335     }
336 
337     if (c1 == null || c2 == null || c1.size() != c2.size()) {
338       throw new DeepInequalityException(eltname + " miscompared", new TreePath(
339           loc, eltname));
340     }
341 
342     for (int i = 0; i < c1.size(); ++i) {
343       c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i));
344     }
345   }
346 
deepCompare(DeepCompare comparand, TreePath loc)347   public void deepCompare(DeepCompare comparand, TreePath loc)
348       throws DeepInequalityException {
349     if (!(comparand instanceof LoggedTask)) {
350       throw new DeepInequalityException("comparand has wrong type", loc);
351     }
352 
353     LoggedTask other = (LoggedTask) comparand;
354 
355     compare1(inputBytes, other.inputBytes, loc, "inputBytes");
356     compare1(inputRecords, other.inputRecords, loc, "inputRecords");
357     compare1(outputBytes, other.outputBytes, loc, "outputBytes");
358     compare1(outputRecords, other.outputRecords, loc, "outputRecords");
359 
360     compare1(taskID.toString(), other.taskID.toString(), loc, "taskID");
361 
362     compare1(startTime, other.startTime, loc, "startTime");
363     compare1(finishTime, other.finishTime, loc, "finishTime");
364 
365     compare1(taskType, other.taskType, loc, "taskType");
366     compare1(taskStatus, other.taskStatus, loc, "taskStatus");
367 
368     compareLoggedTaskAttempts(attempts, other.attempts, loc, "attempts");
369     compareLoggedLocations(preferredLocations, other.preferredLocations, loc,
370         "preferredLocations");
371   }
372 }
373