1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapred; 19 import java.io.DataOutputStream; 20 import java.io.IOException; 21 22 import org.apache.hadoop.util.RemoteExecution; 23 import org.junit.Test; 24 import org.junit.Assert; 25 import org.junit.AfterClass; 26 import org.junit.BeforeClass; 27 import org.apache.hadoop.conf.Configuration; 28 import org.apache.hadoop.fs.FileSystem; 29 import org.apache.hadoop.fs.Path; 30 import org.apache.hadoop.fs.permission.FsAction; 31 import org.apache.hadoop.fs.permission.FsPermission; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.commons.logging.Log; 34 import org.apache.hadoop.util.SSHRemoteExecution; 35 36 import java.util.Collection; 37 import java.util.Hashtable; 38 39 import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; 40 import org.apache.hadoop.mapreduce.test.system.JTProtocol; 41 import org.apache.hadoop.mapreduce.test.system.JobInfo; 42 import org.apache.hadoop.mapreduce.test.system.MRCluster; 43 import org.apache.hadoop.mapreduce.test.system.TTClient; 44 import org.apache.hadoop.mapreduce.test.system.JTClient; 45 import org.apache.hadoop.mapreduce.test.system.TTProtocol; 46 import org.apache.hadoop.mapreduce.test.system.TTTaskInfo; 47 import org.apache.hadoop.mapreduce.test.system.TaskInfo; 48 import testjar.GenerateTaskChildProcess; 49 50 public class TestChildsKillingOfSuspendTask { 51 private static final Log LOG = LogFactory 52 .getLog(TestChildsKillingOfSuspendTask.class); 53 private static Configuration conf = new Configuration(); 54 private static MRCluster cluster; 55 private static Path inputDir = new Path("input"); 56 private static Path outputDir = new Path("output"); 57 private static String confFile = "mapred-site.xml"; 58 59 @BeforeClass before()60 public static void before() throws Exception { 61 Hashtable<String,Object> prop = new Hashtable<String,Object>(); 62 prop.put("mapred.map.max.attempts",1L); 63 prop.put("mapred.task.timeout",30000L); 64 prop.put("mapreduce.job.complete.cancel.delegation.tokens", false); 65 String [] expExcludeList = {"java.net.ConnectException", 66 "java.io.IOException","org.apache.hadoop.metrics2.MetricsException"}; 67 cluster = MRCluster.createCluster(conf); 68 cluster.setExcludeExpList(expExcludeList); 69 cluster.setUp(); 70 cluster.restartClusterWithNewConfig(prop, confFile); 71 UtilsForTests.waitFor(1000); 72 conf = cluster.getJTClient().getProxy().getDaemonConf(); 73 createInput(inputDir, conf); 74 } 75 @AfterClass after()76 public static void after() throws Exception { 77 cleanup(inputDir, conf); 78 cleanup(outputDir, conf); 79 cluster.tearDown(); 80 // cluster.restart(); 81 } 82 83 /** 84 * Verify the process tree clean up of a task after 85 * task is suspended and wait till the task is 86 * terminated based on timeout. 87 */ 88 @Test testProcessTreeCleanupOfSuspendTask()89 public void testProcessTreeCleanupOfSuspendTask() throws 90 Exception { 91 TaskInfo taskInfo = null; 92 TaskID tID = null; 93 TTTaskInfo [] ttTaskinfo = null; 94 String pid = null; 95 TTProtocol ttIns = null; 96 TTClient ttClientIns = null; 97 int counter = 0; 98 99 JobConf jobConf = new JobConf(conf); 100 jobConf.setJobName("Message Display"); 101 jobConf.setJarByClass(GenerateTaskChildProcess.class); 102 jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class); 103 jobConf.setNumMapTasks(1); 104 jobConf.setNumReduceTasks(0); 105 jobConf.setMaxMapAttempts(1); 106 cleanup(outputDir, conf); 107 FileInputFormat.setInputPaths(jobConf, inputDir); 108 FileOutputFormat.setOutputPath(jobConf, outputDir); 109 110 JTClient jtClient = cluster.getJTClient(); 111 JobClient client = jtClient.getClient(); 112 JTProtocol wovenClient = cluster.getJTClient().getProxy(); 113 RunningJob runJob = client.submitJob(jobConf); 114 JobID id = runJob.getID(); 115 JobInfo jInfo = wovenClient.getJobInfo(id); 116 Assert.assertNotNull("Job information is null",jInfo); 117 118 Assert.assertTrue("Job has not been started for 1 min.", 119 jtClient.isJobStarted(id)); 120 JobStatus[] jobStatus = client.getAllJobs(); 121 String userName = jobStatus[0].getUsername(); 122 123 TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); 124 for (TaskInfo taskinfo : taskInfos) { 125 if (!taskinfo.isSetupOrCleanup()) { 126 taskInfo = taskinfo; 127 break; 128 } 129 } 130 131 Assert.assertTrue("Task has not been started for 1 min.", 132 jtClient.isTaskStarted(taskInfo)); 133 134 tID = TaskID.downgrade(taskInfo.getTaskID()); 135 TaskAttemptID tAttID = new TaskAttemptID(tID,0); 136 FinishTaskControlAction action = new FinishTaskControlAction(tID); 137 138 Collection<TTClient> ttClients = cluster.getTTClients(); 139 for (TTClient ttClient : ttClients) { 140 TTProtocol tt = ttClient.getProxy(); 141 tt.sendAction(action); 142 ttTaskinfo = tt.getTasks(); 143 for (TTTaskInfo tttInfo : ttTaskinfo) { 144 if (!tttInfo.isTaskCleanupTask()) { 145 pid = tttInfo.getPid(); 146 ttClientIns = ttClient; 147 ttIns = tt; 148 break; 149 } 150 } 151 if (ttClientIns != null) { 152 break; 153 } 154 } 155 Assert.assertTrue("Map process tree is not alive before task suspend.", 156 ttIns.isProcessTreeAlive(pid)); 157 LOG.info("Suspend the task of process id " + pid); 158 ExecuteShellCommand execcmd = new ExecuteShellCommand(userName, 159 ttClientIns.getHostName(), "kill -SIGSTOP " + pid); 160 execcmd.start(); 161 execcmd.join(); 162 UtilsForTests.waitFor(30000); 163 Assert.assertTrue("Process(" + pid + ") has not been suspended", 164 execcmd.getStatus()); 165 ttIns = ttClientIns.getProxy(); 166 UtilsForTests.waitFor(1000); 167 Assert.assertTrue("Map process is still alive after task has been failed.", 168 !ttIns.isProcessTreeAlive(pid)); 169 } 170 171 /** 172 * Verify the process tree cleanup of task after task 173 * is suspended and resumed the task before the timeout. 174 */ 175 @Test testProcessTreeCleanupOfSuspendAndResumeTask()176 public void testProcessTreeCleanupOfSuspendAndResumeTask() throws 177 Exception { 178 TaskInfo taskInfo = null; 179 TaskID tID = null; 180 TTTaskInfo [] ttTaskinfo = null; 181 String pid = null; 182 TTProtocol ttIns = null; 183 TTClient ttClientIns = null; 184 int counter = 0; 185 186 JobConf jobConf = new JobConf(conf); 187 jobConf.setJobName("Message Display"); 188 jobConf.setJarByClass(GenerateTaskChildProcess.class); 189 jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class); 190 jobConf.setNumMapTasks(1); 191 jobConf.setNumReduceTasks(0); 192 jobConf.setMaxMapAttempts(1); 193 cleanup(outputDir, conf); 194 FileInputFormat.setInputPaths(jobConf, inputDir); 195 FileOutputFormat.setOutputPath(jobConf, outputDir); 196 197 JTClient jtClient = cluster.getJTClient(); 198 JobClient client = jtClient.getClient(); 199 JTProtocol wovenClient = cluster.getJTClient().getProxy(); 200 RunningJob runJob = client.submitJob(jobConf); 201 JobID id = runJob.getID(); 202 JobInfo jInfo = wovenClient.getJobInfo(id); 203 Assert.assertNotNull("Job information is null",jInfo); 204 205 Assert.assertTrue("Job has not been started for 1 min.", 206 jtClient.isJobStarted(id)); 207 208 JobStatus[] jobStatus = client.getAllJobs(); 209 String userName = jobStatus[0].getUsername(); 210 211 TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); 212 for (TaskInfo taskinfo : taskInfos) { 213 if (!taskinfo.isSetupOrCleanup()) { 214 taskInfo = taskinfo; 215 break; 216 } 217 } 218 219 Assert.assertTrue("Task has not been started for 1 min.", 220 jtClient.isTaskStarted(taskInfo)); 221 222 tID = TaskID.downgrade(taskInfo.getTaskID()); 223 TaskAttemptID tAttID = new TaskAttemptID(tID,0); 224 FinishTaskControlAction action = new FinishTaskControlAction(tID); 225 226 Collection<TTClient> ttClients = cluster.getTTClients(); 227 for (TTClient ttClient : ttClients) { 228 TTProtocol tt = ttClient.getProxy(); 229 tt.sendAction(action); 230 ttTaskinfo = tt.getTasks(); 231 for (TTTaskInfo tttInfo : ttTaskinfo) { 232 if (!tttInfo.isTaskCleanupTask()) { 233 pid = tttInfo.getPid(); 234 ttClientIns = ttClient; 235 ttIns = tt; 236 break; 237 } 238 } 239 if (ttClientIns != null) { 240 break; 241 } 242 } 243 Assert.assertTrue("Map process tree is not alive before task suspend.", 244 ttIns.isProcessTreeAlive(pid)); 245 LOG.info("Suspend the task of process id " + pid); 246 ExecuteShellCommand execcmd = new ExecuteShellCommand(userName, 247 ttClientIns.getHostName(), "kill -SIGSTOP " + pid); 248 execcmd.start(); 249 execcmd.join(); 250 251 Assert.assertTrue("Process(" + pid + ") has not been suspended", 252 execcmd.getStatus()); 253 Assert.assertTrue("Map process is not alive after task " 254 + "has been suspended.", ttIns.isProcessTreeAlive(pid)); 255 UtilsForTests.waitFor(5000); 256 ExecuteShellCommand execcmd1 = new ExecuteShellCommand(userName, 257 ttClientIns.getHostName(), "kill -SIGCONT " + pid); 258 execcmd1.start(); 259 execcmd1.join(); 260 Assert.assertTrue("Suspended process(" + pid + ") has not been resumed", 261 execcmd1.getStatus()); 262 UtilsForTests.waitFor(5000); 263 Assert.assertTrue("Map process tree is not alive after task is resumed.", 264 ttIns.isProcessTreeAlive(pid)); 265 } 266 cleanup(Path dir, Configuration conf)267 private static void cleanup(Path dir, Configuration conf) throws 268 IOException { 269 FileSystem fs = dir.getFileSystem(conf); 270 fs.delete(dir, true); 271 } 272 createInput(Path inDir, Configuration conf)273 private static void createInput(Path inDir, Configuration conf) throws 274 IOException { 275 String input = "Hadoop is framework for data intensive distributed " 276 + "applications.\n Hadoop enables applications " 277 + "to work with thousands of nodes."; 278 FileSystem fs = inDir.getFileSystem(conf); 279 if (!fs.mkdirs(inDir)) { 280 throw new IOException("Failed to create the input directory:" 281 + inDir.toString()); 282 } 283 fs.setPermission(inDir, new FsPermission(FsAction.ALL, 284 FsAction.ALL, FsAction.ALL)); 285 DataOutputStream file = fs.create(new Path(inDir, "data.txt")); 286 int i = 0; 287 while(i < 10) { 288 file.writeBytes(input); 289 i++; 290 } 291 file.close(); 292 } 293 294 class ExecuteShellCommand extends Thread { 295 String userName; 296 String cmd; 297 String hostName; 298 boolean exitStatus; ExecuteShellCommand(String userName, String hostName, String cmd)299 public ExecuteShellCommand(String userName, String hostName, String cmd) { 300 this.userName = userName; 301 this.hostName = hostName; 302 this.cmd = cmd; 303 } run()304 public void run() { 305 try { 306 RemoteExecution rExec = new SSHRemoteExecution(); 307 rExec.executeCommand(hostName, userName, cmd); 308 exitStatus = true; 309 } catch(InterruptedException iexp) { 310 LOG.warn("Thread is interrupted:" + iexp.getMessage()); 311 exitStatus = false; 312 } catch(Exception exp) { 313 LOG.warn("Exception:" + exp.getMessage()); 314 exitStatus = false; 315 } 316 } getStatus()317 public boolean getStatus(){ 318 return exitStatus; 319 } 320 } 321 } 322