1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import java.io.IOException;
22 import java.security.PrivilegedExceptionAction;
23 import java.util.Set;
24 import java.util.TreeSet;
25 
26 import java.io.File;
27 import java.util.Properties;
28 
29 import javax.security.auth.login.LoginException;
30 
31 import junit.framework.TestCase;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.examples.SleepJob;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hdfs.MiniDFSCluster;
39 import org.apache.hadoop.mapred.QueueManager.QueueACL;
40 import org.apache.hadoop.security.UserGroupInformation;
41 
42 public class TestQueueManager extends TestCase {
43 
44   private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
45 
46   String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
47   String adminAcl  = QueueACL.ADMINISTER_JOBS.getAclName();
48 
49   MiniDFSCluster miniDFSCluster;
50   MiniMRCluster miniMRCluster = null;
51 
createNecessaryUsers()52   UserGroupInformation createNecessaryUsers() throws IOException {
53     // Create a fake user for all processes to execute within
54     UserGroupInformation ugi = UserGroupInformation.createUserForTesting("Zork",
55                                                  new String [] {"ZorkGroup"});
56     return ugi;
57   }
58 
testDefaultQueueConfiguration()59   public void testDefaultQueueConfiguration() {
60     JobConf conf = new JobConf();
61     QueueManager qMgr = new QueueManager(conf);
62     Set<String> expQueues = new TreeSet<String>();
63     expQueues.add("default");
64     verifyQueues(expQueues, qMgr.getQueues());
65     // pass true so it will fail if the key is not found.
66     assertFalse(conf.getBoolean(JobConf.MR_ACLS_ENABLED, true));
67   }
68 
testMultipleQueues()69   public void testMultipleQueues() {
70     JobConf conf = new JobConf();
71     conf.set("mapred.queue.names", "q1,q2,Q3");
72     QueueManager qMgr = new QueueManager(conf);
73     Set<String> expQueues = new TreeSet<String>();
74     expQueues.add("q1");
75     expQueues.add("q2");
76     expQueues.add("Q3");
77     verifyQueues(expQueues, qMgr.getQueues());
78   }
79 
testSchedulerInfo()80   public void testSchedulerInfo() {
81     JobConf conf = new JobConf();
82     conf.set("mapred.queue.names", "qq1,qq2");
83     QueueManager qMgr = new QueueManager(conf);
84     qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
85     qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
86     assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
87     assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
88   }
89 
testAllEnabledACLForJobSubmission()90   public void testAllEnabledACLForJobSubmission()
91   throws IOException, InterruptedException {
92     try {
93       JobConf conf = setupConf(QueueManager.toFullPropertyName(
94                                                                "default", submitAcl), "*");
95       UserGroupInformation ugi = createNecessaryUsers();
96       String[] groups = ugi.getGroupNames();
97       verifyJobSubmissionToDefaultQueue(conf, true,
98                                         ugi.getShortUserName() + "," + groups[groups.length-1]);
99     } finally {
100       tearDownCluster();
101     }
102   }
103 
testAllDisabledACLForJobSubmission()104   public void testAllDisabledACLForJobSubmission()
105   throws IOException, InterruptedException {
106     try {
107       createNecessaryUsers();
108       JobConf conf = setupConf(QueueManager.toFullPropertyName(
109                                                                "default", submitAcl), " ");
110       String userName = "user1";
111       String groupName = "group1";
112       verifyJobSubmissionToDefaultQueue(conf, false, userName + "," + groupName);
113 
114       // Check if admins can submit job
115       String user2 = "user2";
116       String group2 = "group2";
117       conf.set(JobConf.MR_ADMINS, user2 + " " + groupName);
118       tearDownCluster();
119       verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
120       verifyJobSubmissionToDefaultQueue(conf, true, user2 + "," + group2);
121 
122       // Check if MROwner (user who started the mapreduce cluster) can submit
123       // job. By passing null as userInfo we fallback to using the
124       // UserGroupInformation.getCurrentUser() what is the intent of the test.
125       verifyJobSubmissionToDefaultQueue(conf, true, null);
126     } finally {
127       tearDownCluster();
128     }
129   }
130 
testUserDisabledACLForJobSubmission()131   public void testUserDisabledACLForJobSubmission()
132   throws IOException, InterruptedException {
133     try {
134       JobConf conf = setupConf(QueueManager.toFullPropertyName(
135                                                                "default", submitAcl), "3698-non-existent-user");
136       verifyJobSubmissionToDefaultQueue(conf, false, "user1,group1");
137     } finally {
138       tearDownCluster();
139     }
140   }
141 
testSubmissionToInvalidQueue()142   public void testSubmissionToInvalidQueue()
143   throws IOException, InterruptedException{
144     try {
145     JobConf conf = new JobConf();
146     conf.set("mapred.queue.names","default");
147     setUpCluster(conf);
148     String queueName = "q1";
149     try {
150       submitSleepJob(1, 1, 100, 100, true, null, queueName);
151     } catch (IOException ioe) {
152        assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist"));
153        return;
154     } finally {
155       tearDownCluster();
156     }
157     fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");
158     } finally {
159       tearDownCluster();
160     }
161   }
162 
testUserEnabledACLForJobSubmission()163   public void testUserEnabledACLForJobSubmission()
164   throws IOException, LoginException, InterruptedException {
165     try {
166       String userName = "user1";
167       JobConf conf
168         = setupConf(QueueManager.toFullPropertyName
169                     ("default", submitAcl), "3698-junk-user," + userName
170                     + " 3698-junk-group1,3698-junk-group2");
171       verifyJobSubmissionToDefaultQueue(conf, true, userName+",group1");
172     } finally {
173       tearDownCluster();
174     }
175   }
176 
177   /**
178    * Test to verify refreshing of queue properties by using MRAdmin tool.
179    *
180    * @throws Exception
181    */
testStateRefresh()182   public void testStateRefresh() throws Exception {
183     String queueConfigPath =
184         System.getProperty("test.build.extraconf", "build/test/extraconf");
185     File queueConfigFile =
186         new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
187     try {
188       //Setting up default mapred-site.xml
189       Properties queueConfProps = new Properties();
190       //these properties should be retained.
191       queueConfProps.put("mapred.queue.names", "default,qu1");
192       queueConfProps.put("mapred.acls.enabled", "true");
193       //These property should always be overridden
194       queueConfProps.put("mapred.queue.default.state", "RUNNING");
195       queueConfProps.put("mapred.queue.qu1.state", "STOPPED");
196       UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
197 
198       //Create a new configuration to be used with QueueManager
199       JobConf conf = new JobConf();
200       setUpCluster(conf);
201       QueueManager queueManager =
202         miniMRCluster.getJobTrackerRunner().getJobTracker().getQueueManager();
203 
204       RunningJob job = submitSleepJob(1, 1, 100, 100, true,null, "default" );
205       assertTrue(job.isSuccessful());
206 
207       try {
208         submitSleepJob(1, 1, 100, 100, true,null, "qu1" );
209         fail("submit job in default queue should be failed ");
210       } catch (Exception e) {
211         assertTrue(e.getMessage().contains(
212               "Queue \"" + "qu1" + "\" is not running"));
213       }
214 
215       // verify state of queues before refresh
216       JobQueueInfo queueInfo = queueManager.getJobQueueInfo("default");
217       assertEquals(Queue.QueueState.RUNNING.getStateName(),
218                     queueInfo.getQueueState());
219       queueInfo = queueManager.getJobQueueInfo("qu1");
220       assertEquals(Queue.QueueState.STOPPED.getStateName(),
221                     queueInfo.getQueueState());
222 
223       queueConfProps.put("mapred.queue.default.state", "STOPPED");
224       queueConfProps.put("mapred.queue.qu1.state", "RUNNING");
225       UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
226 
227       //refresh configuration
228       queueManager.refreshQueues(conf);
229 
230       //Job Submission should pass now because ugi to be used is set to blank.
231       try {
232         submitSleepJob(1, 1, 100, 100, true,null,"qu1");
233       } catch (Exception e) {
234         fail("submit job in qu1 queue should be sucessful ");
235       }
236 
237       try {
238         submitSleepJob(1, 1, 100, 100, true,null, "default" );
239         fail("submit job in default queue should be failed ");
240       } catch (Exception e){
241         assertTrue(e.getMessage().contains(
242               "Queue \"" + "default" + "\" is not running"));
243       }
244 
245       // verify state of queues after refresh
246       queueInfo = queueManager.getJobQueueInfo("default");
247       assertEquals(Queue.QueueState.STOPPED.getStateName(),
248                     queueInfo.getQueueState());
249       queueInfo = queueManager.getJobQueueInfo("qu1");
250       assertEquals(Queue.QueueState.RUNNING.getStateName(),
251                     queueInfo.getQueueState());
252     } finally{
253       if(queueConfigFile.exists()) {
254         queueConfigFile.delete();
255       }
256       this.tearDownCluster();
257     }
258   }
259 
setupConf(String aclName, String aclValue)260   JobConf setupConf(String aclName, String aclValue) {
261     JobConf conf = new JobConf();
262     conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
263     conf.set(aclName, aclValue);
264     return conf;
265   }
266 
verifyQueues(Set<String> expectedQueues, Set<String> actualQueues)267   void verifyQueues(Set<String> expectedQueues,
268                                           Set<String> actualQueues) {
269     assertEquals(expectedQueues.size(), actualQueues.size());
270     for (String queue : expectedQueues) {
271       assertTrue(actualQueues.contains(queue));
272     }
273   }
274 
275   /**
276    *  Verify job submission as given user to the default queue
277    */
verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed, String userInfo)278   void verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed,
279 		  String userInfo) throws IOException, InterruptedException {
280     verifyJobSubmission(conf, shouldSucceed, userInfo, "default");
281   }
282 
283   /**
284    * Verify job submission as given user to the given queue
285    */
verifyJobSubmission(JobConf conf, boolean shouldSucceed, String userInfo, String queue)286   void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
287       String userInfo, String queue) throws IOException, InterruptedException {
288     setUpCluster(conf);
289     try {
290       runAndVerifySubmission(conf, shouldSucceed, queue, userInfo);
291     } finally {
292       // tearDownCluster();
293     }
294   }
295 
296   /**
297    * Verify if submission of job to the given queue will succeed or not
298    */
runAndVerifySubmission(JobConf conf, boolean shouldSucceed, String queue, String userInfo)299   void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
300       String queue, String userInfo)
301       throws IOException, InterruptedException {
302     try {
303       RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, userInfo, queue);
304       if (shouldSucceed) {
305         assertTrue(rjob.isSuccessful());
306       } else {
307         fail("Job submission should have failed.");
308       }
309     } catch (IOException ioe) {
310       if (shouldSucceed) {
311         throw ioe;
312       } else {
313         LOG.info("exception while submitting job: " + ioe.getMessage());
314         assertTrue(ioe.getMessage().
315             contains("cannot perform operation " +
316             "SUBMIT_JOB on queue " + queue));
317         // check if the system directory gets cleaned up or not
318         JobTracker jobtracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
319         Path sysDir = new Path(jobtracker.getSystemDir());
320         FileSystem fs = sysDir.getFileSystem(conf);
321         int size = fs.listStatus(sysDir).length;
322         while (size > 1) { // ignore the jobtracker.info file
323           System.out.println("Waiting for the job files in sys directory to be cleaned up");
324           UtilsForTests.waitFor(100);
325           size = fs.listStatus(sysDir).length;
326         }
327       }
328     } finally {
329       // tearDownCluster();
330     }
331   }
332 
333   /**
334    * Submit job as current user and kill the job as user of ugi.
335    * @param ugi {@link UserGroupInformation} of user who tries to kill the job
336    * @param conf JobConf for the job
337    * @param shouldSucceed Should the killing of job be succeeded ?
338    * @throws IOException
339    * @throws InterruptedException
340    */
verifyJobKill(UserGroupInformation ugi, JobConf conf, boolean shouldSucceed)341   void verifyJobKill(UserGroupInformation ugi, JobConf conf,
342 		  boolean shouldSucceed) throws IOException, InterruptedException {
343     setUpCluster(conf);
344     try {
345       RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
346       assertFalse(rjob.isComplete());
347       while(rjob.mapProgress() == 0.0f) {
348         try {
349           Thread.sleep(10);
350         } catch (InterruptedException ie) {
351           break;
352         }
353       }
354       conf.set("mapred.job.tracker", "localhost:"
355               + miniMRCluster.getJobTrackerPort());
356       final String jobId = rjob.getJobID();
357       ugi.doAs(new PrivilegedExceptionAction<Object>() {
358 
359         @Override
360         public Object run() throws Exception {
361           RunningJob runningJob =
362         	  new JobClient(miniMRCluster.createJobConf()).getJob(jobId);
363           runningJob.killJob();
364           return null;
365         }
366       });
367 
368       while(rjob.cleanupProgress() == 0.0f) {
369         try {
370           Thread.sleep(10);
371         } catch (InterruptedException ie) {
372           break;
373         }
374       }
375       if (shouldSucceed) {
376         assertTrue(rjob.isComplete());
377       } else {
378         fail("Job kill should have failed.");
379       }
380     } catch (IOException ioe) {
381       if (shouldSucceed) {
382         throw ioe;
383       } else {
384         LOG.info("exception while submitting/killing job: " + ioe.getMessage());
385         assertTrue(ioe.getMessage().
386             contains(" cannot perform operation KILL_JOB on "));
387       }
388     } finally {
389       // tearDownCluster();
390     }
391   }
392 
393 
verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed, String otherUserInfo)394   void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
395                                         String otherUserInfo)
396                         throws IOException, InterruptedException {
397     setUpCluster(conf);
398     try {
399       // submit a job as another user.
400       String userInfo = otherUserInfo;
401       RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
402       assertFalse(rjob.isComplete());
403 
404       //try to kill as self
405       try {
406         conf.set("mapred.job.tracker", "localhost:"
407             + miniMRCluster.getJobTrackerPort());
408         JobClient client = new JobClient(miniMRCluster.createJobConf());
409         client.getJob(rjob.getID()).killJob();
410         if (!shouldSucceed) {
411           fail("should fail kill operation");
412         }
413       } catch (IOException ioe) {
414         if (shouldSucceed) {
415           throw ioe;
416         }
417         //verify it fails
418         LOG.info("exception while killing job: " + ioe.getMessage());
419         assertTrue(ioe.getMessage().
420                         contains("cannot perform operation " +
421                                     "KILL_JOB on queue default"));
422       }
423       //wait for job to complete on its own
424       while (!rjob.isComplete()) {
425         try {
426           Thread.sleep(1000);
427         } catch (InterruptedException ie) {
428           break;
429         }
430       }
431     } finally {
432       // tearDownCluster();
433     }
434   }
435 
436   /**
437    * Submit job as current user and try to change priority of that job as
438    * another user.
439    * @param otherUGI user who will try to change priority of job
440    * @param conf jobConf for the job
441    * @param shouldSucceed Should the changing of priority of job be succeeded ?
442    * @throws IOException
443    * @throws InterruptedException
444    */
verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI, JobConf conf, final boolean shouldSucceed)445   void verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI,
446       JobConf conf, final boolean shouldSucceed)
447       throws IOException, InterruptedException {
448     setUpCluster(conf);
449     try {
450       // submit job as current user.
451       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
452       String[] groups = ugi.getGroupNames();
453       String userInfo = ugi.getShortUserName() + "," +
454                         groups[groups.length - 1];
455       final RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
456       assertFalse(rjob.isComplete());
457 
458       conf.set("mapred.job.tracker", "localhost:"
459 	            + miniMRCluster.getJobTrackerPort());
460       // try to change priority as other user
461       otherUGI.doAs(new PrivilegedExceptionAction<Object>() {
462 
463         @Override
464         public Object run() throws Exception {
465       	  try {
466             JobClient client = new JobClient(miniMRCluster.createJobConf());
467             client.getJob(rjob.getID()).setJobPriority("VERY_LOW");
468              if (!shouldSucceed) {
469               fail("changing priority should fail.");
470              }
471           } catch (IOException ioe) {
472             //verify it fails
473             LOG.info("exception while changing priority of job: " +
474                      ioe.getMessage());
475             assertTrue(ioe.getMessage().
476                 contains(" cannot perform operation SET_JOB_PRIORITY on "));
477           }
478           return null;
479         }
480       });
481       //wait for job to complete on its own
482       while (!rjob.isComplete()) {
483         try {
484           Thread.sleep(1000);
485         } catch (InterruptedException ie) {
486           break;
487         }
488       }
489     } finally {
490       // tearDownCluster();
491     }
492   }
493 
setUpCluster(JobConf conf)494   void setUpCluster(JobConf conf) throws IOException {
495     if(miniMRCluster == null) {
496       miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
497       FileSystem fileSys = miniDFSCluster.getFileSystem();
498       TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, "/user");
499       TestMiniMRWithDFSWithDistinctUsers.mkdir
500         (fileSys, conf.get("mapreduce.jobtracker.staging.root.dir",
501                            "/tmp/hadoop/mapred/staging"));
502       String namenode = fileSys.getUri().toString();
503       miniMRCluster = new MiniMRCluster(1, namenode, 3,
504                                         null, null, conf);
505     }
506   }
507 
tearDownCluster()508   void tearDownCluster() throws IOException {
509     if (miniMRCluster != null) {
510       long mrTeardownStart = new java.util.Date().getTime();
511       if (miniMRCluster != null) { miniMRCluster.shutdown(); }
512       long mrTeardownEnd = new java.util.Date().getTime();
513       if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
514       long dfsTeardownEnd = new java.util.Date().getTime();
515       miniMRCluster = null;
516       miniDFSCluster = null;
517       System.err.println("An MR teardown took "
518                          + (mrTeardownEnd - mrTeardownStart)
519                          + " milliseconds.  A DFS teardown took "
520                          + ( dfsTeardownEnd - mrTeardownEnd )
521                          + " milliseconds.");
522     }
523   }
524 
submitSleepJob(int numMappers, int numReducers, long mapSleepTime, long reduceSleepTime, boolean shouldComplete)525   RunningJob submitSleepJob(int numMappers, int numReducers,
526                             long mapSleepTime, long reduceSleepTime,
527                             boolean shouldComplete)
528                               throws IOException, InterruptedException {
529     return submitSleepJob(numMappers, numReducers, mapSleepTime,
530                           reduceSleepTime, shouldComplete, null);
531   }
532 
submitSleepJob(int numMappers, int numReducers, long mapSleepTime, long reduceSleepTime, boolean shouldComplete, String userInfo)533   RunningJob submitSleepJob(int numMappers, int numReducers,
534                                       long mapSleepTime, long reduceSleepTime,
535                                       boolean shouldComplete, String userInfo)
536                                      throws IOException, InterruptedException {
537     return submitSleepJob(numMappers, numReducers, mapSleepTime,
538                           reduceSleepTime, shouldComplete, userInfo, null);
539   }
540 
submitSleepJob(final int numMappers, final int numReducers, final long mapSleepTime, final long reduceSleepTime, final boolean shouldComplete, String userInfo, String queueName)541   RunningJob submitSleepJob(final int numMappers, final int numReducers,
542       final long mapSleepTime,
543       final long reduceSleepTime, final boolean shouldComplete, String userInfo,
544                                     String queueName)
545                                       throws IOException, InterruptedException {
546     JobConf clientConf = new JobConf();
547     clientConf.set("mapred.job.tracker", "localhost:"
548         + miniMRCluster.getJobTrackerPort());
549     UserGroupInformation ugi;
550     SleepJob job = new SleepJob();
551     job.setConf(clientConf);
552     clientConf = job.setupJobConf(numMappers, numReducers,
553         mapSleepTime, (int)mapSleepTime/100,
554         reduceSleepTime, (int)reduceSleepTime/100);
555     if (queueName != null) {
556       clientConf.setQueueName(queueName);
557     }
558     final JobConf jc = new JobConf(clientConf);
559     if (userInfo != null) {
560       String[] splits = userInfo.split(",");
561       String[] groups = new String[splits.length - 1];
562       System.arraycopy(splits, 1, groups, 0, splits.length - 1);
563       ugi = UserGroupInformation.createUserForTesting(splits[0], groups);
564     } else {
565       ugi = UserGroupInformation.getCurrentUser();
566     }
567     RunningJob rJob = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
568       public RunningJob run() throws IOException {
569         if (shouldComplete) {
570           return JobClient.runJob(jc);
571         } else {
572           // Job should be submitted as 'userInfo'. So both the client as well as
573           // the configuration should point to the same UGI.
574           return new JobClient(jc).submitJob(jc);
575         }
576       }
577     });
578     return rJob;
579   }
580 
581 }
582