1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package org.apache.hadoop.mapred;
20
21import java.io.IOException;
22import java.util.List;
23import java.util.ArrayList;
24import org.apache.hadoop.conf.Configuration;
25import org.apache.hadoop.mapreduce.test.system.TTProtocol;
26import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
27import org.apache.hadoop.mapred.TTTaskInfoImpl.MapTTTaskInfo;
28import org.apache.hadoop.mapred.TTTaskInfoImpl.ReduceTTTaskInfo;
29import org.apache.hadoop.security.UserGroupInformation;
30import org.apache.hadoop.test.system.DaemonProtocol;
31import org.apache.hadoop.util.Shell;
32import org.apache.hadoop.util.Shell.ShellCommandExecutor;
33import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
34import org.apache.hadoop.mapreduce.TaskAttemptID;
35
36public privileged aspect TaskTrackerAspect {
37
38  declare parents : TaskTracker implements TTProtocol;
39
40  // Add a last sent status field to the Tasktracker class.
41  TaskTrackerStatus TaskTracker.lastSentStatus = null;
42  static String TaskTracker.TASKJARDIR = TaskTracker.JARSDIR;
43
44  public synchronized TaskTrackerStatus TaskTracker.getStatus()
45      throws IOException {
46    return lastSentStatus;
47  }
48
49  public Configuration TaskTracker.getDaemonConf() throws IOException {
50    return fConf;
51  }
52
53  public TTTaskInfo[] TaskTracker.getTasks() throws IOException {
54    List<TTTaskInfo> infoList = new ArrayList<TTTaskInfo>();
55    synchronized (tasks) {
56      for (TaskInProgress tip : tasks.values()) {
57        TTTaskInfo info = getTTTaskInfo(tip);
58        infoList.add(info);
59      }
60    }
61    return (TTTaskInfo[]) infoList.toArray(new TTTaskInfo[infoList.size()]);
62  }
63
64  public TTTaskInfo TaskTracker.getTask(org.apache.hadoop.mapreduce.TaskID id)
65      throws IOException {
66    TaskID old = org.apache.hadoop.mapred.TaskID.downgrade(id);
67    synchronized (tasks) {
68      for(TaskAttemptID ta : tasks.keySet()) {
69        if(old.equals(ta.getTaskID())) {
70          return getTTTaskInfo(tasks.get(ta));
71        }
72      }
73    }
74    return null;
75  }
76
77  private TTTaskInfo TaskTracker.getTTTaskInfo(TaskInProgress tip) {
78    TTTaskInfo info;
79    if (tip.task.isMapTask()) {
80      info = new MapTTTaskInfo(tip.slotTaken, tip.wasKilled,
81          (MapTaskStatus) tip.getStatus(), tip.getJobConf(), tip.getTask()
82              .getUser(), tip.getTask().isTaskCleanupTask(), getPid(tip.getTask().getTaskID()));
83    } else {
84      info = new ReduceTTTaskInfo(tip.slotTaken, tip.wasKilled,
85          (ReduceTaskStatus) tip.getStatus(), tip.getJobConf(), tip.getTask()
86              .getUser(), tip.getTask().isTaskCleanupTask(),getPid(tip.getTask().getTaskID()));
87    }
88    return info;
89  }
90
91  before(TaskTrackerStatus newStatus, TaskTracker tracker) :
92    set(TaskTrackerStatus TaskTracker.status)
93    && args(newStatus) && this(tracker) {
94    if (newStatus == null) {
95      tracker.lastSentStatus = tracker.status;
96    }
97  }
98
99  pointcut ttConstructorPointCut(JobConf conf) :
100    call(TaskTracker.new(JobConf))
101    && args(conf);
102
103  after(JobConf conf) returning (TaskTracker tracker):
104    ttConstructorPointCut(conf) {
105    try {
106      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
107      tracker.setUser(ugi.getShortUserName());
108    } catch (IOException e) {
109      tracker.LOG.warn("Unable to get the user information for the " +
110          "Jobtracker");
111    }
112    tracker.setReady(true);
113  }
114
115  pointcut getVersionAspect(String protocol, long clientVersion) :
116    execution(public long TaskTracker.getProtocolVersion(String ,
117      long) throws IOException) && args(protocol, clientVersion);
118
119  long around(String protocol, long clientVersion) :
120    getVersionAspect(protocol, clientVersion) {
121    if(protocol.equals(DaemonProtocol.class.getName())) {
122      return DaemonProtocol.versionID;
123    } else if(protocol.equals(TTProtocol.class.getName())) {
124      return TTProtocol.versionID;
125    } else {
126      return proceed(protocol, clientVersion);
127    }
128  }
129
130  public boolean TaskTracker.isProcessTreeAlive(String pid) throws IOException {
131    // Command to be executed is as follows :
132    // ps -o pid,ppid,sid,command -e | grep -v ps | grep -v grep | grep
133    // "$pid"
134    String checkerCommand =
135        getDaemonConf().get(
136            "test.system.processgroup_checker_command",
137            "ps -o pid,ppid,sid,command -e "
138                + "| grep -v ps | grep -v grep | grep \"$");
139    String[] command =
140        new String[] { "bash", "-c", checkerCommand + pid + "\"" };
141    ShellCommandExecutor shexec = new ShellCommandExecutor(command);
142    try {
143      shexec.execute();
144    } catch (Shell.ExitCodeException e) {
145      TaskTracker.LOG
146          .info("The process tree grep threw a exitcode exception pointing "
147              + "to process tree not being alive.");
148      return false;
149    }
150    TaskTracker.LOG.info("The task grep command is : "
151        + shexec.toString() + " the output from command is : "
152        + shexec.getOutput());
153    return true;
154  }
155}
156