1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce; 20 21 import java.io.IOException; 22 23 import org.apache.hadoop.fs.FileStatus; 24 import org.apache.hadoop.fs.FileSystem; 25 import org.apache.hadoop.fs.Path; 26 import org.apache.hadoop.fs.permission.FsPermission; 27 import org.apache.hadoop.mapred.JobClient; 28 import org.apache.hadoop.security.UserGroupInformation; 29 import org.apache.hadoop.conf.Configuration; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 34 /** 35 * A utility to manage job submission files.<br/> 36 * <b><i>Note that this class is for framework internal usage only and is 37 * not to be used by users directly.</i></b> 38 */ 39 public class JobSubmissionFiles { 40 41 private final static Log LOG = LogFactory.getLog(JobSubmissionFiles.class); 42 43 // job submission directory is private! 44 final public static FsPermission JOB_DIR_PERMISSION = 45 FsPermission.createImmutable((short) 0700); // rwx-------- 46 //job files are world-wide readable and owner writable 47 final public static FsPermission JOB_FILE_PERMISSION = 48 FsPermission.createImmutable((short) 0644); // rw-r--r-- 49 getJobSplitFile(Path jobSubmissionDir)50 public static Path getJobSplitFile(Path jobSubmissionDir) { 51 return new Path(jobSubmissionDir, "job.split"); 52 } 53 getJobSplitMetaFile(Path jobSubmissionDir)54 public static Path getJobSplitMetaFile(Path jobSubmissionDir) { 55 return new Path(jobSubmissionDir, "job.splitmetainfo"); 56 } 57 58 /** 59 * Get the job conf path. 60 */ getJobConfPath(Path jobSubmitDir)61 public static Path getJobConfPath(Path jobSubmitDir) { 62 return new Path(jobSubmitDir, "job.xml"); 63 } 64 65 /** 66 * Get the job jar path. 67 */ getJobJar(Path jobSubmitDir)68 public static Path getJobJar(Path jobSubmitDir) { 69 return new Path(jobSubmitDir, "job.jar"); 70 } 71 72 /** 73 * Get the job distributed cache files path. 74 * @param jobSubmitDir 75 */ getJobDistCacheFiles(Path jobSubmitDir)76 public static Path getJobDistCacheFiles(Path jobSubmitDir) { 77 return new Path(jobSubmitDir, "files"); 78 } 79 /** 80 * Get the job distributed cache archives path. 81 * @param jobSubmitDir 82 */ getJobDistCacheArchives(Path jobSubmitDir)83 public static Path getJobDistCacheArchives(Path jobSubmitDir) { 84 return new Path(jobSubmitDir, "archives"); 85 } 86 /** 87 * Get the job distributed cache libjars path. 88 * @param jobSubmitDir 89 */ getJobDistCacheLibjars(Path jobSubmitDir)90 public static Path getJobDistCacheLibjars(Path jobSubmitDir) { 91 return new Path(jobSubmitDir, "libjars"); 92 } 93 94 /** 95 * Initializes the staging directory and returns the path. It also 96 * keeps track of all necessary ownership & permissions 97 * @param client 98 * @param conf 99 */ getStagingDir(JobClient client, Configuration conf)100 public static Path getStagingDir(JobClient client, Configuration conf) 101 throws IOException, InterruptedException { 102 Path stagingArea = client.getStagingAreaDir(); 103 FileSystem fs = stagingArea.getFileSystem(conf); 104 String realUser; 105 String currentUser; 106 UserGroupInformation ugi = UserGroupInformation.getLoginUser(); 107 realUser = ugi.getShortUserName(); 108 currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); 109 if (fs.exists(stagingArea)) { 110 FileStatus fsStatus = fs.getFileStatus(stagingArea); 111 String owner = fsStatus.getOwner(); 112 if (!(owner.equals(currentUser) || owner.equals(realUser))) { 113 throw new IOException("The ownership on the staging directory " + 114 stagingArea + " is not as expected. " + 115 "It is owned by " + owner + ". The directory must " + 116 "be owned by the submitter " + currentUser + " or " + 117 "by " + realUser); 118 } 119 if (!fsStatus.getPermission().equals(JOB_DIR_PERMISSION)) { 120 LOG.info("Permissions on staging directory " + stagingArea + " are " + 121 "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + 122 "to correct value " + JOB_DIR_PERMISSION); 123 fs.setPermission(stagingArea, JOB_DIR_PERMISSION); 124 } 125 } else { 126 fs.mkdirs(stagingArea, 127 new FsPermission(JOB_DIR_PERMISSION)); 128 } 129 return stagingArea; 130 } 131 132 } 133