1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.nodemanager; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.Joiner; 23 import com.google.common.base.Preconditions; 24 import com.google.common.base.Strings; 25 26 import org.apache.commons.lang.math.RandomUtils; 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 import org.apache.hadoop.fs.CommonConfigurationKeys; 30 import org.apache.hadoop.fs.FileContext; 31 import org.apache.hadoop.fs.Path; 32 import org.apache.hadoop.fs.UnsupportedFileSystemException; 33 import org.apache.hadoop.fs.permission.FsPermission; 34 import org.apache.hadoop.io.IOUtils; 35 import org.apache.hadoop.util.Shell; 36 import org.apache.hadoop.util.Shell.ShellCommandExecutor; 37 import org.apache.hadoop.util.StringUtils; 38 import org.apache.hadoop.yarn.api.ApplicationConstants; 39 import org.apache.hadoop.yarn.api.records.ContainerId; 40 import org.apache.hadoop.yarn.conf.YarnConfiguration; 41 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 42 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; 43 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; 44 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; 45 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; 46 import org.apache.hadoop.yarn.util.ConverterUtils; 47 48 import java.io.ByteArrayOutputStream; 49 import java.io.DataOutputStream; 50 import java.io.File; 51 import java.io.FileNotFoundException; 52 import java.io.IOException; 53 import java.io.OutputStream; 54 import java.io.PrintStream; 55 import java.util.ArrayList; 56 import java.util.Collections; 57 import java.util.EnumSet; 58 import java.util.HashSet; 59 import java.util.List; 60 import java.util.Map; 61 import java.util.Random; 62 import java.util.Set; 63 import java.util.regex.Pattern; 64 import java.net.InetSocketAddress; 65 import static org.apache.hadoop.fs.CreateFlag.CREATE; 66 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; 67 68 /** 69 * This executor will launch a docker container and run the task inside the container. 70 */ 71 public class DockerContainerExecutor extends ContainerExecutor { 72 73 private static final Log LOG = LogFactory 74 .getLog(DockerContainerExecutor.class); 75 public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT = "docker_container_executor"; 76 public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT = "docker_container_executor_session"; 77 78 // This validates that the image is a proper docker image and would not crash docker. 79 public static final String DOCKER_IMAGE_PATTERN = "^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$"; 80 81 82 private final FileContext lfs; 83 private final Pattern dockerImagePattern; 84 DockerContainerExecutor()85 public DockerContainerExecutor() { 86 try { 87 this.lfs = FileContext.getLocalFSFileContext(); 88 this.dockerImagePattern = Pattern.compile(DOCKER_IMAGE_PATTERN); 89 } catch (UnsupportedFileSystemException e) { 90 throw new RuntimeException(e); 91 } 92 } 93 copyFile(Path src, Path dst, String owner)94 protected void copyFile(Path src, Path dst, String owner) throws IOException { 95 lfs.util().copy(src, dst); 96 } 97 98 @Override init()99 public void init() throws IOException { 100 String auth = getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION); 101 if (auth != null && !auth.equals("simple")) { 102 throw new IllegalStateException("DockerContainerExecutor only works with simple authentication mode"); 103 } 104 String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, 105 YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME); 106 if (!new File(dockerExecutor).exists()) { 107 throw new IllegalStateException("Invalid docker exec path: " + dockerExecutor); 108 } 109 } 110 111 @Override startLocalizer(Path nmPrivateContainerTokensPath, InetSocketAddress nmAddr, String user, String appId, String locId, LocalDirsHandlerService dirsHandler)112 public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, 113 InetSocketAddress nmAddr, String user, String appId, String locId, 114 LocalDirsHandlerService dirsHandler) 115 throws IOException, InterruptedException { 116 117 List<String> localDirs = dirsHandler.getLocalDirs(); 118 List<String> logDirs = dirsHandler.getLogDirs(); 119 120 ContainerLocalizer localizer = 121 new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs), 122 RecordFactoryProvider.getRecordFactory(getConf())); 123 124 createUserLocalDirs(localDirs, user); 125 createUserCacheDirs(localDirs, user); 126 createAppDirs(localDirs, user, appId); 127 createAppLogDirs(appId, logDirs, user); 128 129 // randomly choose the local directory 130 Path appStorageDir = getWorkingDir(localDirs, user, appId); 131 132 String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); 133 Path tokenDst = new Path(appStorageDir, tokenFn); 134 copyFile(nmPrivateContainerTokensPath, tokenDst, user); 135 LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); 136 lfs.setWorkingDirectory(appStorageDir); 137 LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory()); 138 // TODO: DO it over RPC for maintaining similarity? 139 localizer.runLocalization(nmAddr); 140 } 141 142 143 @Override launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName, String appId, Path containerWorkDir, List<String> localDirs, List<String> logDirs)144 public int launchContainer(Container container, 145 Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, 146 String userName, String appId, Path containerWorkDir, 147 List<String> localDirs, List<String> logDirs) throws IOException { 148 String containerImageName = container.getLaunchContext().getEnvironment() 149 .get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); 150 if (LOG.isDebugEnabled()) { 151 LOG.debug("containerImageName from launchContext: " + containerImageName); 152 } 153 Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName), "Container image must not be null"); 154 containerImageName = containerImageName.replaceAll("['\"]", ""); 155 156 Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: " + containerImageName + " is not a proper docker image"); 157 String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, 158 YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME); 159 160 FsPermission dirPerm = new FsPermission(APPDIR_PERM); 161 ContainerId containerId = container.getContainerId(); 162 163 // create container dirs on all disks 164 String containerIdStr = ConverterUtils.toString(containerId); 165 String appIdStr = 166 ConverterUtils.toString( 167 containerId.getApplicationAttemptId(). 168 getApplicationId()); 169 for (String sLocalDir : localDirs) { 170 Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE); 171 Path userdir = new Path(usersdir, userName); 172 Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE); 173 Path appDir = new Path(appCacheDir, appIdStr); 174 Path containerDir = new Path(appDir, containerIdStr); 175 createDir(containerDir, dirPerm, true, userName); 176 } 177 178 // Create the container log-dirs on all disks 179 createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName); 180 181 Path tmpDir = new Path(containerWorkDir, 182 YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); 183 createDir(tmpDir, dirPerm, false, userName); 184 185 // copy launch script to work dir 186 Path launchDst = 187 new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); 188 lfs.util().copy(nmPrivateContainerScriptPath, launchDst); 189 190 // copy container tokens to work dir 191 Path tokenDst = 192 new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE); 193 lfs.util().copy(nmPrivateTokensPath, tokenDst); 194 195 196 197 String localDirMount = toMount(localDirs); 198 String logDirMount = toMount(logDirs); 199 String containerWorkDirMount = toMount(Collections.singletonList(containerWorkDir.toUri().getPath())); 200 StringBuilder commands = new StringBuilder(); 201 String commandStr = commands.append(dockerExecutor) 202 .append(" ") 203 .append("run") 204 .append(" ") 205 .append("--rm --net=host") 206 .append(" ") 207 .append(" --name " + containerIdStr) 208 .append(localDirMount) 209 .append(logDirMount) 210 .append(containerWorkDirMount) 211 .append(" ") 212 .append(containerImageName) 213 .toString(); 214 String dockerPidScript = "`" + dockerExecutor + " inspect --format {{.State.Pid}} " + containerIdStr + "`"; 215 // Create new local launch wrapper script 216 LocalWrapperScriptBuilder sb = 217 new UnixLocalWrapperScriptBuilder(containerWorkDir, commandStr, dockerPidScript); 218 Path pidFile = getPidFilePath(containerId); 219 if (pidFile != null) { 220 sb.writeLocalWrapperScript(launchDst, pidFile); 221 } else { 222 LOG.info("Container " + containerIdStr 223 + " was marked as inactive. Returning terminated error"); 224 return ExitCode.TERMINATED.getExitCode(); 225 } 226 227 ShellCommandExecutor shExec = null; 228 try { 229 lfs.setPermission(launchDst, 230 ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); 231 lfs.setPermission(sb.getWrapperScriptPath(), 232 ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); 233 234 // Setup command to run 235 String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), 236 containerIdStr, userName, pidFile, this.getConf()); 237 if (LOG.isDebugEnabled()) { 238 LOG.debug("launchContainer: " + commandStr + " " + Joiner.on(" ").join(command)); 239 } 240 shExec = new ShellCommandExecutor( 241 command, 242 new File(containerWorkDir.toUri().getPath()), 243 container.getLaunchContext().getEnvironment()); // sanitized env 244 if (isContainerActive(containerId)) { 245 shExec.execute(); 246 } else { 247 LOG.info("Container " + containerIdStr + 248 " was marked as inactive. Returning terminated error"); 249 return ExitCode.TERMINATED.getExitCode(); 250 } 251 } catch (IOException e) { 252 if (null == shExec) { 253 return -1; 254 } 255 int exitCode = shExec.getExitCode(); 256 LOG.warn("Exit code from container " + containerId + " is : " + exitCode); 257 // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was 258 // terminated/killed forcefully. In all other cases, log the 259 // container-executor's output 260 if (exitCode != ExitCode.FORCE_KILLED.getExitCode() 261 && exitCode != ExitCode.TERMINATED.getExitCode()) { 262 LOG.warn("Exception from container-launch with container ID: " 263 + containerId + " and exit code: " + exitCode, e); 264 logOutput(shExec.getOutput()); 265 String diagnostics = "Exception from container-launch: \n" 266 + StringUtils.stringifyException(e) + "\n" + shExec.getOutput(); 267 container.handle(new ContainerDiagnosticsUpdateEvent(containerId, 268 diagnostics)); 269 } else { 270 container.handle(new ContainerDiagnosticsUpdateEvent(containerId, 271 "Container killed on request. Exit code is " + exitCode)); 272 } 273 return exitCode; 274 } finally { 275 if (shExec != null) { 276 shExec.close(); 277 } 278 } 279 return 0; 280 } 281 282 @Override writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command)283 public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException { 284 ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create(); 285 286 Set<String> exclusionSet = new HashSet<String>(); 287 exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); 288 exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name()); 289 exclusionSet.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name()); 290 exclusionSet.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name()); 291 exclusionSet.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name()); 292 exclusionSet.add(ApplicationConstants.Environment.JAVA_HOME.name()); 293 294 if (environment != null) { 295 for (Map.Entry<String,String> env : environment.entrySet()) { 296 if (!exclusionSet.contains(env.getKey())) { 297 sb.env(env.getKey().toString(), env.getValue().toString()); 298 } 299 } 300 } 301 if (resources != null) { 302 for (Map.Entry<Path,List<String>> entry : resources.entrySet()) { 303 for (String linkName : entry.getValue()) { 304 sb.symlink(entry.getKey(), new Path(linkName)); 305 } 306 } 307 } 308 309 sb.command(command); 310 311 PrintStream pout = null; 312 PrintStream ps = null; 313 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 314 try { 315 pout = new PrintStream(out, false, "UTF-8"); 316 if (LOG.isDebugEnabled()) { 317 ps = new PrintStream(baos, false, "UTF-8"); 318 sb.write(ps); 319 } 320 sb.write(pout); 321 322 } finally { 323 if (out != null) { 324 out.close(); 325 } 326 if (ps != null) { 327 ps.close(); 328 } 329 } 330 if (LOG.isDebugEnabled()) { 331 LOG.debug("Script: " + baos.toString("UTF-8")); 332 } 333 } 334 saneDockerImage(String containerImageName)335 private boolean saneDockerImage(String containerImageName) { 336 return dockerImagePattern.matcher(containerImageName).matches(); 337 } 338 339 @Override signalContainer(String user, String pid, Signal signal)340 public boolean signalContainer(String user, String pid, Signal signal) 341 throws IOException { 342 if (LOG.isDebugEnabled()) { 343 LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid 344 + " as user " + user); 345 } 346 if (!containerIsAlive(pid)) { 347 return false; 348 } 349 try { 350 killContainer(pid, signal); 351 } catch (IOException e) { 352 if (!containerIsAlive(pid)) { 353 return false; 354 } 355 throw e; 356 } 357 return true; 358 } 359 360 @Override isContainerProcessAlive(String user, String pid)361 public boolean isContainerProcessAlive(String user, String pid) 362 throws IOException { 363 return containerIsAlive(pid); 364 } 365 366 /** 367 * Returns true if the process with the specified pid is alive. 368 * 369 * @param pid String pid 370 * @return boolean true if the process is alive 371 */ 372 @VisibleForTesting containerIsAlive(String pid)373 public static boolean containerIsAlive(String pid) throws IOException { 374 try { 375 new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid)) 376 .execute(); 377 // successful execution means process is alive 378 return true; 379 } 380 catch (Shell.ExitCodeException e) { 381 // failure (non-zero exit code) means process is not alive 382 return false; 383 } 384 } 385 386 /** 387 * Send a specified signal to the specified pid 388 * 389 * @param pid the pid of the process [group] to signal. 390 * @param signal signal to send 391 * (for logging). 392 */ killContainer(String pid, Signal signal)393 protected void killContainer(String pid, Signal signal) throws IOException { 394 new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid)) 395 .execute(); 396 } 397 398 @Override deleteAsUser(String user, Path subDir, Path... baseDirs)399 public void deleteAsUser(String user, Path subDir, Path... baseDirs) 400 throws IOException, InterruptedException { 401 if (baseDirs == null || baseDirs.length == 0) { 402 LOG.info("Deleting absolute path : " + subDir); 403 if (!lfs.delete(subDir, true)) { 404 //Maybe retry 405 LOG.warn("delete returned false for path: [" + subDir + "]"); 406 } 407 return; 408 } 409 for (Path baseDir : baseDirs) { 410 Path del = subDir == null ? baseDir : new Path(baseDir, subDir); 411 LOG.info("Deleting path : " + del); 412 try { 413 if (!lfs.delete(del, true)) { 414 LOG.warn("delete returned false for path: [" + del + "]"); 415 } 416 } catch (FileNotFoundException e) { 417 continue; 418 } 419 } 420 } 421 422 /** 423 * Converts a directory list to a docker mount string 424 * @param dirs 425 * @return a string of mounts for docker 426 */ toMount(List<String> dirs)427 private String toMount(List<String> dirs) { 428 StringBuilder builder = new StringBuilder(); 429 for (String dir : dirs) { 430 builder.append(" -v " + dir + ":" + dir); 431 } 432 return builder.toString(); 433 } 434 435 private abstract class LocalWrapperScriptBuilder { 436 437 private final Path wrapperScriptPath; 438 getWrapperScriptPath()439 public Path getWrapperScriptPath() { 440 return wrapperScriptPath; 441 } 442 writeLocalWrapperScript(Path launchDst, Path pidFile)443 public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException { 444 DataOutputStream out = null; 445 PrintStream pout = null; 446 447 try { 448 out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE)); 449 pout = new PrintStream(out, false, "UTF-8"); 450 writeLocalWrapperScript(launchDst, pidFile, pout); 451 } finally { 452 IOUtils.cleanup(LOG, pout, out); 453 } 454 } 455 writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout)456 protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile, 457 PrintStream pout); 458 LocalWrapperScriptBuilder(Path containerWorkDir)459 protected LocalWrapperScriptBuilder(Path containerWorkDir) { 460 this.wrapperScriptPath = new Path(containerWorkDir, 461 Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SCRIPT)); 462 } 463 } 464 465 private final class UnixLocalWrapperScriptBuilder 466 extends LocalWrapperScriptBuilder { 467 private final Path sessionScriptPath; 468 private final String dockerCommand; 469 private final String dockerPidScript; 470 UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript)471 public UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript) { 472 super(containerWorkDir); 473 this.dockerCommand = dockerCommand; 474 this.dockerPidScript = dockerPidScript; 475 this.sessionScriptPath = new Path(containerWorkDir, 476 Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT)); 477 } 478 479 @Override writeLocalWrapperScript(Path launchDst, Path pidFile)480 public void writeLocalWrapperScript(Path launchDst, Path pidFile) 481 throws IOException { 482 writeSessionScript(launchDst, pidFile); 483 super.writeLocalWrapperScript(launchDst, pidFile); 484 } 485 486 @Override writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout)487 public void writeLocalWrapperScript(Path launchDst, Path pidFile, 488 PrintStream pout) { 489 490 String exitCodeFile = ContainerLaunch.getExitCodeFile( 491 pidFile.toString()); 492 String tmpFile = exitCodeFile + ".tmp"; 493 pout.println("#!/usr/bin/env bash"); 494 pout.println("bash \"" + sessionScriptPath.toString() + "\""); 495 pout.println("rc=$?"); 496 pout.println("echo $rc > \"" + tmpFile + "\""); 497 pout.println("mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\""); 498 pout.println("exit $rc"); 499 } 500 writeSessionScript(Path launchDst, Path pidFile)501 private void writeSessionScript(Path launchDst, Path pidFile) 502 throws IOException { 503 DataOutputStream out = null; 504 PrintStream pout = null; 505 try { 506 out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); 507 pout = new PrintStream(out, false, "UTF-8"); 508 // We need to do a move as writing to a file is not atomic 509 // Process reading a file being written to may get garbled data 510 // hence write pid to tmp file first followed by a mv 511 pout.println("#!/usr/bin/env bash"); 512 pout.println(); 513 pout.println("echo "+ dockerPidScript +" > " + pidFile.toString() + ".tmp"); 514 pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); 515 pout.println(dockerCommand + " bash \"" + 516 launchDst.toUri().getPath().toString() + "\""); 517 } finally { 518 IOUtils.cleanup(LOG, pout, out); 519 } 520 lfs.setPermission(sessionScriptPath, 521 ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); 522 } 523 } 524 createDir(Path dirPath, FsPermission perms, boolean createParent, String user)525 protected void createDir(Path dirPath, FsPermission perms, 526 boolean createParent, String user) throws IOException { 527 lfs.mkdir(dirPath, perms, createParent); 528 if (!perms.equals(perms.applyUMask(lfs.getUMask()))) { 529 lfs.setPermission(dirPath, perms); 530 } 531 } 532 533 /** 534 * Initialize the local directories for a particular user. 535 * <ul>.mkdir 536 * <li>$local.dir/usercache/$user</li> 537 * </ul> 538 */ createUserLocalDirs(List<String> localDirs, String user)539 void createUserLocalDirs(List<String> localDirs, String user) 540 throws IOException { 541 boolean userDirStatus = false; 542 FsPermission userperms = new FsPermission(USER_PERM); 543 for (String localDir : localDirs) { 544 // create $local.dir/usercache/$user and its immediate parent 545 try { 546 createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user); 547 } catch (IOException e) { 548 LOG.warn("Unable to create the user directory : " + localDir, e); 549 continue; 550 } 551 userDirStatus = true; 552 } 553 if (!userDirStatus) { 554 throw new IOException("Not able to initialize user directories " 555 + "in any of the configured local directories for user " + user); 556 } 557 } 558 559 560 /** 561 * Initialize the local cache directories for a particular user. 562 * <ul> 563 * <li>$local.dir/usercache/$user</li> 564 * <li>$local.dir/usercache/$user/appcache</li> 565 * <li>$local.dir/usercache/$user/filecache</li> 566 * </ul> 567 */ createUserCacheDirs(List<String> localDirs, String user)568 void createUserCacheDirs(List<String> localDirs, String user) 569 throws IOException { 570 LOG.info("Initializing user " + user); 571 572 boolean appcacheDirStatus = false; 573 boolean distributedCacheDirStatus = false; 574 FsPermission appCachePerms = new FsPermission(APPCACHE_PERM); 575 FsPermission fileperms = new FsPermission(FILECACHE_PERM); 576 577 for (String localDir : localDirs) { 578 // create $local.dir/usercache/$user/appcache 579 Path localDirPath = new Path(localDir); 580 final Path appDir = getAppcacheDir(localDirPath, user); 581 try { 582 createDir(appDir, appCachePerms, true, user); 583 appcacheDirStatus = true; 584 } catch (IOException e) { 585 LOG.warn("Unable to create app cache directory : " + appDir, e); 586 } 587 // create $local.dir/usercache/$user/filecache 588 final Path distDir = getFileCacheDir(localDirPath, user); 589 try { 590 createDir(distDir, fileperms, true, user); 591 distributedCacheDirStatus = true; 592 } catch (IOException e) { 593 LOG.warn("Unable to create file cache directory : " + distDir, e); 594 } 595 } 596 if (!appcacheDirStatus) { 597 throw new IOException("Not able to initialize app-cache directories " 598 + "in any of the configured local directories for user " + user); 599 } 600 if (!distributedCacheDirStatus) { 601 throw new IOException( 602 "Not able to initialize distributed-cache directories " 603 + "in any of the configured local directories for user " 604 + user); 605 } 606 } 607 608 /** 609 * Initialize the local directories for a particular user. 610 * <ul> 611 * <li>$local.dir/usercache/$user/appcache/$appid</li> 612 * </ul> 613 * @param localDirs 614 */ createAppDirs(List<String> localDirs, String user, String appId)615 void createAppDirs(List<String> localDirs, String user, String appId) 616 throws IOException { 617 boolean initAppDirStatus = false; 618 FsPermission appperms = new FsPermission(APPDIR_PERM); 619 for (String localDir : localDirs) { 620 Path fullAppDir = getApplicationDir(new Path(localDir), user, appId); 621 // create $local.dir/usercache/$user/appcache/$appId 622 try { 623 createDir(fullAppDir, appperms, true, user); 624 initAppDirStatus = true; 625 } catch (IOException e) { 626 LOG.warn("Unable to create app directory " + fullAppDir.toString(), e); 627 } 628 } 629 if (!initAppDirStatus) { 630 throw new IOException("Not able to initialize app directories " 631 + "in any of the configured local directories for app " 632 + appId.toString()); 633 } 634 } 635 636 637 /** 638 * Create application log directories on all disks. 639 */ createContainerLogDirs(String appId, String containerId, List<String> logDirs, String user)640 void createContainerLogDirs(String appId, String containerId, 641 List<String> logDirs, String user) throws IOException { 642 643 boolean containerLogDirStatus = false; 644 FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM); 645 for (String rootLogDir : logDirs) { 646 // create $log.dir/$appid/$containerid 647 Path appLogDir = new Path(rootLogDir, appId); 648 Path containerLogDir = new Path(appLogDir, containerId); 649 try { 650 createDir(containerLogDir, containerLogDirPerms, true, user); 651 } catch (IOException e) { 652 LOG.warn("Unable to create the container-log directory : " 653 + appLogDir, e); 654 continue; 655 } 656 containerLogDirStatus = true; 657 } 658 if (!containerLogDirStatus) { 659 throw new IOException( 660 "Not able to initialize container-log directories " 661 + "in any of the configured local directories for container " 662 + containerId); 663 } 664 } 665 666 /** 667 * Permissions for user dir. 668 * $local.dir/usercache/$user 669 */ 670 static final short USER_PERM = (short) 0750; 671 /** 672 * Permissions for user appcache dir. 673 * $local.dir/usercache/$user/appcache 674 */ 675 static final short APPCACHE_PERM = (short) 0710; 676 /** 677 * Permissions for user filecache dir. 678 * $local.dir/usercache/$user/filecache 679 */ 680 static final short FILECACHE_PERM = (short) 0710; 681 /** 682 * Permissions for user app dir. 683 * $local.dir/usercache/$user/appcache/$appId 684 */ 685 static final short APPDIR_PERM = (short) 0710; 686 /** 687 * Permissions for user log dir. 688 * $logdir/$user/$appId 689 */ 690 static final short LOGDIR_PERM = (short) 0710; 691 getDiskFreeSpace(Path base)692 private long getDiskFreeSpace(Path base) throws IOException { 693 return lfs.getFsStatus(base).getRemaining(); 694 } 695 getApplicationDir(Path base, String user, String appId)696 private Path getApplicationDir(Path base, String user, String appId) { 697 return new Path(getAppcacheDir(base, user), appId); 698 } 699 getUserCacheDir(Path base, String user)700 private Path getUserCacheDir(Path base, String user) { 701 return new Path(new Path(base, ContainerLocalizer.USERCACHE), user); 702 } 703 getAppcacheDir(Path base, String user)704 private Path getAppcacheDir(Path base, String user) { 705 return new Path(getUserCacheDir(base, user), 706 ContainerLocalizer.APPCACHE); 707 } 708 getFileCacheDir(Path base, String user)709 private Path getFileCacheDir(Path base, String user) { 710 return new Path(getUserCacheDir(base, user), 711 ContainerLocalizer.FILECACHE); 712 } 713 getWorkingDir(List<String> localDirs, String user, String appId)714 protected Path getWorkingDir(List<String> localDirs, String user, 715 String appId) throws IOException { 716 Path appStorageDir = null; 717 long totalAvailable = 0L; 718 long[] availableOnDisk = new long[localDirs.size()]; 719 int i = 0; 720 // randomly choose the app directory 721 // the chance of picking a directory is proportional to 722 // the available space on the directory. 723 // firstly calculate the sum of all available space on these directories 724 for (String localDir : localDirs) { 725 Path curBase = getApplicationDir(new Path(localDir), 726 user, appId); 727 long space = 0L; 728 try { 729 space = getDiskFreeSpace(curBase); 730 } catch (IOException e) { 731 LOG.warn("Unable to get Free Space for " + curBase.toString(), e); 732 } 733 availableOnDisk[i++] = space; 734 totalAvailable += space; 735 } 736 737 // throw an IOException if totalAvailable is 0. 738 if (totalAvailable <= 0L) { 739 throw new IOException("Not able to find a working directory for " 740 + user); 741 } 742 743 // make probability to pick a directory proportional to 744 // the available space on the directory. 745 long randomPosition = RandomUtils.nextLong() % totalAvailable; 746 int dir = 0; 747 // skip zero available space directory, 748 // because totalAvailable is greater than 0 and randomPosition 749 // is less than totalAvailable, we can find a valid directory 750 // with nonzero available space. 751 while (availableOnDisk[dir] == 0L) { 752 dir++; 753 } 754 while (randomPosition > availableOnDisk[dir]) { 755 randomPosition -= availableOnDisk[dir++]; 756 } 757 appStorageDir = getApplicationDir(new Path(localDirs.get(dir)), 758 user, appId); 759 760 return appStorageDir; 761 } 762 763 /** 764 * Create application log directories on all disks. 765 */ createAppLogDirs(String appId, List<String> logDirs, String user)766 void createAppLogDirs(String appId, List<String> logDirs, String user) 767 throws IOException { 768 769 boolean appLogDirStatus = false; 770 FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM); 771 for (String rootLogDir : logDirs) { 772 // create $log.dir/$appid 773 Path appLogDir = new Path(rootLogDir, appId); 774 try { 775 createDir(appLogDir, appLogDirPerms, true, user); 776 } catch (IOException e) { 777 LOG.warn("Unable to create the app-log directory : " + appLogDir, e); 778 continue; 779 } 780 appLogDirStatus = true; 781 } 782 if (!appLogDirStatus) { 783 throw new IOException("Not able to initialize app-log directories " 784 + "in any of the configured local directories for app " + appId); 785 } 786 } 787 788 /** 789 * @return the list of paths of given local directories 790 */ getPaths(List<String> dirs)791 private static List<Path> getPaths(List<String> dirs) { 792 List<Path> paths = new ArrayList<Path>(dirs.size()); 793 for (int i = 0; i < dirs.size(); i++) { 794 paths.add(new Path(dirs.get(i))); 795 } 796 return paths; 797 } 798 799 }