1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22import java.util.List;
23import java.util.ArrayList;
24import java.util.LinkedList;
25import java.util.Set;
26
27import org.apache.hadoop.conf.Configuration;
28import org.apache.hadoop.fs.FileSystem;
29import org.apache.hadoop.fs.Path;
30import org.apache.hadoop.mapred.JobConf;
31import org.apache.hadoop.mapred.JobStatus;
32import org.apache.hadoop.mapred.JobHistory.Keys;
33import org.apache.hadoop.mapred.JobTracker.RetireJobInfo;
34import org.apache.hadoop.mapred.Counters;
35import org.apache.hadoop.mapreduce.JobID;
36import org.apache.hadoop.mapreduce.TaskID;
37import org.apache.hadoop.mapreduce.TaskType;
38import org.apache.hadoop.mapred.TaskTrackerStatus;
39import org.apache.hadoop.mapred.StatisticsCollector;
40import org.apache.hadoop.mapred.StatisticsCollectionHandler;
41import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
42import org.apache.hadoop.mapreduce.test.system.JTProtocol;
43import org.apache.hadoop.mapreduce.test.system.JobInfo;
44import org.apache.hadoop.mapreduce.test.system.TTInfo;
45import org.apache.hadoop.mapreduce.test.system.TaskInfo;
46import org.apache.hadoop.mapreduce.ClusterMetrics;
47import org.apache.hadoop.security.UserGroupInformation;
48import org.apache.hadoop.test.system.DaemonProtocol;
49import org.apache.hadoop.util.Shell.ShellCommandExecutor;
50import org.apache.hadoop.util.Shell;
51import org.apache.hadoop.util.StringUtils;
52
53/**
54 * Aspect class which injects the code for {@link JobTracker} class.
55 *
56 */
57public privileged aspect JobTrackerAspect {
58
59
60  private static JobTracker tracker;
61
62  public Configuration JobTracker.getDaemonConf() throws IOException {
63    return conf;
64  }
65  /**
66   * Method to get the read only view of the job and its associated information.
67   *
68   * @param jobID
69   *          id of the job for which information is required.
70   * @return JobInfo of the job requested
71   * @throws IOException
72   */
73  public JobInfo JobTracker.getJobInfo(JobID jobID) throws IOException {
74    JobInProgress jip = jobs.get(org.apache.hadoop.mapred.JobID
75        .downgrade(jobID));
76    if (jip == null) {
77      LOG.warn("No job present for : " + jobID);
78      return null;
79    }
80    JobInfo info;
81    synchronized (jip) {
82      info = jip.getJobInfo();
83    }
84    return info;
85  }
86
87  /**
88   * Method to get the read only view of the task and its associated
89   * information.
90   *
91   * @param taskID
92   * @return
93   * @throws IOException
94   */
95  public TaskInfo JobTracker.getTaskInfo(TaskID taskID) throws IOException {
96    TaskInProgress tip = getTip(org.apache.hadoop.mapred.TaskID
97        .downgrade(taskID));
98
99    if (tip == null) {
100      LOG.warn("No task present for : " + taskID);
101      return null;
102    }
103    return getTaskInfo(tip);
104  }
105
106  public TTInfo JobTracker.getTTInfo(String trackerName) throws IOException {
107    org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker tt = taskTrackers
108        .get(trackerName);
109    if (tt == null) {
110      LOG.warn("No task tracker with name : " + trackerName + " found");
111      return null;
112    }
113    TaskTrackerStatus status = tt.getStatus();
114    TTInfo info = new TTInfoImpl(status.trackerName, status);
115    return info;
116  }
117
118  // XXX Below two method don't reuse getJobInfo and getTaskInfo as there is a
119  // possibility that retire job can run and remove the job from JT memory
120  // during
121  // processing of the RPC call.
122  public JobInfo[] JobTracker.getAllJobInfo() throws IOException {
123    List<JobInfo> infoList = new ArrayList<JobInfo>();
124    synchronized (jobs) {
125      for (JobInProgress jip : jobs.values()) {
126        JobInfo info = jip.getJobInfo();
127        infoList.add(info);
128      }
129    }
130    return (JobInfo[]) infoList.toArray(new JobInfo[infoList.size()]);
131  }
132
133  public TaskInfo[] JobTracker.getTaskInfo(JobID jobID) throws IOException {
134    JobInProgress jip = jobs.get(org.apache.hadoop.mapred.JobID
135        .downgrade(jobID));
136    if (jip == null) {
137      LOG.warn("Unable to find job : " + jobID);
138      return null;
139    }
140    List<TaskInfo> infoList = new ArrayList<TaskInfo>();
141    synchronized (jip) {
142      for (TaskInProgress tip : jip.setup) {
143        infoList.add(getTaskInfo(tip));
144      }
145      for (TaskInProgress tip : jip.maps) {
146        infoList.add(getTaskInfo(tip));
147      }
148      for (TaskInProgress tip : jip.reduces) {
149        infoList.add(getTaskInfo(tip));
150      }
151      for (TaskInProgress tip : jip.cleanup) {
152        infoList.add(getTaskInfo(tip));
153      }
154    }
155    return (TaskInfo[]) infoList.toArray(new TaskInfo[infoList.size()]);
156  }
157
158  public TTInfo[] JobTracker.getAllTTInfo() throws IOException {
159    List<TTInfo> infoList = new ArrayList<TTInfo>();
160    synchronized (taskTrackers) {
161      for (TaskTracker tt : taskTrackers.values()) {
162        TaskTrackerStatus status = tt.getStatus();
163        TTInfo info = new TTInfoImpl(status.trackerName, status);
164        infoList.add(info);
165      }
166    }
167    return (TTInfo[]) infoList.toArray(new TTInfo[infoList.size()]);
168  }
169
170  public boolean JobTracker.isJobRetired(JobID id) throws IOException {
171    return retireJobs.get(
172        org.apache.hadoop.mapred.JobID.downgrade(id))!=null?true:false;
173  }
174
175  public boolean JobTracker.isBlackListed(String trackerName) throws IOException {
176    return isBlacklisted(trackerName);
177  }
178
179  public String JobTracker.getJobHistoryLocationForRetiredJob(
180      JobID id) throws IOException {
181    RetireJobInfo retInfo = retireJobs.get(
182        org.apache.hadoop.mapred.JobID.downgrade(id));
183    if(retInfo == null) {
184      throw new IOException("The retired job information for the job : "
185          + id +" is not found");
186    } else {
187      return retInfo.getHistoryFile();
188    }
189  }
190  pointcut getVersionAspect(String protocol, long clientVersion) :
191    execution(public long JobTracker.getProtocolVersion(String ,
192      long) throws IOException) && args(protocol, clientVersion);
193
194  long around(String protocol, long clientVersion) :
195    getVersionAspect(protocol, clientVersion) {
196    if (protocol.equals(DaemonProtocol.class.getName())) {
197      return DaemonProtocol.versionID;
198    } else if (protocol.equals(JTProtocol.class.getName())) {
199      return JTProtocol.versionID;
200    } else {
201      return proceed(protocol, clientVersion);
202    }
203  }
204
205  /**
206   * Point cut which monitors for the start of the jobtracker and sets the right
207   * value if the jobtracker is started.
208   *
209   * @param conf
210   * @param jobtrackerIndentifier
211   */
212  pointcut jtConstructorPointCut(JobConf conf, String jobtrackerIndentifier) :
213        call(JobTracker.new(JobConf,String))
214        && args(conf, jobtrackerIndentifier) ;
215
216  after(JobConf conf, String jobtrackerIndentifier)
217    returning (JobTracker tracker): jtConstructorPointCut(conf,
218        jobtrackerIndentifier) {
219    try {
220      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
221      tracker.setUser(ugi.getShortUserName());
222    } catch (IOException e) {
223      tracker.LOG.warn("Unable to get the user information for the " +
224      		"Jobtracker");
225    }
226    this.tracker = tracker;
227    tracker.setReady(true);
228  }
229
230  private TaskInfo JobTracker.getTaskInfo(TaskInProgress tip) {
231    TaskStatus[] status = tip.getTaskStatuses();
232    if (status == null) {
233      if (tip.isMapTask()) {
234        status = new MapTaskStatus[]{};
235      }
236      else {
237        status = new ReduceTaskStatus[]{};
238      }
239    }
240    String[] trackers =
241        (String[]) (tip.getActiveTasks().values()).toArray(new String[tip
242            .getActiveTasks().values().size()]);
243    TaskInfo info =
244        new TaskInfoImpl(tip.getTIPId(), tip.getProgress(), tip
245            .getActiveTasks().size(), tip.numKilledTasks(), tip
246            .numTaskFailures(), status, (tip.isJobSetupTask() || tip
247            .isJobCleanupTask()), trackers);
248    return info;
249  }
250
251  /**
252   * Get the job summary details from the jobtracker log files.
253   * @param jobId - job id
254   * @param filePattern - jobtracker log file pattern.
255   * @return String - Job summary details of given job id.
256   * @throws IOException if any I/O error occurs.
257   */
258  public String JobTracker.getJobSummaryFromLogs(JobID jobId,
259      String filePattern) throws IOException {
260    String pattern = "JobId=" + jobId.toString() + ",submitTime";
261    String[] cmd = new String[] {
262                   "bash",
263                   "-c",
264                   "grep -i "
265                 + pattern + " "
266                 + filePattern + " "
267                 + "| sed s/'JobSummary: '/'^'/g | cut -d'^' -f2"};
268    ShellCommandExecutor shexec = new ShellCommandExecutor(cmd);
269    shexec.execute();
270    return shexec.getOutput();
271  }
272
273  /**
274   * Get the job summary information for given job id.
275   * @param jobId - job id.
276   * @return String - Job summary details as key value pair.
277   * @throws IOException if any I/O error occurs.
278   */
279  public String JobTracker.getJobSummaryInfo(JobID jobId) throws IOException {
280    StringBuffer jobSummary = new StringBuffer();
281    JobInProgress jip = jobs.
282        get(org.apache.hadoop.mapred.JobID.downgrade(jobId));
283    if (jip == null) {
284      LOG.warn("Job has not been found - " + jobId);
285      return null;
286    }
287    JobProfile profile = jip.getProfile();
288    JobStatus status = jip.getStatus();
289    final char[] charsToEscape = {StringUtils.COMMA, '=',
290        StringUtils.ESCAPE_CHAR};
291    String user = StringUtils.escapeString(profile.getUser(),
292        StringUtils.ESCAPE_CHAR, charsToEscape);
293    String queue = StringUtils.escapeString(profile.getQueueName(),
294        StringUtils.ESCAPE_CHAR, charsToEscape);
295    Counters jobCounters = jip.getJobCounters();
296    long mapSlotSeconds = (jobCounters.getCounter(
297        JobInProgress.Counter.SLOTS_MILLIS_MAPS) +
298        jobCounters.getCounter(JobInProgress.
299        Counter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
300    long reduceSlotSeconds = (jobCounters.getCounter(
301        JobInProgress.Counter.SLOTS_MILLIS_REDUCES) +
302       jobCounters.getCounter(JobInProgress.
303       Counter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
304    jobSummary.append("jobId=");
305    jobSummary.append(jip.getJobID());
306    jobSummary.append(",");
307    jobSummary.append("startTime=");
308    jobSummary.append(jip.getStartTime());
309    jobSummary.append(",");
310    jobSummary.append("launchTime=");
311    jobSummary.append(jip.getLaunchTime());
312    jobSummary.append(",");
313    jobSummary.append("finishTime=");
314    jobSummary.append(jip.getFinishTime());
315    jobSummary.append(",");
316    jobSummary.append("numMaps=");
317    jobSummary.append(jip.getTasks(TaskType.MAP).length);
318    jobSummary.append(",");
319    jobSummary.append("numSlotsPerMap=");
320    jobSummary.append(jip.getNumSlotsPerMap() );
321    jobSummary.append(",");
322    jobSummary.append("numReduces=");
323    jobSummary.append(jip.getTasks(TaskType.REDUCE).length);
324    jobSummary.append(",");
325    jobSummary.append("numSlotsPerReduce=");
326    jobSummary.append(jip.getNumSlotsPerReduce());
327    jobSummary.append(",");
328    jobSummary.append("user=");
329    jobSummary.append(user);
330    jobSummary.append(",");
331    jobSummary.append("queue=");
332    jobSummary.append(queue);
333    jobSummary.append(",");
334    jobSummary.append("status=");
335    jobSummary.append(JobStatus.getJobRunState(status.getRunState()));
336    jobSummary.append(",");
337    jobSummary.append("mapSlotSeconds=");
338    jobSummary.append(mapSlotSeconds);
339    jobSummary.append(",");
340    jobSummary.append("reduceSlotsSeconds=");
341    jobSummary.append(reduceSlotSeconds);
342    jobSummary.append(",");
343    jobSummary.append("clusterMapCapacity=");
344    jobSummary.append(tracker.getClusterMetrics().getMapSlotCapacity());
345    jobSummary.append(",");
346    jobSummary.append("clusterReduceCapacity=");
347    jobSummary.append(tracker.getClusterMetrics().getReduceSlotCapacity());
348    return jobSummary.toString();
349  }
350
351  /**
352   * This gets the value of one task tracker window in the tasktracker page.
353   *
354   * @param TaskTrackerStatus,
355   * timePeriod and totalTasksOrSucceededTasks, which are requried to
356   * identify the window
357   * @return The number of tasks info in a particular window in
358   * tasktracker page.
359   */
360  public int JobTracker.getTaskTrackerLevelStatistics(
361      TaskTrackerStatus ttStatus, String timePeriod,
362      String totalTasksOrSucceededTasks) throws IOException {
363
364    LOG.info("ttStatus host :" + ttStatus.getHost());
365    if (timePeriod.matches("since_start")) {
366      StatisticsCollector.TimeWindow window = getStatistics().
367          collector.DEFAULT_COLLECT_WINDOWS[0];
368      return(getNumberOfTasks(window, ttStatus ,
369          totalTasksOrSucceededTasks));
370    } else if (timePeriod.matches("last_day")) {
371      StatisticsCollector.TimeWindow window = getStatistics().
372          collector.DEFAULT_COLLECT_WINDOWS[1];
373      return(getNumberOfTasks(window, ttStatus,
374          totalTasksOrSucceededTasks));
375    } else if (timePeriod.matches("last_hour")) {
376      StatisticsCollector.TimeWindow window = getStatistics().
377          collector.DEFAULT_COLLECT_WINDOWS[2];
378      return(getNumberOfTasks(window, ttStatus ,
379          totalTasksOrSucceededTasks));
380    }
381    return -1;
382  }
383
384  /**
385   * Get Information for Time Period and TaskType box
386   * from all tasktrackers
387   *
388   * @param
389   * timePeriod and totalTasksOrSucceededTasks, which are requried to
390   * identify the window
391   * @return The total number of tasks info for a particular column in
392   * tasktracker page.
393   */
394  public int JobTracker.getInfoFromAllClients(String timePeriod,
395      String totalTasksOrSucceededTasks) throws IOException {
396
397    int totalTasksCount = 0;
398    int totalTasksRanForJob = 0;
399    for (TaskTracker tt : taskTrackers.values()) {
400      TaskTrackerStatus ttStatus = tt.getStatus();
401      String tasktrackerName = ttStatus.getHost();
402      List<Integer> taskTrackerValues = new LinkedList<Integer>();
403      JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics().
404             getTaskTrackerStat(ttStatus.getTrackerName());
405      int totalTasks = getTaskTrackerLevelStatistics(
406          ttStatus, timePeriod, totalTasksOrSucceededTasks);
407      totalTasksCount += totalTasks;
408    }
409    return totalTasksCount;
410  }
411
412  private int JobTracker.getNumberOfTasks(StatisticsCollector.TimeWindow
413    window, TaskTrackerStatus ttStatus, String totalTasksOrSucceededTasks ) {
414    JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics().
415             getTaskTrackerStat(ttStatus.getTrackerName());
416    if (totalTasksOrSucceededTasks.matches("total_tasks")) {
417      return ttStat.totalTasksStat.getValues().
418          get(window).getValue();
419    } else if (totalTasksOrSucceededTasks.matches("succeeded_tasks")) {
420      return ttStat.succeededTasksStat.getValues().
421          get(window).getValue();
422    }
423    return -1;
424  }
425
426  /**
427   * This gets the value of all task trackers windows in the tasktracker page.
428   *
429   * @param none,
430   * @return StatisticsCollectionHandler class which holds the number
431   * of all jobs ran from all tasktrackers, in the sequence given below
432   * "since_start - total_tasks"
433   * "since_start - succeeded_tasks"
434   * "last_hour - total_tasks"
435   * "last_hour - succeeded_tasks"
436   * "last_day - total_tasks"
437   * "last_day - succeeded_tasks"
438   */
439  public StatisticsCollectionHandler JobTracker.
440    getInfoFromAllClientsForAllTaskType() throws Exception {
441
442    //The outer list will have a list of each tasktracker list.
443    //The inner list will have a list of all number of tasks in
444    //one tasktracker.
445    List<List<Integer>> ttInfoList = new LinkedList<List<Integer>>();
446
447    // Go through each tasktracker and get all the number of tasks
448    // six window's values of that tasktracker.Each window points to
449    // specific value for that tasktracker.
450    //"since_start - total_tasks"
451    //"since_start - succeeded_tasks"
452    //"last_hour - total_tasks"
453    //"last_hour - succeeded_tasks"
454    //"last_day - total_tasks"
455    //"last_day - succeeded_tasks"
456
457    for (TaskTracker tt : taskTrackers.values()) {
458      TaskTrackerStatus ttStatus = tt.getStatus();
459      String tasktrackerName = ttStatus.getHost();
460      List<Integer> taskTrackerValues = new LinkedList<Integer>();
461      JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics().
462             getTaskTrackerStat(ttStatus.getTrackerName());
463
464      int value;
465      int totalCount = 0;
466      for (int i = 0; i < 3; i++) {
467        StatisticsCollector.TimeWindow window = getStatistics().
468          collector.DEFAULT_COLLECT_WINDOWS[i];
469        value=0;
470        value = ttStat.totalTasksStat.getValues().
471          get(window).getValue();
472        taskTrackerValues.add(value);
473        value=0;
474        value  = ttStat.succeededTasksStat.getValues().
475          get(window).getValue();
476        taskTrackerValues.add(value);
477      }
478      ttInfoList.add(taskTrackerValues);
479    }
480
481    //The info is collected in the order described above  by going
482    //through each tasktracker list
483    int totalInfoValues = 0;
484    StatisticsCollectionHandler statisticsCollectionHandler =
485      new StatisticsCollectionHandler();
486    for (int i = 0; i < 6; i++) {
487      totalInfoValues = 0;
488      for (int j = 0; j < ttInfoList.size(); j++) {
489         List<Integer> list = ttInfoList.get(j);
490         totalInfoValues += list.get(i);
491      }
492      switch (i) {
493        case 0: statisticsCollectionHandler.
494          setSinceStartTotalTasks(totalInfoValues);
495          break;
496        case 1: statisticsCollectionHandler.
497          setSinceStartSucceededTasks(totalInfoValues);
498          break;
499        case 2: statisticsCollectionHandler.
500          setLastHourTotalTasks(totalInfoValues);
501          break;
502        case 3: statisticsCollectionHandler.
503          setLastHourSucceededTasks(totalInfoValues);
504          break;
505        case 4: statisticsCollectionHandler.
506          setLastDayTotalTasks(totalInfoValues);
507          break;
508        case 5: statisticsCollectionHandler.
509          setLastDaySucceededTasks(totalInfoValues);
510          break;
511      }
512    }
513      return statisticsCollectionHandler;
514  }
515
516  /*
517   * Get the Tasktrcker Heart beat interval
518   */
519  public int JobTracker.getTaskTrackerHeartbeatInterval()
520      throws Exception {
521    return (getNextHeartbeatInterval());
522  }
523
524  //access the job data the method only does a get on read-only data
525  //it does not return anything purposely, since the test case
526  //does not require this but this can be extended in future
527  public void JobTracker.accessHistoryData(JobID id) throws Exception {
528    String location = getJobHistoryLocationForRetiredJob(id);
529    Path logFile = new Path(location);
530    FileSystem fs = logFile.getFileSystem(getConf());
531    JobHistory.JobInfo jobInfo  = new JobHistory.JobInfo(id.toString());
532    DefaultJobHistoryParser.parseJobTasks(location,
533        jobInfo, fs);
534    //Now read the info so two threads can access the info at the
535    //same time from client side
536    LOG.info("user " +jobInfo.get(Keys.USER));
537    LOG.info("jobname "+jobInfo.get(Keys.JOBNAME));
538    jobInfo.get(Keys.JOBCONF);
539    jobInfo.getJobACLs();
540  }
541
542  /**
543   * Verifies whether Node is decommissioned or not
544   * @param
545   * tasktracker Client host name
546   * @return boolean true for Decommissoned and false for not decommissioned.
547   */
548  public boolean JobTracker.isNodeDecommissioned(String ttClientHostName)
549      throws IOException {
550    Set<String> excludedNodes = hostsReader.getExcludedHosts();
551    LOG.info("ttClientHostName is :" + ttClientHostName);
552    boolean b =  excludedNodes.contains(ttClientHostName);
553    return b;
554  }
555}
556