1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapred.gridmix;
19 
20 import java.io.IOException;
21 
22 import org.apache.commons.lang.time.FastDateFormat;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FileStatus;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.io.MD5Hash;
30 import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
31 import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
32 import org.apache.hadoop.mapreduce.Job;
33 import org.apache.hadoop.util.StringUtils;
34 
35 /**
36  * Summarizes a {@link Gridmix} run. Statistics that are reported are
37  * <ul>
38  *   <li>Total number of jobs in the input trace</li>
39  *   <li>Trace signature</li>
40  *   <li>Total number of jobs processed from the input trace</li>
41  *   <li>Total number of jobs submitted</li>
42  *   <li>Total number of successful and failed jobs</li>
43  *   <li>Total number of map/reduce tasks launched</li>
44  *   <li>Gridmix start & end time</li>
45  *   <li>Total time for the Gridmix run (data-generation and simulation)</li>
46  *   <li>Gridmix Configuration (i.e job-type, submission-type, resolver)</li>
47  * </ul>
48  */
49 class ExecutionSummarizer implements StatListener<JobStats> {
50   static final Log LOG = LogFactory.getLog(ExecutionSummarizer.class);
51   private static final FastDateFormat UTIL = FastDateFormat.getInstance();
52 
53   private int numJobsInInputTrace;
54   private int totalSuccessfulJobs;
55   private int totalFailedJobs;
56   private int totalLostJobs;
57   private int totalMapTasksLaunched;
58   private int totalReduceTasksLaunched;
59   private long totalSimulationTime;
60   private long totalRuntime;
61   private final String commandLineArgs;
62   private long startTime;
63   private long endTime;
64   private long simulationStartTime;
65   private String inputTraceLocation;
66   private String inputTraceSignature;
67   private String jobSubmissionPolicy;
68   private String resolver;
69   private DataStatistics dataStats;
70   private String expectedDataSize;
71 
72   /**
73    * Basic constructor initialized with the runtime arguments.
74    */
ExecutionSummarizer(String[] args)75   ExecutionSummarizer(String[] args) {
76     startTime = System.currentTimeMillis();
77     // flatten the args string and store it
78     commandLineArgs =
79       org.apache.commons.lang.StringUtils.join(args, ' ');
80   }
81 
82   /**
83    * Default constructor.
84    */
ExecutionSummarizer()85   ExecutionSummarizer() {
86     startTime = System.currentTimeMillis();
87     commandLineArgs = Summarizer.NA;
88   }
89 
start(Configuration conf)90   void start(Configuration conf) {
91     simulationStartTime = System.currentTimeMillis();
92   }
93 
processJobState(JobStats stats)94   private void processJobState(JobStats stats) {
95     Job job = stats.getJob();
96     try {
97       if (job.isSuccessful()) {
98         ++totalSuccessfulJobs;
99       } else {
100         ++totalFailedJobs;
101       }
102     } catch (Exception e) {
103       // this behavior is consistent with job-monitor which marks the job as
104       // complete (lost) if the status polling bails out
105       ++totalLostJobs;
106     }
107   }
108 
processJobTasks(JobStats stats)109   private void processJobTasks(JobStats stats) {
110     totalMapTasksLaunched += stats.getNoOfMaps();
111     totalReduceTasksLaunched += stats.getNoOfReds();
112   }
113 
process(JobStats stats)114   private void process(JobStats stats) {
115     // process the job run state
116     processJobState(stats);
117 
118     // process the tasks information
119     processJobTasks(stats);
120   }
121 
122   @Override
update(JobStats item)123   public void update(JobStats item) {
124     // process only if the simulation has started
125     if (simulationStartTime > 0) {
126       process(item);
127       totalSimulationTime =
128         System.currentTimeMillis() - getSimulationStartTime();
129     }
130   }
131 
132   // Generates a signature for the trace file based on
133   //   - filename
134   //   - modification time
135   //   - file length
136   //   - owner
getTraceSignature(String input)137   protected static String getTraceSignature(String input) throws IOException {
138     Path inputPath = new Path(input);
139     FileSystem fs = inputPath.getFileSystem(new Configuration());
140     FileStatus status = fs.getFileStatus(inputPath);
141     Path qPath = fs.makeQualified(status.getPath());
142     String traceID = status.getModificationTime() + qPath.toString()
143                      + status.getOwner() + status.getLen();
144     return MD5Hash.digest(traceID).toString();
145   }
146 
147   @SuppressWarnings("unchecked")
finalize(JobFactory factory, String inputPath, long dataSize, UserResolver userResolver, DataStatistics stats, Configuration conf)148   void finalize(JobFactory factory, String inputPath, long dataSize,
149                 UserResolver userResolver, DataStatistics stats,
150                 Configuration conf)
151   throws IOException {
152     numJobsInInputTrace = factory.numJobsInTrace;
153     endTime = System.currentTimeMillis();
154      if ("-".equals(inputPath)) {
155       inputTraceLocation = Summarizer.NA;
156       inputTraceSignature = Summarizer.NA;
157     } else {
158       Path inputTracePath = new Path(inputPath);
159       FileSystem fs = inputTracePath.getFileSystem(conf);
160       inputTraceLocation = fs.makeQualified(inputTracePath).toString();
161       inputTraceSignature = getTraceSignature(inputPath);
162     }
163     jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name();
164     resolver = userResolver.getClass().getName();
165     if (dataSize > 0) {
166       expectedDataSize = StringUtils.humanReadableInt(dataSize);
167     } else {
168       expectedDataSize = Summarizer.NA;
169     }
170     dataStats = stats;
171     totalRuntime = System.currentTimeMillis() - getStartTime();
172   }
173 
174   /**
175    * Summarizes the current {@link Gridmix} run.
176    */
177   @Override
toString()178   public String toString() {
179     StringBuilder builder = new StringBuilder();
180     builder.append("Execution Summary:-");
181     builder.append("\nInput trace: ").append(getInputTraceLocation());
182     builder.append("\nInput trace signature: ")
183            .append(getInputTraceSignature());
184     builder.append("\nTotal number of jobs in trace: ")
185            .append(getNumJobsInTrace());
186     builder.append("\nExpected input data size: ")
187            .append(getExpectedDataSize());
188     builder.append("\nInput data statistics: ")
189            .append(getInputDataStatistics());
190     builder.append("\nTotal number of jobs processed: ")
191            .append(getNumSubmittedJobs());
192     builder.append("\nTotal number of successful jobs: ")
193            .append(getNumSuccessfulJobs());
194     builder.append("\nTotal number of failed jobs: ")
195            .append(getNumFailedJobs());
196     builder.append("\nTotal number of lost jobs: ")
197            .append(getNumLostJobs());
198     builder.append("\nTotal number of map tasks launched: ")
199            .append(getNumMapTasksLaunched());
200     builder.append("\nTotal number of reduce task launched: ")
201            .append(getNumReduceTasksLaunched());
202     builder.append("\nGridmix start time: ")
203            .append(UTIL.format(getStartTime()));
204     builder.append("\nGridmix end time: ").append(UTIL.format(getEndTime()));
205     builder.append("\nGridmix simulation start time: ")
206            .append(UTIL.format(getStartTime()));
207     builder.append("\nGridmix runtime: ")
208            .append(StringUtils.formatTime(getRuntime()));
209     builder.append("\nTime spent in initialization (data-gen etc): ")
210            .append(StringUtils.formatTime(getInitTime()));
211     builder.append("\nTime spent in simulation: ")
212            .append(StringUtils.formatTime(getSimulationTime()));
213     builder.append("\nGridmix configuration parameters: ")
214            .append(getCommandLineArgsString());
215     builder.append("\nGridmix job submission policy: ")
216            .append(getJobSubmissionPolicy());
217     builder.append("\nGridmix resolver: ").append(getUserResolver());
218     builder.append("\n\n");
219     return builder.toString();
220   }
221 
222   // Gets the stringified version of DataStatistics
stringifyDataStatistics(DataStatistics stats)223   static String stringifyDataStatistics(DataStatistics stats) {
224     if (stats != null) {
225       StringBuffer buffer = new StringBuffer();
226       String compressionStatus = stats.isDataCompressed()
227                                  ? "Compressed"
228                                  : "Uncompressed";
229       buffer.append(compressionStatus).append(" input data size: ");
230       buffer.append(StringUtils.humanReadableInt(stats.getDataSize()));
231       buffer.append(", ");
232       buffer.append("Number of files: ").append(stats.getNumFiles());
233 
234       return buffer.toString();
235     } else {
236       return Summarizer.NA;
237     }
238   }
239 
240   // Getters
getExpectedDataSize()241   protected String getExpectedDataSize() {
242     return expectedDataSize;
243   }
244 
getUserResolver()245   protected String getUserResolver() {
246     return resolver;
247   }
248 
getInputDataStatistics()249   protected String getInputDataStatistics() {
250     return stringifyDataStatistics(dataStats);
251   }
252 
getInputTraceSignature()253   protected String getInputTraceSignature() {
254     return inputTraceSignature;
255   }
256 
getInputTraceLocation()257   protected String getInputTraceLocation() {
258     return inputTraceLocation;
259   }
260 
getNumJobsInTrace()261   protected int getNumJobsInTrace() {
262     return numJobsInInputTrace;
263   }
264 
getNumSuccessfulJobs()265   protected int getNumSuccessfulJobs() {
266     return totalSuccessfulJobs;
267   }
268 
getNumFailedJobs()269   protected int getNumFailedJobs() {
270     return totalFailedJobs;
271   }
272 
getNumLostJobs()273   protected int getNumLostJobs() {
274     return totalLostJobs;
275   }
276 
getNumSubmittedJobs()277   protected int getNumSubmittedJobs() {
278     return totalSuccessfulJobs + totalFailedJobs + totalLostJobs;
279   }
280 
getNumMapTasksLaunched()281   protected int getNumMapTasksLaunched() {
282     return totalMapTasksLaunched;
283   }
284 
getNumReduceTasksLaunched()285   protected int getNumReduceTasksLaunched() {
286     return totalReduceTasksLaunched;
287   }
288 
getStartTime()289   protected long getStartTime() {
290     return startTime;
291   }
292 
getEndTime()293   protected long getEndTime() {
294     return endTime;
295   }
296 
getInitTime()297   protected long getInitTime() {
298     return simulationStartTime - startTime;
299   }
300 
getSimulationStartTime()301   protected long getSimulationStartTime() {
302     return simulationStartTime;
303   }
304 
getSimulationTime()305   protected long getSimulationTime() {
306     return totalSimulationTime;
307   }
308 
getRuntime()309   protected long getRuntime() {
310     return totalRuntime;
311   }
312 
getCommandLineArgsString()313   protected String getCommandLineArgsString() {
314     return commandLineArgs;
315   }
316 
getJobSubmissionPolicy()317   protected String getJobSubmissionPolicy() {
318     return jobSubmissionPolicy;
319   }
320 }