1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.DataOutputStream; 22 import java.net.URI; 23 import java.util.Collection; 24 import org.apache.commons.logging.LogFactory; 25 import org.apache.commons.logging.Log; 26 import org.apache.hadoop.mapreduce.test.system.JTProtocol; 27 import org.apache.hadoop.mapreduce.test.system.TTClient; 28 import org.apache.hadoop.mapreduce.test.system.JobInfo; 29 import org.apache.hadoop.mapreduce.test.system.TaskInfo; 30 import org.apache.hadoop.mapreduce.test.system.MRCluster; 31 32 import org.apache.hadoop.conf.Configuration; 33 import org.apache.hadoop.mapred.UtilsForTests; 34 35 import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; 36 import org.apache.hadoop.filecache.DistributedCache; 37 import org.apache.hadoop.fs.FileSystem; 38 import org.apache.hadoop.fs.permission.FsPermission; 39 import org.apache.hadoop.fs.Path; 40 import org.apache.hadoop.fs.FileStatus; 41 import org.apache.hadoop.examples.SleepJob; 42 43 import org.junit.Assert; 44 import org.junit.BeforeClass; 45 import org.junit.AfterClass; 46 import org.junit.Test; 47 48 /** 49 * Verify the Distributed Cache functionality. 50 * This test scenario is for a distributed cache file behaviour 51 * when the file is private. Once a job uses a distributed 52 * cache file with private permissions that file is stored in the 53 * mapred.local.dir, under the directory which has the same name 54 * as job submitter's username. The directory has 700 permission 55 * and the file under it, should have 777 permissions. 56 */ 57 58 public class TestDistributedCachePrivateFile { 59 60 private static MRCluster cluster = null; 61 private static FileSystem dfs = null; 62 private static JobClient client = null; 63 private static FsPermission permission = new FsPermission((short)00770); 64 65 private static String uriPath = "hdfs:///tmp/test.txt"; 66 private static final Path URIPATH = new Path(uriPath); 67 private String distributedFileName = "test.txt"; 68 69 static final Log LOG = LogFactory. 70 getLog(TestDistributedCachePrivateFile.class); 71 TestDistributedCachePrivateFile()72 public TestDistributedCachePrivateFile() throws Exception { 73 } 74 75 @BeforeClass setUp()76 public static void setUp() throws Exception { 77 cluster = MRCluster.createCluster(new Configuration()); 78 cluster.setUp(); 79 client = cluster.getJTClient().getClient(); 80 dfs = client.getFs(); 81 //Deleting the file if it already exists 82 dfs.delete(URIPATH, true); 83 84 Collection<TTClient> tts = cluster.getTTClients(); 85 //Stopping all TTs 86 for (TTClient tt : tts) { 87 tt.kill(); 88 tt.waitForTTStop(); 89 } 90 //Starting all TTs 91 for (TTClient tt : tts) { 92 tt.start(); 93 tt.waitForTTStart(); 94 } 95 96 String input = "This will be the content of\n" + "distributed cache\n"; 97 //Creating the path with the file 98 DataOutputStream file = 99 UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, input); 100 } 101 102 @AfterClass tearDown()103 public static void tearDown() throws Exception { 104 cluster.tearDown(); 105 dfs.delete(URIPATH, true); 106 107 Collection<TTClient> tts = cluster.getTTClients(); 108 //Stopping all TTs 109 for (TTClient tt : tts) { 110 tt.kill(); 111 tt.waitForTTStop(); 112 } 113 //Starting all TTs 114 for (TTClient tt : tts) { 115 tt.start(); 116 tt.waitForTTStart(); 117 } 118 } 119 120 @Test 121 /** 122 * This tests Distributed Cache for private file 123 * @param none 124 * @return void 125 */ testDistributedCache()126 public void testDistributedCache() throws Exception { 127 Configuration conf = new Configuration(cluster.getConf()); 128 JTProtocol wovenClient = cluster.getJTClient().getProxy(); 129 130 String jobTrackerUserName = wovenClient.getDaemonUser(); 131 132 LOG.info("jobTrackerUserName is :" + jobTrackerUserName); 133 134 //This counter will check for count of a loop, 135 //which might become infinite. 136 int count = 0; 137 138 SleepJob job = new SleepJob(); 139 job.setConf(conf); 140 conf = job.setupJobConf(5, 1, 1000, 1000, 100, 100); 141 142 DistributedCache.createSymlink(conf); 143 URI uri = URI.create(uriPath); 144 DistributedCache.addCacheFile(uri, conf); 145 JobConf jconf = new JobConf(conf); 146 147 //Controls the job till all verification is done 148 FinishTaskControlAction.configureControlActionForJob(conf); 149 150 //Submitting the job 151 RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf); 152 153 JobStatus[] jobStatus = client.getAllJobs(); 154 String userName = jobStatus[0].getUsername(); 155 156 TTClient tClient = null; 157 JobInfo jInfo = wovenClient.getJobInfo(rJob.getID()); 158 LOG.info("jInfo is :" + jInfo); 159 160 //Assert if jobInfo is null 161 Assert.assertNotNull("jobInfo is null", jInfo); 162 163 //Wait for the job to start running. 164 count = 0; 165 while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) { 166 UtilsForTests.waitFor(10000); 167 count++; 168 jInfo = wovenClient.getJobInfo(rJob.getID()); 169 //If the count goes beyond a point, then Assert; This is to avoid 170 //infinite loop under unforeseen circumstances. 171 if (count > 10) { 172 Assert.fail("job has not reached running state for more than" + 173 "100 seconds. Failing at this point"); 174 } 175 } 176 177 LOG.info("job id is :" + rJob.getID().toString()); 178 179 TaskInfo[] taskInfos = cluster.getJTClient().getProxy() 180 .getTaskInfo(rJob.getID()); 181 182 boolean distCacheFileIsFound; 183 184 for (TaskInfo taskInfo : taskInfos) { 185 distCacheFileIsFound = false; 186 String[] taskTrackers = taskInfo.getTaskTrackers(); 187 188 for(String taskTracker : taskTrackers) { 189 //Getting the exact FQDN of the tasktracker from 190 //the tasktracker string. 191 taskTracker = UtilsForTests.getFQDNofTT(taskTracker); 192 tClient = cluster.getTTClient(taskTracker); 193 String[] localDirs = tClient.getMapredLocalDirs(); 194 int distributedFileCount = 0; 195 String localDirOnly = null; 196 197 boolean FileNotPresentForThisDirectoryPath = false; 198 199 //Go to every single path 200 for (String localDir : localDirs) { 201 FileNotPresentForThisDirectoryPath = false; 202 localDirOnly = localDir; 203 204 //Public Distributed cache will always be stored under 205 //mapred.local.dir/tasktracker/archive 206 localDirOnly = localDir + Path.SEPARATOR + TaskTracker.SUBDIR + 207 Path.SEPARATOR + userName; 208 209 //Private Distributed cache will always be stored under 210 //mapre.local.dir/taskTracker/<username>/distcache 211 //Checking for username directory to check if it has the 212 //proper permissions 213 localDir = localDir + Path.SEPARATOR + 214 TaskTracker.getPrivateDistributedCacheDir(userName); 215 216 FileStatus fileStatusMapredLocalDirUserName = null; 217 218 try { 219 fileStatusMapredLocalDirUserName = tClient. 220 getFileStatus(localDirOnly, true); 221 } catch (Exception e) { 222 LOG.info("LocalDirOnly :" + localDirOnly + " not found"); 223 FileNotPresentForThisDirectoryPath = true; 224 } 225 226 //File will only be stored under one of the mapred.lcoal.dir 227 //If other paths were hit, just continue 228 if (FileNotPresentForThisDirectoryPath) 229 continue; 230 231 Path pathMapredLocalDirUserName = 232 fileStatusMapredLocalDirUserName.getPath(); 233 FsPermission fsPermMapredLocalDirUserName = 234 fileStatusMapredLocalDirUserName.getPermission(); 235 //If userName of Jobtracker is same as username 236 //of jobSubmission, then the permissions are 770. 237 //Otherwise 570 238 if ( userName.compareTo(jobTrackerUserName) == 0 ) { 239 Assert.assertTrue("Directory Permission is not 770", 240 fsPermMapredLocalDirUserName.equals(new FsPermission("770"))); 241 } else { 242 Assert.assertTrue("Directory Permission is not 570", 243 fsPermMapredLocalDirUserName.equals(new FsPermission("570"))); 244 } 245 246 //Get file status of all the directories 247 //and files under that path. 248 FileStatus[] fileStatuses = tClient.listStatus(localDir, 249 true, true); 250 for (FileStatus fileStatus : fileStatuses) { 251 Path path = fileStatus.getPath(); 252 LOG.info("path is :" + path.toString()); 253 //Checking if the received path ends with 254 //the distributed filename 255 distCacheFileIsFound = (path.toString()). 256 endsWith(distributedFileName); 257 //If file is found, check for its permission. 258 //Since the file is found break out of loop 259 if (distCacheFileIsFound){ 260 LOG.info("PATH found is :" + path.toString()); 261 distributedFileCount++; 262 String filename = path.getName(); 263 FsPermission fsPerm = fileStatus.getPermission(); 264 //If userName of Jobtracker is same as username 265 //of jobSubmission, then the permissions are 770. 266 //Otherwise 570 267 if ( userName.compareTo(jobTrackerUserName) == 0 ) { 268 Assert.assertTrue("File Permission is not 770", 269 fsPerm.equals(new FsPermission("770"))); 270 } else { 271 Assert.assertTrue("File Permission is not 570", 272 fsPerm.equals(new FsPermission("570"))); 273 } 274 } 275 } 276 } 277 278 LOG.info("Distributed File count is :" + distributedFileCount); 279 280 if (distributedFileCount > 1) { 281 Assert.fail("The distributed cache file is more than one"); 282 } else if (distributedFileCount < 1) 283 Assert.fail("The distributed cache file is less than one"); 284 if (!distCacheFileIsFound) { 285 Assert.assertEquals("The distributed cache file does not exist", 286 distCacheFileIsFound, false); 287 } 288 } 289 290 //Allow the job to continue through MR control job. 291 for (TaskInfo taskInfoRemaining : taskInfos) { 292 FinishTaskControlAction action = new FinishTaskControlAction(TaskID 293 .downgrade(taskInfoRemaining.getTaskID())); 294 Collection<TTClient> tts = cluster.getTTClients(); 295 for (TTClient cli : tts) { 296 cli.getProxy().sendAction(action); 297 } 298 } 299 300 //Killing the job because all the verification needed 301 //for this testcase is completed. 302 rJob.killJob(); 303 } 304 } 305 } 306