1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server.nodemanager;
20 
21 import java.io.File;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.io.PrintStream;
25 import java.net.InetSocketAddress;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.locks.ReentrantReadWriteLock;
33 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
35 
36 import org.apache.commons.io.FileUtils;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configurable;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.fs.permission.FsPermission;
43 import org.apache.hadoop.yarn.api.records.ContainerId;
44 import org.apache.hadoop.yarn.api.records.Resource;
45 import org.apache.hadoop.yarn.conf.YarnConfiguration;
46 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
47 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
48 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
49 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
50 import org.apache.hadoop.util.Shell;
51 import org.apache.hadoop.util.StringUtils;
52 
53 public abstract class ContainerExecutor implements Configurable {
54 
55   private static final Log LOG = LogFactory.getLog(ContainerExecutor.class);
56   final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
57     FsPermission.createImmutable((short) 0700);
58 
59   private Configuration conf;
60 
61   private ConcurrentMap<ContainerId, Path> pidFiles =
62       new ConcurrentHashMap<ContainerId, Path>();
63 
64   private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
65   private final ReadLock readLock = lock.readLock();
66   private final WriteLock writeLock = lock.writeLock();
67 
68   @Override
setConf(Configuration conf)69   public void setConf(Configuration conf) {
70     this.conf = conf;
71   }
72 
73   @Override
getConf()74   public Configuration getConf() {
75     return conf;
76   }
77 
78   /**
79    * Run the executor initialization steps.
80    * Verify that the necessary configs, permissions are in place.
81    * @throws IOException
82    */
init()83   public abstract void init() throws IOException;
84 
85   /**
86    * On Windows the ContainerLaunch creates a temporary special jar manifest of
87    * other jars to workaround the CLASSPATH length. In a  secure cluster this
88    * jar must be localized so that the container has access to it.
89    * This function localizes on-demand the jar.
90    *
91    * @param classPathJar
92    * @param owner
93    * @throws IOException
94    */
localizeClasspathJar(Path classPathJar, Path pwd, String owner)95   public Path localizeClasspathJar(Path classPathJar, Path pwd, String owner)
96       throws IOException {
97     // Non-secure executor simply use the classpath created
98     // in the NM fprivate folder
99     return classPathJar;
100   }
101 
102 
103   /**
104    * Prepare the environment for containers in this application to execute.
105    * <pre>
106    * For $x in local.dirs
107    *   create $x/$user/$appId
108    * Copy $nmLocal/appTokens {@literal ->} $N/$user/$appId
109    * For $rsrc in private resources
110    *   Copy $rsrc {@literal ->} $N/$user/filecache/[idef]
111    * For $rsrc in job resources
112    *   Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef
113    * </pre>
114    * @param user user name of application owner
115    * @param appId id of the application
116    * @param nmPrivateContainerTokens path to localized credentials, rsrc by NM
117    * @param nmAddr RPC address to contact NM
118    * @param dirsHandler NM local dirs service, for nm-local-dirs and nm-log-dirs
119    * @throws IOException For most application init failures
120    * @throws InterruptedException If application init thread is halted by NM
121    */
startLocalizer(Path nmPrivateContainerTokens, InetSocketAddress nmAddr, String user, String appId, String locId, LocalDirsHandlerService dirsHandler)122   public abstract void startLocalizer(Path nmPrivateContainerTokens,
123       InetSocketAddress nmAddr, String user, String appId, String locId,
124       LocalDirsHandlerService dirsHandler)
125     throws IOException, InterruptedException;
126 
127 
128   /**
129    * Launch the container on the node. This is a blocking call and returns only
130    * when the container exits.
131    * @param container the container to be launched
132    * @param nmPrivateContainerScriptPath the path for launch script
133    * @param nmPrivateTokensPath the path for tokens for the container
134    * @param user the user of the container
135    * @param appId the appId of the container
136    * @param containerWorkDir the work dir for the container
137    * @param localDirs nm-local-dirs to be used for this container
138    * @param logDirs nm-log-dirs to be used for this container
139    * @return the return status of the launch
140    * @throws IOException
141    */
launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, List<String> localDirs, List<String> logDirs)142   public abstract int launchContainer(Container container,
143       Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
144       String user, String appId, Path containerWorkDir,
145       List<String> localDirs, List<String> logDirs) throws IOException;
146 
signalContainer(String user, String pid, Signal signal)147   public abstract boolean signalContainer(String user, String pid,
148       Signal signal)
149       throws IOException;
150 
deleteAsUser(String user, Path subDir, Path... basedirs)151   public abstract void deleteAsUser(String user, Path subDir, Path... basedirs)
152       throws IOException, InterruptedException;
153 
isContainerProcessAlive(String user, String pid)154   public abstract boolean isContainerProcessAlive(String user, String pid)
155       throws IOException;
156 
157   /**
158    * Recover an already existing container. This is a blocking call and returns
159    * only when the container exits.  Note that the container must have been
160    * activated prior to this call.
161    * @param user the user of the container
162    * @param containerId The ID of the container to reacquire
163    * @return The exit code of the pre-existing container
164    * @throws IOException
165    * @throws InterruptedException
166    */
reacquireContainer(String user, ContainerId containerId)167   public int reacquireContainer(String user, ContainerId containerId)
168       throws IOException, InterruptedException {
169     Path pidPath = getPidFilePath(containerId);
170     if (pidPath == null) {
171       LOG.warn(containerId + " is not active, returning terminated error");
172       return ExitCode.TERMINATED.getExitCode();
173     }
174 
175     String pid = null;
176     pid = ProcessIdFileReader.getProcessId(pidPath);
177     if (pid == null) {
178       throw new IOException("Unable to determine pid for " + containerId);
179     }
180 
181     LOG.info("Reacquiring " + containerId + " with pid " + pid);
182     while(isContainerProcessAlive(user, pid)) {
183       Thread.sleep(1000);
184     }
185 
186     // wait for exit code file to appear
187     String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString());
188     File file = new File(exitCodeFile);
189     final int sleepMsec = 100;
190     int msecLeft = 2000;
191     while (!file.exists() && msecLeft >= 0) {
192       if (!isContainerActive(containerId)) {
193         LOG.info(containerId + " was deactivated");
194         return ExitCode.TERMINATED.getExitCode();
195       }
196 
197       Thread.sleep(sleepMsec);
198 
199       msecLeft -= sleepMsec;
200     }
201     if (msecLeft < 0) {
202       throw new IOException("Timeout while waiting for exit code from "
203           + containerId);
204     }
205 
206     try {
207       return Integer.parseInt(FileUtils.readFileToString(file).trim());
208     } catch (NumberFormatException e) {
209       throw new IOException("Error parsing exit code from pid " + pid, e);
210     }
211   }
212 
writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command)213   public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException{
214     ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
215     if (environment != null) {
216       for (Map.Entry<String,String> env : environment.entrySet()) {
217         sb.env(env.getKey().toString(), env.getValue().toString());
218       }
219     }
220     if (resources != null) {
221       for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
222         for (String linkName : entry.getValue()) {
223           sb.symlink(entry.getKey(), new Path(linkName));
224         }
225       }
226     }
227 
228     sb.command(command);
229 
230     PrintStream pout = null;
231     try {
232       pout = new PrintStream(out, false, "UTF-8");
233       sb.write(pout);
234     } finally {
235       if (out != null) {
236         out.close();
237       }
238     }
239   }
240 
241   public enum ExitCode {
242     FORCE_KILLED(137),
243     TERMINATED(143),
244     LOST(154);
245     private final int code;
246 
ExitCode(int exitCode)247     private ExitCode(int exitCode) {
248       this.code = exitCode;
249     }
250 
getExitCode()251     public int getExitCode() {
252       return code;
253     }
254 
255     @Override
toString()256     public String toString() {
257       return String.valueOf(code);
258     }
259   }
260 
261   /**
262    * The constants for the signals.
263    */
264   public enum Signal {
265     NULL(0, "NULL"), QUIT(3, "SIGQUIT"),
266     KILL(9, "SIGKILL"), TERM(15, "SIGTERM");
267     private final int value;
268     private final String str;
Signal(int value, String str)269     private Signal(int value, String str) {
270       this.str = str;
271       this.value = value;
272     }
getValue()273     public int getValue() {
274       return value;
275     }
276     @Override
toString()277     public String toString() {
278       return str;
279     }
280   }
281 
logOutput(String output)282   protected void logOutput(String output) {
283     String shExecOutput = output;
284     if (shExecOutput != null) {
285       for (String str : shExecOutput.split("\n")) {
286         LOG.info(str);
287       }
288     }
289   }
290 
291   /**
292    * Get the pidFile of the container.
293    * @param containerId
294    * @return the path of the pid-file for the given containerId.
295    */
getPidFilePath(ContainerId containerId)296   protected Path getPidFilePath(ContainerId containerId) {
297     try {
298       readLock.lock();
299       return (this.pidFiles.get(containerId));
300     } finally {
301       readLock.unlock();
302     }
303   }
304 
getRunCommand(String command, String groupId, String userName, Path pidFile, Configuration conf)305   protected String[] getRunCommand(String command, String groupId,
306       String userName, Path pidFile, Configuration conf) {
307     return getRunCommand(command, groupId, userName, pidFile, conf, null);
308   }
309 
310   /**
311    *  Return a command to execute the given command in OS shell.
312    *  On Windows, the passed in groupId can be used to launch
313    *  and associate the given groupId in a process group. On
314    *  non-Windows, groupId is ignored.
315    */
getRunCommand(String command, String groupId, String userName, Path pidFile, Configuration conf, Resource resource)316   protected String[] getRunCommand(String command, String groupId,
317       String userName, Path pidFile, Configuration conf, Resource resource) {
318     boolean containerSchedPriorityIsSet = false;
319     int containerSchedPriorityAdjustment =
320         YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
321 
322     if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) !=
323         null) {
324       containerSchedPriorityIsSet = true;
325       containerSchedPriorityAdjustment = conf
326           .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
327           YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
328     }
329 
330     if (Shell.WINDOWS) {
331       int cpuRate = -1;
332       int memory = -1;
333       if (resource != null) {
334         if (conf
335             .getBoolean(
336                 YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
337                 YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
338           memory = resource.getMemory();
339         }
340 
341         if (conf.getBoolean(
342             YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
343             YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) {
344           int containerVCores = resource.getVirtualCores();
345           int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES,
346               YarnConfiguration.DEFAULT_NM_VCORES);
347           // cap overall usage to the number of cores allocated to YARN
348           int nodeCpuPercentage = Math
349               .min(
350                   conf.getInt(
351                       YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
352                       YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT),
353                   100);
354           nodeCpuPercentage = Math.max(0, nodeCpuPercentage);
355           if (nodeCpuPercentage == 0) {
356             String message = "Illegal value for "
357                 + YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
358                 + ". Value cannot be less than or equal to 0.";
359             throw new IllegalArgumentException(message);
360           }
361           float yarnVCores = (nodeCpuPercentage * nodeVCores) / 100.0f;
362           // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
363           // should be set as 20 * 100. The following setting is equal to:
364           // 100 * (100 * (vcores / Total # of cores allocated to YARN))
365           cpuRate = Math.min(10000,
366               (int) ((containerVCores * 10000) / yarnVCores));
367         }
368       }
369       return new String[] { Shell.WINUTILS, "task", "create", "-m",
370           String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId,
371           "cmd /c " + command };
372     } else {
373       List<String> retCommand = new ArrayList<String>();
374       if (containerSchedPriorityIsSet) {
375         retCommand.addAll(Arrays.asList("nice", "-n",
376             Integer.toString(containerSchedPriorityAdjustment)));
377       }
378       retCommand.addAll(Arrays.asList("bash", command));
379       return retCommand.toArray(new String[retCommand.size()]);
380     }
381 
382   }
383 
384   /**
385    * Is the container still active?
386    * @param containerId
387    * @return true if the container is active else false.
388    */
isContainerActive(ContainerId containerId)389   protected boolean isContainerActive(ContainerId containerId) {
390     try {
391       readLock.lock();
392       return (this.pidFiles.containsKey(containerId));
393     } finally {
394       readLock.unlock();
395     }
396   }
397 
398   /**
399    * Mark the container as active
400    *
401    * @param containerId
402    *          the ContainerId
403    * @param pidFilePath
404    *          Path where the executor should write the pid of the launched
405    *          process
406    */
activateContainer(ContainerId containerId, Path pidFilePath)407   public void activateContainer(ContainerId containerId, Path pidFilePath) {
408     try {
409       writeLock.lock();
410       this.pidFiles.put(containerId, pidFilePath);
411     } finally {
412       writeLock.unlock();
413     }
414   }
415 
416   /**
417    * Mark the container as inactive.
418    * Done iff the container is still active. Else treat it as
419    * a no-op
420    */
deactivateContainer(ContainerId containerId)421   public void deactivateContainer(ContainerId containerId) {
422     try {
423       writeLock.lock();
424       this.pidFiles.remove(containerId);
425     } finally {
426       writeLock.unlock();
427     }
428   }
429 
430   /**
431    * Get the process-identifier for the container
432    *
433    * @param containerID
434    * @return the processid of the container if it has already launched,
435    *         otherwise return null
436    */
getProcessId(ContainerId containerID)437   public String getProcessId(ContainerId containerID) {
438     String pid = null;
439     Path pidFile = pidFiles.get(containerID);
440     if (pidFile == null) {
441       // This container isn't even launched yet.
442       return pid;
443     }
444     try {
445       pid = ProcessIdFileReader.getProcessId(pidFile);
446     } catch (IOException e) {
447       LOG.error("Got exception reading pid from pid-file " + pidFile, e);
448     }
449     return pid;
450   }
451 
452   public static class DelayedProcessKiller extends Thread {
453     private Container container;
454     private final String user;
455     private final String pid;
456     private final long delay;
457     private final Signal signal;
458     private final ContainerExecutor containerExecutor;
459 
DelayedProcessKiller(Container container, String user, String pid, long delay, Signal signal, ContainerExecutor containerExecutor)460     public DelayedProcessKiller(Container container, String user, String pid,
461         long delay, Signal signal, ContainerExecutor containerExecutor) {
462       this.container = container;
463       this.user = user;
464       this.pid = pid;
465       this.delay = delay;
466       this.signal = signal;
467       this.containerExecutor = containerExecutor;
468       setName("Task killer for " + pid);
469       setDaemon(false);
470     }
471     @Override
run()472     public void run() {
473       try {
474         Thread.sleep(delay);
475         containerExecutor.signalContainer(user, pid, signal);
476       } catch (InterruptedException e) {
477         return;
478       } catch (IOException e) {
479         String message = "Exception when user " + user + " killing task " + pid
480             + " in DelayedProcessKiller: " + StringUtils.stringifyException(e);
481         LOG.warn(message);
482         container.handle(new ContainerDiagnosticsUpdateEvent(container
483           .getContainerId(), message));
484       }
485     }
486   }
487 }
488