1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapred.gridmix.test.system; 19 20 import java.io.IOException; 21 import java.io.File; 22 import java.util.Iterator; 23 import java.util.List; 24 import java.util.Map; 25 import java.util.HashMap; 26 import java.util.SortedMap; 27 import java.util.TreeMap; 28 import java.util.Collections; 29 import java.util.Set; 30 import java.util.ArrayList; 31 import java.util.Arrays; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.fs.Path; 36 import org.apache.hadoop.fs.FileSystem; 37 import org.apache.hadoop.fs.FileStatus; 38 import org.apache.hadoop.fs.permission.FsPermission; 39 import org.apache.hadoop.fs.permission.FsAction; 40 import org.apache.hadoop.conf.Configuration; 41 import org.apache.hadoop.mapred.Counters; 42 import org.apache.hadoop.mapred.Counters.Counter; 43 import org.apache.hadoop.mapred.Counters.Group; 44 import org.apache.hadoop.mapred.Task; 45 import org.apache.hadoop.mapred.DefaultJobHistoryParser; 46 import org.apache.hadoop.mapred.JobHistory; 47 import org.apache.hadoop.mapreduce.JobID; 48 import org.apache.hadoop.mapreduce.TaskType; 49 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 50 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 51 import org.apache.hadoop.mapreduce.test.system.JTClient; 52 import org.apache.hadoop.mapred.JobConf; 53 import org.apache.hadoop.tools.rumen.LoggedJob; 54 import org.apache.hadoop.tools.rumen.ZombieJob; 55 import org.apache.hadoop.tools.rumen.TaskInfo; 56 import org.junit.Assert; 57 import java.text.ParseException; 58 import org.apache.hadoop.security.UserGroupInformation; 59 import org.apache.hadoop.mapred.gridmix.GridmixSystemTestCase; 60 61 /** 62 * Verifying each Gridmix job with corresponding job story in a trace file. 63 */ 64 public class GridmixJobVerification { 65 66 private static Log LOG = LogFactory.getLog(GridmixJobVerification.class); 67 private Path path; 68 private Configuration conf; 69 private JTClient jtClient; 70 private String userResolverVal; 71 static final String origJobIdKey = GridMixConfig.GRIDMIX_ORIGINAL_JOB_ID; 72 static final String jobSubKey = GridMixConfig.GRIDMIX_SUBMISSION_POLICY; 73 static final String jobTypeKey = GridMixConfig.GRIDMIX_JOB_TYPE; 74 static final String mapTaskKey = GridMixConfig.GRIDMIX_SLEEPJOB_MAPTASK_ONLY; 75 static final String usrResolver = GridMixConfig.GRIDMIX_USER_RESOLVER; 76 static final String fileOutputFormatKey = "mapred.output.compress"; 77 static final String fileInputFormatKey = "mapred.input.dir"; 78 static final String compEmulKey = GridMixConfig.GRIDMIX_COMPRESSION_ENABLE; 79 static final String inputDecompKey = 80 GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE; 81 static final String mapInputCompRatio = 82 GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO; 83 static final String mapOutputCompRatio = 84 GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO; 85 static final String reduceOutputCompRatio = 86 GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO; 87 private Map<String, List<JobConf>> simuAndOrigJobsInfo = 88 new HashMap<String, List<JobConf>>(); 89 90 /** 91 * Gridmix job verification constructor 92 * @param path - path of the gridmix output directory. 93 * @param conf - cluster configuration. 94 * @param jtClient - jobtracker client. 95 */ GridmixJobVerification(Path path, Configuration conf, JTClient jtClient)96 public GridmixJobVerification(Path path, Configuration conf, 97 JTClient jtClient) { 98 this.path = path; 99 this.conf = conf; 100 this.jtClient = jtClient; 101 } 102 103 /** 104 * It verifies the Gridmix jobs with corresponding job story in a trace file. 105 * @param jobids - gridmix job ids. 106 * @throws IOException - if an I/O error occurs. 107 * @throws ParseException - if an parse error occurs. 108 */ verifyGridmixJobsWithJobStories(List<JobID> jobids)109 public void verifyGridmixJobsWithJobStories(List<JobID> jobids) 110 throws Exception { 111 112 SortedMap <Long, String> origSubmissionTime = new TreeMap <Long, String>(); 113 SortedMap <Long, String> simuSubmissionTime = new TreeMap<Long, String>(); 114 GridmixJobStory gjs = new GridmixJobStory(path, conf); 115 final Iterator<JobID> ite = jobids.iterator(); 116 File destFolder = new File(System.getProperty("java.io.tmpdir") 117 + "/gridmix-st/"); 118 destFolder.mkdir(); 119 while (ite.hasNext()) { 120 JobID simuJobId = ite.next(); 121 122 JobHistory.JobInfo jhInfo = getSimulatedJobHistory(simuJobId); 123 Assert.assertNotNull("Job history not found.", jhInfo); 124 Counters counters = 125 Counters.fromEscapedCompactString(jhInfo.getValues() 126 .get(JobHistory.Keys.COUNTERS)); 127 JobConf simuJobConf = getSimulatedJobConf(simuJobId, destFolder); 128 int cnt = 1; 129 do { 130 if (simuJobConf != null) { 131 break; 132 } 133 Thread.sleep(100); 134 simuJobConf = getSimulatedJobConf(simuJobId, destFolder); 135 cnt++; 136 } while(cnt < 30); 137 138 String origJobId = simuJobConf.get(origJobIdKey); 139 LOG.info("OriginalJobID<->CurrentJobID:" 140 + origJobId + "<->" + simuJobId); 141 142 if (userResolverVal == null) { 143 userResolverVal = simuJobConf.get(usrResolver); 144 } 145 146 ZombieJob zombieJob = gjs.getZombieJob(JobID.forName(origJobId)); 147 Map<String, Long> mapJobCounters = getJobMapCounters(zombieJob); 148 Map<String, Long> reduceJobCounters = getJobReduceCounters(zombieJob); 149 if (simuJobConf.get(jobSubKey).contains("REPLAY")) { 150 origSubmissionTime.put(zombieJob.getSubmissionTime(), 151 origJobId.toString() + "^" + simuJobId); 152 simuSubmissionTime.put(Long.parseLong(jhInfo.getValues().get(JobHistory.Keys.SUBMIT_TIME)), 153 origJobId.toString() + "^" + simuJobId); ; 154 } 155 156 LOG.info("Verifying the job <" + simuJobId + "> and wait for a while..."); 157 verifySimulatedJobSummary(zombieJob, jhInfo, simuJobConf); 158 verifyJobMapCounters(counters, mapJobCounters, simuJobConf); 159 verifyJobReduceCounters(counters, reduceJobCounters, simuJobConf); 160 verifyCompressionEmulation(zombieJob.getJobConf(), simuJobConf, counters, 161 reduceJobCounters, mapJobCounters); 162 verifyDistributeCache(zombieJob,simuJobConf); 163 setJobDistributedCacheInfo(simuJobId.toString(), simuJobConf, 164 zombieJob.getJobConf()); 165 verifyHighRamMemoryJobs(zombieJob, simuJobConf); 166 verifyCPUEmulationOfJobs(zombieJob, jhInfo, simuJobConf); 167 verifyMemoryEmulationOfJobs(zombieJob, jhInfo, simuJobConf); 168 LOG.info("Done."); 169 } 170 verifyDistributedCacheBetweenJobs(simuAndOrigJobsInfo); 171 } 172 173 /** 174 * Verify the job submission order between the jobs in replay mode. 175 * @param origSubmissionTime - sorted map of original jobs submission times. 176 * @param simuSubmissionTime - sorted map of simulated jobs submission times. 177 */ verifyJobSumissionTime(SortedMap<Long, String> origSubmissionTime, SortedMap<Long, String> simuSubmissionTime)178 public void verifyJobSumissionTime(SortedMap<Long, String> origSubmissionTime, 179 SortedMap<Long, String> simuSubmissionTime) { 180 Assert.assertEquals("Simulated job's submission time count has " 181 + "not match with Original job's submission time count.", 182 origSubmissionTime.size(), simuSubmissionTime.size()); 183 for ( int index = 0; index < origSubmissionTime.size(); index ++) { 184 String origAndSimuJobID = origSubmissionTime.get(index); 185 String simuAndorigJobID = simuSubmissionTime.get(index); 186 Assert.assertEquals("Simulated jobs have not submitted in same " 187 + "order as original jobs submitted in REPLAY mode.", 188 origAndSimuJobID, simuAndorigJobID); 189 } 190 } 191 192 /** 193 * It verifies the simulated job map counters. 194 * @param counters - Original job map counters. 195 * @param mapJobCounters - Simulated job map counters. 196 * @param jobConf - Simulated job configuration. 197 * @throws ParseException - If an parser error occurs. 198 */ verifyJobMapCounters(Counters counters, Map<String,Long> mapCounters, JobConf jobConf)199 public void verifyJobMapCounters(Counters counters, 200 Map<String,Long> mapCounters, JobConf jobConf) throws ParseException { 201 if (!jobConf.get(jobTypeKey, "LOADJOB").equals("SLEEPJOB")) { 202 Assert.assertEquals("Map input records have not matched.", 203 mapCounters.get("MAP_INPUT_RECS").longValue(), 204 getCounterValue(counters, "MAP_INPUT_RECORDS")); 205 } else { 206 Assert.assertTrue("Map Input Bytes are zero", 207 getCounterValue(counters,"HDFS_BYTES_READ") != 0); 208 Assert.assertNotNull("Map Input Records are zero", 209 getCounterValue(counters, "MAP_INPUT_RECORDS")!=0); 210 } 211 } 212 213 /** 214 * It verifies the simulated job reduce counters. 215 * @param counters - Original job reduce counters. 216 * @param reduceCounters - Simulated job reduce counters. 217 * @param jobConf - simulated job configuration. 218 * @throws ParseException - if an parser error occurs. 219 */ verifyJobReduceCounters(Counters counters, Map<String,Long> reduceCounters, JobConf jobConf)220 public void verifyJobReduceCounters(Counters counters, 221 Map<String,Long> reduceCounters, JobConf jobConf) throws ParseException { 222 if (jobConf.get(jobTypeKey, "LOADJOB").equals("SLEEPJOB")) { 223 Assert.assertTrue("Reduce output records are not zero for sleep job.", 224 getCounterValue(counters, "REDUCE_OUTPUT_RECORDS") == 0); 225 Assert.assertTrue("Reduce output bytes are not zero for sleep job.", 226 getCounterValue(counters,"HDFS_BYTES_WRITTEN") == 0); 227 } 228 } 229 230 /** 231 * It verifies the gridmix simulated job summary. 232 * @param zombieJob - Original job summary. 233 * @param jhInfo - Simulated job history info. 234 * @param jobConf - simulated job configuration. 235 * @throws IOException - if an I/O error occurs. 236 */ verifySimulatedJobSummary(ZombieJob zombieJob, JobHistory.JobInfo jhInfo, JobConf jobConf)237 public void verifySimulatedJobSummary(ZombieJob zombieJob, 238 JobHistory.JobInfo jhInfo, JobConf jobConf) throws IOException { 239 Assert.assertEquals("Job id has not matched", zombieJob.getJobID(), 240 JobID.forName(jobConf.get(origJobIdKey))); 241 242 Assert.assertEquals("Job maps have not matched", String.valueOf(zombieJob.getNumberMaps()), 243 jhInfo.getValues().get(JobHistory.Keys.TOTAL_MAPS)); 244 245 if (!jobConf.getBoolean(mapTaskKey, false)) { 246 Assert.assertEquals("Job reducers have not matched", 247 String.valueOf(zombieJob.getNumberReduces()), jhInfo.getValues().get(JobHistory.Keys.TOTAL_REDUCES)); 248 } else { 249 Assert.assertEquals("Job reducers have not matched", 250 0, Integer.parseInt(jhInfo.getValues().get(JobHistory.Keys.TOTAL_REDUCES))); 251 } 252 253 Assert.assertEquals("Job status has not matched.", 254 zombieJob.getOutcome().name(), 255 convertJobStatus(jhInfo.getValues().get(JobHistory.Keys.JOB_STATUS))); 256 257 LoggedJob loggedJob = zombieJob.getLoggedJob(); 258 Assert.assertEquals("Job priority has not matched.", 259 loggedJob.getPriority().toString(), 260 jhInfo.getValues().get(JobHistory.Keys.JOB_PRIORITY)); 261 262 if (jobConf.get(usrResolver).contains("RoundRobin")) { 263 String user = UserGroupInformation.getLoginUser().getShortUserName(); 264 Assert.assertTrue(jhInfo.getValues().get(JobHistory.Keys.JOBID).toString() 265 + " has not impersonate with other user.", 266 !jhInfo.getValues().get(JobHistory.Keys.USER).equals(user)); 267 } 268 } 269 270 /** 271 * Get the original job map counters from a trace. 272 * @param zombieJob - Original job story. 273 * @return - map counters as a map. 274 */ getJobMapCounters(ZombieJob zombieJob)275 public Map<String, Long> getJobMapCounters(ZombieJob zombieJob) { 276 long expMapInputBytes = 0; 277 long expMapOutputBytes = 0; 278 long expMapInputRecs = 0; 279 long expMapOutputRecs = 0; 280 Map<String,Long> mapCounters = new HashMap<String,Long>(); 281 for (int index = 0; index < zombieJob.getNumberMaps(); index ++) { 282 TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index); 283 expMapInputBytes += mapTask.getInputBytes(); 284 expMapOutputBytes += mapTask.getOutputBytes(); 285 expMapInputRecs += mapTask.getInputRecords(); 286 expMapOutputRecs += mapTask.getOutputRecords(); 287 } 288 mapCounters.put("MAP_INPUT_BYTES", expMapInputBytes); 289 mapCounters.put("MAP_OUTPUT_BYTES", expMapOutputBytes); 290 mapCounters.put("MAP_INPUT_RECS", expMapInputRecs); 291 mapCounters.put("MAP_OUTPUT_RECS", expMapOutputRecs); 292 return mapCounters; 293 } 294 295 /** 296 * Get the original job reduce counters from a trace. 297 * @param zombieJob - Original job story. 298 * @return - reduce counters as a map. 299 */ getJobReduceCounters(ZombieJob zombieJob)300 public Map<String,Long> getJobReduceCounters(ZombieJob zombieJob) { 301 long expReduceInputBytes = 0; 302 long expReduceOutputBytes = 0; 303 long expReduceInputRecs = 0; 304 long expReduceOutputRecs = 0; 305 Map<String,Long> reduceCounters = new HashMap<String,Long>(); 306 for (int index = 0; index < zombieJob.getNumberReduces(); index ++) { 307 TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index); 308 expReduceInputBytes += reduceTask.getInputBytes(); 309 expReduceOutputBytes += reduceTask.getOutputBytes(); 310 expReduceInputRecs += reduceTask.getInputRecords(); 311 expReduceOutputRecs += reduceTask.getOutputRecords(); 312 } 313 reduceCounters.put("REDUCE_INPUT_BYTES", expReduceInputBytes); 314 reduceCounters.put("REDUCE_OUTPUT_BYTES", expReduceOutputBytes); 315 reduceCounters.put("REDUCE_INPUT_RECS", expReduceInputRecs); 316 reduceCounters.put("REDUCE_OUTPUT_RECS", expReduceOutputRecs); 317 return reduceCounters; 318 } 319 320 /** 321 * Get the simulated job configuration of a job. 322 * @param simulatedJobID - Simulated job id. 323 * @param tmpJHFolder - temporary job history folder location. 324 * @return - simulated job configuration. 325 * @throws IOException - If an I/O error occurs. 326 */ getSimulatedJobConf(JobID simulatedJobID, File tmpJHFolder)327 public JobConf getSimulatedJobConf(JobID simulatedJobID, File tmpJHFolder) 328 throws IOException, InterruptedException { 329 FileSystem fs = null; 330 try { 331 332 String historyFilePath = jtClient.getProxy() 333 .getJobHistoryLocationForRetiredJob(simulatedJobID); 334 int cnt = 0; 335 do { 336 if (historyFilePath != null) { 337 break; 338 } 339 Thread.sleep(100); 340 historyFilePath = jtClient.getProxy() 341 .getJobHistoryLocationForRetiredJob(simulatedJobID); 342 cnt++; 343 } while( cnt < 30 ); 344 Assert.assertNotNull("History file has not available for the job [" 345 + simulatedJobID + "] for 3 secs.", historyFilePath); 346 Path jhpath = new Path(historyFilePath); 347 LOG.info("Parent:" + jhpath.getParent()); 348 fs = jhpath.getFileSystem(conf); 349 fs.copyToLocalFile(jhpath,new Path(tmpJHFolder.toString())); 350 fs.copyToLocalFile(new Path(jhpath.getParent() + "/" + simulatedJobID + "_conf.xml"), 351 new Path(tmpJHFolder.toString())); 352 JobConf jobConf = new JobConf(); 353 jobConf.addResource(new Path(tmpJHFolder.toString() 354 + "/" + simulatedJobID + "_conf.xml")); 355 jobConf.reloadConfiguration(); 356 return jobConf; 357 358 }finally { 359 fs.close(); 360 } 361 } 362 363 /** 364 * Get the simulated job history of a job. 365 * @param simulatedJobID - simulated job id. 366 * @return - simulated job information. 367 * @throws IOException - if an I/O error occurs. 368 */ getSimulatedJobHistory(JobID simulatedJobID)369 public JobHistory.JobInfo getSimulatedJobHistory(JobID simulatedJobID) 370 throws IOException, InterruptedException { 371 FileSystem fs = null; 372 try { 373 String historyFilePath = jtClient.getProxy(). 374 getJobHistoryLocationForRetiredJob(simulatedJobID); 375 int cnt = 0; 376 do { 377 if (historyFilePath != null) { 378 break; 379 } 380 Thread.sleep(100); 381 historyFilePath = jtClient.getProxy() 382 .getJobHistoryLocationForRetiredJob(simulatedJobID); 383 cnt++; 384 } while( cnt < 30 ); 385 LOG.info("HistoryFilePath:" + historyFilePath); 386 Assert.assertNotNull("History file path has not found for a job[" 387 + simulatedJobID + "] for 3 secs."); 388 Path jhpath = new Path(historyFilePath); 389 fs = jhpath.getFileSystem(conf); 390 JobHistory.JobInfo jobInfo = 391 new JobHistory.JobInfo(simulatedJobID.toString()); 392 DefaultJobHistoryParser.parseJobTasks(historyFilePath, jobInfo, fs); 393 return jobInfo; 394 } finally { 395 fs.close(); 396 } 397 } 398 399 /** 400 * It verifies the cpu resource usage of gridmix jobs against 401 * the original job cpu resource usage. 402 * @param origJobHistory - Original job history. 403 * @param simuJobHistoryInfo - Simulated job history. 404 * @param simuJobConf - simulated job configuration. 405 */ verifyCPUEmulationOfJobs(ZombieJob origJobHistory, JobHistory.JobInfo simuJobHistoryInfo, JobConf simuJobConf)406 public void verifyCPUEmulationOfJobs(ZombieJob origJobHistory, 407 JobHistory.JobInfo simuJobHistoryInfo, 408 JobConf simuJobConf) throws Exception { 409 boolean isCPUEmulON = false; 410 if (simuJobConf.get(GridMixConfig.GRIDMIX_CPU_EMULATION) != null) { 411 isCPUEmulON = 412 simuJobConf.get(GridMixConfig.GRIDMIX_CPU_EMULATION). 413 contains(GridMixConfig.GRIDMIX_CPU_EMULATION_PLUGIN); 414 } 415 416 if (isCPUEmulON) { 417 Map<String,Long> origJobMetrics = 418 getOriginalJobCPUMetrics(origJobHistory); 419 Map<String,Long> simuJobMetrics = 420 getSimulatedJobCPUMetrics(simuJobHistoryInfo); 421 422 long origMapUsage = origJobMetrics.get("MAP"); 423 LOG.info("Total cpu usage of Maps for a original job:" + origMapUsage); 424 425 long origReduceUsage = origJobMetrics.get("REDUCE"); 426 LOG.info("Total cpu usage of Reduces for a original job:" 427 + origReduceUsage); 428 429 long simuMapUsage = simuJobMetrics.get("MAP"); 430 LOG.info("Total cpu usage of Maps for a simulated job:" + simuMapUsage); 431 432 long simuReduceUsage = simuJobMetrics.get("REDUCE"); 433 LOG.info("Total cpu usage of Reduces for a simulated job:" 434 + simuReduceUsage); 435 436 int mapCount = Integer.parseInt( 437 simuJobHistoryInfo.getValues().get(JobHistory.Keys.TOTAL_MAPS)); 438 int reduceCount = Integer.parseInt( 439 simuJobHistoryInfo.getValues().get(JobHistory.Keys.TOTAL_REDUCES)); 440 441 if (mapCount > 0) { 442 double mapEmulFactor = (simuMapUsage * 100) / origMapUsage; 443 long mapEmulAccuracy = Math.round(mapEmulFactor); 444 LOG.info("CPU emulation accuracy for maps in job " + 445 simuJobHistoryInfo.getValues().get(JobHistory.Keys.JOBID) + 446 ":"+ mapEmulAccuracy + "%"); 447 Assert.assertTrue("Map-side cpu emulaiton inaccurate!" + 448 " Actual cpu usage: " + simuMapUsage + 449 " Expected cpu usage: " + origMapUsage, mapEmulAccuracy 450 >= GridMixConfig.GRIDMIX_CPU_EMULATION_LOWER_LIMIT 451 && mapEmulAccuracy 452 <= GridMixConfig.GRIDMIX_CPU_EMULATION_UPPER_LIMIT); 453 } 454 455 if (reduceCount >0) { 456 double reduceEmulFactor = (simuReduceUsage * 100) / origReduceUsage; 457 long reduceCPUUsage = simuReduceUsage / 1000; 458 LOG.info("Reduce CPU Usage:" + reduceCPUUsage); 459 LOG.info("Reduce emulation factor:" + reduceEmulFactor); 460 long reduceEmulAccuracy = Math.round(reduceEmulFactor); 461 LOG.info("CPU emulation accuracy for reduces in job " + 462 simuJobHistoryInfo.getValues().get(JobHistory.Keys.JOBID) + 463 ": " + reduceEmulAccuracy + "%"); 464 if ( reduceCPUUsage >= 10 ) { 465 Assert.assertTrue("Reduce side cpu emulaiton inaccurate!" + 466 " Actual cpu usage:" + simuReduceUsage + 467 "Expected cpu usage: " + origReduceUsage, 468 reduceEmulAccuracy 469 >= GridMixConfig.GRIDMIX_CPU_EMULATION_LOWER_LIMIT 470 && reduceEmulAccuracy 471 <= GridMixConfig.GRIDMIX_CPU_EMULATION_UPPER_LIMIT); 472 } else { 473 Assert.assertTrue("Reduce side cpu emulaiton inaccurate!" + 474 " Actual cpu usage:" + simuReduceUsage + 475 "Expected cpu usage: " + origReduceUsage, 476 reduceEmulAccuracy 477 >= 60 && reduceEmulAccuracy <= 100); 478 } 479 } 480 } 481 } 482 483 /** 484 * It verifies the heap memory resource usage of gridmix jobs with 485 * corresponding original job in the trace. 486 * @param zombieJob - Original job history. 487 * @param jhInfo - Simulated job history. 488 * @param simuJobConf - simulated job configuration. 489 */ verifyMemoryEmulationOfJobs(ZombieJob zombieJob, JobHistory.JobInfo jhInfo, JobConf simuJobConf)490 public void verifyMemoryEmulationOfJobs(ZombieJob zombieJob, 491 JobHistory.JobInfo jhInfo, JobConf simuJobConf) throws Exception { 492 long origJobMapsTHU = 0; 493 long origJobReducesTHU = 0; 494 long simuJobMapsTHU = 0; 495 long simuJobReducesTHU = 0; 496 boolean isMemEmulOn = false; 497 String strHeapRatio = "0.3F"; 498 499 if (simuJobConf.get(GridMixConfig.GRIDMIX_MEMORY_EMULATION) != null) { 500 isMemEmulOn = 501 simuJobConf.get(GridMixConfig.GRIDMIX_MEMORY_EMULATION). 502 contains(GridMixConfig.GRIDMIX_MEMORY_EMULATION_PLUGIN); 503 } 504 505 if (isMemEmulOn) { 506 507 for (int index = 0; index < zombieJob.getNumberMaps(); index ++) { 508 TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index); 509 if (mapTask.getResourceUsageMetrics().getHeapUsage() > 0) { 510 origJobMapsTHU += 511 mapTask.getResourceUsageMetrics().getHeapUsage(); 512 } 513 } 514 LOG.info("Total Heap Usage of Maps for original job: " 515 + origJobMapsTHU); 516 517 for (int index = 0; index < zombieJob.getNumberReduces(); index ++) { 518 TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index); 519 if (reduceTask.getResourceUsageMetrics().getHeapUsage() > 0) { 520 origJobReducesTHU += 521 reduceTask.getResourceUsageMetrics().getHeapUsage(); 522 } 523 } 524 LOG.info("Total Heap Usage of Reduces for original job: " 525 + origJobReducesTHU); 526 527 Counters mapCounters = 528 Counters.fromEscapedCompactString(jhInfo.getValues() 529 .get(JobHistory.Keys.MAP_COUNTERS)); 530 531 Counters reduceCounters = 532 Counters.fromEscapedCompactString(jhInfo.getValues() 533 .get(JobHistory.Keys.REDUCE_COUNTERS)); 534 535 simuJobMapsTHU = 536 getCounterValue(mapCounters, 537 Task.Counter.COMMITTED_HEAP_BYTES.toString()); 538 LOG.info("Simulated Job Maps Total Heap Usage: " + simuJobMapsTHU); 539 540 simuJobReducesTHU = 541 getCounterValue(reduceCounters, 542 Task.Counter.COMMITTED_HEAP_BYTES.toString()); 543 LOG.info("Simulated Jobs Reduces Total Heap Usage: " + simuJobReducesTHU); 544 545 long mapCount = 546 Integer.parseInt(jhInfo.getValues() 547 .get(JobHistory.Keys.TOTAL_MAPS)); 548 long reduceCount = 549 Integer.parseInt(jhInfo.getValues() 550 .get(JobHistory.Keys.TOTAL_REDUCES)); 551 552 if (simuJobConf.get(GridMixConfig 553 .GRIDMIX_HEAP_FREE_MEMORY_RATIO) != null) { 554 strHeapRatio = "0.3F"; 555 } 556 557 if (mapCount > 0) { 558 double mapEmulFactor = (simuJobMapsTHU * 100) / origJobMapsTHU; 559 long mapEmulAccuracy = Math.round(mapEmulFactor); 560 LOG.info("Maps memory emulation accuracy of a job:" 561 + mapEmulAccuracy + "%"); 562 Assert.assertTrue("Map phase total memory emulation had crossed the " 563 + "configured max limit.", mapEmulAccuracy 564 <= GridMixConfig.GRIDMIX_MEMORY_EMULATION_UPPER_LIMIT); 565 Assert.assertTrue("Map phase total memory emulation had not crossed " 566 + "the configured min limit.", mapEmulAccuracy 567 >= GridMixConfig.GRIDMIX_MEMORY_EMULATION_LOWER_LIMIT); 568 double expHeapRatio = Double.parseDouble(strHeapRatio); 569 LOG.info("expHeapRatio for maps:" + expHeapRatio); 570 double actHeapRatio = 571 ((double)Math.abs(origJobMapsTHU - simuJobMapsTHU)) ; 572 actHeapRatio /= origJobMapsTHU; 573 LOG.info("actHeapRatio for maps:" + actHeapRatio); 574 Assert.assertTrue("Simulate job maps heap ratio not matched.", 575 actHeapRatio <= expHeapRatio); 576 } 577 578 if (reduceCount >0) { 579 double reduceEmulFactor = (simuJobReducesTHU * 100) / origJobReducesTHU; 580 long reduceEmulAccuracy = Math.round(reduceEmulFactor); 581 LOG.info("Reduces memory emulation accuracy of a job:" 582 + reduceEmulAccuracy + "%"); 583 Assert.assertTrue("Reduce phase total memory emulation had crossed " 584 + "configured max limit.", reduceEmulAccuracy 585 <= GridMixConfig.GRIDMIX_MEMORY_EMULATION_UPPER_LIMIT); 586 Assert.assertTrue("Reduce phase total memory emulation had not " 587 + "crosssed configured min limit.", reduceEmulAccuracy 588 >= GridMixConfig.GRIDMIX_MEMORY_EMULATION_LOWER_LIMIT); 589 double expHeapRatio = Double.parseDouble(strHeapRatio); 590 LOG.info("expHeapRatio for reduces:" + expHeapRatio); 591 double actHeapRatio = 592 ((double)Math.abs(origJobReducesTHU - simuJobReducesTHU)); 593 actHeapRatio /= origJobReducesTHU; 594 LOG.info("actHeapRatio for reduces:" + actHeapRatio); 595 Assert.assertTrue("Simulate job reduces heap ratio not matched.", 596 actHeapRatio <= expHeapRatio); 597 } 598 } 599 } 600 601 /** 602 * Get the simulated job cpu metrics. 603 * @param jhInfo - Simulated job history 604 * @return - cpu metrics as a map. 605 * @throws Exception - if an error occurs. 606 */ getSimulatedJobCPUMetrics( JobHistory.JobInfo jhInfo)607 private Map<String,Long> getSimulatedJobCPUMetrics( 608 JobHistory.JobInfo jhInfo) throws Exception { 609 Map<String, Long> resourceMetrics = new HashMap<String, Long>(); 610 Counters mapCounters = Counters.fromEscapedCompactString( 611 jhInfo.getValues().get(JobHistory.Keys.MAP_COUNTERS)); 612 long mapCPUUsage = 613 getCounterValue(mapCounters, 614 Task.Counter.CPU_MILLISECONDS.toString()); 615 resourceMetrics.put("MAP", mapCPUUsage); 616 617 Counters reduceCounters = Counters.fromEscapedCompactString( 618 jhInfo.getValues().get(JobHistory.Keys.REDUCE_COUNTERS)); 619 long reduceCPUUsage = 620 getCounterValue(reduceCounters, 621 Task.Counter.CPU_MILLISECONDS.toString()); 622 resourceMetrics.put("REDUCE", reduceCPUUsage); 623 return resourceMetrics; 624 } 625 626 /** 627 * Get the original job cpu metrics. 628 * @param zombieJob - original job history. 629 * @return - cpu metrics as map. 630 */ getOriginalJobCPUMetrics(ZombieJob zombieJob)631 private Map<String, Long> getOriginalJobCPUMetrics(ZombieJob zombieJob) { 632 long mapTotalCPUUsage = 0; 633 long reduceTotalCPUUsage = 0; 634 Map<String,Long> resourceMetrics = new HashMap<String,Long>(); 635 636 for (int index = 0; index < zombieJob.getNumberMaps(); index++) { 637 TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index); 638 if (mapTask.getResourceUsageMetrics().getCumulativeCpuUsage() > 0) { 639 mapTotalCPUUsage += 640 mapTask.getResourceUsageMetrics().getCumulativeCpuUsage(); 641 } 642 } 643 resourceMetrics.put("MAP", mapTotalCPUUsage); 644 645 for (int index = 0; index < zombieJob.getNumberReduces(); index++) { 646 TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index); 647 if (reduceTask.getResourceUsageMetrics().getCumulativeCpuUsage() > 0) { 648 reduceTotalCPUUsage += 649 reduceTask.getResourceUsageMetrics().getCumulativeCpuUsage(); 650 } 651 } 652 resourceMetrics.put("REDUCE", reduceTotalCPUUsage); 653 return resourceMetrics; 654 } 655 656 /** 657 * Get the user resolver of a job. 658 */ getJobUserResolver()659 public String getJobUserResolver() { 660 return userResolverVal; 661 } 662 663 /** 664 * It verifies the compression ratios of mapreduce jobs. 665 * @param origJobConf - original job configuration. 666 * @param simuJobConf - simulated job configuration. 667 * @param counters - simulated job counters. 668 * @param origReduceCounters - original job reduce counters. 669 * @param origMapCounters - original job map counters. 670 * @throws ParseException - if a parser error occurs. 671 * @throws IOException - if an I/O error occurs. 672 */ verifyCompressionEmulation(JobConf origJobConf, JobConf simuJobConf,Counters counters, Map<String, Long> origReduceCounters, Map<String, Long> origMapJobCounters)673 public void verifyCompressionEmulation(JobConf origJobConf, 674 JobConf simuJobConf,Counters counters, 675 Map<String, Long> origReduceCounters, 676 Map<String, Long> origMapJobCounters) 677 throws ParseException,IOException { 678 if (simuJobConf.getBoolean(compEmulKey, false)) { 679 String inputDir = origJobConf.get(fileInputFormatKey); 680 Assert.assertNotNull(fileInputFormatKey + " is Null",inputDir); 681 long simMapInputBytes = getCounterValue(counters, "HDFS_BYTES_READ"); 682 long uncompressedInputSize = origMapJobCounters.get("MAP_INPUT_BYTES"); 683 long simReduceInputBytes = 684 getCounterValue(counters, "REDUCE_SHUFFLE_BYTES"); 685 long simMapOutputBytes = getCounterValue(counters, "MAP_OUTPUT_BYTES"); 686 687 // Verify input compression whether it's enable or not. 688 if (inputDir.contains(".gz") || inputDir.contains(".tgz") 689 || inputDir.contains(".bz")) { 690 Assert.assertTrue("Input decompression attribute has been not set for " 691 + "for compressed input", 692 simuJobConf.getBoolean(inputDecompKey, false)); 693 694 float INPUT_COMP_RATIO = 695 getExpectedCompressionRatio(simuJobConf, mapInputCompRatio); 696 float INTERMEDIATE_COMP_RATIO = 697 getExpectedCompressionRatio(simuJobConf, mapOutputCompRatio); 698 699 // Verify Map Input Compression Ratio. 700 assertMapInputCompressionRatio(simMapInputBytes, uncompressedInputSize, 701 INPUT_COMP_RATIO); 702 703 // Verify Map Output Compression Ratio. 704 assertMapOuputCompressionRatio(simReduceInputBytes, simMapOutputBytes, 705 INTERMEDIATE_COMP_RATIO); 706 } else { 707 Assert.assertEquals("MAP input bytes has not matched.", 708 convertBytes(uncompressedInputSize), 709 convertBytes(simMapInputBytes)); 710 } 711 712 Assert.assertEquals("Simulated job output format has not matched with " 713 + "original job output format.", 714 origJobConf.getBoolean(fileOutputFormatKey,false), 715 simuJobConf.getBoolean(fileOutputFormatKey,false)); 716 717 if (simuJobConf.getBoolean(fileOutputFormatKey,false)) { 718 float OUTPUT_COMP_RATIO = 719 getExpectedCompressionRatio(simuJobConf, reduceOutputCompRatio); 720 721 //Verify reduce output compression ratio. 722 long simReduceOutputBytes = 723 getCounterValue(counters, "HDFS_BYTES_WRITTEN"); 724 long origReduceOutputBytes = 725 origReduceCounters.get("REDUCE_OUTPUT_BYTES"); 726 assertReduceOutputCompressionRatio(simReduceOutputBytes, 727 origReduceOutputBytes, 728 OUTPUT_COMP_RATIO); 729 } 730 } 731 } 732 assertMapInputCompressionRatio(long simMapInputBytes, long origMapInputBytes, float expInputCompRatio)733 private void assertMapInputCompressionRatio(long simMapInputBytes, 734 long origMapInputBytes, 735 float expInputCompRatio) { 736 LOG.info("***Verify the map input bytes compression ratio****"); 737 LOG.info("Simulated job's map input bytes(REDUCE_SHUFFLE_BYTES): " 738 + simMapInputBytes); 739 LOG.info("Original job's map input bytes: " + origMapInputBytes); 740 741 final float actInputCompRatio = 742 getActualCompressionRatio(simMapInputBytes, origMapInputBytes); 743 LOG.info("Expected Map Input Compression Ratio:" + expInputCompRatio); 744 LOG.info("Actual Map Input Compression Ratio:" + actInputCompRatio); 745 746 float diffVal = (float)(expInputCompRatio * 0.06); 747 LOG.info("Expected Difference of Map Input Compression Ratio is <= " + 748 + diffVal); 749 float delta = Math.abs(expInputCompRatio - actInputCompRatio); 750 LOG.info("Actual Difference of Map Iput Compression Ratio:" + delta); 751 Assert.assertTrue("Simulated job input compression ratio has mismatched.", 752 delta <= diffVal); 753 LOG.info("******Done******"); 754 } 755 assertMapOuputCompressionRatio(long simReduceInputBytes, long simMapoutputBytes, float expMapOuputCompRatio)756 private void assertMapOuputCompressionRatio(long simReduceInputBytes, 757 long simMapoutputBytes, 758 float expMapOuputCompRatio) { 759 LOG.info("***Verify the map output bytes compression ratio***"); 760 LOG.info("Simulated job reduce input bytes:" + simReduceInputBytes); 761 LOG.info("Simulated job map output bytes:" + simMapoutputBytes); 762 763 final float actMapOutputCompRatio = 764 getActualCompressionRatio(simReduceInputBytes, simMapoutputBytes); 765 LOG.info("Expected Map Output Compression Ratio:" + expMapOuputCompRatio); 766 LOG.info("Actual Map Output Compression Ratio:" + actMapOutputCompRatio); 767 768 float diffVal = 0.05f; 769 LOG.info("Expected Difference Of Map Output Compression Ratio is <= " 770 + diffVal); 771 float delta = Math.abs(expMapOuputCompRatio - actMapOutputCompRatio); 772 LOG.info("Actual Difference Of Map Ouput Compression Ratio :" + delta); 773 774 Assert.assertTrue("Simulated job map output compression ratio " 775 + "has not been matched.", delta <= diffVal); 776 LOG.info("******Done******"); 777 } 778 assertReduceOutputCompressionRatio(long simReduceOutputBytes, long origReduceOutputBytes , float expOutputCompRatio )779 private void assertReduceOutputCompressionRatio(long simReduceOutputBytes, 780 long origReduceOutputBytes , float expOutputCompRatio ) { 781 LOG.info("***Verify the reduce output bytes compression ratio***"); 782 final float actOuputputCompRatio = 783 getActualCompressionRatio(simReduceOutputBytes, origReduceOutputBytes); 784 LOG.info("Simulated job's reduce output bytes:" + simReduceOutputBytes); 785 LOG.info("Original job's reduce output bytes:" + origReduceOutputBytes); 786 LOG.info("Expected output compression ratio:" + expOutputCompRatio); 787 LOG.info("Actual output compression ratio:" + actOuputputCompRatio); 788 long diffVal = (long)(origReduceOutputBytes * 0.15); 789 long delta = Math.abs(origReduceOutputBytes - simReduceOutputBytes); 790 LOG.info("Expected difference of output compressed bytes is <= " 791 + diffVal); 792 LOG.info("Actual difference of compressed ouput bytes:" + delta); 793 Assert.assertTrue("Simulated job reduce output compression ratio " + 794 "has not been matched.", delta <= diffVal); 795 LOG.info("******Done******"); 796 } 797 getExpectedCompressionRatio(JobConf simuJobConf, String RATIO_TYPE)798 private float getExpectedCompressionRatio(JobConf simuJobConf, 799 String RATIO_TYPE) { 800 // Default decompression ratio is 0.50f irrespective of original 801 //job compression ratio. 802 if (simuJobConf.get(RATIO_TYPE) != null) { 803 return Float.parseFloat(simuJobConf.get(RATIO_TYPE)); 804 } else { 805 return 0.50f; 806 } 807 } 808 getActualCompressionRatio(long compressBytes, long uncompessBytes)809 private float getActualCompressionRatio(long compressBytes, 810 long uncompessBytes) { 811 double ratio = ((double)compressBytes) / uncompessBytes; 812 int significant = (int)Math.round(ratio * 100); 813 return ((float)significant)/100; 814 } 815 816 /** 817 * Verify the distributed cache files between the jobs in a gridmix run. 818 * @param jobsInfo - jobConfs of simulated and original jobs as a map. 819 */ verifyDistributedCacheBetweenJobs( Map<String,List<JobConf>> jobsInfo)820 public void verifyDistributedCacheBetweenJobs( 821 Map<String,List<JobConf>> jobsInfo) { 822 if (jobsInfo.size() > 1) { 823 Map<String, Integer> simJobfilesOccurBtnJobs = 824 getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 0); 825 Map<String, Integer> origJobfilesOccurBtnJobs = 826 getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 1); 827 List<Integer> simuOccurList = 828 getMapValuesAsList(simJobfilesOccurBtnJobs); 829 Collections.sort(simuOccurList); 830 List<Integer> origOccurList = 831 getMapValuesAsList(origJobfilesOccurBtnJobs); 832 Collections.sort(origOccurList); 833 Assert.assertEquals("The unique count of distibuted cache files in " 834 + "simulated jobs have not matched with the unique " 835 + "count of original jobs distributed files ", 836 simuOccurList.size(), origOccurList.size()); 837 int index = 0; 838 for (Integer origDistFileCount : origOccurList) { 839 Assert.assertEquals("Distributed cache file reused in simulated " 840 + "jobs has not matched with reused of distributed" 841 + "cache file in original jobs.", 842 origDistFileCount, simuOccurList.get(index)); 843 index ++; 844 } 845 } 846 } 847 848 /** 849 * Get the unique distributed cache files and occurrence between the jobs. 850 * @param jobsInfo - job's configurations as a map. 851 * @param jobConfIndex - 0 for simulated job configuration and 852 * 1 for original jobs configuration. 853 * @return - unique distributed cache files and occurrences as map. 854 */ getDistcacheFilesOccurenceBetweenJobs( Map<String, List<JobConf>> jobsInfo, int jobConfIndex)855 private Map<String, Integer> getDistcacheFilesOccurenceBetweenJobs( 856 Map<String, List<JobConf>> jobsInfo, int jobConfIndex) { 857 Map<String,Integer> filesOccurBtnJobs = new HashMap <String,Integer>(); 858 Set<String> jobIds = jobsInfo.keySet(); 859 Iterator<String > ite = jobIds.iterator(); 860 while (ite.hasNext()) { 861 String jobId = ite.next(); 862 List<JobConf> jobconfs = jobsInfo.get(jobId); 863 String [] distCacheFiles = jobconfs.get(jobConfIndex).get( 864 GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(","); 865 String [] distCacheFileTimeStamps = jobconfs.get(jobConfIndex).get( 866 GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(","); 867 String [] distCacheFileVisib = jobconfs.get(jobConfIndex).get( 868 GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(","); 869 int indx = 0; 870 for (String distCacheFile : distCacheFiles) { 871 String fileAndSize = distCacheFile + "^" 872 + distCacheFileTimeStamps[indx] + "^" 873 + jobconfs.get(jobConfIndex).getUser(); 874 if (filesOccurBtnJobs.get(fileAndSize) != null) { 875 int count = filesOccurBtnJobs.get(fileAndSize); 876 count ++; 877 filesOccurBtnJobs.put(fileAndSize, count); 878 } else { 879 filesOccurBtnJobs.put(fileAndSize, 1); 880 } 881 } 882 } 883 return filesOccurBtnJobs; 884 } 885 886 /** 887 * It verifies the distributed cache emulation of a job. 888 * @param zombieJob - Original job story. 889 * @param simuJobConf - Simulated job configuration. 890 */ verifyDistributeCache(ZombieJob zombieJob, JobConf simuJobConf)891 public void verifyDistributeCache(ZombieJob zombieJob, 892 JobConf simuJobConf) throws IOException { 893 if (simuJobConf.getBoolean(GridMixConfig.GRIDMIX_DISTCACHE_ENABLE, false)) { 894 JobConf origJobConf = zombieJob.getJobConf(); 895 assertFileVisibility(simuJobConf); 896 assertDistcacheFiles(simuJobConf,origJobConf); 897 assertFileSizes(simuJobConf,origJobConf); 898 assertFileStamps(simuJobConf,origJobConf); 899 } else { 900 Assert.assertNull("Configuration has distributed cache visibilites" 901 + "without enabled distributed cache emulation.", 902 simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES)); 903 Assert.assertNull("Configuration has distributed cache files time " 904 + "stamps without enabled distributed cache emulation.", 905 simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP)); 906 Assert.assertNull("Configuration has distributed cache files paths" 907 + "without enabled distributed cache emulation.", 908 simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES)); 909 Assert.assertNull("Configuration has distributed cache files sizes" 910 + "without enabled distributed cache emulation.", 911 simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE)); 912 } 913 } 914 assertFileStamps(JobConf simuJobConf, JobConf origJobConf)915 private void assertFileStamps(JobConf simuJobConf, JobConf origJobConf) { 916 //Verify simulated jobs against distributed cache files time stamps. 917 String [] origDCFTS = 918 origJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(","); 919 String [] simuDCFTS = 920 simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(","); 921 for (int index = 0; index < origDCFTS.length; index++) { 922 Assert.assertTrue("Invalid time stamps between original " 923 +"and simulated job", Long.parseLong(origDCFTS[index]) 924 < Long.parseLong(simuDCFTS[index])); 925 } 926 } 927 assertFileVisibility(JobConf simuJobConf )928 private void assertFileVisibility(JobConf simuJobConf ) { 929 // Verify simulated jobs against distributed cache files visibilities. 930 String [] distFiles = 931 simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(","); 932 String [] simuDistVisibilities = 933 simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(","); 934 List<Boolean> expFileVisibility = new ArrayList<Boolean >(); 935 int index = 0; 936 for (String distFile : distFiles) { 937 boolean isLocalDistCache = GridmixSystemTestCase.isLocalDistCache( 938 distFile, 939 simuJobConf.getUser(), 940 Boolean.valueOf(simuDistVisibilities[index])); 941 if (!isLocalDistCache) { 942 expFileVisibility.add(true); 943 } else { 944 expFileVisibility.add(false); 945 } 946 index ++; 947 } 948 index = 0; 949 for (String actFileVisibility : simuDistVisibilities) { 950 Assert.assertEquals("Simulated job distributed cache file " 951 + "visibilities has not matched.", 952 expFileVisibility.get(index), 953 Boolean.valueOf(actFileVisibility)); 954 index ++; 955 } 956 } 957 assertDistcacheFiles(JobConf simuJobConf, JobConf origJobConf)958 private void assertDistcacheFiles(JobConf simuJobConf, JobConf origJobConf) 959 throws IOException { 960 //Verify simulated jobs against distributed cache files. 961 String [] origDistFiles = origJobConf.get( 962 GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(","); 963 String [] simuDistFiles = simuJobConf.get( 964 GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(","); 965 String [] simuDistVisibilities = simuJobConf.get( 966 GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(","); 967 Assert.assertEquals("No. of simulatued job's distcache files mismacted" 968 + "with no.of original job's distcache files", 969 origDistFiles.length, simuDistFiles.length); 970 971 int index = 0; 972 for (String simDistFile : simuDistFiles) { 973 Path distPath = new Path(simDistFile); 974 boolean isLocalDistCache = 975 GridmixSystemTestCase.isLocalDistCache(simDistFile, 976 simuJobConf.getUser(), 977 Boolean.valueOf(simuDistVisibilities[index])); 978 if (!isLocalDistCache) { 979 FileSystem fs = distPath.getFileSystem(conf); 980 FileStatus fstat = fs.getFileStatus(distPath); 981 FsPermission permission = fstat.getPermission(); 982 Assert.assertTrue("HDFS distributed cache file has wrong " 983 + "permissions for users.", 984 FsAction.READ_WRITE.SYMBOL 985 == permission.getUserAction().SYMBOL); 986 Assert.assertTrue("HDFS distributed cache file has wrong " 987 + "permissions for groups.", 988 FsAction.READ.SYMBOL 989 == permission.getGroupAction().SYMBOL); 990 Assert.assertTrue("HDSFS distributed cache file has wrong " 991 + "permissions for others.", 992 FsAction.READ.SYMBOL 993 == permission.getOtherAction().SYMBOL); 994 } 995 index++; 996 } 997 } 998 assertFileSizes(JobConf simuJobConf, JobConf origJobConf)999 private void assertFileSizes(JobConf simuJobConf, JobConf origJobConf) { 1000 // Verify simulated jobs against distributed cache files size. 1001 List<String> origDistFilesSize = 1002 Arrays.asList(origJobConf.get( 1003 GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(",")); 1004 Collections.sort(origDistFilesSize); 1005 1006 List<String> simuDistFilesSize = 1007 Arrays.asList(simuJobConf.get( 1008 GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(",")); 1009 Collections.sort(simuDistFilesSize); 1010 1011 Assert.assertEquals("Simulated job's file size list has not " 1012 + "matched with the Original job's file size list.", 1013 origDistFilesSize.size(), 1014 simuDistFilesSize.size()); 1015 1016 for (int index = 0; index < origDistFilesSize.size(); index ++) { 1017 Assert.assertEquals("Simulated job distcache file size has not " 1018 + "matched with original job distcache file size.", 1019 origDistFilesSize.get(index), 1020 simuDistFilesSize.get(index)); 1021 } 1022 } 1023 setJobDistributedCacheInfo(String jobId, JobConf simuJobConf, JobConf origJobConf)1024 private void setJobDistributedCacheInfo(String jobId, JobConf simuJobConf, 1025 JobConf origJobConf) { 1026 if (simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES) != null) { 1027 List<JobConf> jobConfs = new ArrayList<JobConf>(); 1028 jobConfs.add(simuJobConf); 1029 jobConfs.add(origJobConf); 1030 simuAndOrigJobsInfo.put(jobId,jobConfs); 1031 } 1032 } 1033 getMapValuesAsList(Map<String,Integer> jobOccurs)1034 private List<Integer> getMapValuesAsList(Map<String,Integer> jobOccurs) { 1035 List<Integer> occursList = new ArrayList<Integer>(); 1036 Set<String> files = jobOccurs.keySet(); 1037 Iterator<String > ite = files.iterator(); 1038 while (ite.hasNext()) { 1039 String file = ite.next(); 1040 occursList.add(jobOccurs.get(file)); 1041 } 1042 return occursList; 1043 } 1044 1045 /** 1046 * It verifies the high ram gridmix jobs. 1047 * @param zombieJob - Original job story. 1048 * @param simuJobConf - Simulated job configuration. 1049 */ 1050 @SuppressWarnings("deprecation") verifyHighRamMemoryJobs(ZombieJob zombieJob, JobConf simuJobConf)1051 public void verifyHighRamMemoryJobs(ZombieJob zombieJob, 1052 JobConf simuJobConf) { 1053 JobConf origJobConf = zombieJob.getJobConf(); 1054 int origMapFactor = getMapFactor(origJobConf); 1055 int origReduceFactor = getReduceFactor(origJobConf); 1056 boolean isHighRamEnable = 1057 simuJobConf.getBoolean(GridMixConfig.GRIDMIX_HIGH_RAM_JOB_ENABLE, 1058 false); 1059 if (isHighRamEnable) { 1060 if (origMapFactor >= 2 && origReduceFactor >= 2) { 1061 assertGridMixHighRamJob(simuJobConf, origJobConf, 1); 1062 } else if(origMapFactor >= 2) { 1063 assertGridMixHighRamJob(simuJobConf, origJobConf, 2); 1064 } else if(origReduceFactor >= 2) { 1065 assertGridMixHighRamJob(simuJobConf, origJobConf, 3); 1066 } 1067 } else { 1068 if (origMapFactor >= 2 && origReduceFactor >= 2) { 1069 assertGridMixHighRamJob(simuJobConf, origJobConf, 4); 1070 } else if(origMapFactor >= 2) { 1071 assertGridMixHighRamJob(simuJobConf, origJobConf, 5); 1072 } else if(origReduceFactor >= 2) { 1073 assertGridMixHighRamJob(simuJobConf, origJobConf, 6); 1074 } 1075 } 1076 } 1077 1078 /** 1079 * Get the value for identifying the slots used by the map. 1080 * @param jobConf - job configuration 1081 * @return - map factor value. 1082 */ getMapFactor(Configuration jobConf)1083 public static int getMapFactor(Configuration jobConf) { 1084 long clusterMapMem = 1085 Long.parseLong(jobConf.get(GridMixConfig.CLUSTER_MAP_MEMORY)); 1086 long jobMapMem = 1087 Long.parseLong(jobConf.get(GridMixConfig.JOB_MAP_MEMORY_MB)); 1088 return (int)Math.ceil((double)jobMapMem / clusterMapMem); 1089 } 1090 1091 /** 1092 * Get the value for identifying the slots used by the reduce. 1093 * @param jobConf - job configuration. 1094 * @return - reduce factor value. 1095 */ getReduceFactor(Configuration jobConf)1096 public static int getReduceFactor(Configuration jobConf) { 1097 long clusterReduceMem = 1098 Long.parseLong(jobConf.get(GridMixConfig.CLUSTER_REDUCE_MEMORY)); 1099 long jobReduceMem = 1100 Long.parseLong(jobConf.get(GridMixConfig.JOB_REDUCE_MEMORY_MB)); 1101 return (int)Math.ceil((double)jobReduceMem / clusterReduceMem); 1102 } 1103 1104 @SuppressWarnings("deprecation") assertGridMixHighRamJob(JobConf simuJobConf, Configuration origConf, int option)1105 private void assertGridMixHighRamJob(JobConf simuJobConf, 1106 Configuration origConf, int option) { 1107 int simuMapFactor = getMapFactor(simuJobConf); 1108 int simuReduceFactor = getReduceFactor(simuJobConf); 1109 /** 1110 * option 1 : Both map and reduce honors the high ram. 1111 * option 2 : Map only honors the high ram. 1112 * option 3 : Reduce only honors the high ram. 1113 * option 4 : Both map and reduce should not honors the high ram 1114 * in disable state. 1115 * option 5 : Map should not honors the high ram in disable state. 1116 * option 6 : Reduce should not honors the high ram in disable state. 1117 */ 1118 switch (option) { 1119 case 1 : 1120 Assert.assertTrue("Gridmix job has not honored the high " 1121 + "ram for map.", simuMapFactor >= 2 1122 && simuMapFactor == getMapFactor(origConf)); 1123 Assert.assertTrue("Gridmix job has not honored the high " 1124 + "ram for reduce.", simuReduceFactor >= 2 1125 && simuReduceFactor 1126 == getReduceFactor(origConf)); 1127 break; 1128 case 2 : 1129 Assert.assertTrue("Gridmix job has not honored the high " 1130 + "ram for map.", simuMapFactor >= 2 1131 && simuMapFactor == getMapFactor(origConf)); 1132 break; 1133 case 3 : 1134 Assert.assertTrue("Girdmix job has not honored the high " 1135 + "ram for reduce.", simuReduceFactor >= 2 1136 && simuReduceFactor 1137 == getReduceFactor(origConf)); 1138 break; 1139 case 4 : 1140 Assert.assertTrue("Gridmix job has honored the high " 1141 + "ram for map in emulation disable state.", 1142 simuMapFactor < 2 1143 && simuMapFactor != getMapFactor(origConf)); 1144 Assert.assertTrue("Gridmix job has honored the high " 1145 + "ram for reduce in emulation disable state.", 1146 simuReduceFactor < 2 1147 && simuReduceFactor 1148 != getReduceFactor(origConf)); 1149 break; 1150 case 5 : 1151 Assert.assertTrue("Gridmix job has honored the high " 1152 + "ram for map in emulation disable state.", 1153 simuMapFactor < 2 1154 && simuMapFactor != getMapFactor(origConf)); 1155 break; 1156 case 6 : 1157 Assert.assertTrue("Girdmix job has honored the high " 1158 + "ram for reduce in emulation disable state.", 1159 simuReduceFactor < 2 1160 && simuReduceFactor 1161 != getReduceFactor(origConf)); 1162 break; 1163 } 1164 } 1165 1166 /** 1167 * Get task memory after scaling based on cluster configuration. 1168 * @param jobTaskKey - Job task key attribute. 1169 * @param clusterTaskKey - Cluster task key attribute. 1170 * @param origConf - Original job configuration. 1171 * @param simuConf - Simulated job configuration. 1172 * @return scaled task memory value. 1173 */ 1174 @SuppressWarnings("deprecation") getScaledTaskMemInMB(String jobTaskKey, String clusterTaskKey, Configuration origConf, Configuration simuConf)1175 public static long getScaledTaskMemInMB(String jobTaskKey, 1176 String clusterTaskKey, 1177 Configuration origConf, 1178 Configuration simuConf) { 1179 long simuClusterTaskValue = 1180 simuConf.getLong(clusterTaskKey, JobConf.DISABLED_MEMORY_LIMIT); 1181 long origClusterTaskValue = 1182 origConf.getLong(clusterTaskKey, JobConf.DISABLED_MEMORY_LIMIT); 1183 long origJobTaskValue = 1184 origConf.getLong(jobTaskKey, JobConf.DISABLED_MEMORY_LIMIT); 1185 double scaleFactor = 1186 Math.ceil((double)origJobTaskValue / origClusterTaskValue); 1187 long simulatedJobValue = (long)(scaleFactor * simuClusterTaskValue); 1188 return simulatedJobValue; 1189 } 1190 1191 /** 1192 * It Verifies the memory limit of a task. 1193 * @param TaskMemInMB - task memory limit. 1194 * @param taskLimitInMB - task upper limit. 1195 */ verifyMemoryLimits(long TaskMemInMB, long taskLimitInMB)1196 public static void verifyMemoryLimits(long TaskMemInMB, long taskLimitInMB) { 1197 if (TaskMemInMB > taskLimitInMB) { 1198 Assert.fail("Simulated job's task memory exceeds the " 1199 + "upper limit of task virtual memory."); 1200 } 1201 } 1202 convertJobStatus(String jobStatus)1203 private String convertJobStatus(String jobStatus) { 1204 if (jobStatus.equals("SUCCEEDED")) { 1205 return "SUCCESS"; 1206 } else { 1207 return jobStatus; 1208 } 1209 } 1210 convertBytes(long bytesValue)1211 private String convertBytes(long bytesValue) { 1212 int units = 1024; 1213 if( bytesValue < units ) { 1214 return String.valueOf(bytesValue)+ "B"; 1215 } else { 1216 // it converts the bytes into either KB or MB or GB or TB etc. 1217 int exp = (int)(Math.log(bytesValue) / Math.log(units)); 1218 return String.format("%1d%sB",(long)(bytesValue / Math.pow(units, exp)), 1219 "KMGTPE".charAt(exp -1)); 1220 } 1221 } 1222 1223 getCounterValue(Counters counters, String key)1224 private long getCounterValue(Counters counters, String key) 1225 throws ParseException { 1226 for (String groupName : counters.getGroupNames()) { 1227 Group totalGroup = counters.getGroup(groupName); 1228 Iterator<Counter> itrCounter = totalGroup.iterator(); 1229 while (itrCounter.hasNext()) { 1230 Counter counter = itrCounter.next(); 1231 if (counter.getName().equals(key)) { 1232 return counter.getValue(); 1233 } 1234 } 1235 } 1236 return 0; 1237 } 1238 } 1239 1240