1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.File; 22 import java.io.IOException; 23 import java.util.ArrayList; 24 import java.util.Arrays; 25 import java.util.List; 26 import java.util.regex.Pattern; 27 import java.util.regex.Matcher; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.fs.FileUtil; 32 import org.apache.hadoop.fs.Path; 33 import org.apache.hadoop.hdfs.MiniDFSCluster; 34 import org.apache.hadoop.hdfs.server.namenode.NameNode; 35 import org.apache.hadoop.examples.SleepJob; 36 import org.apache.hadoop.util.MemoryCalculatorPlugin; 37 import org.apache.hadoop.util.ProcfsBasedProcessTree; 38 import org.apache.hadoop.util.StringUtils; 39 import org.apache.hadoop.util.TestProcfsBasedProcessTree; 40 import org.apache.hadoop.util.ToolRunner; 41 import org.apache.hadoop.fs.FileSystem; 42 43 import junit.framework.TestCase; 44 45 /** 46 * Test class to verify memory management of tasks. 47 */ 48 public class TestTaskTrackerMemoryManager extends TestCase { 49 50 private static final Log LOG = 51 LogFactory.getLog(TestTaskTrackerMemoryManager.class); 52 private static String TEST_ROOT_DIR = new Path(System.getProperty( 53 "test.build.data", "/tmp")).toString().replace(' ', '+'); 54 55 private MiniMRCluster miniMRCluster; 56 57 private String taskOverLimitPatternString = 58 "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. " 59 + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task."; 60 startCluster(JobConf conf)61 private void startCluster(JobConf conf) 62 throws Exception { 63 conf.set("mapred.job.tracker.handler.count", "1"); 64 conf.set("mapred.tasktracker.map.tasks.maximum", "1"); 65 conf.set("mapred.tasktracker.reduce.tasks.maximum", "1"); 66 conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0"); 67 miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf); 68 } 69 70 @Override tearDown()71 protected void tearDown() { 72 if (miniMRCluster != null) { 73 miniMRCluster.shutdown(); 74 } 75 } 76 runSleepJob(JobConf conf)77 private void runSleepJob(JobConf conf) throws Exception { 78 String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" }; 79 ToolRunner.run(conf, new SleepJob(), args); 80 } 81 runAndCheckSuccessfulJob(JobConf conf)82 private void runAndCheckSuccessfulJob(JobConf conf) 83 throws IOException { 84 Pattern taskOverLimitPattern = 85 Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*")); 86 Matcher mat = null; 87 88 // Start the job. 89 boolean success = true; 90 try { 91 runSleepJob(conf); 92 success = true; 93 } catch (Exception e) { 94 success = false; 95 } 96 97 // Job has to succeed 98 assertTrue(success); 99 100 JobClient jClient = new JobClient(conf); 101 JobStatus[] jStatus = jClient.getAllJobs(); 102 JobStatus js = jStatus[0]; // Our only job 103 RunningJob rj = jClient.getJob(js.getJobID()); 104 105 // All events 106 TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0); 107 108 for (TaskCompletionEvent tce : taskComplEvents) { 109 String[] diagnostics = 110 rj.getTaskDiagnostics(tce.getTaskAttemptId()); 111 112 if (diagnostics != null) { 113 for (String str : diagnostics) { 114 mat = taskOverLimitPattern.matcher(str); 115 // The error pattern shouldn't be there in any TIP's diagnostics 116 assertFalse(mat.find()); 117 } 118 } 119 } 120 } 121 isProcfsBasedTreeAvailable()122 private boolean isProcfsBasedTreeAvailable() { 123 try { 124 if (!ProcfsBasedProcessTree.isAvailable()) { 125 LOG.info("Currently ProcessTree has only one implementation " 126 + "ProcfsBasedProcessTree, which is not available on this " 127 + "system. Not testing"); 128 return false; 129 } 130 } catch (Exception e) { 131 LOG.info(StringUtils.stringifyException(e)); 132 return false; 133 } 134 return true; 135 } 136 137 /** 138 * Test for verifying that nothing is killed when memory management is 139 * disabled on the TT, even when the tasks run over their limits. 140 * 141 * @throws Exception 142 */ testTTLimitsDisabled()143 public void testTTLimitsDisabled() 144 throws Exception { 145 // Run the test only if memory management is enabled 146 if (!isProcfsBasedTreeAvailable()) { 147 return; 148 } 149 150 // Task-memory management disabled by default. 151 startCluster(new JobConf()); 152 long PER_TASK_LIMIT = 1L; // Doesn't matter how low. 153 JobConf conf = miniMRCluster.createJobConf(); 154 conf.setMemoryForMapTask(PER_TASK_LIMIT); 155 conf.setMemoryForReduceTask(PER_TASK_LIMIT); 156 runAndCheckSuccessfulJob(conf); 157 } 158 159 /** 160 * Test for verifying that tasks within limits, with the cumulative usage also 161 * under TT's limits succeed. 162 * 163 * @throws Exception 164 */ testTasksWithinLimits()165 public void testTasksWithinLimits() 166 throws Exception { 167 // Run the test only if memory management is enabled 168 if (!isProcfsBasedTreeAvailable()) { 169 return; 170 } 171 172 // Large so that sleepjob goes through and fits total TT usage 173 long PER_TASK_LIMIT = 2 * 1024L; 174 175 // Start cluster with proper configuration. 176 JobConf fConf = new JobConf(); 177 fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 178 2 * 1024L); 179 fConf.setLong( 180 JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 181 2 * 1024L); 182 startCluster(new JobConf()); 183 184 JobConf conf = new JobConf(miniMRCluster.createJobConf()); 185 conf.setMemoryForMapTask(PER_TASK_LIMIT); 186 conf.setMemoryForReduceTask(PER_TASK_LIMIT); 187 runAndCheckSuccessfulJob(conf); 188 } 189 190 /** 191 * Test for verifying that tasks that go beyond limits get killed. 192 * 193 * @throws Exception 194 */ testTasksBeyondLimits()195 public void testTasksBeyondLimits() 196 throws Exception { 197 198 // Run the test only if memory management is enabled 199 if (!isProcfsBasedTreeAvailable()) { 200 return; 201 } 202 203 // Start cluster with proper configuration. 204 JobConf fConf = new JobConf(); 205 // very small value, so that no task escapes to successful completion. 206 fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", 207 String.valueOf(300)); 208 fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 209 2 * 1024); 210 fConf.setLong( 211 JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 212 2 * 1024); 213 startCluster(fConf); 214 runJobExceedingMemoryLimit(); 215 } 216 217 /** 218 * Runs tests with tasks beyond limit and using old configuration values for 219 * the TaskTracker. 220 * 221 * @throws Exception 222 */ 223 testTaskMemoryMonitoringWithDeprecatedConfiguration()224 public void testTaskMemoryMonitoringWithDeprecatedConfiguration () 225 throws Exception { 226 227 // Run the test only if memory management is enabled 228 if (!isProcfsBasedTreeAvailable()) { 229 return; 230 } 231 // Start cluster with proper configuration. 232 JobConf fConf = new JobConf(); 233 // very small value, so that no task escapes to successful completion. 234 fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", 235 String.valueOf(300)); 236 //set old values, max vm property per task and upper limit on the tasks 237 //vm 238 //setting the default maximum vmem property to 2 GB 239 fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, 240 (2L * 1024L * 1024L * 1024L)); 241 fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, 242 (3L * 1024L * 1024L * 1024L)); 243 startCluster(fConf); 244 runJobExceedingMemoryLimit(); 245 } 246 247 /** 248 * Runs a job which should fail the when run by the memory monitor. 249 * 250 * @throws IOException 251 */ runJobExceedingMemoryLimit()252 private void runJobExceedingMemoryLimit() throws IOException { 253 long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks. 254 255 Pattern taskOverLimitPattern = 256 Pattern.compile(String.format(taskOverLimitPatternString, String 257 .valueOf(PER_TASK_LIMIT*1024*1024L))); 258 Matcher mat = null; 259 260 // Set up job. 261 JobConf conf = new JobConf(miniMRCluster.createJobConf()); 262 conf.setMemoryForMapTask(PER_TASK_LIMIT); 263 conf.setMemoryForReduceTask(PER_TASK_LIMIT); 264 conf.setMaxMapAttempts(1); 265 conf.setMaxReduceAttempts(1); 266 267 // Start the job. 268 boolean success = true; 269 try { 270 runSleepJob(conf); 271 success = true; 272 } catch (Exception e) { 273 success = false; 274 } 275 276 // Job has to fail 277 assertFalse(success); 278 279 JobClient jClient = new JobClient(conf); 280 JobStatus[] jStatus = jClient.getAllJobs(); 281 JobStatus js = jStatus[0]; // Our only job 282 RunningJob rj = jClient.getJob(js.getJobID()); 283 284 // All events 285 TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0); 286 287 for (TaskCompletionEvent tce : taskComplEvents) { 288 // Every task HAS to fail 289 assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce 290 .getTaskStatus() == TaskCompletionEvent.Status.FAILED); 291 292 String[] diagnostics = 293 rj.getTaskDiagnostics(tce.getTaskAttemptId()); 294 295 // Every task HAS to spit out the out-of-memory errors 296 assert (diagnostics != null); 297 298 for (String str : diagnostics) { 299 mat = taskOverLimitPattern.matcher(str); 300 // Every task HAS to spit out the out-of-memory errors in the same 301 // format. And these are the only diagnostic messages. 302 assertTrue(mat.find()); 303 } 304 } 305 } 306 307 /** 308 * Test for verifying that tasks causing cumulative usage to go beyond TT's 309 * limit get killed even though they all are under individual limits. Memory 310 * management for tasks with disabled task-limits also traverses the same 311 * code-path, so we don't need a separate testTaskLimitsDisabled. 312 * 313 * @throws Exception 314 */ testTasksCumulativelyExceedingTTLimits()315 public void testTasksCumulativelyExceedingTTLimits() 316 throws Exception { 317 318 // Run the test only if memory management is enabled 319 if (!isProcfsBasedTreeAvailable()) { 320 return; 321 } 322 323 // Large enough for SleepJob Tasks. 324 long PER_TASK_LIMIT = 100 * 1024L; 325 326 // Start cluster with proper configuration. 327 JobConf fConf = new JobConf(); 328 fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 329 1L); 330 fConf.setLong( 331 JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L); 332 333 // Because of the above, the total tt limit is 2mb 334 long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L; 335 336 // very small value, so that no task escapes to successful completion. 337 fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", 338 String.valueOf(300)); 339 340 startCluster(fConf); 341 342 Pattern taskOverLimitPattern = Pattern.compile( 343 String.format(taskOverLimitPatternString, String.valueOf(PER_TASK_LIMIT))); 344 345 Pattern trackerOverLimitPattern = 346 Pattern.compile("Killing one of the least progress tasks - .*, as " 347 + "the cumulative memory usage of all the tasks on the TaskTracker" 348 + " host0.foo.com exceeds virtual memory limit " + TASK_TRACKER_LIMIT 349 + "."); 350 Matcher mat = null; 351 352 // Set up job. 353 JobConf conf = new JobConf(miniMRCluster.createJobConf()); 354 conf.setMemoryForMapTask(PER_TASK_LIMIT); 355 conf.setMemoryForReduceTask(PER_TASK_LIMIT); 356 357 JobClient jClient = new JobClient(conf); 358 SleepJob sleepJob = new SleepJob(); 359 sleepJob.setConf(conf); 360 // Start the job 361 RunningJob job = 362 jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000, 1, 1000, 1)); 363 boolean TTOverFlowMsgPresent = false; 364 while (true) { 365 List<TaskReport> allTaskReports = new ArrayList<TaskReport>(); 366 allTaskReports.addAll(Arrays.asList(jClient 367 .getSetupTaskReports((org.apache.hadoop.mapred.JobID) job.getID()))); 368 allTaskReports.addAll(Arrays.asList(jClient 369 .getMapTaskReports((org.apache.hadoop.mapred.JobID) job.getID()))); 370 for (TaskReport tr : allTaskReports) { 371 String[] diag = tr.getDiagnostics(); 372 for (String str : diag) { 373 mat = taskOverLimitPattern.matcher(str); 374 assertFalse(mat.find()); 375 mat = trackerOverLimitPattern.matcher(str); 376 if (mat.find()) { 377 TTOverFlowMsgPresent = true; 378 } 379 } 380 } 381 if (TTOverFlowMsgPresent) { 382 break; 383 } 384 try { 385 Thread.sleep(1000); 386 } catch (InterruptedException e) { 387 // nothing 388 } 389 } 390 // If it comes here without a test-timeout, it means there was a task that 391 // was killed because of crossing cumulative TT limit. 392 393 // Test succeeded, kill the job. 394 job.killJob(); 395 } 396 397 /** 398 * Test to verify the check for whether a process tree is over limit or not. 399 * @throws IOException if there was a problem setting up the 400 * fake procfs directories or files. 401 */ testProcessTreeLimits()402 public void testProcessTreeLimits() throws IOException { 403 404 // set up a dummy proc file system 405 File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); 406 String[] pids = { "100", "200", "300", "400", "500", "600", "700" }; 407 try { 408 TestProcfsBasedProcessTree.setupProcfsRootDir(procfsRootDir); 409 410 // create pid dirs. 411 TestProcfsBasedProcessTree.setupPidDirs(procfsRootDir, pids); 412 413 // create process infos. 414 TestProcfsBasedProcessTree.ProcessStatInfo[] procs = 415 new TestProcfsBasedProcessTree.ProcessStatInfo[7]; 416 417 // assume pids 100, 500 are in 1 tree 418 // 200,300,400 are in another 419 // 600,700 are in a third 420 procs[0] = new TestProcfsBasedProcessTree.ProcessStatInfo( 421 new String[] {"100", "proc1", "1", "100", "100", "100000"}); 422 procs[1] = new TestProcfsBasedProcessTree.ProcessStatInfo( 423 new String[] {"200", "proc2", "1", "200", "200", "200000"}); 424 procs[2] = new TestProcfsBasedProcessTree.ProcessStatInfo( 425 new String[] {"300", "proc3", "200", "200", "200", "300000"}); 426 procs[3] = new TestProcfsBasedProcessTree.ProcessStatInfo( 427 new String[] {"400", "proc4", "200", "200", "200", "400000"}); 428 procs[4] = new TestProcfsBasedProcessTree.ProcessStatInfo( 429 new String[] {"500", "proc5", "100", "100", "100", "1500000"}); 430 procs[5] = new TestProcfsBasedProcessTree.ProcessStatInfo( 431 new String[] {"600", "proc6", "1", "600", "600", "100000"}); 432 procs[6] = new TestProcfsBasedProcessTree.ProcessStatInfo( 433 new String[] {"700", "proc7", "600", "600", "600", "100000"}); 434 // write stat files. 435 TestProcfsBasedProcessTree.writeStatFiles(procfsRootDir, pids, procs); 436 437 // vmem limit 438 long limit = 700000; 439 440 // Create TaskMemoryMonitorThread 441 TaskMemoryManagerThread test = new TaskMemoryManagerThread(1000000L, 442 5000L); 443 // create process trees 444 // tree rooted at 100 is over limit immediately, as it is 445 // twice over the mem limit. 446 ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree( 447 "100", 448 procfsRootDir.getAbsolutePath()); 449 pTree.getProcessTree(); 450 assertTrue("tree rooted at 100 should be over limit " + 451 "after first iteration.", 452 test.isProcessTreeOverLimit(pTree, "dummyId", limit)); 453 454 // the tree rooted at 200 is initially below limit. 455 pTree = new ProcfsBasedProcessTree("200", 456 procfsRootDir.getAbsolutePath()); 457 pTree.getProcessTree(); 458 assertFalse("tree rooted at 200 shouldn't be over limit " + 459 "after one iteration.", 460 test.isProcessTreeOverLimit(pTree, "dummyId", limit)); 461 // second iteration - now the tree has been over limit twice, 462 // hence it should be declared over limit. 463 pTree.getProcessTree(); 464 assertTrue("tree rooted at 200 should be over limit after 2 iterations", 465 test.isProcessTreeOverLimit(pTree, "dummyId", limit)); 466 467 // the tree rooted at 600 is never over limit. 468 pTree = new ProcfsBasedProcessTree("600", 469 procfsRootDir.getAbsolutePath()); 470 pTree.getProcessTree(); 471 assertFalse("tree rooted at 600 should never be over limit.", 472 test.isProcessTreeOverLimit(pTree, "dummyId", limit)); 473 474 // another iteration does not make any difference. 475 pTree.getProcessTree(); 476 assertFalse("tree rooted at 600 should never be over limit.", 477 test.isProcessTreeOverLimit(pTree, "dummyId", limit)); 478 } finally { 479 FileUtil.fullyDelete(procfsRootDir); 480 } 481 } 482 } 483