1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapred.gridmix.test.system;
19 
20 import java.io.IOException;
21 import java.io.File;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.HashMap;
26 import java.util.SortedMap;
27 import java.util.TreeMap;
28 import java.util.Collections;
29 import java.util.Set;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.FileStatus;
38 import org.apache.hadoop.fs.permission.FsPermission;
39 import org.apache.hadoop.fs.permission.FsAction;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.mapred.Counters;
42 import org.apache.hadoop.mapred.Counters.Counter;
43 import org.apache.hadoop.mapred.Counters.Group;
44 import org.apache.hadoop.mapred.Task;
45 import org.apache.hadoop.mapred.DefaultJobHistoryParser;
46 import org.apache.hadoop.mapred.JobHistory;
47 import org.apache.hadoop.mapreduce.JobID;
48 import org.apache.hadoop.mapreduce.TaskType;
49 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
50 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
51 import org.apache.hadoop.mapreduce.test.system.JTClient;
52 import org.apache.hadoop.mapred.JobConf;
53 import org.apache.hadoop.tools.rumen.LoggedJob;
54 import org.apache.hadoop.tools.rumen.ZombieJob;
55 import org.apache.hadoop.tools.rumen.TaskInfo;
56 import org.junit.Assert;
57 import java.text.ParseException;
58 import org.apache.hadoop.security.UserGroupInformation;
59 import org.apache.hadoop.mapred.gridmix.GridmixSystemTestCase;
60 
61 /**
62  * Verifying each Gridmix job with corresponding job story in a trace file.
63  */
64 public class GridmixJobVerification {
65 
66   private static Log LOG = LogFactory.getLog(GridmixJobVerification.class);
67   private Path path;
68   private Configuration conf;
69   private JTClient jtClient;
70   private String userResolverVal;
71   static final String origJobIdKey = GridMixConfig.GRIDMIX_ORIGINAL_JOB_ID;
72   static final String jobSubKey = GridMixConfig.GRIDMIX_SUBMISSION_POLICY;
73   static final String jobTypeKey = GridMixConfig.GRIDMIX_JOB_TYPE;
74   static final String mapTaskKey = GridMixConfig.GRIDMIX_SLEEPJOB_MAPTASK_ONLY;
75   static final String usrResolver = GridMixConfig.GRIDMIX_USER_RESOLVER;
76   static final String fileOutputFormatKey = "mapred.output.compress";
77   static final String fileInputFormatKey = "mapred.input.dir";
78   static final String compEmulKey = GridMixConfig.GRIDMIX_COMPRESSION_ENABLE;
79   static final String inputDecompKey =
80       GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE;
81   static final String mapInputCompRatio =
82       GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO;
83   static final String mapOutputCompRatio =
84       GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO;
85   static final String reduceOutputCompRatio =
86       GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO;
87   private Map<String, List<JobConf>> simuAndOrigJobsInfo =
88       new HashMap<String, List<JobConf>>();
89 
90   /**
91    * Gridmix job verification constructor
92    * @param path - path of the gridmix output directory.
93    * @param conf - cluster configuration.
94    * @param jtClient - jobtracker client.
95    */
GridmixJobVerification(Path path, Configuration conf, JTClient jtClient)96   public GridmixJobVerification(Path path, Configuration conf,
97      JTClient jtClient) {
98     this.path = path;
99     this.conf = conf;
100     this.jtClient = jtClient;
101   }
102 
103   /**
104    * It verifies the Gridmix jobs with corresponding job story in a trace file.
105    * @param jobids - gridmix job ids.
106    * @throws IOException - if an I/O error occurs.
107    * @throws ParseException - if an parse error occurs.
108    */
verifyGridmixJobsWithJobStories(List<JobID> jobids)109   public void verifyGridmixJobsWithJobStories(List<JobID> jobids)
110       throws Exception {
111 
112     SortedMap <Long, String> origSubmissionTime = new TreeMap <Long, String>();
113     SortedMap <Long, String> simuSubmissionTime = new TreeMap<Long, String>();
114     GridmixJobStory gjs = new GridmixJobStory(path, conf);
115     final Iterator<JobID> ite = jobids.iterator();
116     File destFolder = new File(System.getProperty("java.io.tmpdir")
117                               + "/gridmix-st/");
118     destFolder.mkdir();
119     while (ite.hasNext()) {
120       JobID simuJobId = ite.next();
121 
122       JobHistory.JobInfo jhInfo = getSimulatedJobHistory(simuJobId);
123       Assert.assertNotNull("Job history not found.", jhInfo);
124       Counters counters =
125           Counters.fromEscapedCompactString(jhInfo.getValues()
126               .get(JobHistory.Keys.COUNTERS));
127       JobConf simuJobConf = getSimulatedJobConf(simuJobId, destFolder);
128       int cnt = 1;
129       do {
130         if (simuJobConf != null) {
131           break;
132         }
133         Thread.sleep(100);
134         simuJobConf = getSimulatedJobConf(simuJobId, destFolder);
135         cnt++;
136       } while(cnt < 30);
137 
138       String origJobId = simuJobConf.get(origJobIdKey);
139       LOG.info("OriginalJobID<->CurrentJobID:"
140               + origJobId + "<->" + simuJobId);
141 
142       if (userResolverVal == null) {
143         userResolverVal = simuJobConf.get(usrResolver);
144       }
145 
146       ZombieJob zombieJob = gjs.getZombieJob(JobID.forName(origJobId));
147       Map<String, Long> mapJobCounters = getJobMapCounters(zombieJob);
148       Map<String, Long> reduceJobCounters = getJobReduceCounters(zombieJob);
149       if (simuJobConf.get(jobSubKey).contains("REPLAY")) {
150           origSubmissionTime.put(zombieJob.getSubmissionTime(),
151                                  origJobId.toString() + "^" + simuJobId);
152           simuSubmissionTime.put(Long.parseLong(jhInfo.getValues().get(JobHistory.Keys.SUBMIT_TIME)),
153                                  origJobId.toString() + "^" + simuJobId); ;
154       }
155 
156       LOG.info("Verifying the job <" + simuJobId + "> and wait for a while...");
157       verifySimulatedJobSummary(zombieJob, jhInfo, simuJobConf);
158       verifyJobMapCounters(counters, mapJobCounters, simuJobConf);
159       verifyJobReduceCounters(counters, reduceJobCounters, simuJobConf);
160       verifyCompressionEmulation(zombieJob.getJobConf(), simuJobConf, counters,
161                                  reduceJobCounters, mapJobCounters);
162       verifyDistributeCache(zombieJob,simuJobConf);
163       setJobDistributedCacheInfo(simuJobId.toString(), simuJobConf,
164          zombieJob.getJobConf());
165       verifyHighRamMemoryJobs(zombieJob, simuJobConf);
166       verifyCPUEmulationOfJobs(zombieJob, jhInfo, simuJobConf);
167       verifyMemoryEmulationOfJobs(zombieJob, jhInfo, simuJobConf);
168       LOG.info("Done.");
169     }
170     verifyDistributedCacheBetweenJobs(simuAndOrigJobsInfo);
171   }
172 
173   /**
174    * Verify the job submission order between the jobs in replay mode.
175    * @param origSubmissionTime - sorted map of original jobs submission times.
176    * @param simuSubmissionTime - sorted map of simulated jobs submission times.
177    */
verifyJobSumissionTime(SortedMap<Long, String> origSubmissionTime, SortedMap<Long, String> simuSubmissionTime)178   public void verifyJobSumissionTime(SortedMap<Long, String> origSubmissionTime,
179       SortedMap<Long, String> simuSubmissionTime) {
180     Assert.assertEquals("Simulated job's submission time count has "
181                      + "not match with Original job's submission time count.",
182                      origSubmissionTime.size(), simuSubmissionTime.size());
183     for ( int index = 0; index < origSubmissionTime.size(); index ++) {
184         String origAndSimuJobID = origSubmissionTime.get(index);
185         String simuAndorigJobID = simuSubmissionTime.get(index);
186         Assert.assertEquals("Simulated jobs have not submitted in same "
187                            + "order as original jobs submitted in REPLAY mode.",
188                            origAndSimuJobID, simuAndorigJobID);
189     }
190   }
191 
192   /**
193    * It verifies the simulated job map counters.
194    * @param counters - Original job map counters.
195    * @param mapJobCounters - Simulated job map counters.
196    * @param jobConf - Simulated job configuration.
197    * @throws ParseException - If an parser error occurs.
198    */
verifyJobMapCounters(Counters counters, Map<String,Long> mapCounters, JobConf jobConf)199   public void verifyJobMapCounters(Counters counters,
200      Map<String,Long> mapCounters, JobConf jobConf) throws ParseException {
201     if (!jobConf.get(jobTypeKey, "LOADJOB").equals("SLEEPJOB")) {
202       Assert.assertEquals("Map input records have not matched.",
203                           mapCounters.get("MAP_INPUT_RECS").longValue(),
204                           getCounterValue(counters, "MAP_INPUT_RECORDS"));
205     } else {
206       Assert.assertTrue("Map Input Bytes are zero",
207                         getCounterValue(counters,"HDFS_BYTES_READ") != 0);
208       Assert.assertNotNull("Map Input Records are zero",
209                            getCounterValue(counters, "MAP_INPUT_RECORDS")!=0);
210     }
211   }
212 
213   /**
214    *  It verifies the simulated job reduce counters.
215    * @param counters - Original job reduce counters.
216    * @param reduceCounters - Simulated job reduce counters.
217    * @param jobConf - simulated job configuration.
218    * @throws ParseException - if an parser error occurs.
219    */
verifyJobReduceCounters(Counters counters, Map<String,Long> reduceCounters, JobConf jobConf)220   public void verifyJobReduceCounters(Counters counters,
221      Map<String,Long> reduceCounters, JobConf jobConf) throws ParseException {
222     if (jobConf.get(jobTypeKey, "LOADJOB").equals("SLEEPJOB")) {
223       Assert.assertTrue("Reduce output records are not zero for sleep job.",
224           getCounterValue(counters, "REDUCE_OUTPUT_RECORDS") == 0);
225       Assert.assertTrue("Reduce output bytes are not zero for sleep job.",
226           getCounterValue(counters,"HDFS_BYTES_WRITTEN") == 0);
227     }
228   }
229 
230   /**
231    * It verifies the gridmix simulated job summary.
232    * @param zombieJob - Original job summary.
233    * @param jhInfo  - Simulated job history info.
234    * @param jobConf - simulated job configuration.
235    * @throws IOException - if an I/O error occurs.
236    */
verifySimulatedJobSummary(ZombieJob zombieJob, JobHistory.JobInfo jhInfo, JobConf jobConf)237   public void verifySimulatedJobSummary(ZombieJob zombieJob,
238      JobHistory.JobInfo jhInfo, JobConf jobConf) throws IOException {
239     Assert.assertEquals("Job id has not matched", zombieJob.getJobID(),
240                         JobID.forName(jobConf.get(origJobIdKey)));
241 
242     Assert.assertEquals("Job maps have not matched", String.valueOf(zombieJob.getNumberMaps()),
243                         jhInfo.getValues().get(JobHistory.Keys.TOTAL_MAPS));
244 
245     if (!jobConf.getBoolean(mapTaskKey, false)) {
246       Assert.assertEquals("Job reducers have not matched",
247           String.valueOf(zombieJob.getNumberReduces()), jhInfo.getValues().get(JobHistory.Keys.TOTAL_REDUCES));
248     } else {
249       Assert.assertEquals("Job reducers have not matched",
250                           0, Integer.parseInt(jhInfo.getValues().get(JobHistory.Keys.TOTAL_REDUCES)));
251     }
252 
253     Assert.assertEquals("Job status has not matched.",
254                         zombieJob.getOutcome().name(),
255                         convertJobStatus(jhInfo.getValues().get(JobHistory.Keys.JOB_STATUS)));
256 
257     LoggedJob loggedJob = zombieJob.getLoggedJob();
258     Assert.assertEquals("Job priority has not matched.",
259                         loggedJob.getPriority().toString(),
260                         jhInfo.getValues().get(JobHistory.Keys.JOB_PRIORITY));
261 
262     if (jobConf.get(usrResolver).contains("RoundRobin")) {
263        String user = UserGroupInformation.getLoginUser().getShortUserName();
264        Assert.assertTrue(jhInfo.getValues().get(JobHistory.Keys.JOBID).toString()
265                         + " has not impersonate with other user.",
266                         !jhInfo.getValues().get(JobHistory.Keys.USER).equals(user));
267     }
268   }
269 
270   /**
271    * Get the original job map counters from a trace.
272    * @param zombieJob - Original job story.
273    * @return - map counters as a map.
274    */
getJobMapCounters(ZombieJob zombieJob)275   public Map<String, Long> getJobMapCounters(ZombieJob zombieJob) {
276     long expMapInputBytes = 0;
277     long expMapOutputBytes = 0;
278     long expMapInputRecs = 0;
279     long expMapOutputRecs = 0;
280     Map<String,Long> mapCounters = new HashMap<String,Long>();
281     for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
282       TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
283       expMapInputBytes += mapTask.getInputBytes();
284       expMapOutputBytes += mapTask.getOutputBytes();
285       expMapInputRecs += mapTask.getInputRecords();
286       expMapOutputRecs += mapTask.getOutputRecords();
287     }
288     mapCounters.put("MAP_INPUT_BYTES", expMapInputBytes);
289     mapCounters.put("MAP_OUTPUT_BYTES", expMapOutputBytes);
290     mapCounters.put("MAP_INPUT_RECS", expMapInputRecs);
291     mapCounters.put("MAP_OUTPUT_RECS", expMapOutputRecs);
292     return mapCounters;
293   }
294 
295   /**
296    * Get the original job reduce counters from a trace.
297    * @param zombieJob - Original job story.
298    * @return - reduce counters as a map.
299    */
getJobReduceCounters(ZombieJob zombieJob)300   public Map<String,Long> getJobReduceCounters(ZombieJob zombieJob) {
301     long expReduceInputBytes = 0;
302     long expReduceOutputBytes = 0;
303     long expReduceInputRecs = 0;
304     long expReduceOutputRecs = 0;
305     Map<String,Long> reduceCounters = new HashMap<String,Long>();
306     for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
307       TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
308       expReduceInputBytes += reduceTask.getInputBytes();
309       expReduceOutputBytes += reduceTask.getOutputBytes();
310       expReduceInputRecs += reduceTask.getInputRecords();
311       expReduceOutputRecs += reduceTask.getOutputRecords();
312     }
313     reduceCounters.put("REDUCE_INPUT_BYTES", expReduceInputBytes);
314     reduceCounters.put("REDUCE_OUTPUT_BYTES", expReduceOutputBytes);
315     reduceCounters.put("REDUCE_INPUT_RECS", expReduceInputRecs);
316     reduceCounters.put("REDUCE_OUTPUT_RECS", expReduceOutputRecs);
317     return reduceCounters;
318   }
319 
320   /**
321    * Get the simulated job configuration of a job.
322    * @param simulatedJobID - Simulated job id.
323    * @param tmpJHFolder - temporary job history folder location.
324    * @return - simulated job configuration.
325    * @throws IOException - If an I/O error occurs.
326    */
getSimulatedJobConf(JobID simulatedJobID, File tmpJHFolder)327   public JobConf getSimulatedJobConf(JobID simulatedJobID, File tmpJHFolder)
328       throws IOException, InterruptedException {
329     FileSystem fs = null;
330     try {
331 
332       String historyFilePath = jtClient.getProxy()
333           .getJobHistoryLocationForRetiredJob(simulatedJobID);
334       int cnt = 0;
335       do {
336         if (historyFilePath != null) {
337            break;
338         }
339         Thread.sleep(100);
340         historyFilePath = jtClient.getProxy()
341             .getJobHistoryLocationForRetiredJob(simulatedJobID);
342         cnt++;
343       } while( cnt < 30 );
344       Assert.assertNotNull("History file has not available for the job ["
345               + simulatedJobID + "] for 3 secs.", historyFilePath);
346       Path jhpath = new Path(historyFilePath);
347       LOG.info("Parent:" + jhpath.getParent());
348       fs = jhpath.getFileSystem(conf);
349       fs.copyToLocalFile(jhpath,new Path(tmpJHFolder.toString()));
350       fs.copyToLocalFile(new Path(jhpath.getParent() + "/" + simulatedJobID + "_conf.xml"),
351                          new Path(tmpJHFolder.toString()));
352       JobConf jobConf = new JobConf();
353       jobConf.addResource(new Path(tmpJHFolder.toString()
354                          + "/" + simulatedJobID + "_conf.xml"));
355       jobConf.reloadConfiguration();
356       return jobConf;
357 
358     }finally {
359       fs.close();
360     }
361   }
362 
363   /**
364    * Get the simulated job history of a job.
365    * @param simulatedJobID - simulated job id.
366    * @return - simulated job information.
367    * @throws IOException - if an I/O error occurs.
368    */
getSimulatedJobHistory(JobID simulatedJobID)369   public JobHistory.JobInfo getSimulatedJobHistory(JobID simulatedJobID)
370       throws IOException, InterruptedException {
371     FileSystem fs = null;
372     try {
373       String historyFilePath = jtClient.getProxy().
374           getJobHistoryLocationForRetiredJob(simulatedJobID);
375       int cnt = 0;
376       do {
377         if (historyFilePath != null) {
378           break;
379         }
380         Thread.sleep(100);
381         historyFilePath = jtClient.getProxy()
382             .getJobHistoryLocationForRetiredJob(simulatedJobID);
383         cnt++;
384       } while( cnt < 30 );
385       LOG.info("HistoryFilePath:" + historyFilePath);
386       Assert.assertNotNull("History file path has not found for a job["
387               + simulatedJobID + "] for 3 secs.");
388       Path jhpath = new Path(historyFilePath);
389       fs = jhpath.getFileSystem(conf);
390       JobHistory.JobInfo jobInfo =
391           new JobHistory.JobInfo(simulatedJobID.toString());
392       DefaultJobHistoryParser.parseJobTasks(historyFilePath, jobInfo, fs);
393       return jobInfo;
394     } finally {
395       fs.close();
396     }
397   }
398 
399   /**
400    * It verifies the cpu resource usage of gridmix jobs against
401    * the original job cpu resource usage.
402    * @param origJobHistory - Original job history.
403    * @param simuJobHistoryInfo - Simulated job history.
404    * @param simuJobConf - simulated job configuration.
405    */
verifyCPUEmulationOfJobs(ZombieJob origJobHistory, JobHistory.JobInfo simuJobHistoryInfo, JobConf simuJobConf)406   public void verifyCPUEmulationOfJobs(ZombieJob origJobHistory,
407                                    JobHistory.JobInfo simuJobHistoryInfo,
408                                    JobConf simuJobConf) throws Exception {
409     boolean isCPUEmulON = false;
410     if (simuJobConf.get(GridMixConfig.GRIDMIX_CPU_EMULATION) != null) {
411       isCPUEmulON =
412           simuJobConf.get(GridMixConfig.GRIDMIX_CPU_EMULATION).
413               contains(GridMixConfig.GRIDMIX_CPU_EMULATION_PLUGIN);
414     }
415 
416     if (isCPUEmulON) {
417       Map<String,Long> origJobMetrics =
418                        getOriginalJobCPUMetrics(origJobHistory);
419       Map<String,Long> simuJobMetrics =
420                        getSimulatedJobCPUMetrics(simuJobHistoryInfo);
421 
422       long origMapUsage = origJobMetrics.get("MAP");
423       LOG.info("Total cpu usage of Maps for a original job:" + origMapUsage);
424 
425       long origReduceUsage = origJobMetrics.get("REDUCE");
426       LOG.info("Total cpu usage of Reduces for a original job:"
427               + origReduceUsage);
428 
429       long simuMapUsage = simuJobMetrics.get("MAP");
430       LOG.info("Total cpu usage of Maps for a simulated job:" + simuMapUsage);
431 
432       long simuReduceUsage = simuJobMetrics.get("REDUCE");
433       LOG.info("Total cpu usage of Reduces for a simulated job:"
434               + simuReduceUsage);
435 
436       int mapCount = Integer.parseInt(
437           simuJobHistoryInfo.getValues().get(JobHistory.Keys.TOTAL_MAPS));
438       int reduceCount = Integer.parseInt(
439           simuJobHistoryInfo.getValues().get(JobHistory.Keys.TOTAL_REDUCES));
440 
441       if (mapCount > 0) {
442         double mapEmulFactor = (simuMapUsage * 100) / origMapUsage;
443         long mapEmulAccuracy = Math.round(mapEmulFactor);
444         LOG.info("CPU emulation accuracy for maps in job " +
445                  simuJobHistoryInfo.getValues().get(JobHistory.Keys.JOBID) +
446                  ":"+ mapEmulAccuracy + "%");
447         Assert.assertTrue("Map-side cpu emulaiton inaccurate!" +
448                           " Actual cpu usage: " + simuMapUsage +
449                           " Expected cpu usage: " + origMapUsage, mapEmulAccuracy
450                           >= GridMixConfig.GRIDMIX_CPU_EMULATION_LOWER_LIMIT
451                           && mapEmulAccuracy
452                           <= GridMixConfig.GRIDMIX_CPU_EMULATION_UPPER_LIMIT);
453       }
454 
455       if (reduceCount >0) {
456         double reduceEmulFactor = (simuReduceUsage * 100) / origReduceUsage;
457         long reduceCPUUsage = simuReduceUsage / 1000;
458         LOG.info("Reduce CPU Usage:" + reduceCPUUsage);
459         LOG.info("Reduce emulation factor:" + reduceEmulFactor);
460         long reduceEmulAccuracy = Math.round(reduceEmulFactor);
461         LOG.info("CPU emulation accuracy for reduces in job " +
462                  simuJobHistoryInfo.getValues().get(JobHistory.Keys.JOBID) +
463                  ": " + reduceEmulAccuracy + "%");
464         if ( reduceCPUUsage >= 10 ) {
465           Assert.assertTrue("Reduce side cpu emulaiton inaccurate!" +
466                             " Actual cpu usage:" + simuReduceUsage +
467                             "Expected cpu usage: " + origReduceUsage,
468                             reduceEmulAccuracy
469                             >= GridMixConfig.GRIDMIX_CPU_EMULATION_LOWER_LIMIT
470                             && reduceEmulAccuracy
471                             <= GridMixConfig.GRIDMIX_CPU_EMULATION_UPPER_LIMIT);
472         } else {
473           Assert.assertTrue("Reduce side cpu emulaiton inaccurate!" +
474                             " Actual cpu usage:" + simuReduceUsage +
475                             "Expected cpu usage: " + origReduceUsage,
476                             reduceEmulAccuracy
477                             >= 60 && reduceEmulAccuracy <= 100);
478         }
479       }
480     }
481   }
482 
483   /**
484    * It verifies the heap memory resource usage of gridmix jobs with
485    * corresponding original job in the trace.
486    * @param zombieJob - Original job history.
487    * @param jhInfo - Simulated job history.
488    * @param simuJobConf - simulated job configuration.
489    */
verifyMemoryEmulationOfJobs(ZombieJob zombieJob, JobHistory.JobInfo jhInfo, JobConf simuJobConf)490   public void verifyMemoryEmulationOfJobs(ZombieJob zombieJob,
491           JobHistory.JobInfo jhInfo, JobConf simuJobConf) throws Exception {
492     long origJobMapsTHU = 0;
493     long origJobReducesTHU = 0;
494     long simuJobMapsTHU = 0;
495     long simuJobReducesTHU = 0;
496     boolean isMemEmulOn = false;
497     String strHeapRatio = "0.3F";
498 
499     if (simuJobConf.get(GridMixConfig.GRIDMIX_MEMORY_EMULATION) != null) {
500       isMemEmulOn =
501           simuJobConf.get(GridMixConfig.GRIDMIX_MEMORY_EMULATION).
502               contains(GridMixConfig.GRIDMIX_MEMORY_EMULATION_PLUGIN);
503     }
504 
505     if (isMemEmulOn) {
506 
507       for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
508         TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
509         if (mapTask.getResourceUsageMetrics().getHeapUsage() > 0) {
510           origJobMapsTHU +=
511                   mapTask.getResourceUsageMetrics().getHeapUsage();
512         }
513       }
514       LOG.info("Total Heap Usage of Maps for original job: "
515               + origJobMapsTHU);
516 
517       for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
518         TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
519         if (reduceTask.getResourceUsageMetrics().getHeapUsage() > 0) {
520           origJobReducesTHU +=
521                   reduceTask.getResourceUsageMetrics().getHeapUsage();
522         }
523       }
524       LOG.info("Total Heap Usage of Reduces for original job: "
525               + origJobReducesTHU);
526 
527       Counters mapCounters =
528           Counters.fromEscapedCompactString(jhInfo.getValues()
529                   .get(JobHistory.Keys.MAP_COUNTERS));
530 
531       Counters reduceCounters =
532           Counters.fromEscapedCompactString(jhInfo.getValues()
533                   .get(JobHistory.Keys.REDUCE_COUNTERS));
534 
535       simuJobMapsTHU =
536           getCounterValue(mapCounters,
537               Task.Counter.COMMITTED_HEAP_BYTES.toString());
538       LOG.info("Simulated Job Maps Total Heap Usage: " + simuJobMapsTHU);
539 
540       simuJobReducesTHU =
541           getCounterValue(reduceCounters,
542               Task.Counter.COMMITTED_HEAP_BYTES.toString());
543       LOG.info("Simulated Jobs Reduces Total Heap Usage: " + simuJobReducesTHU);
544 
545       long mapCount =
546           Integer.parseInt(jhInfo.getValues()
547                   .get(JobHistory.Keys.TOTAL_MAPS));
548       long reduceCount =
549           Integer.parseInt(jhInfo.getValues()
550                   .get(JobHistory.Keys.TOTAL_REDUCES));
551 
552       if (simuJobConf.get(GridMixConfig
553           .GRIDMIX_HEAP_FREE_MEMORY_RATIO) != null) {
554         strHeapRatio = "0.3F";
555       }
556 
557       if (mapCount > 0) {
558         double mapEmulFactor = (simuJobMapsTHU * 100) / origJobMapsTHU;
559         long mapEmulAccuracy = Math.round(mapEmulFactor);
560         LOG.info("Maps memory emulation accuracy of a job:"
561                 + mapEmulAccuracy + "%");
562         Assert.assertTrue("Map phase total memory emulation had crossed the "
563                          + "configured max limit.", mapEmulAccuracy
564                          <= GridMixConfig.GRIDMIX_MEMORY_EMULATION_UPPER_LIMIT);
565         Assert.assertTrue("Map phase total memory emulation had not crossed "
566                          + "the configured min limit.", mapEmulAccuracy
567                          >= GridMixConfig.GRIDMIX_MEMORY_EMULATION_LOWER_LIMIT);
568         double expHeapRatio = Double.parseDouble(strHeapRatio);
569         LOG.info("expHeapRatio for maps:" + expHeapRatio);
570         double actHeapRatio =
571                 ((double)Math.abs(origJobMapsTHU - simuJobMapsTHU)) ;
572         actHeapRatio /= origJobMapsTHU;
573           LOG.info("actHeapRatio for maps:" + actHeapRatio);
574           Assert.assertTrue("Simulate job maps heap ratio not matched.",
575                             actHeapRatio <= expHeapRatio);
576       }
577 
578       if (reduceCount >0) {
579         double reduceEmulFactor = (simuJobReducesTHU * 100) / origJobReducesTHU;
580         long reduceEmulAccuracy = Math.round(reduceEmulFactor);
581         LOG.info("Reduces memory emulation accuracy of a job:"
582                 + reduceEmulAccuracy + "%");
583         Assert.assertTrue("Reduce phase total memory emulation had crossed "
584                          + "configured max limit.", reduceEmulAccuracy
585                          <= GridMixConfig.GRIDMIX_MEMORY_EMULATION_UPPER_LIMIT);
586         Assert.assertTrue("Reduce phase total memory emulation had not "
587                          + "crosssed configured min limit.", reduceEmulAccuracy
588                          >= GridMixConfig.GRIDMIX_MEMORY_EMULATION_LOWER_LIMIT);
589         double expHeapRatio = Double.parseDouble(strHeapRatio);
590         LOG.info("expHeapRatio for reduces:" + expHeapRatio);
591         double actHeapRatio =
592                 ((double)Math.abs(origJobReducesTHU - simuJobReducesTHU));
593         actHeapRatio /= origJobReducesTHU;
594           LOG.info("actHeapRatio for reduces:" + actHeapRatio);
595           Assert.assertTrue("Simulate job reduces heap ratio not matched.",
596                             actHeapRatio <= expHeapRatio);
597       }
598     }
599   }
600 
601   /**
602    *  Get the simulated job cpu metrics.
603    * @param jhInfo - Simulated job history
604    * @return - cpu metrics as a map.
605    * @throws Exception - if an error occurs.
606    */
getSimulatedJobCPUMetrics( JobHistory.JobInfo jhInfo)607   private Map<String,Long> getSimulatedJobCPUMetrics(
608                            JobHistory.JobInfo jhInfo) throws Exception {
609     Map<String, Long> resourceMetrics = new HashMap<String, Long>();
610     Counters mapCounters = Counters.fromEscapedCompactString(
611         jhInfo.getValues().get(JobHistory.Keys.MAP_COUNTERS));
612     long mapCPUUsage =
613         getCounterValue(mapCounters,
614                         Task.Counter.CPU_MILLISECONDS.toString());
615     resourceMetrics.put("MAP", mapCPUUsage);
616 
617     Counters reduceCounters = Counters.fromEscapedCompactString(
618         jhInfo.getValues().get(JobHistory.Keys.REDUCE_COUNTERS));
619     long reduceCPUUsage =
620         getCounterValue(reduceCounters,
621                         Task.Counter.CPU_MILLISECONDS.toString());
622     resourceMetrics.put("REDUCE", reduceCPUUsage);
623     return resourceMetrics;
624   }
625 
626   /**
627    * Get the original job cpu metrics.
628    * @param zombieJob - original job history.
629    * @return - cpu metrics as map.
630    */
getOriginalJobCPUMetrics(ZombieJob zombieJob)631   private Map<String, Long> getOriginalJobCPUMetrics(ZombieJob zombieJob) {
632     long mapTotalCPUUsage = 0;
633     long reduceTotalCPUUsage = 0;
634     Map<String,Long> resourceMetrics = new HashMap<String,Long>();
635 
636     for (int index = 0; index < zombieJob.getNumberMaps(); index++) {
637       TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
638       if (mapTask.getResourceUsageMetrics().getCumulativeCpuUsage() > 0) {
639         mapTotalCPUUsage +=
640             mapTask.getResourceUsageMetrics().getCumulativeCpuUsage();
641       }
642     }
643     resourceMetrics.put("MAP", mapTotalCPUUsage);
644 
645     for (int index = 0; index < zombieJob.getNumberReduces(); index++) {
646       TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
647       if (reduceTask.getResourceUsageMetrics().getCumulativeCpuUsage() > 0) {
648         reduceTotalCPUUsage +=
649             reduceTask.getResourceUsageMetrics().getCumulativeCpuUsage();
650       }
651     }
652     resourceMetrics.put("REDUCE", reduceTotalCPUUsage);
653     return resourceMetrics;
654   }
655 
656   /**
657    * Get the user resolver of a job.
658    */
getJobUserResolver()659   public String getJobUserResolver() {
660     return userResolverVal;
661   }
662 
663   /**
664    * It verifies the compression ratios of mapreduce jobs.
665    * @param origJobConf - original job configuration.
666    * @param simuJobConf - simulated job configuration.
667    * @param counters  - simulated job counters.
668    * @param origReduceCounters - original job reduce counters.
669    * @param origMapCounters - original job map counters.
670    * @throws ParseException - if a parser error occurs.
671    * @throws IOException - if an I/O error occurs.
672    */
verifyCompressionEmulation(JobConf origJobConf, JobConf simuJobConf,Counters counters, Map<String, Long> origReduceCounters, Map<String, Long> origMapJobCounters)673   public void verifyCompressionEmulation(JobConf origJobConf,
674                                          JobConf simuJobConf,Counters counters,
675                                          Map<String, Long> origReduceCounters,
676                                          Map<String, Long> origMapJobCounters)
677                                          throws ParseException,IOException {
678     if (simuJobConf.getBoolean(compEmulKey, false)) {
679       String inputDir = origJobConf.get(fileInputFormatKey);
680       Assert.assertNotNull(fileInputFormatKey + " is Null",inputDir);
681       long simMapInputBytes = getCounterValue(counters, "HDFS_BYTES_READ");
682       long uncompressedInputSize = origMapJobCounters.get("MAP_INPUT_BYTES");
683       long simReduceInputBytes =
684             getCounterValue(counters, "REDUCE_SHUFFLE_BYTES");
685         long simMapOutputBytes = getCounterValue(counters, "MAP_OUTPUT_BYTES");
686 
687       // Verify input compression whether it's enable or not.
688       if (inputDir.contains(".gz") || inputDir.contains(".tgz")
689          || inputDir.contains(".bz")) {
690         Assert.assertTrue("Input decompression attribute has been not set for "
691                          + "for compressed input",
692                          simuJobConf.getBoolean(inputDecompKey, false));
693 
694         float INPUT_COMP_RATIO =
695             getExpectedCompressionRatio(simuJobConf, mapInputCompRatio);
696         float INTERMEDIATE_COMP_RATIO =
697             getExpectedCompressionRatio(simuJobConf, mapOutputCompRatio);
698 
699         // Verify Map Input Compression Ratio.
700         assertMapInputCompressionRatio(simMapInputBytes, uncompressedInputSize,
701                                        INPUT_COMP_RATIO);
702 
703         // Verify Map Output Compression Ratio.
704         assertMapOuputCompressionRatio(simReduceInputBytes, simMapOutputBytes,
705                                        INTERMEDIATE_COMP_RATIO);
706       } else {
707         Assert.assertEquals("MAP input bytes has not matched.",
708                             convertBytes(uncompressedInputSize),
709                             convertBytes(simMapInputBytes));
710       }
711 
712       Assert.assertEquals("Simulated job output format has not matched with "
713                          + "original job output format.",
714                          origJobConf.getBoolean(fileOutputFormatKey,false),
715                          simuJobConf.getBoolean(fileOutputFormatKey,false));
716 
717       if (simuJobConf.getBoolean(fileOutputFormatKey,false)) {
718         float OUTPUT_COMP_RATIO =
719             getExpectedCompressionRatio(simuJobConf, reduceOutputCompRatio);
720 
721          //Verify reduce output compression ratio.
722          long simReduceOutputBytes =
723              getCounterValue(counters, "HDFS_BYTES_WRITTEN");
724          long origReduceOutputBytes =
725              origReduceCounters.get("REDUCE_OUTPUT_BYTES");
726          assertReduceOutputCompressionRatio(simReduceOutputBytes,
727                                             origReduceOutputBytes,
728                                             OUTPUT_COMP_RATIO);
729       }
730     }
731   }
732 
assertMapInputCompressionRatio(long simMapInputBytes, long origMapInputBytes, float expInputCompRatio)733   private void assertMapInputCompressionRatio(long simMapInputBytes,
734                                    long origMapInputBytes,
735                                    float expInputCompRatio) {
736     LOG.info("***Verify the map input bytes compression ratio****");
737     LOG.info("Simulated job's map input bytes(REDUCE_SHUFFLE_BYTES): "
738             + simMapInputBytes);
739     LOG.info("Original job's map input bytes: " + origMapInputBytes);
740 
741     final float actInputCompRatio =
742         getActualCompressionRatio(simMapInputBytes, origMapInputBytes);
743     LOG.info("Expected Map Input Compression Ratio:" + expInputCompRatio);
744     LOG.info("Actual Map Input Compression Ratio:" + actInputCompRatio);
745 
746     float diffVal = (float)(expInputCompRatio * 0.06);
747     LOG.info("Expected Difference of Map Input Compression Ratio is <= " +
748             + diffVal);
749     float delta = Math.abs(expInputCompRatio - actInputCompRatio);
750     LOG.info("Actual Difference of Map Iput Compression Ratio:" + delta);
751     Assert.assertTrue("Simulated job input compression ratio has mismatched.",
752                       delta <= diffVal);
753     LOG.info("******Done******");
754   }
755 
assertMapOuputCompressionRatio(long simReduceInputBytes, long simMapoutputBytes, float expMapOuputCompRatio)756   private void assertMapOuputCompressionRatio(long simReduceInputBytes,
757                                               long simMapoutputBytes,
758                                               float expMapOuputCompRatio) {
759     LOG.info("***Verify the map output bytes compression ratio***");
760     LOG.info("Simulated job reduce input bytes:" + simReduceInputBytes);
761     LOG.info("Simulated job map output bytes:" + simMapoutputBytes);
762 
763     final float actMapOutputCompRatio =
764         getActualCompressionRatio(simReduceInputBytes, simMapoutputBytes);
765     LOG.info("Expected Map Output Compression Ratio:" + expMapOuputCompRatio);
766     LOG.info("Actual Map Output Compression Ratio:" + actMapOutputCompRatio);
767 
768     float diffVal = 0.05f;
769     LOG.info("Expected Difference Of Map Output Compression Ratio is <= "
770             + diffVal);
771     float delta = Math.abs(expMapOuputCompRatio - actMapOutputCompRatio);
772     LOG.info("Actual Difference Of Map Ouput Compression Ratio :" + delta);
773 
774     Assert.assertTrue("Simulated job map output compression ratio "
775                      + "has not been matched.", delta <= diffVal);
776     LOG.info("******Done******");
777   }
778 
assertReduceOutputCompressionRatio(long simReduceOutputBytes, long origReduceOutputBytes , float expOutputCompRatio )779   private void assertReduceOutputCompressionRatio(long simReduceOutputBytes,
780       long origReduceOutputBytes , float expOutputCompRatio ) {
781       LOG.info("***Verify the reduce output bytes compression ratio***");
782       final float actOuputputCompRatio =
783           getActualCompressionRatio(simReduceOutputBytes, origReduceOutputBytes);
784       LOG.info("Simulated job's reduce output bytes:" + simReduceOutputBytes);
785       LOG.info("Original job's reduce output bytes:" + origReduceOutputBytes);
786       LOG.info("Expected output compression ratio:" + expOutputCompRatio);
787       LOG.info("Actual output compression ratio:" + actOuputputCompRatio);
788       long diffVal = (long)(origReduceOutputBytes * 0.15);
789       long delta = Math.abs(origReduceOutputBytes - simReduceOutputBytes);
790       LOG.info("Expected difference of output compressed bytes is <= "
791               + diffVal);
792       LOG.info("Actual difference of compressed ouput bytes:" + delta);
793       Assert.assertTrue("Simulated job reduce output compression ratio " +
794          "has not been matched.", delta <= diffVal);
795       LOG.info("******Done******");
796   }
797 
getExpectedCompressionRatio(JobConf simuJobConf, String RATIO_TYPE)798   private float getExpectedCompressionRatio(JobConf simuJobConf,
799                                             String RATIO_TYPE) {
800     // Default decompression ratio is 0.50f irrespective of original
801     //job compression ratio.
802     if (simuJobConf.get(RATIO_TYPE) != null) {
803       return Float.parseFloat(simuJobConf.get(RATIO_TYPE));
804     } else {
805       return 0.50f;
806     }
807   }
808 
getActualCompressionRatio(long compressBytes, long uncompessBytes)809   private float getActualCompressionRatio(long compressBytes,
810                                           long uncompessBytes) {
811     double ratio = ((double)compressBytes) / uncompessBytes;
812     int significant = (int)Math.round(ratio * 100);
813     return ((float)significant)/100;
814   }
815 
816   /**
817    * Verify the distributed cache files between the jobs in a gridmix run.
818    * @param jobsInfo - jobConfs of simulated and original jobs as a map.
819    */
verifyDistributedCacheBetweenJobs( Map<String,List<JobConf>> jobsInfo)820   public void verifyDistributedCacheBetweenJobs(
821       Map<String,List<JobConf>> jobsInfo) {
822      if (jobsInfo.size() > 1) {
823        Map<String, Integer> simJobfilesOccurBtnJobs =
824            getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 0);
825        Map<String, Integer> origJobfilesOccurBtnJobs =
826            getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 1);
827        List<Integer> simuOccurList =
828            getMapValuesAsList(simJobfilesOccurBtnJobs);
829        Collections.sort(simuOccurList);
830        List<Integer> origOccurList =
831            getMapValuesAsList(origJobfilesOccurBtnJobs);
832        Collections.sort(origOccurList);
833        Assert.assertEquals("The unique count of distibuted cache files in "
834                         + "simulated jobs have not matched with the unique "
835                         + "count of original jobs distributed files ",
836                         simuOccurList.size(), origOccurList.size());
837        int index = 0;
838        for (Integer origDistFileCount : origOccurList) {
839          Assert.assertEquals("Distributed cache file reused in simulated "
840                             + "jobs has not matched with reused of distributed"
841                             + "cache file in original jobs.",
842                             origDistFileCount, simuOccurList.get(index));
843          index ++;
844        }
845      }
846   }
847 
848   /**
849    * Get the unique distributed cache files and occurrence between the jobs.
850    * @param jobsInfo - job's configurations as a map.
851    * @param jobConfIndex - 0 for simulated job configuration and
852    *                       1 for original jobs configuration.
853    * @return  - unique distributed cache files and occurrences as map.
854    */
getDistcacheFilesOccurenceBetweenJobs( Map<String, List<JobConf>> jobsInfo, int jobConfIndex)855   private Map<String, Integer> getDistcacheFilesOccurenceBetweenJobs(
856       Map<String, List<JobConf>> jobsInfo, int jobConfIndex) {
857     Map<String,Integer> filesOccurBtnJobs = new HashMap <String,Integer>();
858     Set<String> jobIds = jobsInfo.keySet();
859     Iterator<String > ite = jobIds.iterator();
860     while (ite.hasNext()) {
861       String jobId = ite.next();
862       List<JobConf> jobconfs = jobsInfo.get(jobId);
863       String [] distCacheFiles = jobconfs.get(jobConfIndex).get(
864           GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(",");
865       String [] distCacheFileTimeStamps = jobconfs.get(jobConfIndex).get(
866           GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
867       String [] distCacheFileVisib = jobconfs.get(jobConfIndex).get(
868           GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
869       int indx = 0;
870       for (String distCacheFile : distCacheFiles) {
871         String fileAndSize = distCacheFile + "^"
872                            + distCacheFileTimeStamps[indx] + "^"
873                            + jobconfs.get(jobConfIndex).getUser();
874         if (filesOccurBtnJobs.get(fileAndSize) != null) {
875           int count = filesOccurBtnJobs.get(fileAndSize);
876           count ++;
877           filesOccurBtnJobs.put(fileAndSize, count);
878         } else {
879           filesOccurBtnJobs.put(fileAndSize, 1);
880         }
881       }
882     }
883     return filesOccurBtnJobs;
884   }
885 
886   /**
887    * It verifies the distributed cache emulation of  a job.
888    * @param zombieJob - Original job story.
889    * @param simuJobConf - Simulated job configuration.
890    */
verifyDistributeCache(ZombieJob zombieJob, JobConf simuJobConf)891   public void verifyDistributeCache(ZombieJob zombieJob,
892                                     JobConf simuJobConf) throws IOException {
893     if (simuJobConf.getBoolean(GridMixConfig.GRIDMIX_DISTCACHE_ENABLE, false)) {
894       JobConf origJobConf = zombieJob.getJobConf();
895       assertFileVisibility(simuJobConf);
896       assertDistcacheFiles(simuJobConf,origJobConf);
897       assertFileSizes(simuJobConf,origJobConf);
898       assertFileStamps(simuJobConf,origJobConf);
899     } else {
900       Assert.assertNull("Configuration has distributed cache visibilites"
901           + "without enabled distributed cache emulation.",
902           simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES));
903       Assert.assertNull("Configuration has distributed cache files time "
904           + "stamps without enabled distributed cache emulation.",
905           simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP));
906       Assert.assertNull("Configuration has distributed cache files paths"
907           + "without enabled distributed cache emulation.",
908           simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES));
909       Assert.assertNull("Configuration has distributed cache files sizes"
910           + "without enabled distributed cache emulation.",
911           simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE));
912     }
913   }
914 
assertFileStamps(JobConf simuJobConf, JobConf origJobConf)915   private void assertFileStamps(JobConf simuJobConf, JobConf origJobConf) {
916     //Verify simulated jobs against distributed cache files time stamps.
917     String [] origDCFTS =
918         origJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
919     String [] simuDCFTS =
920         simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
921     for (int index = 0; index < origDCFTS.length; index++) {
922       Assert.assertTrue("Invalid time stamps between original "
923           +"and simulated job", Long.parseLong(origDCFTS[index])
924           < Long.parseLong(simuDCFTS[index]));
925     }
926   }
927 
assertFileVisibility(JobConf simuJobConf )928   private void assertFileVisibility(JobConf simuJobConf ) {
929     // Verify simulated jobs against distributed cache files visibilities.
930     String [] distFiles =
931         simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(",");
932     String [] simuDistVisibilities =
933         simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
934     List<Boolean> expFileVisibility = new ArrayList<Boolean >();
935     int index = 0;
936     for (String distFile : distFiles) {
937       boolean isLocalDistCache = GridmixSystemTestCase.isLocalDistCache(
938                                  distFile,
939                                  simuJobConf.getUser(),
940                                  Boolean.valueOf(simuDistVisibilities[index]));
941       if (!isLocalDistCache) {
942         expFileVisibility.add(true);
943       } else {
944         expFileVisibility.add(false);
945       }
946       index ++;
947     }
948     index = 0;
949     for (String actFileVisibility :  simuDistVisibilities) {
950       Assert.assertEquals("Simulated job distributed cache file "
951                          + "visibilities has not matched.",
952                          expFileVisibility.get(index),
953                          Boolean.valueOf(actFileVisibility));
954       index ++;
955     }
956   }
957 
assertDistcacheFiles(JobConf simuJobConf, JobConf origJobConf)958   private void assertDistcacheFiles(JobConf simuJobConf, JobConf origJobConf)
959       throws IOException {
960     //Verify simulated jobs against distributed cache files.
961     String [] origDistFiles = origJobConf.get(
962         GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(",");
963     String [] simuDistFiles = simuJobConf.get(
964         GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(",");
965     String [] simuDistVisibilities = simuJobConf.get(
966         GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
967     Assert.assertEquals("No. of simulatued job's distcache files mismacted"
968                        + "with no.of original job's distcache files",
969                        origDistFiles.length, simuDistFiles.length);
970 
971     int index = 0;
972     for (String simDistFile : simuDistFiles) {
973       Path distPath = new Path(simDistFile);
974       boolean isLocalDistCache =
975           GridmixSystemTestCase.isLocalDistCache(simDistFile,
976               simuJobConf.getUser(),
977               Boolean.valueOf(simuDistVisibilities[index]));
978       if (!isLocalDistCache) {
979         FileSystem fs = distPath.getFileSystem(conf);
980         FileStatus fstat = fs.getFileStatus(distPath);
981         FsPermission permission = fstat.getPermission();
982         Assert.assertTrue("HDFS distributed cache file has wrong "
983                          + "permissions for users.",
984                          FsAction.READ_WRITE.SYMBOL
985                          == permission.getUserAction().SYMBOL);
986         Assert.assertTrue("HDFS distributed cache file has wrong "
987                          + "permissions for groups.",
988                          FsAction.READ.SYMBOL
989                          == permission.getGroupAction().SYMBOL);
990         Assert.assertTrue("HDSFS distributed cache file has wrong "
991                          + "permissions for others.",
992                          FsAction.READ.SYMBOL
993                          == permission.getOtherAction().SYMBOL);
994       }
995       index++;
996     }
997   }
998 
assertFileSizes(JobConf simuJobConf, JobConf origJobConf)999   private void assertFileSizes(JobConf simuJobConf, JobConf origJobConf) {
1000     // Verify simulated jobs against distributed cache files size.
1001     List<String> origDistFilesSize =
1002         Arrays.asList(origJobConf.get(
1003             GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(","));
1004     Collections.sort(origDistFilesSize);
1005 
1006     List<String> simuDistFilesSize =
1007         Arrays.asList(simuJobConf.get(
1008             GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(","));
1009     Collections.sort(simuDistFilesSize);
1010 
1011     Assert.assertEquals("Simulated job's file size list has not "
1012                        + "matched with the Original job's file size list.",
1013                        origDistFilesSize.size(),
1014                        simuDistFilesSize.size());
1015 
1016     for (int index = 0; index < origDistFilesSize.size(); index ++) {
1017        Assert.assertEquals("Simulated job distcache file size has not "
1018                           + "matched with original job distcache file size.",
1019                           origDistFilesSize.get(index),
1020                           simuDistFilesSize.get(index));
1021     }
1022   }
1023 
setJobDistributedCacheInfo(String jobId, JobConf simuJobConf, JobConf origJobConf)1024   private void setJobDistributedCacheInfo(String jobId, JobConf simuJobConf,
1025      JobConf origJobConf) {
1026     if (simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES) != null) {
1027       List<JobConf> jobConfs = new ArrayList<JobConf>();
1028       jobConfs.add(simuJobConf);
1029       jobConfs.add(origJobConf);
1030       simuAndOrigJobsInfo.put(jobId,jobConfs);
1031     }
1032   }
1033 
getMapValuesAsList(Map<String,Integer> jobOccurs)1034   private List<Integer> getMapValuesAsList(Map<String,Integer> jobOccurs) {
1035     List<Integer> occursList = new ArrayList<Integer>();
1036     Set<String> files = jobOccurs.keySet();
1037     Iterator<String > ite = files.iterator();
1038     while (ite.hasNext()) {
1039       String file = ite.next();
1040       occursList.add(jobOccurs.get(file));
1041     }
1042     return occursList;
1043   }
1044 
1045   /**
1046    * It verifies the high ram gridmix jobs.
1047    * @param zombieJob - Original job story.
1048    * @param simuJobConf - Simulated job configuration.
1049    */
1050   @SuppressWarnings("deprecation")
verifyHighRamMemoryJobs(ZombieJob zombieJob, JobConf simuJobConf)1051   public void verifyHighRamMemoryJobs(ZombieJob zombieJob,
1052                                       JobConf simuJobConf) {
1053     JobConf origJobConf = zombieJob.getJobConf();
1054     int origMapFactor = getMapFactor(origJobConf);
1055     int origReduceFactor = getReduceFactor(origJobConf);
1056     boolean isHighRamEnable =
1057         simuJobConf.getBoolean(GridMixConfig.GRIDMIX_HIGH_RAM_JOB_ENABLE,
1058                                false);
1059     if (isHighRamEnable) {
1060         if (origMapFactor >= 2 && origReduceFactor >= 2) {
1061           assertGridMixHighRamJob(simuJobConf, origJobConf, 1);
1062         } else if(origMapFactor >= 2) {
1063           assertGridMixHighRamJob(simuJobConf, origJobConf, 2);
1064         } else if(origReduceFactor >= 2) {
1065           assertGridMixHighRamJob(simuJobConf, origJobConf, 3);
1066         }
1067     } else {
1068         if (origMapFactor >= 2 && origReduceFactor >= 2) {
1069               assertGridMixHighRamJob(simuJobConf, origJobConf, 4);
1070         } else if(origMapFactor >= 2) {
1071               assertGridMixHighRamJob(simuJobConf, origJobConf, 5);
1072         } else if(origReduceFactor >= 2) {
1073               assertGridMixHighRamJob(simuJobConf, origJobConf, 6);
1074         }
1075     }
1076   }
1077 
1078   /**
1079    * Get the value for identifying the slots used by the map.
1080    * @param jobConf - job configuration
1081    * @return - map factor value.
1082    */
getMapFactor(Configuration jobConf)1083   public static int getMapFactor(Configuration jobConf) {
1084     long clusterMapMem =
1085         Long.parseLong(jobConf.get(GridMixConfig.CLUSTER_MAP_MEMORY));
1086     long jobMapMem =
1087         Long.parseLong(jobConf.get(GridMixConfig.JOB_MAP_MEMORY_MB));
1088     return (int)Math.ceil((double)jobMapMem / clusterMapMem);
1089   }
1090 
1091   /**
1092    * Get the value for identifying the slots used by the reduce.
1093    * @param jobConf - job configuration.
1094    * @return - reduce factor value.
1095    */
getReduceFactor(Configuration jobConf)1096   public static int getReduceFactor(Configuration jobConf) {
1097     long clusterReduceMem =
1098         Long.parseLong(jobConf.get(GridMixConfig.CLUSTER_REDUCE_MEMORY));
1099     long jobReduceMem =
1100         Long.parseLong(jobConf.get(GridMixConfig.JOB_REDUCE_MEMORY_MB));
1101     return (int)Math.ceil((double)jobReduceMem / clusterReduceMem);
1102   }
1103 
1104   @SuppressWarnings("deprecation")
assertGridMixHighRamJob(JobConf simuJobConf, Configuration origConf, int option)1105   private void assertGridMixHighRamJob(JobConf simuJobConf,
1106                                        Configuration origConf, int option) {
1107     int simuMapFactor = getMapFactor(simuJobConf);
1108     int simuReduceFactor = getReduceFactor(simuJobConf);
1109     /**
1110      *  option 1 : Both map and reduce honors the high ram.
1111      *  option 2 : Map only honors the high ram.
1112      *  option 3 : Reduce only honors the high ram.
1113      *  option 4 : Both map and reduce should not honors the high ram
1114      *             in disable state.
1115      *  option 5 : Map should not honors the high ram in disable state.
1116      *  option 6 : Reduce should not honors the high ram in disable state.
1117      */
1118     switch (option) {
1119       case 1 :
1120                Assert.assertTrue("Gridmix job has not honored the high "
1121                                 + "ram for map.", simuMapFactor >= 2
1122                                 && simuMapFactor == getMapFactor(origConf));
1123                Assert.assertTrue("Gridmix job has not honored the high "
1124                                 + "ram for reduce.", simuReduceFactor >= 2
1125                                 && simuReduceFactor
1126                                 == getReduceFactor(origConf));
1127                break;
1128       case 2 :
1129                Assert.assertTrue("Gridmix job has not honored the high "
1130                                 + "ram for map.", simuMapFactor >= 2
1131                                 && simuMapFactor == getMapFactor(origConf));
1132                break;
1133       case 3 :
1134                Assert.assertTrue("Girdmix job has not honored the high "
1135                                 + "ram for reduce.", simuReduceFactor >= 2
1136                                 && simuReduceFactor
1137                                 == getReduceFactor(origConf));
1138                break;
1139       case 4 :
1140                Assert.assertTrue("Gridmix job has honored the high "
1141                                 + "ram for map in emulation disable state.",
1142                                 simuMapFactor < 2
1143                                 && simuMapFactor != getMapFactor(origConf));
1144                Assert.assertTrue("Gridmix job has honored the high "
1145                                 + "ram for reduce in emulation disable state.",
1146                                 simuReduceFactor < 2
1147                                 && simuReduceFactor
1148                                 != getReduceFactor(origConf));
1149                break;
1150       case 5 :
1151                Assert.assertTrue("Gridmix job has honored the high "
1152                                 + "ram for map in emulation disable state.",
1153                                 simuMapFactor < 2
1154                                 && simuMapFactor != getMapFactor(origConf));
1155                break;
1156       case 6 :
1157                Assert.assertTrue("Girdmix job has honored the high "
1158                                 + "ram for reduce in emulation disable state.",
1159                                 simuReduceFactor < 2
1160                                 && simuReduceFactor
1161                                 != getReduceFactor(origConf));
1162                break;
1163     }
1164   }
1165 
1166   /**
1167    * Get task memory after scaling based on cluster configuration.
1168    * @param jobTaskKey - Job task key attribute.
1169    * @param clusterTaskKey - Cluster task key attribute.
1170    * @param origConf - Original job configuration.
1171    * @param simuConf - Simulated job configuration.
1172    * @return scaled task memory value.
1173    */
1174   @SuppressWarnings("deprecation")
getScaledTaskMemInMB(String jobTaskKey, String clusterTaskKey, Configuration origConf, Configuration simuConf)1175   public static long getScaledTaskMemInMB(String jobTaskKey,
1176                                           String clusterTaskKey,
1177                                           Configuration origConf,
1178                                           Configuration simuConf) {
1179     long simuClusterTaskValue =
1180         simuConf.getLong(clusterTaskKey, JobConf.DISABLED_MEMORY_LIMIT);
1181     long origClusterTaskValue =
1182         origConf.getLong(clusterTaskKey, JobConf.DISABLED_MEMORY_LIMIT);
1183     long origJobTaskValue =
1184         origConf.getLong(jobTaskKey, JobConf.DISABLED_MEMORY_LIMIT);
1185     double scaleFactor =
1186         Math.ceil((double)origJobTaskValue / origClusterTaskValue);
1187     long simulatedJobValue = (long)(scaleFactor * simuClusterTaskValue);
1188     return simulatedJobValue;
1189   }
1190 
1191   /**
1192    * It Verifies the memory limit of a task.
1193    * @param TaskMemInMB - task memory limit.
1194    * @param taskLimitInMB - task upper limit.
1195    */
verifyMemoryLimits(long TaskMemInMB, long taskLimitInMB)1196   public static void verifyMemoryLimits(long TaskMemInMB, long taskLimitInMB) {
1197     if (TaskMemInMB > taskLimitInMB) {
1198       Assert.fail("Simulated job's task memory exceeds the "
1199                  + "upper limit of task virtual memory.");
1200     }
1201   }
1202 
convertJobStatus(String jobStatus)1203   private String convertJobStatus(String jobStatus) {
1204     if (jobStatus.equals("SUCCEEDED")) {
1205       return "SUCCESS";
1206     } else {
1207       return jobStatus;
1208     }
1209   }
1210 
convertBytes(long bytesValue)1211   private String convertBytes(long bytesValue) {
1212     int units = 1024;
1213     if( bytesValue < units ) {
1214       return String.valueOf(bytesValue)+ "B";
1215     } else {
1216       // it converts the bytes into either KB or MB or GB or TB etc.
1217       int exp = (int)(Math.log(bytesValue) / Math.log(units));
1218       return String.format("%1d%sB",(long)(bytesValue / Math.pow(units, exp)),
1219           "KMGTPE".charAt(exp -1));
1220     }
1221   }
1222 
1223 
getCounterValue(Counters counters, String key)1224   private long getCounterValue(Counters counters, String key)
1225      throws ParseException {
1226     for (String groupName : counters.getGroupNames()) {
1227        Group totalGroup = counters.getGroup(groupName);
1228        Iterator<Counter> itrCounter = totalGroup.iterator();
1229        while (itrCounter.hasNext()) {
1230          Counter counter = itrCounter.next();
1231          if (counter.getName().equals(key)) {
1232            return counter.getValue();
1233          }
1234        }
1235     }
1236     return 0;
1237   }
1238 }
1239 
1240