1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapred.gridmix; 19 20 import java.io.IOException; 21 22 import org.apache.commons.lang.time.FastDateFormat; 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.LogFactory; 25 import org.apache.hadoop.conf.Configuration; 26 import org.apache.hadoop.fs.FileStatus; 27 import org.apache.hadoop.fs.FileSystem; 28 import org.apache.hadoop.fs.Path; 29 import org.apache.hadoop.io.MD5Hash; 30 import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics; 31 import org.apache.hadoop.mapred.gridmix.Statistics.JobStats; 32 import org.apache.hadoop.mapreduce.Job; 33 import org.apache.hadoop.util.StringUtils; 34 35 /** 36 * Summarizes a {@link Gridmix} run. Statistics that are reported are 37 * <ul> 38 * <li>Total number of jobs in the input trace</li> 39 * <li>Trace signature</li> 40 * <li>Total number of jobs processed from the input trace</li> 41 * <li>Total number of jobs submitted</li> 42 * <li>Total number of successful and failed jobs</li> 43 * <li>Total number of map/reduce tasks launched</li> 44 * <li>Gridmix start & end time</li> 45 * <li>Total time for the Gridmix run (data-generation and simulation)</li> 46 * <li>Gridmix Configuration (i.e job-type, submission-type, resolver)</li> 47 * </ul> 48 */ 49 class ExecutionSummarizer implements StatListener<JobStats> { 50 static final Log LOG = LogFactory.getLog(ExecutionSummarizer.class); 51 private static final FastDateFormat UTIL = FastDateFormat.getInstance(); 52 53 private int numJobsInInputTrace; 54 private int totalSuccessfulJobs; 55 private int totalFailedJobs; 56 private int totalLostJobs; 57 private int totalMapTasksLaunched; 58 private int totalReduceTasksLaunched; 59 private long totalSimulationTime; 60 private long totalRuntime; 61 private final String commandLineArgs; 62 private long startTime; 63 private long endTime; 64 private long simulationStartTime; 65 private String inputTraceLocation; 66 private String inputTraceSignature; 67 private String jobSubmissionPolicy; 68 private String resolver; 69 private DataStatistics dataStats; 70 private String expectedDataSize; 71 72 /** 73 * Basic constructor initialized with the runtime arguments. 74 */ ExecutionSummarizer(String[] args)75 ExecutionSummarizer(String[] args) { 76 startTime = System.currentTimeMillis(); 77 // flatten the args string and store it 78 commandLineArgs = 79 org.apache.commons.lang.StringUtils.join(args, ' '); 80 } 81 82 /** 83 * Default constructor. 84 */ ExecutionSummarizer()85 ExecutionSummarizer() { 86 startTime = System.currentTimeMillis(); 87 commandLineArgs = Summarizer.NA; 88 } 89 start(Configuration conf)90 void start(Configuration conf) { 91 simulationStartTime = System.currentTimeMillis(); 92 } 93 processJobState(JobStats stats)94 private void processJobState(JobStats stats) { 95 Job job = stats.getJob(); 96 try { 97 if (job.isSuccessful()) { 98 ++totalSuccessfulJobs; 99 } else { 100 ++totalFailedJobs; 101 } 102 } catch (Exception e) { 103 // this behavior is consistent with job-monitor which marks the job as 104 // complete (lost) if the status polling bails out 105 ++totalLostJobs; 106 } 107 } 108 processJobTasks(JobStats stats)109 private void processJobTasks(JobStats stats) { 110 totalMapTasksLaunched += stats.getNoOfMaps(); 111 totalReduceTasksLaunched += stats.getNoOfReds(); 112 } 113 process(JobStats stats)114 private void process(JobStats stats) { 115 // process the job run state 116 processJobState(stats); 117 118 // process the tasks information 119 processJobTasks(stats); 120 } 121 122 @Override update(JobStats item)123 public void update(JobStats item) { 124 // process only if the simulation has started 125 if (simulationStartTime > 0) { 126 process(item); 127 totalSimulationTime = 128 System.currentTimeMillis() - getSimulationStartTime(); 129 } 130 } 131 132 // Generates a signature for the trace file based on 133 // - filename 134 // - modification time 135 // - file length 136 // - owner getTraceSignature(String input)137 protected static String getTraceSignature(String input) throws IOException { 138 Path inputPath = new Path(input); 139 FileSystem fs = inputPath.getFileSystem(new Configuration()); 140 FileStatus status = fs.getFileStatus(inputPath); 141 Path qPath = fs.makeQualified(status.getPath()); 142 String traceID = status.getModificationTime() + qPath.toString() 143 + status.getOwner() + status.getLen(); 144 return MD5Hash.digest(traceID).toString(); 145 } 146 147 @SuppressWarnings("unchecked") finalize(JobFactory factory, String inputPath, long dataSize, UserResolver userResolver, DataStatistics stats, Configuration conf)148 void finalize(JobFactory factory, String inputPath, long dataSize, 149 UserResolver userResolver, DataStatistics stats, 150 Configuration conf) 151 throws IOException { 152 numJobsInInputTrace = factory.numJobsInTrace; 153 endTime = System.currentTimeMillis(); 154 if ("-".equals(inputPath)) { 155 inputTraceLocation = Summarizer.NA; 156 inputTraceSignature = Summarizer.NA; 157 } else { 158 Path inputTracePath = new Path(inputPath); 159 FileSystem fs = inputTracePath.getFileSystem(conf); 160 inputTraceLocation = fs.makeQualified(inputTracePath).toString(); 161 inputTraceSignature = getTraceSignature(inputPath); 162 } 163 jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name(); 164 resolver = userResolver.getClass().getName(); 165 if (dataSize > 0) { 166 expectedDataSize = StringUtils.humanReadableInt(dataSize); 167 } else { 168 expectedDataSize = Summarizer.NA; 169 } 170 dataStats = stats; 171 totalRuntime = System.currentTimeMillis() - getStartTime(); 172 } 173 174 /** 175 * Summarizes the current {@link Gridmix} run. 176 */ 177 @Override toString()178 public String toString() { 179 StringBuilder builder = new StringBuilder(); 180 builder.append("Execution Summary:-"); 181 builder.append("\nInput trace: ").append(getInputTraceLocation()); 182 builder.append("\nInput trace signature: ") 183 .append(getInputTraceSignature()); 184 builder.append("\nTotal number of jobs in trace: ") 185 .append(getNumJobsInTrace()); 186 builder.append("\nExpected input data size: ") 187 .append(getExpectedDataSize()); 188 builder.append("\nInput data statistics: ") 189 .append(getInputDataStatistics()); 190 builder.append("\nTotal number of jobs processed: ") 191 .append(getNumSubmittedJobs()); 192 builder.append("\nTotal number of successful jobs: ") 193 .append(getNumSuccessfulJobs()); 194 builder.append("\nTotal number of failed jobs: ") 195 .append(getNumFailedJobs()); 196 builder.append("\nTotal number of lost jobs: ") 197 .append(getNumLostJobs()); 198 builder.append("\nTotal number of map tasks launched: ") 199 .append(getNumMapTasksLaunched()); 200 builder.append("\nTotal number of reduce task launched: ") 201 .append(getNumReduceTasksLaunched()); 202 builder.append("\nGridmix start time: ") 203 .append(UTIL.format(getStartTime())); 204 builder.append("\nGridmix end time: ").append(UTIL.format(getEndTime())); 205 builder.append("\nGridmix simulation start time: ") 206 .append(UTIL.format(getStartTime())); 207 builder.append("\nGridmix runtime: ") 208 .append(StringUtils.formatTime(getRuntime())); 209 builder.append("\nTime spent in initialization (data-gen etc): ") 210 .append(StringUtils.formatTime(getInitTime())); 211 builder.append("\nTime spent in simulation: ") 212 .append(StringUtils.formatTime(getSimulationTime())); 213 builder.append("\nGridmix configuration parameters: ") 214 .append(getCommandLineArgsString()); 215 builder.append("\nGridmix job submission policy: ") 216 .append(getJobSubmissionPolicy()); 217 builder.append("\nGridmix resolver: ").append(getUserResolver()); 218 builder.append("\n\n"); 219 return builder.toString(); 220 } 221 222 // Gets the stringified version of DataStatistics stringifyDataStatistics(DataStatistics stats)223 static String stringifyDataStatistics(DataStatistics stats) { 224 if (stats != null) { 225 StringBuffer buffer = new StringBuffer(); 226 String compressionStatus = stats.isDataCompressed() 227 ? "Compressed" 228 : "Uncompressed"; 229 buffer.append(compressionStatus).append(" input data size: "); 230 buffer.append(StringUtils.humanReadableInt(stats.getDataSize())); 231 buffer.append(", "); 232 buffer.append("Number of files: ").append(stats.getNumFiles()); 233 234 return buffer.toString(); 235 } else { 236 return Summarizer.NA; 237 } 238 } 239 240 // Getters getExpectedDataSize()241 protected String getExpectedDataSize() { 242 return expectedDataSize; 243 } 244 getUserResolver()245 protected String getUserResolver() { 246 return resolver; 247 } 248 getInputDataStatistics()249 protected String getInputDataStatistics() { 250 return stringifyDataStatistics(dataStats); 251 } 252 getInputTraceSignature()253 protected String getInputTraceSignature() { 254 return inputTraceSignature; 255 } 256 getInputTraceLocation()257 protected String getInputTraceLocation() { 258 return inputTraceLocation; 259 } 260 getNumJobsInTrace()261 protected int getNumJobsInTrace() { 262 return numJobsInInputTrace; 263 } 264 getNumSuccessfulJobs()265 protected int getNumSuccessfulJobs() { 266 return totalSuccessfulJobs; 267 } 268 getNumFailedJobs()269 protected int getNumFailedJobs() { 270 return totalFailedJobs; 271 } 272 getNumLostJobs()273 protected int getNumLostJobs() { 274 return totalLostJobs; 275 } 276 getNumSubmittedJobs()277 protected int getNumSubmittedJobs() { 278 return totalSuccessfulJobs + totalFailedJobs + totalLostJobs; 279 } 280 getNumMapTasksLaunched()281 protected int getNumMapTasksLaunched() { 282 return totalMapTasksLaunched; 283 } 284 getNumReduceTasksLaunched()285 protected int getNumReduceTasksLaunched() { 286 return totalReduceTasksLaunched; 287 } 288 getStartTime()289 protected long getStartTime() { 290 return startTime; 291 } 292 getEndTime()293 protected long getEndTime() { 294 return endTime; 295 } 296 getInitTime()297 protected long getInitTime() { 298 return simulationStartTime - startTime; 299 } 300 getSimulationStartTime()301 protected long getSimulationStartTime() { 302 return simulationStartTime; 303 } 304 getSimulationTime()305 protected long getSimulationTime() { 306 return totalSimulationTime; 307 } 308 getRuntime()309 protected long getRuntime() { 310 return totalRuntime; 311 } 312 getCommandLineArgsString()313 protected String getCommandLineArgsString() { 314 return commandLineArgs; 315 } 316 getJobSubmissionPolicy()317 protected String getJobSubmissionPolicy() { 318 return jobSubmissionPolicy; 319 } 320 }