1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.tools.rumen; 19 20 import java.util.ArrayList; 21 import java.util.Collections; 22 import java.util.List; 23 import java.util.Set; 24 import java.util.TreeSet; 25 26 import org.apache.hadoop.mapreduce.TaskID; 27 import org.apache.hadoop.mapreduce.jobhistory.JhCounter; 28 import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup; 29 import org.apache.hadoop.mapreduce.jobhistory.JhCounters; 30 31 import org.apache.hadoop.util.StringUtils; 32 import org.codehaus.jackson.annotate.JsonAnySetter; 33 34 /** 35 * A {@link LoggedTask} represents a [hadoop] task that is part of a hadoop job. 36 * It knows about the [pssibly empty] sequence of attempts, its I/O footprint, 37 * and its runtime. 38 * 39 * All of the public methods are simply accessors for the instance variables we 40 * want to write out in the JSON files. 41 * 42 */ 43 public class LoggedTask implements DeepCompare { 44 long inputBytes = -1L; 45 long inputRecords = -1L; 46 long outputBytes = -1L; 47 long outputRecords = -1L; 48 TaskID taskID; 49 long startTime = -1L; 50 long finishTime = -1L; 51 Pre21JobHistoryConstants.Values taskType; 52 Pre21JobHistoryConstants.Values taskStatus; 53 List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>(); 54 List<LoggedLocation> preferredLocations = Collections.emptyList(); 55 56 static private Set<String> alreadySeenAnySetterAttributes = 57 new TreeSet<String>(); 58 59 // for input parameter ignored. 60 @JsonAnySetter setUnknownAttribute(String attributeName, Object ignored)61 public void setUnknownAttribute(String attributeName, Object ignored) { 62 if (!alreadySeenAnySetterAttributes.contains(attributeName)) { 63 alreadySeenAnySetterAttributes.add(attributeName); 64 System.err.println("In LoggedJob, we saw the unknown attribute " 65 + attributeName + "."); 66 } 67 } 68 LoggedTask()69 LoggedTask() { 70 super(); 71 } 72 adjustTimes(long adjustment)73 void adjustTimes(long adjustment) { 74 startTime += adjustment; 75 finishTime += adjustment; 76 77 for (LoggedTaskAttempt attempt : attempts) { 78 attempt.adjustTimes(adjustment); 79 } 80 } 81 getInputBytes()82 public long getInputBytes() { 83 return inputBytes; 84 } 85 setInputBytes(long inputBytes)86 void setInputBytes(long inputBytes) { 87 this.inputBytes = inputBytes; 88 } 89 getInputRecords()90 public long getInputRecords() { 91 return inputRecords; 92 } 93 setInputRecords(long inputRecords)94 void setInputRecords(long inputRecords) { 95 this.inputRecords = inputRecords; 96 } 97 getOutputBytes()98 public long getOutputBytes() { 99 return outputBytes; 100 } 101 setOutputBytes(long outputBytes)102 void setOutputBytes(long outputBytes) { 103 this.outputBytes = outputBytes; 104 } 105 getOutputRecords()106 public long getOutputRecords() { 107 return outputRecords; 108 } 109 setOutputRecords(long outputRecords)110 void setOutputRecords(long outputRecords) { 111 this.outputRecords = outputRecords; 112 } 113 getTaskID()114 public TaskID getTaskID() { 115 return taskID; 116 } 117 setTaskID(String taskID)118 void setTaskID(String taskID) { 119 this.taskID = TaskID.forName(taskID); 120 } 121 getStartTime()122 public long getStartTime() { 123 return startTime; 124 } 125 setStartTime(long startTime)126 void setStartTime(long startTime) { 127 this.startTime = startTime; 128 } 129 getFinishTime()130 public long getFinishTime() { 131 return finishTime; 132 } 133 setFinishTime(long finishTime)134 void setFinishTime(long finishTime) { 135 this.finishTime = finishTime; 136 } 137 getAttempts()138 public List<LoggedTaskAttempt> getAttempts() { 139 return attempts; 140 } 141 setAttempts(List<LoggedTaskAttempt> attempts)142 void setAttempts(List<LoggedTaskAttempt> attempts) { 143 if (attempts == null) { 144 this.attempts = new ArrayList<LoggedTaskAttempt>(); 145 } else { 146 this.attempts = attempts; 147 } 148 } 149 getPreferredLocations()150 public List<LoggedLocation> getPreferredLocations() { 151 return preferredLocations; 152 } 153 setPreferredLocations(List<LoggedLocation> preferredLocations)154 void setPreferredLocations(List<LoggedLocation> preferredLocations) { 155 if (preferredLocations == null || preferredLocations.isEmpty()) { 156 this.preferredLocations = Collections.emptyList(); 157 } else { 158 this.preferredLocations = preferredLocations; 159 } 160 } 161 getTaskStatus()162 public Pre21JobHistoryConstants.Values getTaskStatus() { 163 return taskStatus; 164 } 165 setTaskStatus(Pre21JobHistoryConstants.Values taskStatus)166 void setTaskStatus(Pre21JobHistoryConstants.Values taskStatus) { 167 this.taskStatus = taskStatus; 168 } 169 getTaskType()170 public Pre21JobHistoryConstants.Values getTaskType() { 171 return taskType; 172 } 173 setTaskType(Pre21JobHistoryConstants.Values taskType)174 void setTaskType(Pre21JobHistoryConstants.Values taskType) { 175 this.taskType = taskType; 176 } 177 incorporateMapCounters(JhCounters counters)178 private void incorporateMapCounters(JhCounters counters) { 179 incorporateCounter(new SetField(this) { 180 @Override 181 void set(long val) { 182 task.inputBytes = val; 183 } 184 }, counters, "HDFS_BYTES_READ"); 185 incorporateCounter(new SetField(this) { 186 @Override 187 void set(long val) { 188 task.outputBytes = val; 189 } 190 }, counters, "FILE_BYTES_WRITTEN"); 191 incorporateCounter(new SetField(this) { 192 @Override 193 void set(long val) { 194 task.inputRecords = val; 195 } 196 }, counters, "MAP_INPUT_RECORDS"); 197 incorporateCounter(new SetField(this) { 198 @Override 199 void set(long val) { 200 task.outputRecords = val; 201 } 202 }, counters, "MAP_OUTPUT_RECORDS"); 203 } 204 incorporateReduceCounters(JhCounters counters)205 private void incorporateReduceCounters(JhCounters counters) { 206 incorporateCounter(new SetField(this) { 207 @Override 208 void set(long val) { 209 task.inputBytes = val; 210 } 211 }, counters, "REDUCE_SHUFFLE_BYTES"); 212 incorporateCounter(new SetField(this) { 213 @Override 214 void set(long val) { 215 task.outputBytes = val; 216 } 217 }, counters, "HDFS_BYTES_WRITTEN"); 218 incorporateCounter(new SetField(this) { 219 @Override 220 void set(long val) { 221 task.inputRecords = val; 222 } 223 }, counters, "REDUCE_INPUT_RECORDS"); 224 incorporateCounter(new SetField(this) { 225 @Override 226 void set(long val) { 227 task.outputRecords = val; 228 } 229 }, counters, "REDUCE_OUTPUT_RECORDS"); 230 } 231 232 // incorporate event counters 233 // LoggedTask MUST KNOW ITS TYPE BEFORE THIS CALL incorporateCounters(JhCounters counters)234 public void incorporateCounters(JhCounters counters) { 235 switch (taskType) { 236 case MAP: 237 incorporateMapCounters(counters); 238 return; 239 case REDUCE: 240 incorporateReduceCounters(counters); 241 return; 242 // NOT exhaustive 243 } 244 } 245 canonicalizeCounterName(String nonCanonicalName)246 private static String canonicalizeCounterName(String nonCanonicalName) { 247 String result = StringUtils.toLowerCase(nonCanonicalName); 248 249 result = result.replace(' ', '|'); 250 result = result.replace('-', '|'); 251 result = result.replace('_', '|'); 252 result = result.replace('.', '|'); 253 254 return result; 255 } 256 257 private abstract class SetField { 258 LoggedTask task; 259 SetField(LoggedTask task)260 SetField(LoggedTask task) { 261 this.task = task; 262 } 263 set(long value)264 abstract void set(long value); 265 } 266 incorporateCounter(SetField thunk, JhCounters counters, String counterName)267 private static void incorporateCounter(SetField thunk, JhCounters counters, 268 String counterName) { 269 counterName = canonicalizeCounterName(counterName); 270 271 for (JhCounterGroup group : counters.groups) { 272 for (JhCounter counter : group.counts) { 273 if (counterName 274 .equals(canonicalizeCounterName(counter.name.toString()))) { 275 thunk.set(counter.value); 276 return; 277 } 278 } 279 } 280 } 281 compare1(long c1, long c2, TreePath loc, String eltname)282 private void compare1(long c1, long c2, TreePath loc, String eltname) 283 throws DeepInequalityException { 284 if (c1 != c2) { 285 throw new DeepInequalityException(eltname + " miscompared", new TreePath( 286 loc, eltname)); 287 } 288 } 289 compare1(String c1, String c2, TreePath loc, String eltname)290 private void compare1(String c1, String c2, TreePath loc, String eltname) 291 throws DeepInequalityException { 292 if (c1 == null && c2 == null) { 293 return; 294 } 295 if (c1 == null || c2 == null || !c1.equals(c2)) { 296 throw new DeepInequalityException(eltname + " miscompared", new TreePath( 297 loc, eltname)); 298 } 299 } 300 compare1(Pre21JobHistoryConstants.Values c1, Pre21JobHistoryConstants.Values c2, TreePath loc, String eltname)301 private void compare1(Pre21JobHistoryConstants.Values c1, 302 Pre21JobHistoryConstants.Values c2, TreePath loc, String eltname) 303 throws DeepInequalityException { 304 if (c1 == null && c2 == null) { 305 return; 306 } 307 if (c1 == null || c2 == null || !c1.equals(c2)) { 308 throw new DeepInequalityException(eltname + " miscompared", new TreePath( 309 loc, eltname)); 310 } 311 } 312 compareLoggedLocations(List<LoggedLocation> c1, List<LoggedLocation> c2, TreePath loc, String eltname)313 private void compareLoggedLocations(List<LoggedLocation> c1, 314 List<LoggedLocation> c2, TreePath loc, String eltname) 315 throws DeepInequalityException { 316 if (c1 == null && c2 == null) { 317 return; 318 } 319 320 if (c1 == null || c2 == null || c1.size() != c2.size()) { 321 throw new DeepInequalityException(eltname + " miscompared", new TreePath( 322 loc, eltname)); 323 } 324 325 for (int i = 0; i < c1.size(); ++i) { 326 c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i)); 327 } 328 } 329 compareLoggedTaskAttempts(List<LoggedTaskAttempt> c1, List<LoggedTaskAttempt> c2, TreePath loc, String eltname)330 private void compareLoggedTaskAttempts(List<LoggedTaskAttempt> c1, 331 List<LoggedTaskAttempt> c2, TreePath loc, String eltname) 332 throws DeepInequalityException { 333 if (c1 == null && c2 == null) { 334 return; 335 } 336 337 if (c1 == null || c2 == null || c1.size() != c2.size()) { 338 throw new DeepInequalityException(eltname + " miscompared", new TreePath( 339 loc, eltname)); 340 } 341 342 for (int i = 0; i < c1.size(); ++i) { 343 c1.get(i).deepCompare(c2.get(i), new TreePath(loc, eltname, i)); 344 } 345 } 346 deepCompare(DeepCompare comparand, TreePath loc)347 public void deepCompare(DeepCompare comparand, TreePath loc) 348 throws DeepInequalityException { 349 if (!(comparand instanceof LoggedTask)) { 350 throw new DeepInequalityException("comparand has wrong type", loc); 351 } 352 353 LoggedTask other = (LoggedTask) comparand; 354 355 compare1(inputBytes, other.inputBytes, loc, "inputBytes"); 356 compare1(inputRecords, other.inputRecords, loc, "inputRecords"); 357 compare1(outputBytes, other.outputBytes, loc, "outputBytes"); 358 compare1(outputRecords, other.outputRecords, loc, "outputRecords"); 359 360 compare1(taskID.toString(), other.taskID.toString(), loc, "taskID"); 361 362 compare1(startTime, other.startTime, loc, "startTime"); 363 compare1(finishTime, other.finishTime, loc, "finishTime"); 364 365 compare1(taskType, other.taskType, loc, "taskType"); 366 compare1(taskStatus, other.taskStatus, loc, "taskStatus"); 367 368 compareLoggedTaskAttempts(attempts, other.attempts, loc, "attempts"); 369 compareLoggedLocations(preferredLocations, other.preferredLocations, loc, 370 "preferredLocations"); 371 } 372 } 373