1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.IOException; 22 import java.security.PrivilegedExceptionAction; 23 import java.util.Set; 24 import java.util.TreeSet; 25 26 import java.io.File; 27 import java.util.Properties; 28 29 import javax.security.auth.login.LoginException; 30 31 import junit.framework.TestCase; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.examples.SleepJob; 36 import org.apache.hadoop.fs.FileSystem; 37 import org.apache.hadoop.fs.Path; 38 import org.apache.hadoop.hdfs.MiniDFSCluster; 39 import org.apache.hadoop.mapred.QueueManager.QueueACL; 40 import org.apache.hadoop.security.UserGroupInformation; 41 42 public class TestQueueManager extends TestCase { 43 44 private static final Log LOG = LogFactory.getLog(TestQueueManager.class); 45 46 String submitAcl = QueueACL.SUBMIT_JOB.getAclName(); 47 String adminAcl = QueueACL.ADMINISTER_JOBS.getAclName(); 48 49 MiniDFSCluster miniDFSCluster; 50 MiniMRCluster miniMRCluster = null; 51 createNecessaryUsers()52 UserGroupInformation createNecessaryUsers() throws IOException { 53 // Create a fake user for all processes to execute within 54 UserGroupInformation ugi = UserGroupInformation.createUserForTesting("Zork", 55 new String [] {"ZorkGroup"}); 56 return ugi; 57 } 58 testDefaultQueueConfiguration()59 public void testDefaultQueueConfiguration() { 60 JobConf conf = new JobConf(); 61 QueueManager qMgr = new QueueManager(conf); 62 Set<String> expQueues = new TreeSet<String>(); 63 expQueues.add("default"); 64 verifyQueues(expQueues, qMgr.getQueues()); 65 // pass true so it will fail if the key is not found. 66 assertFalse(conf.getBoolean(JobConf.MR_ACLS_ENABLED, true)); 67 } 68 testMultipleQueues()69 public void testMultipleQueues() { 70 JobConf conf = new JobConf(); 71 conf.set("mapred.queue.names", "q1,q2,Q3"); 72 QueueManager qMgr = new QueueManager(conf); 73 Set<String> expQueues = new TreeSet<String>(); 74 expQueues.add("q1"); 75 expQueues.add("q2"); 76 expQueues.add("Q3"); 77 verifyQueues(expQueues, qMgr.getQueues()); 78 } 79 testSchedulerInfo()80 public void testSchedulerInfo() { 81 JobConf conf = new JobConf(); 82 conf.set("mapred.queue.names", "qq1,qq2"); 83 QueueManager qMgr = new QueueManager(conf); 84 qMgr.setSchedulerInfo("qq1", "queueInfoForqq1"); 85 qMgr.setSchedulerInfo("qq2", "queueInfoForqq2"); 86 assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2"); 87 assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1"); 88 } 89 testAllEnabledACLForJobSubmission()90 public void testAllEnabledACLForJobSubmission() 91 throws IOException, InterruptedException { 92 try { 93 JobConf conf = setupConf(QueueManager.toFullPropertyName( 94 "default", submitAcl), "*"); 95 UserGroupInformation ugi = createNecessaryUsers(); 96 String[] groups = ugi.getGroupNames(); 97 verifyJobSubmissionToDefaultQueue(conf, true, 98 ugi.getShortUserName() + "," + groups[groups.length-1]); 99 } finally { 100 tearDownCluster(); 101 } 102 } 103 testAllDisabledACLForJobSubmission()104 public void testAllDisabledACLForJobSubmission() 105 throws IOException, InterruptedException { 106 try { 107 createNecessaryUsers(); 108 JobConf conf = setupConf(QueueManager.toFullPropertyName( 109 "default", submitAcl), " "); 110 String userName = "user1"; 111 String groupName = "group1"; 112 verifyJobSubmissionToDefaultQueue(conf, false, userName + "," + groupName); 113 114 // Check if admins can submit job 115 String user2 = "user2"; 116 String group2 = "group2"; 117 conf.set(JobConf.MR_ADMINS, user2 + " " + groupName); 118 tearDownCluster(); 119 verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName); 120 verifyJobSubmissionToDefaultQueue(conf, true, user2 + "," + group2); 121 122 // Check if MROwner (user who started the mapreduce cluster) can submit 123 // job. By passing null as userInfo we fallback to using the 124 // UserGroupInformation.getCurrentUser() what is the intent of the test. 125 verifyJobSubmissionToDefaultQueue(conf, true, null); 126 } finally { 127 tearDownCluster(); 128 } 129 } 130 testUserDisabledACLForJobSubmission()131 public void testUserDisabledACLForJobSubmission() 132 throws IOException, InterruptedException { 133 try { 134 JobConf conf = setupConf(QueueManager.toFullPropertyName( 135 "default", submitAcl), "3698-non-existent-user"); 136 verifyJobSubmissionToDefaultQueue(conf, false, "user1,group1"); 137 } finally { 138 tearDownCluster(); 139 } 140 } 141 testSubmissionToInvalidQueue()142 public void testSubmissionToInvalidQueue() 143 throws IOException, InterruptedException{ 144 try { 145 JobConf conf = new JobConf(); 146 conf.set("mapred.queue.names","default"); 147 setUpCluster(conf); 148 String queueName = "q1"; 149 try { 150 submitSleepJob(1, 1, 100, 100, true, null, queueName); 151 } catch (IOException ioe) { 152 assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist")); 153 return; 154 } finally { 155 tearDownCluster(); 156 } 157 fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception "); 158 } finally { 159 tearDownCluster(); 160 } 161 } 162 testUserEnabledACLForJobSubmission()163 public void testUserEnabledACLForJobSubmission() 164 throws IOException, LoginException, InterruptedException { 165 try { 166 String userName = "user1"; 167 JobConf conf 168 = setupConf(QueueManager.toFullPropertyName 169 ("default", submitAcl), "3698-junk-user," + userName 170 + " 3698-junk-group1,3698-junk-group2"); 171 verifyJobSubmissionToDefaultQueue(conf, true, userName+",group1"); 172 } finally { 173 tearDownCluster(); 174 } 175 } 176 177 /** 178 * Test to verify refreshing of queue properties by using MRAdmin tool. 179 * 180 * @throws Exception 181 */ testStateRefresh()182 public void testStateRefresh() throws Exception { 183 String queueConfigPath = 184 System.getProperty("test.build.extraconf", "build/test/extraconf"); 185 File queueConfigFile = 186 new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME); 187 try { 188 //Setting up default mapred-site.xml 189 Properties queueConfProps = new Properties(); 190 //these properties should be retained. 191 queueConfProps.put("mapred.queue.names", "default,qu1"); 192 queueConfProps.put("mapred.acls.enabled", "true"); 193 //These property should always be overridden 194 queueConfProps.put("mapred.queue.default.state", "RUNNING"); 195 queueConfProps.put("mapred.queue.qu1.state", "STOPPED"); 196 UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile); 197 198 //Create a new configuration to be used with QueueManager 199 JobConf conf = new JobConf(); 200 setUpCluster(conf); 201 QueueManager queueManager = 202 miniMRCluster.getJobTrackerRunner().getJobTracker().getQueueManager(); 203 204 RunningJob job = submitSleepJob(1, 1, 100, 100, true,null, "default" ); 205 assertTrue(job.isSuccessful()); 206 207 try { 208 submitSleepJob(1, 1, 100, 100, true,null, "qu1" ); 209 fail("submit job in default queue should be failed "); 210 } catch (Exception e) { 211 assertTrue(e.getMessage().contains( 212 "Queue \"" + "qu1" + "\" is not running")); 213 } 214 215 // verify state of queues before refresh 216 JobQueueInfo queueInfo = queueManager.getJobQueueInfo("default"); 217 assertEquals(Queue.QueueState.RUNNING.getStateName(), 218 queueInfo.getQueueState()); 219 queueInfo = queueManager.getJobQueueInfo("qu1"); 220 assertEquals(Queue.QueueState.STOPPED.getStateName(), 221 queueInfo.getQueueState()); 222 223 queueConfProps.put("mapred.queue.default.state", "STOPPED"); 224 queueConfProps.put("mapred.queue.qu1.state", "RUNNING"); 225 UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile); 226 227 //refresh configuration 228 queueManager.refreshQueues(conf); 229 230 //Job Submission should pass now because ugi to be used is set to blank. 231 try { 232 submitSleepJob(1, 1, 100, 100, true,null,"qu1"); 233 } catch (Exception e) { 234 fail("submit job in qu1 queue should be sucessful "); 235 } 236 237 try { 238 submitSleepJob(1, 1, 100, 100, true,null, "default" ); 239 fail("submit job in default queue should be failed "); 240 } catch (Exception e){ 241 assertTrue(e.getMessage().contains( 242 "Queue \"" + "default" + "\" is not running")); 243 } 244 245 // verify state of queues after refresh 246 queueInfo = queueManager.getJobQueueInfo("default"); 247 assertEquals(Queue.QueueState.STOPPED.getStateName(), 248 queueInfo.getQueueState()); 249 queueInfo = queueManager.getJobQueueInfo("qu1"); 250 assertEquals(Queue.QueueState.RUNNING.getStateName(), 251 queueInfo.getQueueState()); 252 } finally{ 253 if(queueConfigFile.exists()) { 254 queueConfigFile.delete(); 255 } 256 this.tearDownCluster(); 257 } 258 } 259 setupConf(String aclName, String aclValue)260 JobConf setupConf(String aclName, String aclValue) { 261 JobConf conf = new JobConf(); 262 conf.setBoolean(JobConf.MR_ACLS_ENABLED, true); 263 conf.set(aclName, aclValue); 264 return conf; 265 } 266 verifyQueues(Set<String> expectedQueues, Set<String> actualQueues)267 void verifyQueues(Set<String> expectedQueues, 268 Set<String> actualQueues) { 269 assertEquals(expectedQueues.size(), actualQueues.size()); 270 for (String queue : expectedQueues) { 271 assertTrue(actualQueues.contains(queue)); 272 } 273 } 274 275 /** 276 * Verify job submission as given user to the default queue 277 */ verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed, String userInfo)278 void verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed, 279 String userInfo) throws IOException, InterruptedException { 280 verifyJobSubmission(conf, shouldSucceed, userInfo, "default"); 281 } 282 283 /** 284 * Verify job submission as given user to the given queue 285 */ verifyJobSubmission(JobConf conf, boolean shouldSucceed, String userInfo, String queue)286 void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 287 String userInfo, String queue) throws IOException, InterruptedException { 288 setUpCluster(conf); 289 try { 290 runAndVerifySubmission(conf, shouldSucceed, queue, userInfo); 291 } finally { 292 // tearDownCluster(); 293 } 294 } 295 296 /** 297 * Verify if submission of job to the given queue will succeed or not 298 */ runAndVerifySubmission(JobConf conf, boolean shouldSucceed, String queue, String userInfo)299 void runAndVerifySubmission(JobConf conf, boolean shouldSucceed, 300 String queue, String userInfo) 301 throws IOException, InterruptedException { 302 try { 303 RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, userInfo, queue); 304 if (shouldSucceed) { 305 assertTrue(rjob.isSuccessful()); 306 } else { 307 fail("Job submission should have failed."); 308 } 309 } catch (IOException ioe) { 310 if (shouldSucceed) { 311 throw ioe; 312 } else { 313 LOG.info("exception while submitting job: " + ioe.getMessage()); 314 assertTrue(ioe.getMessage(). 315 contains("cannot perform operation " + 316 "SUBMIT_JOB on queue " + queue)); 317 // check if the system directory gets cleaned up or not 318 JobTracker jobtracker = miniMRCluster.getJobTrackerRunner().getJobTracker(); 319 Path sysDir = new Path(jobtracker.getSystemDir()); 320 FileSystem fs = sysDir.getFileSystem(conf); 321 int size = fs.listStatus(sysDir).length; 322 while (size > 1) { // ignore the jobtracker.info file 323 System.out.println("Waiting for the job files in sys directory to be cleaned up"); 324 UtilsForTests.waitFor(100); 325 size = fs.listStatus(sysDir).length; 326 } 327 } 328 } finally { 329 // tearDownCluster(); 330 } 331 } 332 333 /** 334 * Submit job as current user and kill the job as user of ugi. 335 * @param ugi {@link UserGroupInformation} of user who tries to kill the job 336 * @param conf JobConf for the job 337 * @param shouldSucceed Should the killing of job be succeeded ? 338 * @throws IOException 339 * @throws InterruptedException 340 */ verifyJobKill(UserGroupInformation ugi, JobConf conf, boolean shouldSucceed)341 void verifyJobKill(UserGroupInformation ugi, JobConf conf, 342 boolean shouldSucceed) throws IOException, InterruptedException { 343 setUpCluster(conf); 344 try { 345 RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false); 346 assertFalse(rjob.isComplete()); 347 while(rjob.mapProgress() == 0.0f) { 348 try { 349 Thread.sleep(10); 350 } catch (InterruptedException ie) { 351 break; 352 } 353 } 354 conf.set("mapred.job.tracker", "localhost:" 355 + miniMRCluster.getJobTrackerPort()); 356 final String jobId = rjob.getJobID(); 357 ugi.doAs(new PrivilegedExceptionAction<Object>() { 358 359 @Override 360 public Object run() throws Exception { 361 RunningJob runningJob = 362 new JobClient(miniMRCluster.createJobConf()).getJob(jobId); 363 runningJob.killJob(); 364 return null; 365 } 366 }); 367 368 while(rjob.cleanupProgress() == 0.0f) { 369 try { 370 Thread.sleep(10); 371 } catch (InterruptedException ie) { 372 break; 373 } 374 } 375 if (shouldSucceed) { 376 assertTrue(rjob.isComplete()); 377 } else { 378 fail("Job kill should have failed."); 379 } 380 } catch (IOException ioe) { 381 if (shouldSucceed) { 382 throw ioe; 383 } else { 384 LOG.info("exception while submitting/killing job: " + ioe.getMessage()); 385 assertTrue(ioe.getMessage(). 386 contains(" cannot perform operation KILL_JOB on ")); 387 } 388 } finally { 389 // tearDownCluster(); 390 } 391 } 392 393 verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed, String otherUserInfo)394 void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed, 395 String otherUserInfo) 396 throws IOException, InterruptedException { 397 setUpCluster(conf); 398 try { 399 // submit a job as another user. 400 String userInfo = otherUserInfo; 401 RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo); 402 assertFalse(rjob.isComplete()); 403 404 //try to kill as self 405 try { 406 conf.set("mapred.job.tracker", "localhost:" 407 + miniMRCluster.getJobTrackerPort()); 408 JobClient client = new JobClient(miniMRCluster.createJobConf()); 409 client.getJob(rjob.getID()).killJob(); 410 if (!shouldSucceed) { 411 fail("should fail kill operation"); 412 } 413 } catch (IOException ioe) { 414 if (shouldSucceed) { 415 throw ioe; 416 } 417 //verify it fails 418 LOG.info("exception while killing job: " + ioe.getMessage()); 419 assertTrue(ioe.getMessage(). 420 contains("cannot perform operation " + 421 "KILL_JOB on queue default")); 422 } 423 //wait for job to complete on its own 424 while (!rjob.isComplete()) { 425 try { 426 Thread.sleep(1000); 427 } catch (InterruptedException ie) { 428 break; 429 } 430 } 431 } finally { 432 // tearDownCluster(); 433 } 434 } 435 436 /** 437 * Submit job as current user and try to change priority of that job as 438 * another user. 439 * @param otherUGI user who will try to change priority of job 440 * @param conf jobConf for the job 441 * @param shouldSucceed Should the changing of priority of job be succeeded ? 442 * @throws IOException 443 * @throws InterruptedException 444 */ verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI, JobConf conf, final boolean shouldSucceed)445 void verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI, 446 JobConf conf, final boolean shouldSucceed) 447 throws IOException, InterruptedException { 448 setUpCluster(conf); 449 try { 450 // submit job as current user. 451 UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); 452 String[] groups = ugi.getGroupNames(); 453 String userInfo = ugi.getShortUserName() + "," + 454 groups[groups.length - 1]; 455 final RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo); 456 assertFalse(rjob.isComplete()); 457 458 conf.set("mapred.job.tracker", "localhost:" 459 + miniMRCluster.getJobTrackerPort()); 460 // try to change priority as other user 461 otherUGI.doAs(new PrivilegedExceptionAction<Object>() { 462 463 @Override 464 public Object run() throws Exception { 465 try { 466 JobClient client = new JobClient(miniMRCluster.createJobConf()); 467 client.getJob(rjob.getID()).setJobPriority("VERY_LOW"); 468 if (!shouldSucceed) { 469 fail("changing priority should fail."); 470 } 471 } catch (IOException ioe) { 472 //verify it fails 473 LOG.info("exception while changing priority of job: " + 474 ioe.getMessage()); 475 assertTrue(ioe.getMessage(). 476 contains(" cannot perform operation SET_JOB_PRIORITY on ")); 477 } 478 return null; 479 } 480 }); 481 //wait for job to complete on its own 482 while (!rjob.isComplete()) { 483 try { 484 Thread.sleep(1000); 485 } catch (InterruptedException ie) { 486 break; 487 } 488 } 489 } finally { 490 // tearDownCluster(); 491 } 492 } 493 setUpCluster(JobConf conf)494 void setUpCluster(JobConf conf) throws IOException { 495 if(miniMRCluster == null) { 496 miniDFSCluster = new MiniDFSCluster(conf, 1, true, null); 497 FileSystem fileSys = miniDFSCluster.getFileSystem(); 498 TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, "/user"); 499 TestMiniMRWithDFSWithDistinctUsers.mkdir 500 (fileSys, conf.get("mapreduce.jobtracker.staging.root.dir", 501 "/tmp/hadoop/mapred/staging")); 502 String namenode = fileSys.getUri().toString(); 503 miniMRCluster = new MiniMRCluster(1, namenode, 3, 504 null, null, conf); 505 } 506 } 507 tearDownCluster()508 void tearDownCluster() throws IOException { 509 if (miniMRCluster != null) { 510 long mrTeardownStart = new java.util.Date().getTime(); 511 if (miniMRCluster != null) { miniMRCluster.shutdown(); } 512 long mrTeardownEnd = new java.util.Date().getTime(); 513 if (miniDFSCluster != null) { miniDFSCluster.shutdown(); } 514 long dfsTeardownEnd = new java.util.Date().getTime(); 515 miniMRCluster = null; 516 miniDFSCluster = null; 517 System.err.println("An MR teardown took " 518 + (mrTeardownEnd - mrTeardownStart) 519 + " milliseconds. A DFS teardown took " 520 + ( dfsTeardownEnd - mrTeardownEnd ) 521 + " milliseconds."); 522 } 523 } 524 submitSleepJob(int numMappers, int numReducers, long mapSleepTime, long reduceSleepTime, boolean shouldComplete)525 RunningJob submitSleepJob(int numMappers, int numReducers, 526 long mapSleepTime, long reduceSleepTime, 527 boolean shouldComplete) 528 throws IOException, InterruptedException { 529 return submitSleepJob(numMappers, numReducers, mapSleepTime, 530 reduceSleepTime, shouldComplete, null); 531 } 532 submitSleepJob(int numMappers, int numReducers, long mapSleepTime, long reduceSleepTime, boolean shouldComplete, String userInfo)533 RunningJob submitSleepJob(int numMappers, int numReducers, 534 long mapSleepTime, long reduceSleepTime, 535 boolean shouldComplete, String userInfo) 536 throws IOException, InterruptedException { 537 return submitSleepJob(numMappers, numReducers, mapSleepTime, 538 reduceSleepTime, shouldComplete, userInfo, null); 539 } 540 submitSleepJob(final int numMappers, final int numReducers, final long mapSleepTime, final long reduceSleepTime, final boolean shouldComplete, String userInfo, String queueName)541 RunningJob submitSleepJob(final int numMappers, final int numReducers, 542 final long mapSleepTime, 543 final long reduceSleepTime, final boolean shouldComplete, String userInfo, 544 String queueName) 545 throws IOException, InterruptedException { 546 JobConf clientConf = new JobConf(); 547 clientConf.set("mapred.job.tracker", "localhost:" 548 + miniMRCluster.getJobTrackerPort()); 549 UserGroupInformation ugi; 550 SleepJob job = new SleepJob(); 551 job.setConf(clientConf); 552 clientConf = job.setupJobConf(numMappers, numReducers, 553 mapSleepTime, (int)mapSleepTime/100, 554 reduceSleepTime, (int)reduceSleepTime/100); 555 if (queueName != null) { 556 clientConf.setQueueName(queueName); 557 } 558 final JobConf jc = new JobConf(clientConf); 559 if (userInfo != null) { 560 String[] splits = userInfo.split(","); 561 String[] groups = new String[splits.length - 1]; 562 System.arraycopy(splits, 1, groups, 0, splits.length - 1); 563 ugi = UserGroupInformation.createUserForTesting(splits[0], groups); 564 } else { 565 ugi = UserGroupInformation.getCurrentUser(); 566 } 567 RunningJob rJob = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() { 568 public RunningJob run() throws IOException { 569 if (shouldComplete) { 570 return JobClient.runJob(jc); 571 } else { 572 // Job should be submitted as 'userInfo'. So both the client as well as 573 // the configuration should point to the same UGI. 574 return new JobClient(jc).submitJob(jc); 575 } 576 } 577 }); 578 return rJob; 579 } 580 581 } 582