1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import java.io.DataOutputStream;
22 import java.net.URI;
23 import java.util.Collection;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.commons.logging.Log;
26 import org.apache.hadoop.mapreduce.test.system.JTProtocol;
27 import org.apache.hadoop.mapreduce.test.system.TTClient;
28 import org.apache.hadoop.mapreduce.test.system.JobInfo;
29 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
30 import org.apache.hadoop.mapreduce.test.system.MRCluster;
31 
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.mapred.UtilsForTests;
34 
35 import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
36 import org.apache.hadoop.filecache.DistributedCache;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.permission.FsPermission;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.fs.FileStatus;
41 import org.apache.hadoop.examples.SleepJob;
42 
43 import org.junit.Assert;
44 import org.junit.BeforeClass;
45 import org.junit.AfterClass;
46 import org.junit.Test;
47 
48 /**
49  * Verify the Distributed Cache functionality.
50  * This test scenario is for a distributed cache file behaviour
51  * when the file is private. Once a job uses a distributed
52  * cache file with private permissions that file is stored in the
53  * mapred.local.dir, under the directory which has the same name
54  * as job submitter's username. The directory has 700 permission
55  * and the file under it, should have 777 permissions.
56 */
57 
58 public class TestDistributedCachePrivateFile {
59 
60   private static MRCluster cluster = null;
61   private static FileSystem dfs = null;
62   private static JobClient client = null;
63   private static FsPermission permission = new FsPermission((short)00770);
64 
65   private static String uriPath = "hdfs:///tmp/test.txt";
66   private static final Path URIPATH = new Path(uriPath);
67   private String distributedFileName = "test.txt";
68 
69   static final Log LOG = LogFactory.
70                            getLog(TestDistributedCachePrivateFile.class);
71 
TestDistributedCachePrivateFile()72   public TestDistributedCachePrivateFile() throws Exception {
73   }
74 
75   @BeforeClass
setUp()76   public static void setUp() throws Exception {
77     cluster = MRCluster.createCluster(new Configuration());
78     cluster.setUp();
79     client = cluster.getJTClient().getClient();
80     dfs = client.getFs();
81     //Deleting the file if it already exists
82     dfs.delete(URIPATH, true);
83 
84     Collection<TTClient> tts = cluster.getTTClients();
85     //Stopping all TTs
86     for (TTClient tt : tts) {
87       tt.kill();
88       tt.waitForTTStop();
89     }
90     //Starting all TTs
91     for (TTClient tt : tts) {
92       tt.start();
93       tt.waitForTTStart();
94     }
95 
96     String input = "This will be the content of\n" + "distributed cache\n";
97     //Creating the path with the file
98     DataOutputStream file =
99         UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, input);
100   }
101 
102   @AfterClass
tearDown()103   public static void tearDown() throws Exception {
104     cluster.tearDown();
105     dfs.delete(URIPATH, true);
106 
107     Collection<TTClient> tts = cluster.getTTClients();
108     //Stopping all TTs
109     for (TTClient tt : tts) {
110       tt.kill();
111       tt.waitForTTStop();
112     }
113     //Starting all TTs
114     for (TTClient tt : tts) {
115       tt.start();
116       tt.waitForTTStart();
117     }
118   }
119 
120   @Test
121   /**
122    * This tests Distributed Cache for private file
123    * @param none
124    * @return void
125    */
testDistributedCache()126   public void testDistributedCache() throws Exception {
127     Configuration conf = new Configuration(cluster.getConf());
128     JTProtocol wovenClient = cluster.getJTClient().getProxy();
129 
130     String jobTrackerUserName = wovenClient.getDaemonUser();
131 
132     LOG.info("jobTrackerUserName is :" + jobTrackerUserName);
133 
134     //This counter will check for count of a loop,
135     //which might become infinite.
136     int count = 0;
137 
138     SleepJob job = new SleepJob();
139     job.setConf(conf);
140     conf = job.setupJobConf(5, 1, 1000, 1000, 100, 100);
141 
142     DistributedCache.createSymlink(conf);
143     URI uri = URI.create(uriPath);
144     DistributedCache.addCacheFile(uri, conf);
145     JobConf jconf = new JobConf(conf);
146 
147     //Controls the job till all verification is done
148     FinishTaskControlAction.configureControlActionForJob(conf);
149 
150     //Submitting the job
151     RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
152 
153     JobStatus[] jobStatus = client.getAllJobs();
154     String userName = jobStatus[0].getUsername();
155 
156     TTClient tClient = null;
157     JobInfo jInfo = wovenClient.getJobInfo(rJob.getID());
158     LOG.info("jInfo is :" + jInfo);
159 
160     //Assert if jobInfo is null
161     Assert.assertNotNull("jobInfo is null", jInfo);
162 
163     //Wait for the job to start running.
164     count = 0;
165     while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
166       UtilsForTests.waitFor(10000);
167       count++;
168       jInfo = wovenClient.getJobInfo(rJob.getID());
169       //If the count goes beyond a point, then Assert; This is to avoid
170       //infinite loop under unforeseen circumstances.
171       if (count > 10) {
172         Assert.fail("job has not reached running state for more than" +
173             "100 seconds. Failing at this point");
174       }
175     }
176 
177     LOG.info("job id is :" + rJob.getID().toString());
178 
179     TaskInfo[] taskInfos = cluster.getJTClient().getProxy()
180            .getTaskInfo(rJob.getID());
181 
182     boolean distCacheFileIsFound;
183 
184     for (TaskInfo taskInfo : taskInfos) {
185       distCacheFileIsFound = false;
186       String[] taskTrackers = taskInfo.getTaskTrackers();
187 
188       for(String taskTracker : taskTrackers) {
189         //Getting the exact FQDN of the tasktracker from
190         //the tasktracker string.
191         taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
192         tClient =  cluster.getTTClient(taskTracker);
193         String[] localDirs = tClient.getMapredLocalDirs();
194         int distributedFileCount = 0;
195         String localDirOnly = null;
196 
197         boolean FileNotPresentForThisDirectoryPath = false;
198 
199         //Go to every single path
200         for (String localDir : localDirs) {
201           FileNotPresentForThisDirectoryPath = false;
202           localDirOnly = localDir;
203 
204           //Public Distributed cache will always be stored under
205           //mapred.local.dir/tasktracker/archive
206           localDirOnly = localDir + Path.SEPARATOR + TaskTracker.SUBDIR +
207               Path.SEPARATOR +  userName;
208 
209           //Private Distributed cache will always be stored under
210           //mapre.local.dir/taskTracker/<username>/distcache
211           //Checking for username directory to check if it has the
212           //proper permissions
213           localDir = localDir + Path.SEPARATOR +
214                   TaskTracker.getPrivateDistributedCacheDir(userName);
215 
216           FileStatus fileStatusMapredLocalDirUserName = null;
217 
218           try {
219             fileStatusMapredLocalDirUserName = tClient.
220                             getFileStatus(localDirOnly, true);
221           } catch (Exception e) {
222             LOG.info("LocalDirOnly :" + localDirOnly + " not found");
223             FileNotPresentForThisDirectoryPath = true;
224           }
225 
226           //File will only be stored under one of the mapred.lcoal.dir
227           //If other paths were hit, just continue
228           if (FileNotPresentForThisDirectoryPath)
229             continue;
230 
231           Path pathMapredLocalDirUserName =
232               fileStatusMapredLocalDirUserName.getPath();
233           FsPermission fsPermMapredLocalDirUserName =
234               fileStatusMapredLocalDirUserName.getPermission();
235           //If userName of Jobtracker is same as username
236           //of jobSubmission, then the permissions are 770.
237           //Otherwise 570
238           if ( userName.compareTo(jobTrackerUserName) == 0 ) {
239             Assert.assertTrue("Directory Permission is not 770",
240               fsPermMapredLocalDirUserName.equals(new FsPermission("770")));
241           } else {
242             Assert.assertTrue("Directory Permission is not 570",
243               fsPermMapredLocalDirUserName.equals(new FsPermission("570")));
244           }
245 
246           //Get file status of all the directories
247           //and files under that path.
248           FileStatus[] fileStatuses = tClient.listStatus(localDir,
249               true, true);
250           for (FileStatus  fileStatus : fileStatuses) {
251             Path path = fileStatus.getPath();
252             LOG.info("path is :" + path.toString());
253             //Checking if the received path ends with
254             //the distributed filename
255             distCacheFileIsFound = (path.toString()).
256                 endsWith(distributedFileName);
257             //If file is found, check for its permission.
258             //Since the file is found break out of loop
259             if (distCacheFileIsFound){
260               LOG.info("PATH found is :" + path.toString());
261               distributedFileCount++;
262               String filename = path.getName();
263               FsPermission fsPerm = fileStatus.getPermission();
264               //If userName of Jobtracker is same as username
265               //of jobSubmission, then the permissions are 770.
266               //Otherwise 570
267               if ( userName.compareTo(jobTrackerUserName) == 0 ) {
268                 Assert.assertTrue("File Permission is not 770",
269                   fsPerm.equals(new FsPermission("770")));
270               } else {
271                 Assert.assertTrue("File Permission is not 570",
272                   fsPerm.equals(new FsPermission("570")));
273               }
274             }
275           }
276         }
277 
278         LOG.info("Distributed File count is :" + distributedFileCount);
279 
280         if (distributedFileCount > 1) {
281           Assert.fail("The distributed cache file is more than one");
282         } else if (distributedFileCount < 1)
283           Assert.fail("The distributed cache file is less than one");
284         if (!distCacheFileIsFound) {
285           Assert.assertEquals("The distributed cache file does not exist",
286               distCacheFileIsFound, false);
287         }
288       }
289 
290       //Allow the job to continue through MR control job.
291       for (TaskInfo taskInfoRemaining : taskInfos) {
292         FinishTaskControlAction action = new FinishTaskControlAction(TaskID
293            .downgrade(taskInfoRemaining.getTaskID()));
294         Collection<TTClient> tts = cluster.getTTClients();
295         for (TTClient cli : tts) {
296           cli.getProxy().sendAction(action);
297         }
298       }
299 
300       //Killing the job because all the verification needed
301       //for this testcase is completed.
302       rJob.killJob();
303     }
304   }
305 }
306