1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server.nodemanager;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Optional;
23 
24 import java.io.File;
25 import java.io.IOException;
26 import java.net.InetSocketAddress;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.regex.Pattern;
31 
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.security.UserGroupInformation;
37 import org.apache.hadoop.util.ReflectionUtils;
38 import org.apache.hadoop.util.Shell.ExitCodeException;
39 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
40 import org.apache.hadoop.util.StringUtils;
41 import org.apache.hadoop.yarn.api.ApplicationConstants;
42 import org.apache.hadoop.yarn.api.records.ContainerId;
43 import org.apache.hadoop.yarn.conf.YarnConfiguration;
44 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
45 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
46 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
47 import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
48 import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
49 import org.apache.hadoop.yarn.util.ConverterUtils;
50 
51 public class LinuxContainerExecutor extends ContainerExecutor {
52 
53   private static final Log LOG = LogFactory
54       .getLog(LinuxContainerExecutor.class);
55 
56   private String nonsecureLocalUser;
57   private Pattern nonsecureLocalUserPattern;
58   private String containerExecutorExe;
59   private LCEResourcesHandler resourcesHandler;
60   private boolean containerSchedPriorityIsSet = false;
61   private int containerSchedPriorityAdjustment = 0;
62   private boolean containerLimitUsers;
63 
64   @Override
setConf(Configuration conf)65   public void setConf(Configuration conf) {
66     super.setConf(conf);
67     containerExecutorExe = getContainerExecutorExecutablePath(conf);
68 
69     resourcesHandler = ReflectionUtils.newInstance(
70             conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
71               DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
72     resourcesHandler.setConf(conf);
73 
74     if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != null) {
75      containerSchedPriorityIsSet = true;
76      containerSchedPriorityAdjustment = conf
77          .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
78          YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
79     }
80     nonsecureLocalUser = conf.get(
81         YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
82         YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
83     nonsecureLocalUserPattern = Pattern.compile(
84         conf.get(YarnConfiguration.NM_NONSECURE_MODE_USER_PATTERN_KEY,
85             YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_USER_PATTERN));
86     containerLimitUsers = conf.getBoolean(
87       YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS,
88       YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LIMIT_USERS);
89     if (!containerLimitUsers) {
90       LOG.warn(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS +
91           ": impersonation without authentication enabled");
92     }
93   }
94 
verifyUsernamePattern(String user)95   void verifyUsernamePattern(String user) {
96     if (!UserGroupInformation.isSecurityEnabled() &&
97         !nonsecureLocalUserPattern.matcher(user).matches()) {
98         throw new IllegalArgumentException("Invalid user name '" + user + "'," +
99             " it must match '" + nonsecureLocalUserPattern.pattern() + "'");
100       }
101   }
102 
getRunAsUser(String user)103   String getRunAsUser(String user) {
104     if (UserGroupInformation.isSecurityEnabled() ||
105        !containerLimitUsers) {
106       return user;
107     } else {
108       return nonsecureLocalUser;
109     }
110   }
111 
112 
113 
114   /**
115    * List of commands that the setuid script will execute.
116    */
117   enum Commands {
118     INITIALIZE_CONTAINER(0),
119     LAUNCH_CONTAINER(1),
120     SIGNAL_CONTAINER(2),
121     DELETE_AS_USER(3);
122 
123     private int value;
Commands(int value)124     Commands(int value) {
125       this.value = value;
126     }
getValue()127     int getValue() {
128       return value;
129     }
130   }
131 
132   /**
133    * Result codes returned from the C container-executor.
134    * These must match the values in container-executor.h.
135    */
136   enum ResultCode {
137     OK(0),
138     INVALID_USER_NAME(2),
139     UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
140     INVALID_CONTAINER_PID(9),
141     INVALID_CONTAINER_EXEC_PERMISSIONS(22),
142     INVALID_CONFIG_FILE(24),
143     WRITE_CGROUP_FAILED(27);
144 
145     private final int value;
ResultCode(int value)146     ResultCode(int value) {
147       this.value = value;
148     }
getValue()149     int getValue() {
150       return value;
151     }
152   }
153 
getContainerExecutorExecutablePath(Configuration conf)154   protected String getContainerExecutorExecutablePath(Configuration conf) {
155     String yarnHomeEnvVar =
156         System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
157     File hadoopBin = new File(yarnHomeEnvVar, "bin");
158     String defaultPath =
159       new File(hadoopBin, "container-executor").getAbsolutePath();
160     return null == conf
161       ? defaultPath
162       : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
163   }
164 
addSchedPriorityCommand(List<String> command)165   protected void addSchedPriorityCommand(List<String> command) {
166     if (containerSchedPriorityIsSet) {
167       command.addAll(Arrays.asList("nice", "-n",
168           Integer.toString(containerSchedPriorityAdjustment)));
169     }
170   }
171 
172   @Override
init()173   public void init() throws IOException {
174     // Send command to executor which will just start up,
175     // verify configuration/permissions and exit
176     List<String> command = new ArrayList<String>(
177         Arrays.asList(containerExecutorExe,
178             "--checksetup"));
179     String[] commandArray = command.toArray(new String[command.size()]);
180     ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
181     if (LOG.isDebugEnabled()) {
182       LOG.debug("checkLinuxExecutorSetup: " + Arrays.toString(commandArray));
183     }
184     try {
185       shExec.execute();
186     } catch (ExitCodeException e) {
187       int exitCode = shExec.getExitCode();
188       LOG.warn("Exit code from container executor initialization is : "
189           + exitCode, e);
190       logOutput(shExec.getOutput());
191       throw new IOException("Linux container executor not configured properly"
192           + " (error=" + exitCode + ")", e);
193     }
194 
195     resourcesHandler.init(this);
196   }
197 
198   @Override
startLocalizer(Path nmPrivateContainerTokensPath, InetSocketAddress nmAddr, String user, String appId, String locId, LocalDirsHandlerService dirsHandler)199   public void startLocalizer(Path nmPrivateContainerTokensPath,
200       InetSocketAddress nmAddr, String user, String appId, String locId,
201       LocalDirsHandlerService dirsHandler)
202       throws IOException, InterruptedException {
203 
204     List<String> localDirs = dirsHandler.getLocalDirs();
205     List<String> logDirs = dirsHandler.getLogDirs();
206 
207     verifyUsernamePattern(user);
208     String runAsUser = getRunAsUser(user);
209     List<String> command = new ArrayList<String>();
210     addSchedPriorityCommand(command);
211     command.addAll(Arrays.asList(containerExecutorExe,
212                    runAsUser,
213                    user,
214                    Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
215                    appId,
216                    nmPrivateContainerTokensPath.toUri().getPath().toString(),
217                    StringUtils.join(",", localDirs),
218                    StringUtils.join(",", logDirs)));
219 
220     File jvm =                                  // use same jvm as parent
221       new File(new File(System.getProperty("java.home"), "bin"), "java");
222     command.add(jvm.toString());
223     command.add("-classpath");
224     command.add(System.getProperty("java.class.path"));
225     String javaLibPath = System.getProperty("java.library.path");
226     if (javaLibPath != null) {
227       command.add("-Djava.library.path=" + javaLibPath);
228     }
229     buildMainArgs(command, user, appId, locId, nmAddr, localDirs);
230     String[] commandArray = command.toArray(new String[command.size()]);
231     ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
232     if (LOG.isDebugEnabled()) {
233       LOG.debug("initApplication: " + Arrays.toString(commandArray));
234     }
235     try {
236       shExec.execute();
237       if (LOG.isDebugEnabled()) {
238         logOutput(shExec.getOutput());
239       }
240     } catch (ExitCodeException e) {
241       int exitCode = shExec.getExitCode();
242       LOG.warn("Exit code from container " + locId + " startLocalizer is : "
243           + exitCode, e);
244       logOutput(shExec.getOutput());
245       throw new IOException("Application " + appId + " initialization failed" +
246       		" (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
247     }
248   }
249 
250   @VisibleForTesting
buildMainArgs(List<String> command, String user, String appId, String locId, InetSocketAddress nmAddr, List<String> localDirs)251   public void buildMainArgs(List<String> command, String user, String appId,
252       String locId, InetSocketAddress nmAddr, List<String> localDirs) {
253     ContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
254       localDirs);
255   }
256 
257   @Override
launchContainer(Container container, Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, List<String> localDirs, List<String> logDirs)258   public int launchContainer(Container container,
259       Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
260       String user, String appId, Path containerWorkDir,
261       List<String> localDirs, List<String> logDirs) throws IOException {
262 
263     verifyUsernamePattern(user);
264     String runAsUser = getRunAsUser(user);
265 
266     ContainerId containerId = container.getContainerId();
267     String containerIdStr = ConverterUtils.toString(containerId);
268 
269     resourcesHandler.preExecute(containerId,
270             container.getResource());
271     String resourcesOptions = resourcesHandler.getResourcesOption(
272             containerId);
273 
274     ShellCommandExecutor shExec = null;
275 
276     try {
277       Path pidFilePath = getPidFilePath(containerId);
278       if (pidFilePath != null) {
279         List<String> command = new ArrayList<String>();
280         addSchedPriorityCommand(command);
281         command.addAll(Arrays.asList(
282             containerExecutorExe, runAsUser, user, Integer
283                 .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
284             containerIdStr, containerWorkDir.toString(),
285             nmPrivateCotainerScriptPath.toUri().getPath().toString(),
286             nmPrivateTokensPath.toUri().getPath().toString(),
287             pidFilePath.toString(),
288             StringUtils.join(",", localDirs),
289             StringUtils.join(",", logDirs),
290             resourcesOptions));
291         String[] commandArray = command.toArray(new String[command.size()]);
292         shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
293             container.getLaunchContext().getEnvironment()); // sanitized env
294         if (LOG.isDebugEnabled()) {
295           LOG.debug("launchContainer: " + Arrays.toString(commandArray));
296         }
297         shExec.execute();
298         if (LOG.isDebugEnabled()) {
299           logOutput(shExec.getOutput());
300         }
301       } else {
302         LOG.info("Container was marked as inactive. Returning terminated error");
303         return ExitCode.TERMINATED.getExitCode();
304       }
305     } catch (ExitCodeException e) {
306       int exitCode = shExec.getExitCode();
307       LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
308       // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
309       // terminated/killed forcefully. In all other cases, log the
310       // container-executor's output
311       if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
312           && exitCode != ExitCode.TERMINATED.getExitCode()) {
313         LOG.warn("Exception from container-launch with container ID: "
314             + containerId + " and exit code: " + exitCode , e);
315 
316         StringBuilder builder = new StringBuilder();
317         builder.append("Exception from container-launch.\n");
318         builder.append("Container id: " + containerId + "\n");
319         builder.append("Exit code: " + exitCode + "\n");
320         if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
321           builder.append("Exception message: " + e.getMessage() + "\n");
322         }
323         builder.append("Stack trace: "
324             + StringUtils.stringifyException(e) + "\n");
325         if (!shExec.getOutput().isEmpty()) {
326           builder.append("Shell output: " + shExec.getOutput() + "\n");
327         }
328         String diagnostics = builder.toString();
329         logOutput(diagnostics);
330         container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
331             diagnostics));
332       } else {
333         container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
334             "Container killed on request. Exit code is " + exitCode));
335       }
336       return exitCode;
337     } finally {
338       resourcesHandler.postExecute(containerId);
339     }
340     if (LOG.isDebugEnabled()) {
341       LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
342       logOutput(shExec.getOutput());
343     }
344     return 0;
345   }
346 
347   @Override
reacquireContainer(String user, ContainerId containerId)348   public int reacquireContainer(String user, ContainerId containerId)
349       throws IOException, InterruptedException {
350     try {
351       return super.reacquireContainer(user, containerId);
352     } finally {
353       resourcesHandler.postExecute(containerId);
354     }
355   }
356 
357   @Override
signalContainer(String user, String pid, Signal signal)358   public boolean signalContainer(String user, String pid, Signal signal)
359       throws IOException {
360 
361     verifyUsernamePattern(user);
362     String runAsUser = getRunAsUser(user);
363 
364     String[] command =
365         new String[] { containerExecutorExe,
366                    runAsUser,
367                    user,
368                    Integer.toString(Commands.SIGNAL_CONTAINER.getValue()),
369                    pid,
370                    Integer.toString(signal.getValue()) };
371     ShellCommandExecutor shExec = new ShellCommandExecutor(command);
372     if (LOG.isDebugEnabled()) {
373       LOG.debug("signalContainer: " + Arrays.toString(command));
374     }
375     try {
376       shExec.execute();
377     } catch (ExitCodeException e) {
378       int ret_code = shExec.getExitCode();
379       if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) {
380         return false;
381       }
382       LOG.warn("Error in signalling container " + pid + " with " + signal
383           + "; exit = " + ret_code, e);
384       logOutput(shExec.getOutput());
385       throw new IOException("Problem signalling container " + pid + " with "
386           + signal + "; output: " + shExec.getOutput() + " and exitCode: "
387           + ret_code, e);
388     }
389     return true;
390   }
391 
392   @Override
deleteAsUser(String user, Path dir, Path... baseDirs)393   public void deleteAsUser(String user, Path dir, Path... baseDirs) {
394     verifyUsernamePattern(user);
395     String runAsUser = getRunAsUser(user);
396 
397     String dirString = dir == null ? "" : dir.toUri().getPath();
398 
399     List<String> command = new ArrayList<String>(
400         Arrays.asList(containerExecutorExe,
401                     runAsUser,
402                     user,
403                     Integer.toString(Commands.DELETE_AS_USER.getValue()),
404                     dirString));
405     List<String> pathsToDelete = new ArrayList<String>();
406     if (baseDirs == null || baseDirs.length == 0) {
407       LOG.info("Deleting absolute path : " + dir);
408       pathsToDelete.add(dirString);
409     } else {
410       for (Path baseDir : baseDirs) {
411         Path del = dir == null ? baseDir : new Path(baseDir, dir);
412         LOG.info("Deleting path : " + del);
413         pathsToDelete.add(del.toString());
414         command.add(baseDir.toUri().getPath());
415       }
416     }
417     String[] commandArray = command.toArray(new String[command.size()]);
418     ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
419     if (LOG.isDebugEnabled()) {
420       LOG.debug("deleteAsUser: " + Arrays.toString(commandArray));
421     }
422     try {
423       shExec.execute();
424       if (LOG.isDebugEnabled()) {
425         logOutput(shExec.getOutput());
426       }
427     } catch (IOException e) {
428       int exitCode = shExec.getExitCode();
429       LOG.error("DeleteAsUser for " + StringUtils.join(" ", pathsToDelete)
430           + " returned with exit code: " + exitCode, e);
431       LOG.error("Output from LinuxContainerExecutor's deleteAsUser follows:");
432       logOutput(shExec.getOutput());
433     }
434   }
435 
436   @Override
isContainerProcessAlive(String user, String pid)437   public boolean isContainerProcessAlive(String user, String pid)
438       throws IOException {
439     // Send a test signal to the process as the user to see if it's alive
440     return signalContainer(user, pid, Signal.NULL);
441   }
442 
mountCgroups(List<String> cgroupKVs, String hierarchy)443   public void mountCgroups(List<String> cgroupKVs, String hierarchy)
444          throws IOException {
445     List<String> command = new ArrayList<String>(
446             Arrays.asList(containerExecutorExe, "--mount-cgroups", hierarchy));
447     command.addAll(cgroupKVs);
448 
449     String[] commandArray = command.toArray(new String[command.size()]);
450     ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
451 
452     if (LOG.isDebugEnabled()) {
453         LOG.debug("mountCgroups: " + Arrays.toString(commandArray));
454     }
455 
456     try {
457         shExec.execute();
458     } catch (IOException e) {
459         int ret_code = shExec.getExitCode();
460         LOG.warn("Exception in LinuxContainerExecutor mountCgroups ", e);
461         logOutput(shExec.getOutput());
462         throw new IOException("Problem mounting cgroups " + cgroupKVs +
463           "; exit code = " + ret_code + " and output: " + shExec.getOutput(), e);
464     }
465   }
466 }
467