1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.applications.distributedshell;
20 
21 import java.io.BufferedReader;
22 import java.io.DataInputStream;
23 import java.io.File;
24 import java.io.FileInputStream;
25 import java.io.IOException;
26 import java.io.StringReader;
27 import java.lang.reflect.UndeclaredThrowableException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.nio.ByteBuffer;
31 import java.security.PrivilegedExceptionAction;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Vector;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ConcurrentMap;
40 import java.util.concurrent.atomic.AtomicInteger;
41 
42 import org.apache.commons.cli.CommandLine;
43 import org.apache.commons.cli.GnuParser;
44 import org.apache.commons.cli.HelpFormatter;
45 import org.apache.commons.cli.Options;
46 import org.apache.commons.cli.ParseException;
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.apache.hadoop.classification.InterfaceAudience;
50 import org.apache.hadoop.classification.InterfaceAudience.Private;
51 import org.apache.hadoop.classification.InterfaceStability;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.hadoop.fs.FileSystem;
54 import org.apache.hadoop.fs.Path;
55 import org.apache.hadoop.io.DataOutputBuffer;
56 import org.apache.hadoop.io.IOUtils;
57 import org.apache.hadoop.net.NetUtils;
58 import org.apache.hadoop.security.Credentials;
59 import org.apache.hadoop.security.UserGroupInformation;
60 import org.apache.hadoop.security.token.Token;
61 import org.apache.hadoop.util.ExitUtil;
62 import org.apache.hadoop.util.Shell;
63 import org.apache.hadoop.yarn.api.ApplicationConstants;
64 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
65 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
66 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
67 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
68 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
69 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
70 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
71 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
72 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
73 import org.apache.hadoop.yarn.api.records.Container;
74 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
75 import org.apache.hadoop.yarn.api.records.ContainerId;
76 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
77 import org.apache.hadoop.yarn.api.records.ContainerState;
78 import org.apache.hadoop.yarn.api.records.ContainerStatus;
79 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
80 import org.apache.hadoop.yarn.api.records.LocalResource;
81 import org.apache.hadoop.yarn.api.records.LocalResourceType;
82 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
83 import org.apache.hadoop.yarn.api.records.NodeReport;
84 import org.apache.hadoop.yarn.api.records.Priority;
85 import org.apache.hadoop.yarn.api.records.Resource;
86 import org.apache.hadoop.yarn.api.records.ResourceRequest;
87 import org.apache.hadoop.yarn.api.records.URL;
88 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
89 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
90 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
91 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
92 import org.apache.hadoop.yarn.client.api.TimelineClient;
93 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
94 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
95 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
96 import org.apache.hadoop.yarn.conf.YarnConfiguration;
97 import org.apache.hadoop.yarn.exceptions.YarnException;
98 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
99 import org.apache.hadoop.yarn.util.ConverterUtils;
100 import org.apache.log4j.LogManager;
101 
102 import com.google.common.annotations.VisibleForTesting;
103 
104 /**
105  * An ApplicationMaster for executing shell commands on a set of launched
106  * containers using the YARN framework.
107  *
108  * <p>
109  * This class is meant to act as an example on how to write yarn-based
110  * application masters.
111  * </p>
112  *
113  * <p>
114  * The ApplicationMaster is started on a container by the
115  * <code>ResourceManager</code>'s launcher. The first thing that the
116  * <code>ApplicationMaster</code> needs to do is to connect and register itself
117  * with the <code>ResourceManager</code>. The registration sets up information
118  * within the <code>ResourceManager</code> regarding what host:port the
119  * ApplicationMaster is listening on to provide any form of functionality to a
120  * client as well as a tracking url that a client can use to keep track of
121  * status/job history if needed. However, in the distributedshell, trackingurl
122  * and appMasterHost:appMasterRpcPort are not supported.
123  * </p>
124  *
125  * <p>
126  * The <code>ApplicationMaster</code> needs to send a heartbeat to the
127  * <code>ResourceManager</code> at regular intervals to inform the
128  * <code>ResourceManager</code> that it is up and alive. The
129  * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
130  * <code>ApplicationMaster</code> acts as a heartbeat.
131  *
132  * <p>
133  * For the actual handling of the job, the <code>ApplicationMaster</code> has to
134  * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
135  * required no. of containers using {@link ResourceRequest} with the necessary
136  * resource specifications such as node location, computational
137  * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
138  * responds with an {@link AllocateResponse} that informs the
139  * <code>ApplicationMaster</code> of the set of newly allocated containers,
140  * completed containers as well as current state of available resources.
141  * </p>
142  *
143  * <p>
144  * For each allocated container, the <code>ApplicationMaster</code> can then set
145  * up the necessary launch context via {@link ContainerLaunchContext} to specify
146  * the allocated container id, local resources required by the executable, the
147  * environment to be setup for the executable, commands to execute, etc. and
148  * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
149  * launch and execute the defined commands on the given allocated container.
150  * </p>
151  *
152  * <p>
153  * The <code>ApplicationMaster</code> can monitor the launched container by
154  * either querying the <code>ResourceManager</code> using
155  * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
156  * the {@link ContainerManagementProtocol} by querying for the status of the allocated
157  * container's {@link ContainerId}.
158  *
159  * <p>
160  * After the job has been completed, the <code>ApplicationMaster</code> has to
161  * send a {@link FinishApplicationMasterRequest} to the
162  * <code>ResourceManager</code> to inform it that the
163  * <code>ApplicationMaster</code> has been completed.
164  */
165 @InterfaceAudience.Public
166 @InterfaceStability.Unstable
167 public class ApplicationMaster {
168 
169   private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
170 
171   @VisibleForTesting
172   @Private
173   public static enum DSEvent {
174     DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
175   }
176 
177   @VisibleForTesting
178   @Private
179   public static enum DSEntity {
180     DS_APP_ATTEMPT, DS_CONTAINER
181   }
182 
183   // Configuration
184   private Configuration conf;
185 
186   // Handle to communicate with the Resource Manager
187   @SuppressWarnings("rawtypes")
188   private AMRMClientAsync amRMClient;
189 
190   // In both secure and non-secure modes, this points to the job-submitter.
191   @VisibleForTesting
192   UserGroupInformation appSubmitterUgi;
193 
194   // Handle to communicate with the Node Manager
195   private NMClientAsync nmClientAsync;
196   // Listen to process the response from the Node Manager
197   private NMCallbackHandler containerListener;
198 
199   // Application Attempt Id ( combination of attemptId and fail count )
200   @VisibleForTesting
201   protected ApplicationAttemptId appAttemptID;
202 
203   // TODO
204   // For status update for clients - yet to be implemented
205   // Hostname of the container
206   private String appMasterHostname = "";
207   // Port on which the app master listens for status updates from clients
208   private int appMasterRpcPort = -1;
209   // Tracking url to which app master publishes info for clients to monitor
210   private String appMasterTrackingUrl = "";
211 
212   // App Master configuration
213   // No. of containers to run shell command on
214   @VisibleForTesting
215   protected int numTotalContainers = 1;
216   // Memory to request for the container on which the shell command will run
217   private int containerMemory = 10;
218   // VirtualCores to request for the container on which the shell command will run
219   private int containerVirtualCores = 1;
220   // Priority of the request
221   private int requestPriority;
222 
223   // Counter for completed containers ( complete denotes successful or failed )
224   private AtomicInteger numCompletedContainers = new AtomicInteger();
225   // Allocated container count so that we know how many containers has the RM
226   // allocated to us
227   @VisibleForTesting
228   protected AtomicInteger numAllocatedContainers = new AtomicInteger();
229   // Count of failed containers
230   private AtomicInteger numFailedContainers = new AtomicInteger();
231   // Count of containers already requested from the RM
232   // Needed as once requested, we should not request for containers again.
233   // Only request for more if the original requirement changes.
234   @VisibleForTesting
235   protected AtomicInteger numRequestedContainers = new AtomicInteger();
236 
237   // Shell command to be executed
238   private String shellCommand = "";
239   // Args to be passed to the shell command
240   private String shellArgs = "";
241   // Env variables to be setup for the shell command
242   private Map<String, String> shellEnv = new HashMap<String, String>();
243 
244   // Location of shell script ( obtained from info set in env )
245   // Shell script path in fs
246   private String scriptPath = "";
247   // Timestamp needed for creating a local resource
248   private long shellScriptPathTimestamp = 0;
249   // File length needed for local resource
250   private long shellScriptPathLen = 0;
251 
252   // Timeline domain ID
253   private String domainId = null;
254 
255   // Hardcoded path to shell script in launch container's local env
256   private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
257   private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
258       + ".bat";
259 
260   // Hardcoded path to custom log_properties
261   private static final String log4jPath = "log4j.properties";
262 
263   private static final String shellCommandPath = "shellCommands";
264   private static final String shellArgsPath = "shellArgs";
265 
266   private volatile boolean done;
267 
268   private ByteBuffer allTokens;
269 
270   // Launch threads
271   private List<Thread> launchThreads = new ArrayList<Thread>();
272 
273   // Timeline Client
274   @VisibleForTesting
275   TimelineClient timelineClient;
276 
277   private final String linux_bash_command = "bash";
278   private final String windows_command = "cmd /c";
279 
280   /**
281    * @param args Command line args
282    */
main(String[] args)283   public static void main(String[] args) {
284     boolean result = false;
285     try {
286       ApplicationMaster appMaster = new ApplicationMaster();
287       LOG.info("Initializing ApplicationMaster");
288       boolean doRun = appMaster.init(args);
289       if (!doRun) {
290         System.exit(0);
291       }
292       appMaster.run();
293       result = appMaster.finish();
294     } catch (Throwable t) {
295       LOG.fatal("Error running ApplicationMaster", t);
296       LogManager.shutdown();
297       ExitUtil.terminate(1, t);
298     }
299     if (result) {
300       LOG.info("Application Master completed successfully. exiting");
301       System.exit(0);
302     } else {
303       LOG.info("Application Master failed. exiting");
304       System.exit(2);
305     }
306   }
307 
308   /**
309    * Dump out contents of $CWD and the environment to stdout for debugging
310    */
dumpOutDebugInfo()311   private void dumpOutDebugInfo() {
312 
313     LOG.info("Dump debug output");
314     Map<String, String> envs = System.getenv();
315     for (Map.Entry<String, String> env : envs.entrySet()) {
316       LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
317       System.out.println("System env: key=" + env.getKey() + ", val="
318           + env.getValue());
319     }
320 
321     BufferedReader buf = null;
322     try {
323       String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
324         Shell.execCommand("ls", "-al");
325       buf = new BufferedReader(new StringReader(lines));
326       String line = "";
327       while ((line = buf.readLine()) != null) {
328         LOG.info("System CWD content: " + line);
329         System.out.println("System CWD content: " + line);
330       }
331     } catch (IOException e) {
332       e.printStackTrace();
333     } finally {
334       IOUtils.cleanup(LOG, buf);
335     }
336   }
337 
ApplicationMaster()338   public ApplicationMaster() {
339     // Set up the configuration
340     conf = new YarnConfiguration();
341   }
342 
343   /**
344    * Parse command line options
345    *
346    * @param args Command line args
347    * @return Whether init successful and run should be invoked
348    * @throws ParseException
349    * @throws IOException
350    */
init(String[] args)351   public boolean init(String[] args) throws ParseException, IOException {
352     Options opts = new Options();
353     opts.addOption("app_attempt_id", true,
354         "App Attempt ID. Not to be used unless for testing purposes");
355     opts.addOption("shell_env", true,
356         "Environment for shell script. Specified as env_key=env_val pairs");
357     opts.addOption("container_memory", true,
358         "Amount of memory in MB to be requested to run the shell command");
359     opts.addOption("container_vcores", true,
360         "Amount of virtual cores to be requested to run the shell command");
361     opts.addOption("num_containers", true,
362         "No. of containers on which the shell command needs to be executed");
363     opts.addOption("priority", true, "Application Priority. Default 0");
364     opts.addOption("debug", false, "Dump out debug information");
365 
366     opts.addOption("help", false, "Print usage");
367     CommandLine cliParser = new GnuParser().parse(opts, args);
368 
369     if (args.length == 0) {
370       printUsage(opts);
371       throw new IllegalArgumentException(
372           "No args specified for application master to initialize");
373     }
374 
375     //Check whether customer log4j.properties file exists
376     if (fileExist(log4jPath)) {
377       try {
378         Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
379             log4jPath);
380       } catch (Exception e) {
381         LOG.warn("Can not set up custom log4j properties. " + e);
382       }
383     }
384 
385     if (cliParser.hasOption("help")) {
386       printUsage(opts);
387       return false;
388     }
389 
390     if (cliParser.hasOption("debug")) {
391       dumpOutDebugInfo();
392     }
393 
394     Map<String, String> envs = System.getenv();
395 
396     if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
397       if (cliParser.hasOption("app_attempt_id")) {
398         String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
399         appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
400       } else {
401         throw new IllegalArgumentException(
402             "Application Attempt Id not set in the environment");
403       }
404     } else {
405       ContainerId containerId = ConverterUtils.toContainerId(envs
406           .get(Environment.CONTAINER_ID.name()));
407       appAttemptID = containerId.getApplicationAttemptId();
408     }
409 
410     if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
411       throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
412           + " not set in the environment");
413     }
414     if (!envs.containsKey(Environment.NM_HOST.name())) {
415       throw new RuntimeException(Environment.NM_HOST.name()
416           + " not set in the environment");
417     }
418     if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
419       throw new RuntimeException(Environment.NM_HTTP_PORT
420           + " not set in the environment");
421     }
422     if (!envs.containsKey(Environment.NM_PORT.name())) {
423       throw new RuntimeException(Environment.NM_PORT.name()
424           + " not set in the environment");
425     }
426 
427     LOG.info("Application master for app" + ", appId="
428         + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
429         + appAttemptID.getApplicationId().getClusterTimestamp()
430         + ", attemptId=" + appAttemptID.getAttemptId());
431 
432     if (!fileExist(shellCommandPath)
433         && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
434       throw new IllegalArgumentException(
435           "No shell command or shell script specified to be executed by application master");
436     }
437 
438     if (fileExist(shellCommandPath)) {
439       shellCommand = readContent(shellCommandPath);
440     }
441 
442     if (fileExist(shellArgsPath)) {
443       shellArgs = readContent(shellArgsPath);
444     }
445 
446     if (cliParser.hasOption("shell_env")) {
447       String shellEnvs[] = cliParser.getOptionValues("shell_env");
448       for (String env : shellEnvs) {
449         env = env.trim();
450         int index = env.indexOf('=');
451         if (index == -1) {
452           shellEnv.put(env, "");
453           continue;
454         }
455         String key = env.substring(0, index);
456         String val = "";
457         if (index < (env.length() - 1)) {
458           val = env.substring(index + 1);
459         }
460         shellEnv.put(key, val);
461       }
462     }
463 
464     if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
465       scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
466 
467       if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
468         shellScriptPathTimestamp = Long.parseLong(envs
469             .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
470       }
471       if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
472         shellScriptPathLen = Long.parseLong(envs
473             .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
474       }
475       if (!scriptPath.isEmpty()
476           && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
477         LOG.error("Illegal values in env for shell script path" + ", path="
478             + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
479             + shellScriptPathTimestamp);
480         throw new IllegalArgumentException(
481             "Illegal values in env for shell script path");
482       }
483     }
484 
485     if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
486       domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
487     }
488 
489     containerMemory = Integer.parseInt(cliParser.getOptionValue(
490         "container_memory", "10"));
491     containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
492         "container_vcores", "1"));
493     numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
494         "num_containers", "1"));
495     if (numTotalContainers == 0) {
496       throw new IllegalArgumentException(
497           "Cannot run distributed shell with no containers");
498     }
499     requestPriority = Integer.parseInt(cliParser
500         .getOptionValue("priority", "0"));
501     return true;
502   }
503 
504   /**
505    * Helper function to print usage
506    *
507    * @param opts Parsed command line options
508    */
printUsage(Options opts)509   private void printUsage(Options opts) {
510     new HelpFormatter().printHelp("ApplicationMaster", opts);
511   }
512 
513   /**
514    * Main run function for the application master
515    *
516    * @throws YarnException
517    * @throws IOException
518    */
519   @SuppressWarnings({ "unchecked" })
run()520   public void run() throws YarnException, IOException, InterruptedException {
521     LOG.info("Starting ApplicationMaster");
522 
523     // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
524     // are marked as LimitedPrivate
525     Credentials credentials =
526         UserGroupInformation.getCurrentUser().getCredentials();
527     DataOutputBuffer dob = new DataOutputBuffer();
528     credentials.writeTokenStorageToStream(dob);
529     // Now remove the AM->RM token so that containers cannot access it.
530     Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
531     LOG.info("Executing with tokens:");
532     while (iter.hasNext()) {
533       Token<?> token = iter.next();
534       LOG.info(token);
535       if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
536         iter.remove();
537       }
538     }
539     allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
540 
541     // Create appSubmitterUgi and add original tokens to it
542     String appSubmitterUserName =
543         System.getenv(ApplicationConstants.Environment.USER.name());
544     appSubmitterUgi =
545         UserGroupInformation.createRemoteUser(appSubmitterUserName);
546     appSubmitterUgi.addCredentials(credentials);
547 
548 
549     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
550     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
551     amRMClient.init(conf);
552     amRMClient.start();
553 
554     containerListener = createNMCallbackHandler();
555     nmClientAsync = new NMClientAsyncImpl(containerListener);
556     nmClientAsync.init(conf);
557     nmClientAsync.start();
558 
559     startTimelineClient(conf);
560     if(timelineClient != null) {
561       publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
562           DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
563     }
564 
565     // Setup local RPC Server to accept status requests directly from clients
566     // TODO need to setup a protocol for client to be able to communicate to
567     // the RPC server
568     // TODO use the rpc port info to register with the RM for the client to
569     // send requests to this app master
570 
571     // Register self with ResourceManager
572     // This will start heartbeating to the RM
573     appMasterHostname = NetUtils.getHostname();
574     RegisterApplicationMasterResponse response = amRMClient
575         .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
576             appMasterTrackingUrl);
577     // Dump out information about cluster capability as seen by the
578     // resource manager
579     int maxMem = response.getMaximumResourceCapability().getMemory();
580     LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
581 
582     int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
583     LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
584 
585     // A resource ask cannot exceed the max.
586     if (containerMemory > maxMem) {
587       LOG.info("Container memory specified above max threshold of cluster."
588           + " Using max value." + ", specified=" + containerMemory + ", max="
589           + maxMem);
590       containerMemory = maxMem;
591     }
592 
593     if (containerVirtualCores > maxVCores) {
594       LOG.info("Container virtual cores specified above max threshold of cluster."
595           + " Using max value." + ", specified=" + containerVirtualCores + ", max="
596           + maxVCores);
597       containerVirtualCores = maxVCores;
598     }
599 
600     List<Container> previousAMRunningContainers =
601         response.getContainersFromPreviousAttempts();
602     LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
603       + " previous attempts' running containers on AM registration.");
604     numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
605 
606     int numTotalContainersToRequest =
607         numTotalContainers - previousAMRunningContainers.size();
608     // Setup ask for containers from RM
609     // Send request for containers to RM
610     // Until we get our fully allocated quota, we keep on polling RM for
611     // containers
612     // Keep looping until all the containers are launched and shell script
613     // executed on them ( regardless of success/failure).
614     for (int i = 0; i < numTotalContainersToRequest; ++i) {
615       ContainerRequest containerAsk = setupContainerAskForRM();
616       amRMClient.addContainerRequest(containerAsk);
617     }
618     numRequestedContainers.set(numTotalContainers);
619   }
620 
621   @VisibleForTesting
startTimelineClient(final Configuration conf)622   void startTimelineClient(final Configuration conf)
623       throws YarnException, IOException, InterruptedException {
624     try {
625       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
626         @Override
627         public Void run() throws Exception {
628           if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
629               YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
630             // Creating the Timeline Client
631             timelineClient = TimelineClient.createTimelineClient();
632             timelineClient.init(conf);
633             timelineClient.start();
634           } else {
635             timelineClient = null;
636             LOG.warn("Timeline service is not enabled");
637           }
638           return null;
639         }
640       });
641     } catch (UndeclaredThrowableException e) {
642       throw new YarnException(e.getCause());
643     }
644   }
645 
646   @VisibleForTesting
createNMCallbackHandler()647   NMCallbackHandler createNMCallbackHandler() {
648     return new NMCallbackHandler(this);
649   }
650 
651   @VisibleForTesting
finish()652   protected boolean finish() {
653     // wait for completion.
654     while (!done
655         && (numCompletedContainers.get() != numTotalContainers)) {
656       try {
657         Thread.sleep(200);
658       } catch (InterruptedException ex) {}
659     }
660 
661     if(timelineClient != null) {
662       publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
663           DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
664     }
665 
666     // Join all launched threads
667     // needed for when we time out
668     // and we need to release containers
669     for (Thread launchThread : launchThreads) {
670       try {
671         launchThread.join(10000);
672       } catch (InterruptedException e) {
673         LOG.info("Exception thrown in thread join: " + e.getMessage());
674         e.printStackTrace();
675       }
676     }
677 
678     // When the application completes, it should stop all running containers
679     LOG.info("Application completed. Stopping running containers");
680     nmClientAsync.stop();
681 
682     // When the application completes, it should send a finish application
683     // signal to the RM
684     LOG.info("Application completed. Signalling finish to RM");
685 
686     FinalApplicationStatus appStatus;
687     String appMessage = null;
688     boolean success = true;
689     if (numFailedContainers.get() == 0 &&
690         numCompletedContainers.get() == numTotalContainers) {
691       appStatus = FinalApplicationStatus.SUCCEEDED;
692     } else {
693       appStatus = FinalApplicationStatus.FAILED;
694       appMessage = "Diagnostics." + ", total=" + numTotalContainers
695           + ", completed=" + numCompletedContainers.get() + ", allocated="
696           + numAllocatedContainers.get() + ", failed="
697           + numFailedContainers.get();
698       LOG.info(appMessage);
699       success = false;
700     }
701     try {
702       amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
703     } catch (YarnException ex) {
704       LOG.error("Failed to unregister application", ex);
705     } catch (IOException e) {
706       LOG.error("Failed to unregister application", e);
707     }
708 
709     amRMClient.stop();
710 
711     // Stop Timeline Client
712     if(timelineClient != null) {
713       timelineClient.stop();
714     }
715 
716     return success;
717   }
718 
719   private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
720     @SuppressWarnings("unchecked")
721     @Override
onContainersCompleted(List<ContainerStatus> completedContainers)722     public void onContainersCompleted(List<ContainerStatus> completedContainers) {
723       LOG.info("Got response from RM for container ask, completedCnt="
724           + completedContainers.size());
725       for (ContainerStatus containerStatus : completedContainers) {
726         LOG.info(appAttemptID + " got container status for containerID="
727             + containerStatus.getContainerId() + ", state="
728             + containerStatus.getState() + ", exitStatus="
729             + containerStatus.getExitStatus() + ", diagnostics="
730             + containerStatus.getDiagnostics());
731 
732         // non complete containers should not be here
733         assert (containerStatus.getState() == ContainerState.COMPLETE);
734 
735         // increment counters for completed/failed containers
736         int exitStatus = containerStatus.getExitStatus();
737         if (0 != exitStatus) {
738           // container failed
739           if (ContainerExitStatus.ABORTED != exitStatus) {
740             // shell script failed
741             // counts as completed
742             numCompletedContainers.incrementAndGet();
743             numFailedContainers.incrementAndGet();
744           } else {
745             // container was killed by framework, possibly preempted
746             // we should re-try as the container was lost for some reason
747             numAllocatedContainers.decrementAndGet();
748             numRequestedContainers.decrementAndGet();
749             // we do not need to release the container as it would be done
750             // by the RM
751           }
752         } else {
753           // nothing to do
754           // container completed successfully
755           numCompletedContainers.incrementAndGet();
756           LOG.info("Container completed successfully." + ", containerId="
757               + containerStatus.getContainerId());
758         }
759         if(timelineClient != null) {
760           publishContainerEndEvent(
761               timelineClient, containerStatus, domainId, appSubmitterUgi);
762         }
763       }
764 
765       // ask for more containers if any failed
766       int askCount = numTotalContainers - numRequestedContainers.get();
767       numRequestedContainers.addAndGet(askCount);
768 
769       if (askCount > 0) {
770         for (int i = 0; i < askCount; ++i) {
771           ContainerRequest containerAsk = setupContainerAskForRM();
772           amRMClient.addContainerRequest(containerAsk);
773         }
774       }
775 
776       if (numCompletedContainers.get() == numTotalContainers) {
777         done = true;
778       }
779     }
780 
781     @Override
onContainersAllocated(List<Container> allocatedContainers)782     public void onContainersAllocated(List<Container> allocatedContainers) {
783       LOG.info("Got response from RM for container ask, allocatedCnt="
784           + allocatedContainers.size());
785       numAllocatedContainers.addAndGet(allocatedContainers.size());
786       for (Container allocatedContainer : allocatedContainers) {
787         LOG.info("Launching shell command on a new container."
788             + ", containerId=" + allocatedContainer.getId()
789             + ", containerNode=" + allocatedContainer.getNodeId().getHost()
790             + ":" + allocatedContainer.getNodeId().getPort()
791             + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
792             + ", containerResourceMemory"
793             + allocatedContainer.getResource().getMemory()
794             + ", containerResourceVirtualCores"
795             + allocatedContainer.getResource().getVirtualCores());
796         // + ", containerToken"
797         // +allocatedContainer.getContainerToken().getIdentifier().toString());
798 
799         LaunchContainerRunnable runnableLaunchContainer =
800             new LaunchContainerRunnable(allocatedContainer, containerListener);
801         Thread launchThread = new Thread(runnableLaunchContainer);
802 
803         // launch and start the container on a separate thread to keep
804         // the main thread unblocked
805         // as all containers may not be allocated at one go.
806         launchThreads.add(launchThread);
807         launchThread.start();
808       }
809     }
810 
811     @Override
onShutdownRequest()812     public void onShutdownRequest() {
813       done = true;
814     }
815 
816     @Override
onNodesUpdated(List<NodeReport> updatedNodes)817     public void onNodesUpdated(List<NodeReport> updatedNodes) {}
818 
819     @Override
getProgress()820     public float getProgress() {
821       // set progress to deliver to RM on next heartbeat
822       float progress = (float) numCompletedContainers.get()
823           / numTotalContainers;
824       return progress;
825     }
826 
827     @Override
onError(Throwable e)828     public void onError(Throwable e) {
829       done = true;
830       amRMClient.stop();
831     }
832   }
833 
834   @VisibleForTesting
835   static class NMCallbackHandler
836     implements NMClientAsync.CallbackHandler {
837 
838     private ConcurrentMap<ContainerId, Container> containers =
839         new ConcurrentHashMap<ContainerId, Container>();
840     private final ApplicationMaster applicationMaster;
841 
NMCallbackHandler(ApplicationMaster applicationMaster)842     public NMCallbackHandler(ApplicationMaster applicationMaster) {
843       this.applicationMaster = applicationMaster;
844     }
845 
addContainer(ContainerId containerId, Container container)846     public void addContainer(ContainerId containerId, Container container) {
847       containers.putIfAbsent(containerId, container);
848     }
849 
850     @Override
onContainerStopped(ContainerId containerId)851     public void onContainerStopped(ContainerId containerId) {
852       if (LOG.isDebugEnabled()) {
853         LOG.debug("Succeeded to stop Container " + containerId);
854       }
855       containers.remove(containerId);
856     }
857 
858     @Override
onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus)859     public void onContainerStatusReceived(ContainerId containerId,
860         ContainerStatus containerStatus) {
861       if (LOG.isDebugEnabled()) {
862         LOG.debug("Container Status: id=" + containerId + ", status=" +
863             containerStatus);
864       }
865     }
866 
867     @Override
onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse)868     public void onContainerStarted(ContainerId containerId,
869         Map<String, ByteBuffer> allServiceResponse) {
870       if (LOG.isDebugEnabled()) {
871         LOG.debug("Succeeded to start Container " + containerId);
872       }
873       Container container = containers.get(containerId);
874       if (container != null) {
875         applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
876       }
877       if(applicationMaster.timelineClient != null) {
878         ApplicationMaster.publishContainerStartEvent(
879             applicationMaster.timelineClient, container,
880             applicationMaster.domainId, applicationMaster.appSubmitterUgi);
881       }
882     }
883 
884     @Override
onStartContainerError(ContainerId containerId, Throwable t)885     public void onStartContainerError(ContainerId containerId, Throwable t) {
886       LOG.error("Failed to start Container " + containerId);
887       containers.remove(containerId);
888       applicationMaster.numCompletedContainers.incrementAndGet();
889       applicationMaster.numFailedContainers.incrementAndGet();
890     }
891 
892     @Override
onGetContainerStatusError( ContainerId containerId, Throwable t)893     public void onGetContainerStatusError(
894         ContainerId containerId, Throwable t) {
895       LOG.error("Failed to query the status of Container " + containerId);
896     }
897 
898     @Override
onStopContainerError(ContainerId containerId, Throwable t)899     public void onStopContainerError(ContainerId containerId, Throwable t) {
900       LOG.error("Failed to stop Container " + containerId);
901       containers.remove(containerId);
902     }
903   }
904 
905   /**
906    * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
907    * that will execute the shell command.
908    */
909   private class LaunchContainerRunnable implements Runnable {
910 
911     // Allocated container
912     Container container;
913 
914     NMCallbackHandler containerListener;
915 
916     /**
917      * @param lcontainer Allocated container
918      * @param containerListener Callback handler of the container
919      */
LaunchContainerRunnable( Container lcontainer, NMCallbackHandler containerListener)920     public LaunchContainerRunnable(
921         Container lcontainer, NMCallbackHandler containerListener) {
922       this.container = lcontainer;
923       this.containerListener = containerListener;
924     }
925 
926     @Override
927     /**
928      * Connects to CM, sets up container launch context
929      * for shell command and eventually dispatches the container
930      * start request to the CM.
931      */
run()932     public void run() {
933       LOG.info("Setting up container launch container for containerid="
934           + container.getId());
935 
936       // Set the local resources
937       Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
938 
939       // The container for the eventual shell commands needs its own local
940       // resources too.
941       // In this scenario, if a shell script is specified, we need to have it
942       // copied and made available to the container.
943       if (!scriptPath.isEmpty()) {
944         Path renamedScriptPath = null;
945         if (Shell.WINDOWS) {
946           renamedScriptPath = new Path(scriptPath + ".bat");
947         } else {
948           renamedScriptPath = new Path(scriptPath + ".sh");
949         }
950 
951         try {
952           // rename the script file based on the underlying OS syntax.
953           renameScriptFile(renamedScriptPath);
954         } catch (Exception e) {
955           LOG.error(
956               "Not able to add suffix (.bat/.sh) to the shell script filename",
957               e);
958           // We know we cannot continue launching the container
959           // so we should release it.
960           numCompletedContainers.incrementAndGet();
961           numFailedContainers.incrementAndGet();
962           return;
963         }
964 
965         URL yarnUrl = null;
966         try {
967           yarnUrl = ConverterUtils.getYarnUrlFromURI(
968             new URI(renamedScriptPath.toString()));
969         } catch (URISyntaxException e) {
970           LOG.error("Error when trying to use shell script path specified"
971               + " in env, path=" + renamedScriptPath, e);
972           // A failure scenario on bad input such as invalid shell script path
973           // We know we cannot continue launching the container
974           // so we should release it.
975           // TODO
976           numCompletedContainers.incrementAndGet();
977           numFailedContainers.incrementAndGet();
978           return;
979         }
980         LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
981           LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
982           shellScriptPathLen, shellScriptPathTimestamp);
983         localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
984             ExecShellStringPath, shellRsrc);
985         shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
986       }
987 
988       // Set the necessary command to execute on the allocated container
989       Vector<CharSequence> vargs = new Vector<CharSequence>(5);
990 
991       // Set executable command
992       vargs.add(shellCommand);
993       // Set shell script path
994       if (!scriptPath.isEmpty()) {
995         vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
996             : ExecShellStringPath);
997       }
998 
999       // Set args for the shell command if any
1000       vargs.add(shellArgs);
1001       // Add log redirect params
1002       vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
1003       vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
1004 
1005       // Get final commmand
1006       StringBuilder command = new StringBuilder();
1007       for (CharSequence str : vargs) {
1008         command.append(str).append(" ");
1009       }
1010 
1011       List<String> commands = new ArrayList<String>();
1012       commands.add(command.toString());
1013 
1014       // Set up ContainerLaunchContext, setting local resource, environment,
1015       // command and token for constructor.
1016 
1017       // Note for tokens: Set up tokens for the container too. Today, for normal
1018       // shell commands, the container in distribute-shell doesn't need any
1019       // tokens. We are populating them mainly for NodeManagers to be able to
1020       // download anyfiles in the distributed file-system. The tokens are
1021       // otherwise also useful in cases, for e.g., when one is running a
1022       // "hadoop dfs" command inside the distributed shell.
1023       ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
1024         localResources, shellEnv, commands, null, allTokens.duplicate(), null);
1025       containerListener.addContainer(container.getId(), container);
1026       nmClientAsync.startContainerAsync(container, ctx);
1027     }
1028   }
1029 
renameScriptFile(final Path renamedScriptPath)1030   private void renameScriptFile(final Path renamedScriptPath)
1031       throws IOException, InterruptedException {
1032     appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1033       @Override
1034       public Void run() throws IOException {
1035         FileSystem fs = renamedScriptPath.getFileSystem(conf);
1036         fs.rename(new Path(scriptPath), renamedScriptPath);
1037         return null;
1038       }
1039     });
1040     LOG.info("User " + appSubmitterUgi.getUserName()
1041         + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1042   }
1043 
1044   /**
1045    * Setup the request that will be sent to the RM for the container ask.
1046    *
1047    * @return the setup ResourceRequest to be sent to RM
1048    */
setupContainerAskForRM()1049   private ContainerRequest setupContainerAskForRM() {
1050     // setup requirements for hosts
1051     // using * as any host will do for the distributed shell app
1052     // set the priority for the request
1053     // TODO - what is the range for priority? how to decide?
1054     Priority pri = Priority.newInstance(requestPriority);
1055 
1056     // Set up resource type requirements
1057     // For now, memory and CPU are supported so we set memory and cpu requirements
1058     Resource capability = Resource.newInstance(containerMemory,
1059       containerVirtualCores);
1060 
1061     ContainerRequest request = new ContainerRequest(capability, null, null,
1062         pri);
1063     LOG.info("Requested container ask: " + request.toString());
1064     return request;
1065   }
1066 
fileExist(String filePath)1067   private boolean fileExist(String filePath) {
1068     return new File(filePath).exists();
1069   }
1070 
readContent(String filePath)1071   private String readContent(String filePath) throws IOException {
1072     DataInputStream ds = null;
1073     try {
1074       ds = new DataInputStream(new FileInputStream(filePath));
1075       return ds.readUTF();
1076     } finally {
1077       org.apache.commons.io.IOUtils.closeQuietly(ds);
1078     }
1079   }
1080 
publishContainerStartEvent( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi)1081   private static void publishContainerStartEvent(
1082       final TimelineClient timelineClient, Container container, String domainId,
1083       UserGroupInformation ugi) {
1084     final TimelineEntity entity = new TimelineEntity();
1085     entity.setEntityId(container.getId().toString());
1086     entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1087     entity.setDomainId(domainId);
1088     entity.addPrimaryFilter("user", ugi.getShortUserName());
1089     TimelineEvent event = new TimelineEvent();
1090     event.setTimestamp(System.currentTimeMillis());
1091     event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1092     event.addEventInfo("Node", container.getNodeId().toString());
1093     event.addEventInfo("Resources", container.getResource().toString());
1094     entity.addEvent(event);
1095 
1096     try {
1097       ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1098         @Override
1099         public TimelinePutResponse run() throws Exception {
1100           return timelineClient.putEntities(entity);
1101         }
1102       });
1103     } catch (Exception e) {
1104       LOG.error("Container start event could not be published for "
1105           + container.getId().toString(),
1106           e instanceof UndeclaredThrowableException ? e.getCause() : e);
1107     }
1108   }
1109 
publishContainerEndEvent( final TimelineClient timelineClient, ContainerStatus container, String domainId, UserGroupInformation ugi)1110   private static void publishContainerEndEvent(
1111       final TimelineClient timelineClient, ContainerStatus container,
1112       String domainId, UserGroupInformation ugi) {
1113     final TimelineEntity entity = new TimelineEntity();
1114     entity.setEntityId(container.getContainerId().toString());
1115     entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1116     entity.setDomainId(domainId);
1117     entity.addPrimaryFilter("user", ugi.getShortUserName());
1118     TimelineEvent event = new TimelineEvent();
1119     event.setTimestamp(System.currentTimeMillis());
1120     event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1121     event.addEventInfo("State", container.getState().name());
1122     event.addEventInfo("Exit Status", container.getExitStatus());
1123     entity.addEvent(event);
1124     try {
1125       timelineClient.putEntities(entity);
1126     } catch (YarnException | IOException e) {
1127       LOG.error("Container end event could not be published for "
1128           + container.getContainerId().toString(), e);
1129     }
1130   }
1131 
publishApplicationAttemptEvent( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi)1132   private static void publishApplicationAttemptEvent(
1133       final TimelineClient timelineClient, String appAttemptId,
1134       DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1135     final TimelineEntity entity = new TimelineEntity();
1136     entity.setEntityId(appAttemptId);
1137     entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1138     entity.setDomainId(domainId);
1139     entity.addPrimaryFilter("user", ugi.getShortUserName());
1140     TimelineEvent event = new TimelineEvent();
1141     event.setEventType(appEvent.toString());
1142     event.setTimestamp(System.currentTimeMillis());
1143     entity.addEvent(event);
1144     try {
1145       timelineClient.putEntities(entity);
1146     } catch (YarnException | IOException e) {
1147       LOG.error("App Attempt "
1148           + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1149           + " event could not be published for "
1150           + appAttemptId.toString(), e);
1151     }
1152   }
1153 }
1154