1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.nodemanager; 20 21 import java.io.File; 22 import java.io.IOException; 23 import java.io.OutputStream; 24 import java.io.PrintStream; 25 import java.net.InetSocketAddress; 26 import java.util.ArrayList; 27 import java.util.Arrays; 28 import java.util.List; 29 import java.util.Map; 30 import java.util.concurrent.ConcurrentHashMap; 31 import java.util.concurrent.ConcurrentMap; 32 import java.util.concurrent.locks.ReentrantReadWriteLock; 33 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 34 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 35 36 import org.apache.commons.io.FileUtils; 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 import org.apache.hadoop.conf.Configurable; 40 import org.apache.hadoop.conf.Configuration; 41 import org.apache.hadoop.fs.Path; 42 import org.apache.hadoop.fs.permission.FsPermission; 43 import org.apache.hadoop.yarn.api.records.ContainerId; 44 import org.apache.hadoop.yarn.api.records.Resource; 45 import org.apache.hadoop.yarn.conf.YarnConfiguration; 46 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; 47 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; 48 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; 49 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; 50 import org.apache.hadoop.util.Shell; 51 import org.apache.hadoop.util.StringUtils; 52 53 public abstract class ContainerExecutor implements Configurable { 54 55 private static final Log LOG = LogFactory.getLog(ContainerExecutor.class); 56 final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION = 57 FsPermission.createImmutable((short) 0700); 58 59 private Configuration conf; 60 61 private ConcurrentMap<ContainerId, Path> pidFiles = 62 new ConcurrentHashMap<ContainerId, Path>(); 63 64 private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 65 private final ReadLock readLock = lock.readLock(); 66 private final WriteLock writeLock = lock.writeLock(); 67 68 @Override setConf(Configuration conf)69 public void setConf(Configuration conf) { 70 this.conf = conf; 71 } 72 73 @Override getConf()74 public Configuration getConf() { 75 return conf; 76 } 77 78 /** 79 * Run the executor initialization steps. 80 * Verify that the necessary configs, permissions are in place. 81 * @throws IOException 82 */ init()83 public abstract void init() throws IOException; 84 85 /** 86 * On Windows the ContainerLaunch creates a temporary special jar manifest of 87 * other jars to workaround the CLASSPATH length. In a secure cluster this 88 * jar must be localized so that the container has access to it. 89 * This function localizes on-demand the jar. 90 * 91 * @param classPathJar 92 * @param owner 93 * @throws IOException 94 */ localizeClasspathJar(Path classPathJar, Path pwd, String owner)95 public Path localizeClasspathJar(Path classPathJar, Path pwd, String owner) 96 throws IOException { 97 // Non-secure executor simply use the classpath created 98 // in the NM fprivate folder 99 return classPathJar; 100 } 101 102 103 /** 104 * Prepare the environment for containers in this application to execute. 105 * <pre> 106 * For $x in local.dirs 107 * create $x/$user/$appId 108 * Copy $nmLocal/appTokens {@literal ->} $N/$user/$appId 109 * For $rsrc in private resources 110 * Copy $rsrc {@literal ->} $N/$user/filecache/[idef] 111 * For $rsrc in job resources 112 * Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef 113 * </pre> 114 * @param user user name of application owner 115 * @param appId id of the application 116 * @param nmPrivateContainerTokens path to localized credentials, rsrc by NM 117 * @param nmAddr RPC address to contact NM 118 * @param dirsHandler NM local dirs service, for nm-local-dirs and nm-log-dirs 119 * @throws IOException For most application init failures 120 * @throws InterruptedException If application init thread is halted by NM 121 */ startLocalizer(Path nmPrivateContainerTokens, InetSocketAddress nmAddr, String user, String appId, String locId, LocalDirsHandlerService dirsHandler)122 public abstract void startLocalizer(Path nmPrivateContainerTokens, 123 InetSocketAddress nmAddr, String user, String appId, String locId, 124 LocalDirsHandlerService dirsHandler) 125 throws IOException, InterruptedException; 126 127 128 /** 129 * Launch the container on the node. This is a blocking call and returns only 130 * when the container exits. 131 * @param container the container to be launched 132 * @param nmPrivateContainerScriptPath the path for launch script 133 * @param nmPrivateTokensPath the path for tokens for the container 134 * @param user the user of the container 135 * @param appId the appId of the container 136 * @param containerWorkDir the work dir for the container 137 * @param localDirs nm-local-dirs to be used for this container 138 * @param logDirs nm-log-dirs to be used for this container 139 * @return the return status of the launch 140 * @throws IOException 141 */ launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, List<String> localDirs, List<String> logDirs)142 public abstract int launchContainer(Container container, 143 Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, 144 String user, String appId, Path containerWorkDir, 145 List<String> localDirs, List<String> logDirs) throws IOException; 146 signalContainer(String user, String pid, Signal signal)147 public abstract boolean signalContainer(String user, String pid, 148 Signal signal) 149 throws IOException; 150 deleteAsUser(String user, Path subDir, Path... basedirs)151 public abstract void deleteAsUser(String user, Path subDir, Path... basedirs) 152 throws IOException, InterruptedException; 153 isContainerProcessAlive(String user, String pid)154 public abstract boolean isContainerProcessAlive(String user, String pid) 155 throws IOException; 156 157 /** 158 * Recover an already existing container. This is a blocking call and returns 159 * only when the container exits. Note that the container must have been 160 * activated prior to this call. 161 * @param user the user of the container 162 * @param containerId The ID of the container to reacquire 163 * @return The exit code of the pre-existing container 164 * @throws IOException 165 * @throws InterruptedException 166 */ reacquireContainer(String user, ContainerId containerId)167 public int reacquireContainer(String user, ContainerId containerId) 168 throws IOException, InterruptedException { 169 Path pidPath = getPidFilePath(containerId); 170 if (pidPath == null) { 171 LOG.warn(containerId + " is not active, returning terminated error"); 172 return ExitCode.TERMINATED.getExitCode(); 173 } 174 175 String pid = null; 176 pid = ProcessIdFileReader.getProcessId(pidPath); 177 if (pid == null) { 178 throw new IOException("Unable to determine pid for " + containerId); 179 } 180 181 LOG.info("Reacquiring " + containerId + " with pid " + pid); 182 while(isContainerProcessAlive(user, pid)) { 183 Thread.sleep(1000); 184 } 185 186 // wait for exit code file to appear 187 String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString()); 188 File file = new File(exitCodeFile); 189 final int sleepMsec = 100; 190 int msecLeft = 2000; 191 while (!file.exists() && msecLeft >= 0) { 192 if (!isContainerActive(containerId)) { 193 LOG.info(containerId + " was deactivated"); 194 return ExitCode.TERMINATED.getExitCode(); 195 } 196 197 Thread.sleep(sleepMsec); 198 199 msecLeft -= sleepMsec; 200 } 201 if (msecLeft < 0) { 202 throw new IOException("Timeout while waiting for exit code from " 203 + containerId); 204 } 205 206 try { 207 return Integer.parseInt(FileUtils.readFileToString(file).trim()); 208 } catch (NumberFormatException e) { 209 throw new IOException("Error parsing exit code from pid " + pid, e); 210 } 211 } 212 writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command)213 public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException{ 214 ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create(); 215 if (environment != null) { 216 for (Map.Entry<String,String> env : environment.entrySet()) { 217 sb.env(env.getKey().toString(), env.getValue().toString()); 218 } 219 } 220 if (resources != null) { 221 for (Map.Entry<Path,List<String>> entry : resources.entrySet()) { 222 for (String linkName : entry.getValue()) { 223 sb.symlink(entry.getKey(), new Path(linkName)); 224 } 225 } 226 } 227 228 sb.command(command); 229 230 PrintStream pout = null; 231 try { 232 pout = new PrintStream(out, false, "UTF-8"); 233 sb.write(pout); 234 } finally { 235 if (out != null) { 236 out.close(); 237 } 238 } 239 } 240 241 public enum ExitCode { 242 FORCE_KILLED(137), 243 TERMINATED(143), 244 LOST(154); 245 private final int code; 246 ExitCode(int exitCode)247 private ExitCode(int exitCode) { 248 this.code = exitCode; 249 } 250 getExitCode()251 public int getExitCode() { 252 return code; 253 } 254 255 @Override toString()256 public String toString() { 257 return String.valueOf(code); 258 } 259 } 260 261 /** 262 * The constants for the signals. 263 */ 264 public enum Signal { 265 NULL(0, "NULL"), QUIT(3, "SIGQUIT"), 266 KILL(9, "SIGKILL"), TERM(15, "SIGTERM"); 267 private final int value; 268 private final String str; Signal(int value, String str)269 private Signal(int value, String str) { 270 this.str = str; 271 this.value = value; 272 } getValue()273 public int getValue() { 274 return value; 275 } 276 @Override toString()277 public String toString() { 278 return str; 279 } 280 } 281 logOutput(String output)282 protected void logOutput(String output) { 283 String shExecOutput = output; 284 if (shExecOutput != null) { 285 for (String str : shExecOutput.split("\n")) { 286 LOG.info(str); 287 } 288 } 289 } 290 291 /** 292 * Get the pidFile of the container. 293 * @param containerId 294 * @return the path of the pid-file for the given containerId. 295 */ getPidFilePath(ContainerId containerId)296 protected Path getPidFilePath(ContainerId containerId) { 297 try { 298 readLock.lock(); 299 return (this.pidFiles.get(containerId)); 300 } finally { 301 readLock.unlock(); 302 } 303 } 304 getRunCommand(String command, String groupId, String userName, Path pidFile, Configuration conf)305 protected String[] getRunCommand(String command, String groupId, 306 String userName, Path pidFile, Configuration conf) { 307 return getRunCommand(command, groupId, userName, pidFile, conf, null); 308 } 309 310 /** 311 * Return a command to execute the given command in OS shell. 312 * On Windows, the passed in groupId can be used to launch 313 * and associate the given groupId in a process group. On 314 * non-Windows, groupId is ignored. 315 */ getRunCommand(String command, String groupId, String userName, Path pidFile, Configuration conf, Resource resource)316 protected String[] getRunCommand(String command, String groupId, 317 String userName, Path pidFile, Configuration conf, Resource resource) { 318 boolean containerSchedPriorityIsSet = false; 319 int containerSchedPriorityAdjustment = 320 YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY; 321 322 if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != 323 null) { 324 containerSchedPriorityIsSet = true; 325 containerSchedPriorityAdjustment = conf 326 .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 327 YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY); 328 } 329 330 if (Shell.WINDOWS) { 331 int cpuRate = -1; 332 int memory = -1; 333 if (resource != null) { 334 if (conf 335 .getBoolean( 336 YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, 337 YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) { 338 memory = resource.getMemory(); 339 } 340 341 if (conf.getBoolean( 342 YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, 343 YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) { 344 int containerVCores = resource.getVirtualCores(); 345 int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES, 346 YarnConfiguration.DEFAULT_NM_VCORES); 347 // cap overall usage to the number of cores allocated to YARN 348 int nodeCpuPercentage = Math 349 .min( 350 conf.getInt( 351 YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 352 YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT), 353 100); 354 nodeCpuPercentage = Math.max(0, nodeCpuPercentage); 355 if (nodeCpuPercentage == 0) { 356 String message = "Illegal value for " 357 + YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT 358 + ". Value cannot be less than or equal to 0."; 359 throw new IllegalArgumentException(message); 360 } 361 float yarnVCores = (nodeCpuPercentage * nodeVCores) / 100.0f; 362 // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit 363 // should be set as 20 * 100. The following setting is equal to: 364 // 100 * (100 * (vcores / Total # of cores allocated to YARN)) 365 cpuRate = Math.min(10000, 366 (int) ((containerVCores * 10000) / yarnVCores)); 367 } 368 } 369 return new String[] { Shell.WINUTILS, "task", "create", "-m", 370 String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId, 371 "cmd /c " + command }; 372 } else { 373 List<String> retCommand = new ArrayList<String>(); 374 if (containerSchedPriorityIsSet) { 375 retCommand.addAll(Arrays.asList("nice", "-n", 376 Integer.toString(containerSchedPriorityAdjustment))); 377 } 378 retCommand.addAll(Arrays.asList("bash", command)); 379 return retCommand.toArray(new String[retCommand.size()]); 380 } 381 382 } 383 384 /** 385 * Is the container still active? 386 * @param containerId 387 * @return true if the container is active else false. 388 */ isContainerActive(ContainerId containerId)389 protected boolean isContainerActive(ContainerId containerId) { 390 try { 391 readLock.lock(); 392 return (this.pidFiles.containsKey(containerId)); 393 } finally { 394 readLock.unlock(); 395 } 396 } 397 398 /** 399 * Mark the container as active 400 * 401 * @param containerId 402 * the ContainerId 403 * @param pidFilePath 404 * Path where the executor should write the pid of the launched 405 * process 406 */ activateContainer(ContainerId containerId, Path pidFilePath)407 public void activateContainer(ContainerId containerId, Path pidFilePath) { 408 try { 409 writeLock.lock(); 410 this.pidFiles.put(containerId, pidFilePath); 411 } finally { 412 writeLock.unlock(); 413 } 414 } 415 416 /** 417 * Mark the container as inactive. 418 * Done iff the container is still active. Else treat it as 419 * a no-op 420 */ deactivateContainer(ContainerId containerId)421 public void deactivateContainer(ContainerId containerId) { 422 try { 423 writeLock.lock(); 424 this.pidFiles.remove(containerId); 425 } finally { 426 writeLock.unlock(); 427 } 428 } 429 430 /** 431 * Get the process-identifier for the container 432 * 433 * @param containerID 434 * @return the processid of the container if it has already launched, 435 * otherwise return null 436 */ getProcessId(ContainerId containerID)437 public String getProcessId(ContainerId containerID) { 438 String pid = null; 439 Path pidFile = pidFiles.get(containerID); 440 if (pidFile == null) { 441 // This container isn't even launched yet. 442 return pid; 443 } 444 try { 445 pid = ProcessIdFileReader.getProcessId(pidFile); 446 } catch (IOException e) { 447 LOG.error("Got exception reading pid from pid-file " + pidFile, e); 448 } 449 return pid; 450 } 451 452 public static class DelayedProcessKiller extends Thread { 453 private Container container; 454 private final String user; 455 private final String pid; 456 private final long delay; 457 private final Signal signal; 458 private final ContainerExecutor containerExecutor; 459 DelayedProcessKiller(Container container, String user, String pid, long delay, Signal signal, ContainerExecutor containerExecutor)460 public DelayedProcessKiller(Container container, String user, String pid, 461 long delay, Signal signal, ContainerExecutor containerExecutor) { 462 this.container = container; 463 this.user = user; 464 this.pid = pid; 465 this.delay = delay; 466 this.signal = signal; 467 this.containerExecutor = containerExecutor; 468 setName("Task killer for " + pid); 469 setDaemon(false); 470 } 471 @Override run()472 public void run() { 473 try { 474 Thread.sleep(delay); 475 containerExecutor.signalContainer(user, pid, signal); 476 } catch (InterruptedException e) { 477 return; 478 } catch (IOException e) { 479 String message = "Exception when user " + user + " killing task " + pid 480 + " in DelayedProcessKiller: " + StringUtils.stringifyException(e); 481 LOG.warn(message); 482 container.handle(new ContainerDiagnosticsUpdateEvent(container 483 .getContainerId(), message)); 484 } 485 } 486 } 487 } 488