1/** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19package org.apache.hadoop.mapred; 20 21import java.io.IOException; 22import java.util.List; 23import java.util.ArrayList; 24import org.apache.hadoop.conf.Configuration; 25import org.apache.hadoop.mapreduce.test.system.TTProtocol; 26import org.apache.hadoop.mapreduce.test.system.TTTaskInfo; 27import org.apache.hadoop.mapred.TTTaskInfoImpl.MapTTTaskInfo; 28import org.apache.hadoop.mapred.TTTaskInfoImpl.ReduceTTTaskInfo; 29import org.apache.hadoop.security.UserGroupInformation; 30import org.apache.hadoop.test.system.DaemonProtocol; 31import org.apache.hadoop.util.Shell; 32import org.apache.hadoop.util.Shell.ShellCommandExecutor; 33import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; 34import org.apache.hadoop.mapreduce.TaskAttemptID; 35 36public privileged aspect TaskTrackerAspect { 37 38 declare parents : TaskTracker implements TTProtocol; 39 40 // Add a last sent status field to the Tasktracker class. 41 TaskTrackerStatus TaskTracker.lastSentStatus = null; 42 static String TaskTracker.TASKJARDIR = TaskTracker.JARSDIR; 43 44 public synchronized TaskTrackerStatus TaskTracker.getStatus() 45 throws IOException { 46 return lastSentStatus; 47 } 48 49 public Configuration TaskTracker.getDaemonConf() throws IOException { 50 return fConf; 51 } 52 53 public TTTaskInfo[] TaskTracker.getTasks() throws IOException { 54 List<TTTaskInfo> infoList = new ArrayList<TTTaskInfo>(); 55 synchronized (tasks) { 56 for (TaskInProgress tip : tasks.values()) { 57 TTTaskInfo info = getTTTaskInfo(tip); 58 infoList.add(info); 59 } 60 } 61 return (TTTaskInfo[]) infoList.toArray(new TTTaskInfo[infoList.size()]); 62 } 63 64 public TTTaskInfo TaskTracker.getTask(org.apache.hadoop.mapreduce.TaskID id) 65 throws IOException { 66 TaskID old = org.apache.hadoop.mapred.TaskID.downgrade(id); 67 synchronized (tasks) { 68 for(TaskAttemptID ta : tasks.keySet()) { 69 if(old.equals(ta.getTaskID())) { 70 return getTTTaskInfo(tasks.get(ta)); 71 } 72 } 73 } 74 return null; 75 } 76 77 private TTTaskInfo TaskTracker.getTTTaskInfo(TaskInProgress tip) { 78 TTTaskInfo info; 79 if (tip.task.isMapTask()) { 80 info = new MapTTTaskInfo(tip.slotTaken, tip.wasKilled, 81 (MapTaskStatus) tip.getStatus(), tip.getJobConf(), tip.getTask() 82 .getUser(), tip.getTask().isTaskCleanupTask(), getPid(tip.getTask().getTaskID())); 83 } else { 84 info = new ReduceTTTaskInfo(tip.slotTaken, tip.wasKilled, 85 (ReduceTaskStatus) tip.getStatus(), tip.getJobConf(), tip.getTask() 86 .getUser(), tip.getTask().isTaskCleanupTask(),getPid(tip.getTask().getTaskID())); 87 } 88 return info; 89 } 90 91 before(TaskTrackerStatus newStatus, TaskTracker tracker) : 92 set(TaskTrackerStatus TaskTracker.status) 93 && args(newStatus) && this(tracker) { 94 if (newStatus == null) { 95 tracker.lastSentStatus = tracker.status; 96 } 97 } 98 99 pointcut ttConstructorPointCut(JobConf conf) : 100 call(TaskTracker.new(JobConf)) 101 && args(conf); 102 103 after(JobConf conf) returning (TaskTracker tracker): 104 ttConstructorPointCut(conf) { 105 try { 106 UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); 107 tracker.setUser(ugi.getShortUserName()); 108 } catch (IOException e) { 109 tracker.LOG.warn("Unable to get the user information for the " + 110 "Jobtracker"); 111 } 112 tracker.setReady(true); 113 } 114 115 pointcut getVersionAspect(String protocol, long clientVersion) : 116 execution(public long TaskTracker.getProtocolVersion(String , 117 long) throws IOException) && args(protocol, clientVersion); 118 119 long around(String protocol, long clientVersion) : 120 getVersionAspect(protocol, clientVersion) { 121 if(protocol.equals(DaemonProtocol.class.getName())) { 122 return DaemonProtocol.versionID; 123 } else if(protocol.equals(TTProtocol.class.getName())) { 124 return TTProtocol.versionID; 125 } else { 126 return proceed(protocol, clientVersion); 127 } 128 } 129 130 public boolean TaskTracker.isProcessTreeAlive(String pid) throws IOException { 131 // Command to be executed is as follows : 132 // ps -o pid,ppid,sid,command -e | grep -v ps | grep -v grep | grep 133 // "$pid" 134 String checkerCommand = 135 getDaemonConf().get( 136 "test.system.processgroup_checker_command", 137 "ps -o pid,ppid,sid,command -e " 138 + "| grep -v ps | grep -v grep | grep \"$"); 139 String[] command = 140 new String[] { "bash", "-c", checkerCommand + pid + "\"" }; 141 ShellCommandExecutor shexec = new ShellCommandExecutor(command); 142 try { 143 shexec.execute(); 144 } catch (Shell.ExitCodeException e) { 145 TaskTracker.LOG 146 .info("The process tree grep threw a exitcode exception pointing " 147 + "to process tree not being alive."); 148 return false; 149 } 150 TaskTracker.LOG.info("The task grep command is : " 151 + shexec.toString() + " the output from command is : " 152 + shexec.getOutput()); 153 return true; 154 } 155} 156