1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapreduce; 19 20 import java.io.File; 21 import java.io.IOException; 22 import java.net.InetAddress; 23 import java.net.URI; 24 import java.net.URISyntaxException; 25 import java.security.NoSuchAlgorithmException; 26 import java.util.ArrayList; 27 import java.util.Arrays; 28 import java.util.Comparator; 29 import java.util.List; 30 import java.util.Map; 31 32 import javax.crypto.KeyGenerator; 33 import javax.crypto.SecretKey; 34 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 import org.apache.hadoop.classification.InterfaceAudience; 38 import org.apache.hadoop.classification.InterfaceStability; 39 import org.apache.hadoop.conf.Configuration; 40 import org.apache.hadoop.fs.FSDataOutputStream; 41 import org.apache.hadoop.fs.FileContext; 42 import org.apache.hadoop.fs.FileSystem; 43 import org.apache.hadoop.fs.Path; 44 import org.apache.hadoop.fs.permission.FsPermission; 45 import org.apache.hadoop.io.Text; 46 import org.apache.hadoop.mapred.JobConf; 47 import org.apache.hadoop.mapred.QueueACL; 48 49 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName; 50 51 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; 52 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 53 import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 54 import org.apache.hadoop.mapreduce.security.TokenCache; 55 import org.apache.hadoop.mapreduce.split.JobSplitWriter; 56 import org.apache.hadoop.security.Credentials; 57 import org.apache.hadoop.security.UserGroupInformation; 58 import org.apache.hadoop.security.authorize.AccessControlList; 59 import org.apache.hadoop.security.token.Token; 60 import org.apache.hadoop.security.token.TokenIdentifier; 61 import org.apache.hadoop.util.ReflectionUtils; 62 import org.apache.hadoop.yarn.api.records.ReservationId; 63 import org.codehaus.jackson.JsonParseException; 64 import org.codehaus.jackson.map.JsonMappingException; 65 import org.codehaus.jackson.map.ObjectMapper; 66 67 import com.google.common.base.Charsets; 68 69 @InterfaceAudience.Private 70 @InterfaceStability.Unstable 71 class JobSubmitter { 72 protected static final Log LOG = LogFactory.getLog(JobSubmitter.class); 73 private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; 74 private static final int SHUFFLE_KEY_LENGTH = 64; 75 private FileSystem jtFs; 76 private ClientProtocol submitClient; 77 private String submitHostName; 78 private String submitHostAddress; 79 JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)80 JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) 81 throws IOException { 82 this.submitClient = submitClient; 83 this.jtFs = submitFs; 84 } 85 86 /** 87 * configure the jobconf of the user with the command line options of 88 * -libjars, -files, -archives. 89 * @param job 90 * @throws IOException 91 */ copyAndConfigureFiles(Job job, Path jobSubmitDir)92 private void copyAndConfigureFiles(Job job, Path jobSubmitDir) 93 throws IOException { 94 JobResourceUploader rUploader = new JobResourceUploader(jtFs); 95 rUploader.uploadFiles(job, jobSubmitDir); 96 97 // Get the working directory. If not set, sets it to filesystem working dir 98 // This code has been added so that working directory reset before running 99 // the job. This is necessary for backward compatibility as other systems 100 // might use the public API JobConf#setWorkingDirectory to reset the working 101 // directory. 102 job.getWorkingDirectory(); 103 } 104 105 /** 106 * Internal method for submitting jobs to the system. 107 * 108 * <p>The job submission process involves: 109 * <ol> 110 * <li> 111 * Checking the input and output specifications of the job. 112 * </li> 113 * <li> 114 * Computing the {@link InputSplit}s for the job. 115 * </li> 116 * <li> 117 * Setup the requisite accounting information for the 118 * {@link DistributedCache} of the job, if necessary. 119 * </li> 120 * <li> 121 * Copying the job's jar and configuration to the map-reduce system 122 * directory on the distributed file-system. 123 * </li> 124 * <li> 125 * Submitting the job to the <code>JobTracker</code> and optionally 126 * monitoring it's status. 127 * </li> 128 * </ol></p> 129 * @param job the configuration to submit 130 * @param cluster the handle to the Cluster 131 * @throws ClassNotFoundException 132 * @throws InterruptedException 133 * @throws IOException 134 */ submitJobInternal(Job job, Cluster cluster)135 JobStatus submitJobInternal(Job job, Cluster cluster) 136 throws ClassNotFoundException, InterruptedException, IOException { 137 138 //validate the jobs output specs 139 checkSpecs(job); 140 141 Configuration conf = job.getConfiguration(); 142 addMRFrameworkToDistributedCache(conf); 143 144 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); 145 //configure the command line options correctly on the submitting dfs 146 InetAddress ip = InetAddress.getLocalHost(); 147 if (ip != null) { 148 submitHostAddress = ip.getHostAddress(); 149 submitHostName = ip.getHostName(); 150 conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); 151 conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); 152 } 153 JobID jobId = submitClient.getNewJobID(); 154 job.setJobID(jobId); 155 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); 156 JobStatus status = null; 157 try { 158 conf.set(MRJobConfig.USER_NAME, 159 UserGroupInformation.getCurrentUser().getShortUserName()); 160 conf.set("hadoop.http.filter.initializers", 161 "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); 162 conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); 163 LOG.debug("Configuring job " + jobId + " with " + submitJobDir 164 + " as the submit dir"); 165 // get delegation token for the dir 166 TokenCache.obtainTokensForNamenodes(job.getCredentials(), 167 new Path[] { submitJobDir }, conf); 168 169 populateTokenCache(conf, job.getCredentials()); 170 171 // generate a secret to authenticate shuffle transfers 172 if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { 173 KeyGenerator keyGen; 174 try { 175 keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); 176 keyGen.init(SHUFFLE_KEY_LENGTH); 177 } catch (NoSuchAlgorithmException e) { 178 throw new IOException("Error generating shuffle secret key", e); 179 } 180 SecretKey shuffleKey = keyGen.generateKey(); 181 TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), 182 job.getCredentials()); 183 } 184 if (CryptoUtils.isEncryptedSpillEnabled(conf)) { 185 conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1); 186 LOG.warn("Max job attempts set to 1 since encrypted intermediate" + 187 "data spill is enabled"); 188 } 189 190 copyAndConfigureFiles(job, submitJobDir); 191 192 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); 193 194 // Create the splits for the job 195 LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); 196 int maps = writeSplits(job, submitJobDir); 197 conf.setInt(MRJobConfig.NUM_MAPS, maps); 198 LOG.info("number of splits:" + maps); 199 200 // write "queue admins of the queue to which job is being submitted" 201 // to job file. 202 String queue = conf.get(MRJobConfig.QUEUE_NAME, 203 JobConf.DEFAULT_QUEUE_NAME); 204 AccessControlList acl = submitClient.getQueueAdmins(queue); 205 conf.set(toFullPropertyName(queue, 206 QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); 207 208 // removing jobtoken referrals before copying the jobconf to HDFS 209 // as the tasks don't need this setting, actually they may break 210 // because of it if present as the referral will point to a 211 // different job. 212 TokenCache.cleanUpTokenReferral(conf); 213 214 if (conf.getBoolean( 215 MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, 216 MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { 217 // Add HDFS tracking ids 218 ArrayList<String> trackingIds = new ArrayList<String>(); 219 for (Token<? extends TokenIdentifier> t : 220 job.getCredentials().getAllTokens()) { 221 trackingIds.add(t.decodeIdentifier().getTrackingId()); 222 } 223 conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, 224 trackingIds.toArray(new String[trackingIds.size()])); 225 } 226 227 // Set reservation info if it exists 228 ReservationId reservationId = job.getReservationId(); 229 if (reservationId != null) { 230 conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); 231 } 232 233 // Write job file to submit dir 234 writeConf(conf, submitJobFile); 235 236 // 237 // Now, actually submit the job (using the submit name) 238 // 239 printTokens(jobId, job.getCredentials()); 240 status = submitClient.submitJob( 241 jobId, submitJobDir.toString(), job.getCredentials()); 242 if (status != null) { 243 return status; 244 } else { 245 throw new IOException("Could not launch job"); 246 } 247 } finally { 248 if (status == null) { 249 LOG.info("Cleaning up the staging area " + submitJobDir); 250 if (jtFs != null && submitJobDir != null) 251 jtFs.delete(submitJobDir, true); 252 253 } 254 } 255 } 256 checkSpecs(Job job)257 private void checkSpecs(Job job) throws ClassNotFoundException, 258 InterruptedException, IOException { 259 JobConf jConf = (JobConf)job.getConfiguration(); 260 // Check the output specification 261 if (jConf.getNumReduceTasks() == 0 ? 262 jConf.getUseNewMapper() : jConf.getUseNewReducer()) { 263 org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = 264 ReflectionUtils.newInstance(job.getOutputFormatClass(), 265 job.getConfiguration()); 266 output.checkOutputSpecs(job); 267 } else { 268 jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf); 269 } 270 } 271 writeConf(Configuration conf, Path jobFile)272 private void writeConf(Configuration conf, Path jobFile) 273 throws IOException { 274 // Write job file to JobTracker's fs 275 FSDataOutputStream out = 276 FileSystem.create(jtFs, jobFile, 277 new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); 278 try { 279 conf.writeXml(out); 280 } finally { 281 out.close(); 282 } 283 } 284 printTokens(JobID jobId, Credentials credentials)285 private void printTokens(JobID jobId, 286 Credentials credentials) throws IOException { 287 LOG.info("Submitting tokens for job: " + jobId); 288 for (Token<?> token: credentials.getAllTokens()) { 289 LOG.info(token); 290 } 291 } 292 293 @SuppressWarnings("unchecked") 294 private <T extends InputSplit> writeNewSplits(JobContext job, Path jobSubmitDir)295 int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, 296 InterruptedException, ClassNotFoundException { 297 Configuration conf = job.getConfiguration(); 298 InputFormat<?, ?> input = 299 ReflectionUtils.newInstance(job.getInputFormatClass(), conf); 300 301 List<InputSplit> splits = input.getSplits(job); 302 T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); 303 304 // sort the splits into order based on size, so that the biggest 305 // go first 306 Arrays.sort(array, new SplitComparator()); 307 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 308 jobSubmitDir.getFileSystem(conf), array); 309 return array.length; 310 } 311 writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir)312 private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, 313 Path jobSubmitDir) throws IOException, 314 InterruptedException, ClassNotFoundException { 315 JobConf jConf = (JobConf)job.getConfiguration(); 316 int maps; 317 if (jConf.getUseNewMapper()) { 318 maps = writeNewSplits(job, jobSubmitDir); 319 } else { 320 maps = writeOldSplits(jConf, jobSubmitDir); 321 } 322 return maps; 323 } 324 325 //method to write splits for old api mapper. writeOldSplits(JobConf job, Path jobSubmitDir)326 private int writeOldSplits(JobConf job, Path jobSubmitDir) 327 throws IOException { 328 org.apache.hadoop.mapred.InputSplit[] splits = 329 job.getInputFormat().getSplits(job, job.getNumMapTasks()); 330 // sort the splits into order based on size, so that the biggest 331 // go first 332 Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() { 333 public int compare(org.apache.hadoop.mapred.InputSplit a, 334 org.apache.hadoop.mapred.InputSplit b) { 335 try { 336 long left = a.getLength(); 337 long right = b.getLength(); 338 if (left == right) { 339 return 0; 340 } else if (left < right) { 341 return 1; 342 } else { 343 return -1; 344 } 345 } catch (IOException ie) { 346 throw new RuntimeException("Problem getting input split size", ie); 347 } 348 } 349 }); 350 JobSplitWriter.createSplitFiles(jobSubmitDir, job, 351 jobSubmitDir.getFileSystem(job), splits); 352 return splits.length; 353 } 354 355 private static class SplitComparator implements Comparator<InputSplit> { 356 @Override compare(InputSplit o1, InputSplit o2)357 public int compare(InputSplit o1, InputSplit o2) { 358 try { 359 long len1 = o1.getLength(); 360 long len2 = o2.getLength(); 361 if (len1 < len2) { 362 return 1; 363 } else if (len1 == len2) { 364 return 0; 365 } else { 366 return -1; 367 } 368 } catch (IOException ie) { 369 throw new RuntimeException("exception in compare", ie); 370 } catch (InterruptedException ie) { 371 throw new RuntimeException("exception in compare", ie); 372 } 373 } 374 } 375 376 @SuppressWarnings("unchecked") readTokensFromFiles(Configuration conf, Credentials credentials)377 private void readTokensFromFiles(Configuration conf, Credentials credentials) 378 throws IOException { 379 // add tokens and secrets coming from a token storage file 380 String binaryTokenFilename = 381 conf.get("mapreduce.job.credentials.binary"); 382 if (binaryTokenFilename != null) { 383 Credentials binary = Credentials.readTokenStorageFile( 384 FileSystem.getLocal(conf).makeQualified( 385 new Path(binaryTokenFilename)), 386 conf); 387 credentials.addAll(binary); 388 } 389 // add secret keys coming from a json file 390 String tokensFileName = conf.get("mapreduce.job.credentials.json"); 391 if(tokensFileName != null) { 392 LOG.info("loading user's secret keys from " + tokensFileName); 393 String localFileName = new Path(tokensFileName).toUri().getPath(); 394 395 boolean json_error = false; 396 try { 397 // read JSON 398 ObjectMapper mapper = new ObjectMapper(); 399 Map<String, String> nm = 400 mapper.readValue(new File(localFileName), Map.class); 401 402 for(Map.Entry<String, String> ent: nm.entrySet()) { 403 credentials.addSecretKey(new Text(ent.getKey()), ent.getValue() 404 .getBytes(Charsets.UTF_8)); 405 } 406 } catch (JsonMappingException e) { 407 json_error = true; 408 } catch (JsonParseException e) { 409 json_error = true; 410 } 411 if(json_error) 412 LOG.warn("couldn't parse Token Cache JSON file with user secret keys"); 413 } 414 } 415 416 //get secret keys and tokens and store them into TokenCache populateTokenCache(Configuration conf, Credentials credentials)417 private void populateTokenCache(Configuration conf, Credentials credentials) 418 throws IOException{ 419 readTokensFromFiles(conf, credentials); 420 // add the delegation tokens from configuration 421 String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES); 422 LOG.debug("adding the following namenodes' delegation tokens:" + 423 Arrays.toString(nameNodes)); 424 if(nameNodes != null) { 425 Path [] ps = new Path[nameNodes.length]; 426 for(int i=0; i< nameNodes.length; i++) { 427 ps[i] = new Path(nameNodes[i]); 428 } 429 TokenCache.obtainTokensForNamenodes(credentials, ps, conf); 430 } 431 } 432 433 @SuppressWarnings("deprecation") addMRFrameworkToDistributedCache(Configuration conf)434 private static void addMRFrameworkToDistributedCache(Configuration conf) 435 throws IOException { 436 String framework = 437 conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, ""); 438 if (!framework.isEmpty()) { 439 URI uri; 440 try { 441 uri = new URI(framework); 442 } catch (URISyntaxException e) { 443 throw new IllegalArgumentException("Unable to parse '" + framework 444 + "' as a URI, check the setting for " 445 + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e); 446 } 447 448 String linkedName = uri.getFragment(); 449 450 // resolve any symlinks in the URI path so using a "current" symlink 451 // to point to a specific version shows the specific version 452 // in the distributed cache configuration 453 FileSystem fs = FileSystem.get(conf); 454 Path frameworkPath = fs.makeQualified( 455 new Path(uri.getScheme(), uri.getAuthority(), uri.getPath())); 456 FileContext fc = FileContext.getFileContext(frameworkPath.toUri(), conf); 457 frameworkPath = fc.resolvePath(frameworkPath); 458 uri = frameworkPath.toUri(); 459 try { 460 uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), 461 null, linkedName); 462 } catch (URISyntaxException e) { 463 throw new IllegalArgumentException(e); 464 } 465 466 DistributedCache.addCacheArchive(uri, conf); 467 } 468 } 469 } 470