1/** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19package org.apache.hadoop.mapred; 20 21import java.io.IOException; 22import java.util.List; 23import java.util.ArrayList; 24import java.util.LinkedList; 25import java.util.Set; 26 27import org.apache.hadoop.conf.Configuration; 28import org.apache.hadoop.fs.FileSystem; 29import org.apache.hadoop.fs.Path; 30import org.apache.hadoop.mapred.JobConf; 31import org.apache.hadoop.mapred.JobStatus; 32import org.apache.hadoop.mapred.JobHistory.Keys; 33import org.apache.hadoop.mapred.JobTracker.RetireJobInfo; 34import org.apache.hadoop.mapred.Counters; 35import org.apache.hadoop.mapreduce.JobID; 36import org.apache.hadoop.mapreduce.TaskID; 37import org.apache.hadoop.mapreduce.TaskType; 38import org.apache.hadoop.mapred.TaskTrackerStatus; 39import org.apache.hadoop.mapred.StatisticsCollector; 40import org.apache.hadoop.mapred.StatisticsCollectionHandler; 41import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; 42import org.apache.hadoop.mapreduce.test.system.JTProtocol; 43import org.apache.hadoop.mapreduce.test.system.JobInfo; 44import org.apache.hadoop.mapreduce.test.system.TTInfo; 45import org.apache.hadoop.mapreduce.test.system.TaskInfo; 46import org.apache.hadoop.mapreduce.ClusterMetrics; 47import org.apache.hadoop.security.UserGroupInformation; 48import org.apache.hadoop.test.system.DaemonProtocol; 49import org.apache.hadoop.util.Shell.ShellCommandExecutor; 50import org.apache.hadoop.util.Shell; 51import org.apache.hadoop.util.StringUtils; 52 53/** 54 * Aspect class which injects the code for {@link JobTracker} class. 55 * 56 */ 57public privileged aspect JobTrackerAspect { 58 59 60 private static JobTracker tracker; 61 62 public Configuration JobTracker.getDaemonConf() throws IOException { 63 return conf; 64 } 65 /** 66 * Method to get the read only view of the job and its associated information. 67 * 68 * @param jobID 69 * id of the job for which information is required. 70 * @return JobInfo of the job requested 71 * @throws IOException 72 */ 73 public JobInfo JobTracker.getJobInfo(JobID jobID) throws IOException { 74 JobInProgress jip = jobs.get(org.apache.hadoop.mapred.JobID 75 .downgrade(jobID)); 76 if (jip == null) { 77 LOG.warn("No job present for : " + jobID); 78 return null; 79 } 80 JobInfo info; 81 synchronized (jip) { 82 info = jip.getJobInfo(); 83 } 84 return info; 85 } 86 87 /** 88 * Method to get the read only view of the task and its associated 89 * information. 90 * 91 * @param taskID 92 * @return 93 * @throws IOException 94 */ 95 public TaskInfo JobTracker.getTaskInfo(TaskID taskID) throws IOException { 96 TaskInProgress tip = getTip(org.apache.hadoop.mapred.TaskID 97 .downgrade(taskID)); 98 99 if (tip == null) { 100 LOG.warn("No task present for : " + taskID); 101 return null; 102 } 103 return getTaskInfo(tip); 104 } 105 106 public TTInfo JobTracker.getTTInfo(String trackerName) throws IOException { 107 org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker tt = taskTrackers 108 .get(trackerName); 109 if (tt == null) { 110 LOG.warn("No task tracker with name : " + trackerName + " found"); 111 return null; 112 } 113 TaskTrackerStatus status = tt.getStatus(); 114 TTInfo info = new TTInfoImpl(status.trackerName, status); 115 return info; 116 } 117 118 // XXX Below two method don't reuse getJobInfo and getTaskInfo as there is a 119 // possibility that retire job can run and remove the job from JT memory 120 // during 121 // processing of the RPC call. 122 public JobInfo[] JobTracker.getAllJobInfo() throws IOException { 123 List<JobInfo> infoList = new ArrayList<JobInfo>(); 124 synchronized (jobs) { 125 for (JobInProgress jip : jobs.values()) { 126 JobInfo info = jip.getJobInfo(); 127 infoList.add(info); 128 } 129 } 130 return (JobInfo[]) infoList.toArray(new JobInfo[infoList.size()]); 131 } 132 133 public TaskInfo[] JobTracker.getTaskInfo(JobID jobID) throws IOException { 134 JobInProgress jip = jobs.get(org.apache.hadoop.mapred.JobID 135 .downgrade(jobID)); 136 if (jip == null) { 137 LOG.warn("Unable to find job : " + jobID); 138 return null; 139 } 140 List<TaskInfo> infoList = new ArrayList<TaskInfo>(); 141 synchronized (jip) { 142 for (TaskInProgress tip : jip.setup) { 143 infoList.add(getTaskInfo(tip)); 144 } 145 for (TaskInProgress tip : jip.maps) { 146 infoList.add(getTaskInfo(tip)); 147 } 148 for (TaskInProgress tip : jip.reduces) { 149 infoList.add(getTaskInfo(tip)); 150 } 151 for (TaskInProgress tip : jip.cleanup) { 152 infoList.add(getTaskInfo(tip)); 153 } 154 } 155 return (TaskInfo[]) infoList.toArray(new TaskInfo[infoList.size()]); 156 } 157 158 public TTInfo[] JobTracker.getAllTTInfo() throws IOException { 159 List<TTInfo> infoList = new ArrayList<TTInfo>(); 160 synchronized (taskTrackers) { 161 for (TaskTracker tt : taskTrackers.values()) { 162 TaskTrackerStatus status = tt.getStatus(); 163 TTInfo info = new TTInfoImpl(status.trackerName, status); 164 infoList.add(info); 165 } 166 } 167 return (TTInfo[]) infoList.toArray(new TTInfo[infoList.size()]); 168 } 169 170 public boolean JobTracker.isJobRetired(JobID id) throws IOException { 171 return retireJobs.get( 172 org.apache.hadoop.mapred.JobID.downgrade(id))!=null?true:false; 173 } 174 175 public boolean JobTracker.isBlackListed(String trackerName) throws IOException { 176 return isBlacklisted(trackerName); 177 } 178 179 public String JobTracker.getJobHistoryLocationForRetiredJob( 180 JobID id) throws IOException { 181 RetireJobInfo retInfo = retireJobs.get( 182 org.apache.hadoop.mapred.JobID.downgrade(id)); 183 if(retInfo == null) { 184 throw new IOException("The retired job information for the job : " 185 + id +" is not found"); 186 } else { 187 return retInfo.getHistoryFile(); 188 } 189 } 190 pointcut getVersionAspect(String protocol, long clientVersion) : 191 execution(public long JobTracker.getProtocolVersion(String , 192 long) throws IOException) && args(protocol, clientVersion); 193 194 long around(String protocol, long clientVersion) : 195 getVersionAspect(protocol, clientVersion) { 196 if (protocol.equals(DaemonProtocol.class.getName())) { 197 return DaemonProtocol.versionID; 198 } else if (protocol.equals(JTProtocol.class.getName())) { 199 return JTProtocol.versionID; 200 } else { 201 return proceed(protocol, clientVersion); 202 } 203 } 204 205 /** 206 * Point cut which monitors for the start of the jobtracker and sets the right 207 * value if the jobtracker is started. 208 * 209 * @param conf 210 * @param jobtrackerIndentifier 211 */ 212 pointcut jtConstructorPointCut(JobConf conf, String jobtrackerIndentifier) : 213 call(JobTracker.new(JobConf,String)) 214 && args(conf, jobtrackerIndentifier) ; 215 216 after(JobConf conf, String jobtrackerIndentifier) 217 returning (JobTracker tracker): jtConstructorPointCut(conf, 218 jobtrackerIndentifier) { 219 try { 220 UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); 221 tracker.setUser(ugi.getShortUserName()); 222 } catch (IOException e) { 223 tracker.LOG.warn("Unable to get the user information for the " + 224 "Jobtracker"); 225 } 226 this.tracker = tracker; 227 tracker.setReady(true); 228 } 229 230 private TaskInfo JobTracker.getTaskInfo(TaskInProgress tip) { 231 TaskStatus[] status = tip.getTaskStatuses(); 232 if (status == null) { 233 if (tip.isMapTask()) { 234 status = new MapTaskStatus[]{}; 235 } 236 else { 237 status = new ReduceTaskStatus[]{}; 238 } 239 } 240 String[] trackers = 241 (String[]) (tip.getActiveTasks().values()).toArray(new String[tip 242 .getActiveTasks().values().size()]); 243 TaskInfo info = 244 new TaskInfoImpl(tip.getTIPId(), tip.getProgress(), tip 245 .getActiveTasks().size(), tip.numKilledTasks(), tip 246 .numTaskFailures(), status, (tip.isJobSetupTask() || tip 247 .isJobCleanupTask()), trackers); 248 return info; 249 } 250 251 /** 252 * Get the job summary details from the jobtracker log files. 253 * @param jobId - job id 254 * @param filePattern - jobtracker log file pattern. 255 * @return String - Job summary details of given job id. 256 * @throws IOException if any I/O error occurs. 257 */ 258 public String JobTracker.getJobSummaryFromLogs(JobID jobId, 259 String filePattern) throws IOException { 260 String pattern = "JobId=" + jobId.toString() + ",submitTime"; 261 String[] cmd = new String[] { 262 "bash", 263 "-c", 264 "grep -i " 265 + pattern + " " 266 + filePattern + " " 267 + "| sed s/'JobSummary: '/'^'/g | cut -d'^' -f2"}; 268 ShellCommandExecutor shexec = new ShellCommandExecutor(cmd); 269 shexec.execute(); 270 return shexec.getOutput(); 271 } 272 273 /** 274 * Get the job summary information for given job id. 275 * @param jobId - job id. 276 * @return String - Job summary details as key value pair. 277 * @throws IOException if any I/O error occurs. 278 */ 279 public String JobTracker.getJobSummaryInfo(JobID jobId) throws IOException { 280 StringBuffer jobSummary = new StringBuffer(); 281 JobInProgress jip = jobs. 282 get(org.apache.hadoop.mapred.JobID.downgrade(jobId)); 283 if (jip == null) { 284 LOG.warn("Job has not been found - " + jobId); 285 return null; 286 } 287 JobProfile profile = jip.getProfile(); 288 JobStatus status = jip.getStatus(); 289 final char[] charsToEscape = {StringUtils.COMMA, '=', 290 StringUtils.ESCAPE_CHAR}; 291 String user = StringUtils.escapeString(profile.getUser(), 292 StringUtils.ESCAPE_CHAR, charsToEscape); 293 String queue = StringUtils.escapeString(profile.getQueueName(), 294 StringUtils.ESCAPE_CHAR, charsToEscape); 295 Counters jobCounters = jip.getJobCounters(); 296 long mapSlotSeconds = (jobCounters.getCounter( 297 JobInProgress.Counter.SLOTS_MILLIS_MAPS) + 298 jobCounters.getCounter(JobInProgress. 299 Counter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000; 300 long reduceSlotSeconds = (jobCounters.getCounter( 301 JobInProgress.Counter.SLOTS_MILLIS_REDUCES) + 302 jobCounters.getCounter(JobInProgress. 303 Counter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000; 304 jobSummary.append("jobId="); 305 jobSummary.append(jip.getJobID()); 306 jobSummary.append(","); 307 jobSummary.append("startTime="); 308 jobSummary.append(jip.getStartTime()); 309 jobSummary.append(","); 310 jobSummary.append("launchTime="); 311 jobSummary.append(jip.getLaunchTime()); 312 jobSummary.append(","); 313 jobSummary.append("finishTime="); 314 jobSummary.append(jip.getFinishTime()); 315 jobSummary.append(","); 316 jobSummary.append("numMaps="); 317 jobSummary.append(jip.getTasks(TaskType.MAP).length); 318 jobSummary.append(","); 319 jobSummary.append("numSlotsPerMap="); 320 jobSummary.append(jip.getNumSlotsPerMap() ); 321 jobSummary.append(","); 322 jobSummary.append("numReduces="); 323 jobSummary.append(jip.getTasks(TaskType.REDUCE).length); 324 jobSummary.append(","); 325 jobSummary.append("numSlotsPerReduce="); 326 jobSummary.append(jip.getNumSlotsPerReduce()); 327 jobSummary.append(","); 328 jobSummary.append("user="); 329 jobSummary.append(user); 330 jobSummary.append(","); 331 jobSummary.append("queue="); 332 jobSummary.append(queue); 333 jobSummary.append(","); 334 jobSummary.append("status="); 335 jobSummary.append(JobStatus.getJobRunState(status.getRunState())); 336 jobSummary.append(","); 337 jobSummary.append("mapSlotSeconds="); 338 jobSummary.append(mapSlotSeconds); 339 jobSummary.append(","); 340 jobSummary.append("reduceSlotsSeconds="); 341 jobSummary.append(reduceSlotSeconds); 342 jobSummary.append(","); 343 jobSummary.append("clusterMapCapacity="); 344 jobSummary.append(tracker.getClusterMetrics().getMapSlotCapacity()); 345 jobSummary.append(","); 346 jobSummary.append("clusterReduceCapacity="); 347 jobSummary.append(tracker.getClusterMetrics().getReduceSlotCapacity()); 348 return jobSummary.toString(); 349 } 350 351 /** 352 * This gets the value of one task tracker window in the tasktracker page. 353 * 354 * @param TaskTrackerStatus, 355 * timePeriod and totalTasksOrSucceededTasks, which are requried to 356 * identify the window 357 * @return The number of tasks info in a particular window in 358 * tasktracker page. 359 */ 360 public int JobTracker.getTaskTrackerLevelStatistics( 361 TaskTrackerStatus ttStatus, String timePeriod, 362 String totalTasksOrSucceededTasks) throws IOException { 363 364 LOG.info("ttStatus host :" + ttStatus.getHost()); 365 if (timePeriod.matches("since_start")) { 366 StatisticsCollector.TimeWindow window = getStatistics(). 367 collector.DEFAULT_COLLECT_WINDOWS[0]; 368 return(getNumberOfTasks(window, ttStatus , 369 totalTasksOrSucceededTasks)); 370 } else if (timePeriod.matches("last_day")) { 371 StatisticsCollector.TimeWindow window = getStatistics(). 372 collector.DEFAULT_COLLECT_WINDOWS[1]; 373 return(getNumberOfTasks(window, ttStatus, 374 totalTasksOrSucceededTasks)); 375 } else if (timePeriod.matches("last_hour")) { 376 StatisticsCollector.TimeWindow window = getStatistics(). 377 collector.DEFAULT_COLLECT_WINDOWS[2]; 378 return(getNumberOfTasks(window, ttStatus , 379 totalTasksOrSucceededTasks)); 380 } 381 return -1; 382 } 383 384 /** 385 * Get Information for Time Period and TaskType box 386 * from all tasktrackers 387 * 388 * @param 389 * timePeriod and totalTasksOrSucceededTasks, which are requried to 390 * identify the window 391 * @return The total number of tasks info for a particular column in 392 * tasktracker page. 393 */ 394 public int JobTracker.getInfoFromAllClients(String timePeriod, 395 String totalTasksOrSucceededTasks) throws IOException { 396 397 int totalTasksCount = 0; 398 int totalTasksRanForJob = 0; 399 for (TaskTracker tt : taskTrackers.values()) { 400 TaskTrackerStatus ttStatus = tt.getStatus(); 401 String tasktrackerName = ttStatus.getHost(); 402 List<Integer> taskTrackerValues = new LinkedList<Integer>(); 403 JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics(). 404 getTaskTrackerStat(ttStatus.getTrackerName()); 405 int totalTasks = getTaskTrackerLevelStatistics( 406 ttStatus, timePeriod, totalTasksOrSucceededTasks); 407 totalTasksCount += totalTasks; 408 } 409 return totalTasksCount; 410 } 411 412 private int JobTracker.getNumberOfTasks(StatisticsCollector.TimeWindow 413 window, TaskTrackerStatus ttStatus, String totalTasksOrSucceededTasks ) { 414 JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics(). 415 getTaskTrackerStat(ttStatus.getTrackerName()); 416 if (totalTasksOrSucceededTasks.matches("total_tasks")) { 417 return ttStat.totalTasksStat.getValues(). 418 get(window).getValue(); 419 } else if (totalTasksOrSucceededTasks.matches("succeeded_tasks")) { 420 return ttStat.succeededTasksStat.getValues(). 421 get(window).getValue(); 422 } 423 return -1; 424 } 425 426 /** 427 * This gets the value of all task trackers windows in the tasktracker page. 428 * 429 * @param none, 430 * @return StatisticsCollectionHandler class which holds the number 431 * of all jobs ran from all tasktrackers, in the sequence given below 432 * "since_start - total_tasks" 433 * "since_start - succeeded_tasks" 434 * "last_hour - total_tasks" 435 * "last_hour - succeeded_tasks" 436 * "last_day - total_tasks" 437 * "last_day - succeeded_tasks" 438 */ 439 public StatisticsCollectionHandler JobTracker. 440 getInfoFromAllClientsForAllTaskType() throws Exception { 441 442 //The outer list will have a list of each tasktracker list. 443 //The inner list will have a list of all number of tasks in 444 //one tasktracker. 445 List<List<Integer>> ttInfoList = new LinkedList<List<Integer>>(); 446 447 // Go through each tasktracker and get all the number of tasks 448 // six window's values of that tasktracker.Each window points to 449 // specific value for that tasktracker. 450 //"since_start - total_tasks" 451 //"since_start - succeeded_tasks" 452 //"last_hour - total_tasks" 453 //"last_hour - succeeded_tasks" 454 //"last_day - total_tasks" 455 //"last_day - succeeded_tasks" 456 457 for (TaskTracker tt : taskTrackers.values()) { 458 TaskTrackerStatus ttStatus = tt.getStatus(); 459 String tasktrackerName = ttStatus.getHost(); 460 List<Integer> taskTrackerValues = new LinkedList<Integer>(); 461 JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics(). 462 getTaskTrackerStat(ttStatus.getTrackerName()); 463 464 int value; 465 int totalCount = 0; 466 for (int i = 0; i < 3; i++) { 467 StatisticsCollector.TimeWindow window = getStatistics(). 468 collector.DEFAULT_COLLECT_WINDOWS[i]; 469 value=0; 470 value = ttStat.totalTasksStat.getValues(). 471 get(window).getValue(); 472 taskTrackerValues.add(value); 473 value=0; 474 value = ttStat.succeededTasksStat.getValues(). 475 get(window).getValue(); 476 taskTrackerValues.add(value); 477 } 478 ttInfoList.add(taskTrackerValues); 479 } 480 481 //The info is collected in the order described above by going 482 //through each tasktracker list 483 int totalInfoValues = 0; 484 StatisticsCollectionHandler statisticsCollectionHandler = 485 new StatisticsCollectionHandler(); 486 for (int i = 0; i < 6; i++) { 487 totalInfoValues = 0; 488 for (int j = 0; j < ttInfoList.size(); j++) { 489 List<Integer> list = ttInfoList.get(j); 490 totalInfoValues += list.get(i); 491 } 492 switch (i) { 493 case 0: statisticsCollectionHandler. 494 setSinceStartTotalTasks(totalInfoValues); 495 break; 496 case 1: statisticsCollectionHandler. 497 setSinceStartSucceededTasks(totalInfoValues); 498 break; 499 case 2: statisticsCollectionHandler. 500 setLastHourTotalTasks(totalInfoValues); 501 break; 502 case 3: statisticsCollectionHandler. 503 setLastHourSucceededTasks(totalInfoValues); 504 break; 505 case 4: statisticsCollectionHandler. 506 setLastDayTotalTasks(totalInfoValues); 507 break; 508 case 5: statisticsCollectionHandler. 509 setLastDaySucceededTasks(totalInfoValues); 510 break; 511 } 512 } 513 return statisticsCollectionHandler; 514 } 515 516 /* 517 * Get the Tasktrcker Heart beat interval 518 */ 519 public int JobTracker.getTaskTrackerHeartbeatInterval() 520 throws Exception { 521 return (getNextHeartbeatInterval()); 522 } 523 524 //access the job data the method only does a get on read-only data 525 //it does not return anything purposely, since the test case 526 //does not require this but this can be extended in future 527 public void JobTracker.accessHistoryData(JobID id) throws Exception { 528 String location = getJobHistoryLocationForRetiredJob(id); 529 Path logFile = new Path(location); 530 FileSystem fs = logFile.getFileSystem(getConf()); 531 JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(id.toString()); 532 DefaultJobHistoryParser.parseJobTasks(location, 533 jobInfo, fs); 534 //Now read the info so two threads can access the info at the 535 //same time from client side 536 LOG.info("user " +jobInfo.get(Keys.USER)); 537 LOG.info("jobname "+jobInfo.get(Keys.JOBNAME)); 538 jobInfo.get(Keys.JOBCONF); 539 jobInfo.getJobACLs(); 540 } 541 542 /** 543 * Verifies whether Node is decommissioned or not 544 * @param 545 * tasktracker Client host name 546 * @return boolean true for Decommissoned and false for not decommissioned. 547 */ 548 public boolean JobTracker.isNodeDecommissioned(String ttClientHostName) 549 throws IOException { 550 Set<String> excludedNodes = hostsReader.getExcludedHosts(); 551 LOG.info("ttClientHostName is :" + ttClientHostName); 552 boolean b = excludedNodes.contains(ttClientHostName); 553 return b; 554 } 555} 556