1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapreduce;
19 
20 import java.io.File;
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.net.URI;
24 import java.net.URISyntaxException;
25 import java.security.NoSuchAlgorithmException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Comparator;
29 import java.util.List;
30 import java.util.Map;
31 
32 import javax.crypto.KeyGenerator;
33 import javax.crypto.SecretKey;
34 
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.classification.InterfaceAudience;
38 import org.apache.hadoop.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FSDataOutputStream;
41 import org.apache.hadoop.fs.FileContext;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.fs.permission.FsPermission;
45 import org.apache.hadoop.io.Text;
46 import org.apache.hadoop.mapred.JobConf;
47 import org.apache.hadoop.mapred.QueueACL;
48 
49 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
50 
51 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
52 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
53 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
54 import org.apache.hadoop.mapreduce.security.TokenCache;
55 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
56 import org.apache.hadoop.security.Credentials;
57 import org.apache.hadoop.security.UserGroupInformation;
58 import org.apache.hadoop.security.authorize.AccessControlList;
59 import org.apache.hadoop.security.token.Token;
60 import org.apache.hadoop.security.token.TokenIdentifier;
61 import org.apache.hadoop.util.ReflectionUtils;
62 import org.apache.hadoop.yarn.api.records.ReservationId;
63 import org.codehaus.jackson.JsonParseException;
64 import org.codehaus.jackson.map.JsonMappingException;
65 import org.codehaus.jackson.map.ObjectMapper;
66 
67 import com.google.common.base.Charsets;
68 
69 @InterfaceAudience.Private
70 @InterfaceStability.Unstable
71 class JobSubmitter {
72   protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
73   private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
74   private static final int SHUFFLE_KEY_LENGTH = 64;
75   private FileSystem jtFs;
76   private ClientProtocol submitClient;
77   private String submitHostName;
78   private String submitHostAddress;
79 
JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)80   JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)
81   throws IOException {
82     this.submitClient = submitClient;
83     this.jtFs = submitFs;
84   }
85 
86   /**
87    * configure the jobconf of the user with the command line options of
88    * -libjars, -files, -archives.
89    * @param job
90    * @throws IOException
91    */
copyAndConfigureFiles(Job job, Path jobSubmitDir)92   private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
93   throws IOException {
94     JobResourceUploader rUploader = new JobResourceUploader(jtFs);
95     rUploader.uploadFiles(job, jobSubmitDir);
96 
97     // Get the working directory. If not set, sets it to filesystem working dir
98     // This code has been added so that working directory reset before running
99     // the job. This is necessary for backward compatibility as other systems
100     // might use the public API JobConf#setWorkingDirectory to reset the working
101     // directory.
102     job.getWorkingDirectory();
103   }
104 
105   /**
106    * Internal method for submitting jobs to the system.
107    *
108    * <p>The job submission process involves:
109    * <ol>
110    *   <li>
111    *   Checking the input and output specifications of the job.
112    *   </li>
113    *   <li>
114    *   Computing the {@link InputSplit}s for the job.
115    *   </li>
116    *   <li>
117    *   Setup the requisite accounting information for the
118    *   {@link DistributedCache} of the job, if necessary.
119    *   </li>
120    *   <li>
121    *   Copying the job's jar and configuration to the map-reduce system
122    *   directory on the distributed file-system.
123    *   </li>
124    *   <li>
125    *   Submitting the job to the <code>JobTracker</code> and optionally
126    *   monitoring it's status.
127    *   </li>
128    * </ol></p>
129    * @param job the configuration to submit
130    * @param cluster the handle to the Cluster
131    * @throws ClassNotFoundException
132    * @throws InterruptedException
133    * @throws IOException
134    */
submitJobInternal(Job job, Cluster cluster)135   JobStatus submitJobInternal(Job job, Cluster cluster)
136   throws ClassNotFoundException, InterruptedException, IOException {
137 
138     //validate the jobs output specs
139     checkSpecs(job);
140 
141     Configuration conf = job.getConfiguration();
142     addMRFrameworkToDistributedCache(conf);
143 
144     Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
145     //configure the command line options correctly on the submitting dfs
146     InetAddress ip = InetAddress.getLocalHost();
147     if (ip != null) {
148       submitHostAddress = ip.getHostAddress();
149       submitHostName = ip.getHostName();
150       conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
151       conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
152     }
153     JobID jobId = submitClient.getNewJobID();
154     job.setJobID(jobId);
155     Path submitJobDir = new Path(jobStagingArea, jobId.toString());
156     JobStatus status = null;
157     try {
158       conf.set(MRJobConfig.USER_NAME,
159           UserGroupInformation.getCurrentUser().getShortUserName());
160       conf.set("hadoop.http.filter.initializers",
161           "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
162       conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
163       LOG.debug("Configuring job " + jobId + " with " + submitJobDir
164           + " as the submit dir");
165       // get delegation token for the dir
166       TokenCache.obtainTokensForNamenodes(job.getCredentials(),
167           new Path[] { submitJobDir }, conf);
168 
169       populateTokenCache(conf, job.getCredentials());
170 
171       // generate a secret to authenticate shuffle transfers
172       if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
173         KeyGenerator keyGen;
174         try {
175           keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
176           keyGen.init(SHUFFLE_KEY_LENGTH);
177         } catch (NoSuchAlgorithmException e) {
178           throw new IOException("Error generating shuffle secret key", e);
179         }
180         SecretKey shuffleKey = keyGen.generateKey();
181         TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
182             job.getCredentials());
183       }
184       if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
185         conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
186         LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
187                 "data spill is enabled");
188       }
189 
190       copyAndConfigureFiles(job, submitJobDir);
191 
192       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
193 
194       // Create the splits for the job
195       LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
196       int maps = writeSplits(job, submitJobDir);
197       conf.setInt(MRJobConfig.NUM_MAPS, maps);
198       LOG.info("number of splits:" + maps);
199 
200       // write "queue admins of the queue to which job is being submitted"
201       // to job file.
202       String queue = conf.get(MRJobConfig.QUEUE_NAME,
203           JobConf.DEFAULT_QUEUE_NAME);
204       AccessControlList acl = submitClient.getQueueAdmins(queue);
205       conf.set(toFullPropertyName(queue,
206           QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
207 
208       // removing jobtoken referrals before copying the jobconf to HDFS
209       // as the tasks don't need this setting, actually they may break
210       // because of it if present as the referral will point to a
211       // different job.
212       TokenCache.cleanUpTokenReferral(conf);
213 
214       if (conf.getBoolean(
215           MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
216           MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
217         // Add HDFS tracking ids
218         ArrayList<String> trackingIds = new ArrayList<String>();
219         for (Token<? extends TokenIdentifier> t :
220             job.getCredentials().getAllTokens()) {
221           trackingIds.add(t.decodeIdentifier().getTrackingId());
222         }
223         conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
224             trackingIds.toArray(new String[trackingIds.size()]));
225       }
226 
227       // Set reservation info if it exists
228       ReservationId reservationId = job.getReservationId();
229       if (reservationId != null) {
230         conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
231       }
232 
233       // Write job file to submit dir
234       writeConf(conf, submitJobFile);
235 
236       //
237       // Now, actually submit the job (using the submit name)
238       //
239       printTokens(jobId, job.getCredentials());
240       status = submitClient.submitJob(
241           jobId, submitJobDir.toString(), job.getCredentials());
242       if (status != null) {
243         return status;
244       } else {
245         throw new IOException("Could not launch job");
246       }
247     } finally {
248       if (status == null) {
249         LOG.info("Cleaning up the staging area " + submitJobDir);
250         if (jtFs != null && submitJobDir != null)
251           jtFs.delete(submitJobDir, true);
252 
253       }
254     }
255   }
256 
checkSpecs(Job job)257   private void checkSpecs(Job job) throws ClassNotFoundException,
258       InterruptedException, IOException {
259     JobConf jConf = (JobConf)job.getConfiguration();
260     // Check the output specification
261     if (jConf.getNumReduceTasks() == 0 ?
262         jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
263       org.apache.hadoop.mapreduce.OutputFormat<?, ?> output =
264         ReflectionUtils.newInstance(job.getOutputFormatClass(),
265           job.getConfiguration());
266       output.checkOutputSpecs(job);
267     } else {
268       jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
269     }
270   }
271 
writeConf(Configuration conf, Path jobFile)272   private void writeConf(Configuration conf, Path jobFile)
273       throws IOException {
274     // Write job file to JobTracker's fs
275     FSDataOutputStream out =
276       FileSystem.create(jtFs, jobFile,
277                         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
278     try {
279       conf.writeXml(out);
280     } finally {
281       out.close();
282     }
283   }
284 
printTokens(JobID jobId, Credentials credentials)285   private void printTokens(JobID jobId,
286       Credentials credentials) throws IOException {
287     LOG.info("Submitting tokens for job: " + jobId);
288     for (Token<?> token: credentials.getAllTokens()) {
289       LOG.info(token);
290     }
291   }
292 
293   @SuppressWarnings("unchecked")
294   private <T extends InputSplit>
writeNewSplits(JobContext job, Path jobSubmitDir)295   int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
296       InterruptedException, ClassNotFoundException {
297     Configuration conf = job.getConfiguration();
298     InputFormat<?, ?> input =
299       ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
300 
301     List<InputSplit> splits = input.getSplits(job);
302     T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
303 
304     // sort the splits into order based on size, so that the biggest
305     // go first
306     Arrays.sort(array, new SplitComparator());
307     JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
308         jobSubmitDir.getFileSystem(conf), array);
309     return array.length;
310   }
311 
writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir)312   private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
313       Path jobSubmitDir) throws IOException,
314       InterruptedException, ClassNotFoundException {
315     JobConf jConf = (JobConf)job.getConfiguration();
316     int maps;
317     if (jConf.getUseNewMapper()) {
318       maps = writeNewSplits(job, jobSubmitDir);
319     } else {
320       maps = writeOldSplits(jConf, jobSubmitDir);
321     }
322     return maps;
323   }
324 
325   //method to write splits for old api mapper.
writeOldSplits(JobConf job, Path jobSubmitDir)326   private int writeOldSplits(JobConf job, Path jobSubmitDir)
327   throws IOException {
328     org.apache.hadoop.mapred.InputSplit[] splits =
329     job.getInputFormat().getSplits(job, job.getNumMapTasks());
330     // sort the splits into order based on size, so that the biggest
331     // go first
332     Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
333       public int compare(org.apache.hadoop.mapred.InputSplit a,
334                          org.apache.hadoop.mapred.InputSplit b) {
335         try {
336           long left = a.getLength();
337           long right = b.getLength();
338           if (left == right) {
339             return 0;
340           } else if (left < right) {
341             return 1;
342           } else {
343             return -1;
344           }
345         } catch (IOException ie) {
346           throw new RuntimeException("Problem getting input split size", ie);
347         }
348       }
349     });
350     JobSplitWriter.createSplitFiles(jobSubmitDir, job,
351         jobSubmitDir.getFileSystem(job), splits);
352     return splits.length;
353   }
354 
355   private static class SplitComparator implements Comparator<InputSplit> {
356     @Override
compare(InputSplit o1, InputSplit o2)357     public int compare(InputSplit o1, InputSplit o2) {
358       try {
359         long len1 = o1.getLength();
360         long len2 = o2.getLength();
361         if (len1 < len2) {
362           return 1;
363         } else if (len1 == len2) {
364           return 0;
365         } else {
366           return -1;
367         }
368       } catch (IOException ie) {
369         throw new RuntimeException("exception in compare", ie);
370       } catch (InterruptedException ie) {
371         throw new RuntimeException("exception in compare", ie);
372       }
373     }
374   }
375 
376   @SuppressWarnings("unchecked")
readTokensFromFiles(Configuration conf, Credentials credentials)377   private void readTokensFromFiles(Configuration conf, Credentials credentials)
378   throws IOException {
379     // add tokens and secrets coming from a token storage file
380     String binaryTokenFilename =
381       conf.get("mapreduce.job.credentials.binary");
382     if (binaryTokenFilename != null) {
383       Credentials binary = Credentials.readTokenStorageFile(
384           FileSystem.getLocal(conf).makeQualified(
385               new Path(binaryTokenFilename)),
386           conf);
387       credentials.addAll(binary);
388     }
389     // add secret keys coming from a json file
390     String tokensFileName = conf.get("mapreduce.job.credentials.json");
391     if(tokensFileName != null) {
392       LOG.info("loading user's secret keys from " + tokensFileName);
393       String localFileName = new Path(tokensFileName).toUri().getPath();
394 
395       boolean json_error = false;
396       try {
397         // read JSON
398         ObjectMapper mapper = new ObjectMapper();
399         Map<String, String> nm =
400           mapper.readValue(new File(localFileName), Map.class);
401 
402         for(Map.Entry<String, String> ent: nm.entrySet()) {
403           credentials.addSecretKey(new Text(ent.getKey()), ent.getValue()
404               .getBytes(Charsets.UTF_8));
405         }
406       } catch (JsonMappingException e) {
407         json_error = true;
408       } catch (JsonParseException e) {
409         json_error = true;
410       }
411       if(json_error)
412         LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
413     }
414   }
415 
416   //get secret keys and tokens and store them into TokenCache
populateTokenCache(Configuration conf, Credentials credentials)417   private void populateTokenCache(Configuration conf, Credentials credentials)
418   throws IOException{
419     readTokensFromFiles(conf, credentials);
420     // add the delegation tokens from configuration
421     String [] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
422     LOG.debug("adding the following namenodes' delegation tokens:" +
423         Arrays.toString(nameNodes));
424     if(nameNodes != null) {
425       Path [] ps = new Path[nameNodes.length];
426       for(int i=0; i< nameNodes.length; i++) {
427         ps[i] = new Path(nameNodes[i]);
428       }
429       TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
430     }
431   }
432 
433   @SuppressWarnings("deprecation")
addMRFrameworkToDistributedCache(Configuration conf)434   private static void addMRFrameworkToDistributedCache(Configuration conf)
435       throws IOException {
436     String framework =
437         conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, "");
438     if (!framework.isEmpty()) {
439       URI uri;
440       try {
441         uri = new URI(framework);
442       } catch (URISyntaxException e) {
443         throw new IllegalArgumentException("Unable to parse '" + framework
444             + "' as a URI, check the setting for "
445             + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e);
446       }
447 
448       String linkedName = uri.getFragment();
449 
450       // resolve any symlinks in the URI path so using a "current" symlink
451       // to point to a specific version shows the specific version
452       // in the distributed cache configuration
453       FileSystem fs = FileSystem.get(conf);
454       Path frameworkPath = fs.makeQualified(
455           new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()));
456       FileContext fc = FileContext.getFileContext(frameworkPath.toUri(), conf);
457       frameworkPath = fc.resolvePath(frameworkPath);
458       uri = frameworkPath.toUri();
459       try {
460         uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(),
461             null, linkedName);
462       } catch (URISyntaxException e) {
463         throw new IllegalArgumentException(e);
464       }
465 
466       DistributedCache.addCacheArchive(uri, conf);
467     }
468   }
469 }
470