1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce;
20 
21 import java.io.IOException;
22 
23 import org.apache.hadoop.fs.FileStatus;
24 import org.apache.hadoop.fs.FileSystem;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.fs.permission.FsPermission;
27 import org.apache.hadoop.mapred.JobClient;
28 import org.apache.hadoop.security.UserGroupInformation;
29 import org.apache.hadoop.conf.Configuration;
30 
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 
34 /**
35  * A utility to manage job submission files.<br/>
36  * <b><i>Note that this class is for framework internal usage only and is
37  * not to be used by users directly.</i></b>
38  */
39 public class JobSubmissionFiles {
40 
41   private final static Log LOG = LogFactory.getLog(JobSubmissionFiles.class);
42 
43   // job submission directory is private!
44   final public static FsPermission JOB_DIR_PERMISSION =
45     FsPermission.createImmutable((short) 0700); // rwx--------
46   //job files are world-wide readable and owner writable
47   final public static FsPermission JOB_FILE_PERMISSION =
48     FsPermission.createImmutable((short) 0644); // rw-r--r--
49 
getJobSplitFile(Path jobSubmissionDir)50   public static Path getJobSplitFile(Path jobSubmissionDir) {
51     return new Path(jobSubmissionDir, "job.split");
52   }
53 
getJobSplitMetaFile(Path jobSubmissionDir)54   public static Path getJobSplitMetaFile(Path jobSubmissionDir) {
55     return new Path(jobSubmissionDir, "job.splitmetainfo");
56   }
57 
58   /**
59    * Get the job conf path.
60    */
getJobConfPath(Path jobSubmitDir)61   public static Path getJobConfPath(Path jobSubmitDir) {
62     return new Path(jobSubmitDir, "job.xml");
63   }
64 
65   /**
66    * Get the job jar path.
67    */
getJobJar(Path jobSubmitDir)68   public static Path getJobJar(Path jobSubmitDir) {
69     return new Path(jobSubmitDir, "job.jar");
70   }
71 
72   /**
73    * Get the job distributed cache files path.
74    * @param jobSubmitDir
75    */
getJobDistCacheFiles(Path jobSubmitDir)76   public static Path getJobDistCacheFiles(Path jobSubmitDir) {
77     return new Path(jobSubmitDir, "files");
78   }
79   /**
80    * Get the job distributed cache archives path.
81    * @param jobSubmitDir
82    */
getJobDistCacheArchives(Path jobSubmitDir)83   public static Path getJobDistCacheArchives(Path jobSubmitDir) {
84     return new Path(jobSubmitDir, "archives");
85   }
86   /**
87    * Get the job distributed cache libjars path.
88    * @param jobSubmitDir
89    */
getJobDistCacheLibjars(Path jobSubmitDir)90   public static Path getJobDistCacheLibjars(Path jobSubmitDir) {
91     return new Path(jobSubmitDir, "libjars");
92   }
93 
94   /**
95    * Initializes the staging directory and returns the path. It also
96    * keeps track of all necessary ownership & permissions
97    * @param client
98    * @param conf
99    */
getStagingDir(JobClient client, Configuration conf)100   public static Path getStagingDir(JobClient client, Configuration conf)
101   throws IOException, InterruptedException {
102     Path stagingArea = client.getStagingAreaDir();
103     FileSystem fs = stagingArea.getFileSystem(conf);
104     String realUser;
105     String currentUser;
106     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
107     realUser = ugi.getShortUserName();
108     currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
109     if (fs.exists(stagingArea)) {
110       FileStatus fsStatus = fs.getFileStatus(stagingArea);
111       String owner = fsStatus.getOwner();
112       if (!(owner.equals(currentUser) || owner.equals(realUser))) {
113          throw new IOException("The ownership on the staging directory " +
114                       stagingArea + " is not as expected. " +
115                       "It is owned by " + owner + ". The directory must " +
116                       "be owned by the submitter " + currentUser + " or " +
117                       "by " + realUser);
118       }
119       if (!fsStatus.getPermission().equals(JOB_DIR_PERMISSION)) {
120         LOG.info("Permissions on staging directory " + stagingArea + " are " +
121           "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
122           "to correct value " + JOB_DIR_PERMISSION);
123         fs.setPermission(stagingArea, JOB_DIR_PERMISSION);
124       }
125     } else {
126       fs.mkdirs(stagingArea,
127           new FsPermission(JOB_DIR_PERMISSION));
128     }
129     return stagingArea;
130   }
131 
132 }
133