1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.nodemanager;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Joiner;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Strings;
25 
26 import org.apache.commons.lang.math.RandomUtils;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.fs.CommonConfigurationKeys;
30 import org.apache.hadoop.fs.FileContext;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.fs.UnsupportedFileSystemException;
33 import org.apache.hadoop.fs.permission.FsPermission;
34 import org.apache.hadoop.io.IOUtils;
35 import org.apache.hadoop.util.Shell;
36 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
37 import org.apache.hadoop.util.StringUtils;
38 import org.apache.hadoop.yarn.api.ApplicationConstants;
39 import org.apache.hadoop.yarn.api.records.ContainerId;
40 import org.apache.hadoop.yarn.conf.YarnConfiguration;
41 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
42 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
43 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
44 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
45 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
46 import org.apache.hadoop.yarn.util.ConverterUtils;
47 
48 import java.io.ByteArrayOutputStream;
49 import java.io.DataOutputStream;
50 import java.io.File;
51 import java.io.FileNotFoundException;
52 import java.io.IOException;
53 import java.io.OutputStream;
54 import java.io.PrintStream;
55 import java.util.ArrayList;
56 import java.util.Collections;
57 import java.util.EnumSet;
58 import java.util.HashSet;
59 import java.util.List;
60 import java.util.Map;
61 import java.util.Random;
62 import java.util.Set;
63 import java.util.regex.Pattern;
64 import java.net.InetSocketAddress;
65 import static org.apache.hadoop.fs.CreateFlag.CREATE;
66 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
67 
68 /**
69  * This executor will launch a docker container and run the task inside the container.
70  */
71 public class DockerContainerExecutor extends ContainerExecutor {
72 
73   private static final Log LOG = LogFactory
74       .getLog(DockerContainerExecutor.class);
75   public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT = "docker_container_executor";
76   public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT = "docker_container_executor_session";
77 
78   // This validates that the image is a proper docker image and would not crash docker.
79   public static final String DOCKER_IMAGE_PATTERN = "^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$";
80 
81 
82   private final FileContext lfs;
83   private final Pattern dockerImagePattern;
84 
DockerContainerExecutor()85   public DockerContainerExecutor() {
86     try {
87       this.lfs = FileContext.getLocalFSFileContext();
88       this.dockerImagePattern = Pattern.compile(DOCKER_IMAGE_PATTERN);
89     } catch (UnsupportedFileSystemException e) {
90       throw new RuntimeException(e);
91     }
92   }
93 
copyFile(Path src, Path dst, String owner)94   protected void copyFile(Path src, Path dst, String owner) throws IOException {
95     lfs.util().copy(src, dst);
96   }
97 
98   @Override
init()99   public void init() throws IOException {
100     String auth = getConf().get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);
101     if (auth != null && !auth.equals("simple")) {
102       throw new IllegalStateException("DockerContainerExecutor only works with simple authentication mode");
103     }
104     String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
105       YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
106     if (!new File(dockerExecutor).exists()) {
107       throw new IllegalStateException("Invalid docker exec path: " + dockerExecutor);
108     }
109   }
110 
111   @Override
startLocalizer(Path nmPrivateContainerTokensPath, InetSocketAddress nmAddr, String user, String appId, String locId, LocalDirsHandlerService dirsHandler)112   public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
113                                           InetSocketAddress nmAddr, String user, String appId, String locId,
114                                           LocalDirsHandlerService dirsHandler)
115     throws IOException, InterruptedException {
116 
117     List<String> localDirs = dirsHandler.getLocalDirs();
118     List<String> logDirs = dirsHandler.getLogDirs();
119 
120     ContainerLocalizer localizer =
121       new ContainerLocalizer(lfs, user, appId, locId, getPaths(localDirs),
122         RecordFactoryProvider.getRecordFactory(getConf()));
123 
124     createUserLocalDirs(localDirs, user);
125     createUserCacheDirs(localDirs, user);
126     createAppDirs(localDirs, user, appId);
127     createAppLogDirs(appId, logDirs, user);
128 
129     // randomly choose the local directory
130     Path appStorageDir = getWorkingDir(localDirs, user, appId);
131 
132     String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
133     Path tokenDst = new Path(appStorageDir, tokenFn);
134     copyFile(nmPrivateContainerTokensPath, tokenDst, user);
135     LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
136     lfs.setWorkingDirectory(appStorageDir);
137     LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
138     // TODO: DO it over RPC for maintaining similarity?
139     localizer.runLocalization(nmAddr);
140   }
141 
142 
143   @Override
launchContainer(Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName, String appId, Path containerWorkDir, List<String> localDirs, List<String> logDirs)144   public int launchContainer(Container container,
145                              Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
146                              String userName, String appId, Path containerWorkDir,
147                              List<String> localDirs, List<String> logDirs) throws IOException {
148     String containerImageName = container.getLaunchContext().getEnvironment()
149         .get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
150     if (LOG.isDebugEnabled()) {
151       LOG.debug("containerImageName from launchContext: " + containerImageName);
152     }
153     Preconditions.checkArgument(!Strings.isNullOrEmpty(containerImageName), "Container image must not be null");
154     containerImageName = containerImageName.replaceAll("['\"]", "");
155 
156     Preconditions.checkArgument(saneDockerImage(containerImageName), "Image: " + containerImageName + " is not a proper docker image");
157     String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME,
158         YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME);
159 
160     FsPermission dirPerm = new FsPermission(APPDIR_PERM);
161     ContainerId containerId = container.getContainerId();
162 
163     // create container dirs on all disks
164     String containerIdStr = ConverterUtils.toString(containerId);
165     String appIdStr =
166         ConverterUtils.toString(
167             containerId.getApplicationAttemptId().
168                 getApplicationId());
169     for (String sLocalDir : localDirs) {
170       Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
171       Path userdir = new Path(usersdir, userName);
172       Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
173       Path appDir = new Path(appCacheDir, appIdStr);
174       Path containerDir = new Path(appDir, containerIdStr);
175       createDir(containerDir, dirPerm, true, userName);
176     }
177 
178     // Create the container log-dirs on all disks
179     createContainerLogDirs(appIdStr, containerIdStr, logDirs, userName);
180 
181     Path tmpDir = new Path(containerWorkDir,
182         YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
183     createDir(tmpDir, dirPerm, false, userName);
184 
185     // copy launch script to work dir
186     Path launchDst =
187         new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
188     lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
189 
190     // copy container tokens to work dir
191     Path tokenDst =
192         new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
193     lfs.util().copy(nmPrivateTokensPath, tokenDst);
194 
195 
196 
197     String localDirMount = toMount(localDirs);
198     String logDirMount = toMount(logDirs);
199     String containerWorkDirMount = toMount(Collections.singletonList(containerWorkDir.toUri().getPath()));
200     StringBuilder commands = new StringBuilder();
201     String commandStr = commands.append(dockerExecutor)
202         .append(" ")
203         .append("run")
204         .append(" ")
205         .append("--rm --net=host")
206         .append(" ")
207         .append(" --name " + containerIdStr)
208         .append(localDirMount)
209         .append(logDirMount)
210         .append(containerWorkDirMount)
211         .append(" ")
212         .append(containerImageName)
213         .toString();
214     String dockerPidScript = "`" + dockerExecutor + " inspect --format {{.State.Pid}} " + containerIdStr + "`";
215     // Create new local launch wrapper script
216     LocalWrapperScriptBuilder sb =
217       new UnixLocalWrapperScriptBuilder(containerWorkDir, commandStr, dockerPidScript);
218     Path pidFile = getPidFilePath(containerId);
219     if (pidFile != null) {
220       sb.writeLocalWrapperScript(launchDst, pidFile);
221     } else {
222       LOG.info("Container " + containerIdStr
223           + " was marked as inactive. Returning terminated error");
224       return ExitCode.TERMINATED.getExitCode();
225     }
226 
227     ShellCommandExecutor shExec = null;
228     try {
229       lfs.setPermission(launchDst,
230           ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
231       lfs.setPermission(sb.getWrapperScriptPath(),
232           ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
233 
234       // Setup command to run
235       String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
236         containerIdStr, userName, pidFile, this.getConf());
237       if (LOG.isDebugEnabled()) {
238         LOG.debug("launchContainer: " + commandStr + " " + Joiner.on(" ").join(command));
239       }
240       shExec = new ShellCommandExecutor(
241           command,
242           new File(containerWorkDir.toUri().getPath()),
243           container.getLaunchContext().getEnvironment());      // sanitized env
244       if (isContainerActive(containerId)) {
245         shExec.execute();
246       } else {
247         LOG.info("Container " + containerIdStr +
248             " was marked as inactive. Returning terminated error");
249         return ExitCode.TERMINATED.getExitCode();
250       }
251     } catch (IOException e) {
252       if (null == shExec) {
253         return -1;
254       }
255       int exitCode = shExec.getExitCode();
256       LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
257       // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
258       // terminated/killed forcefully. In all other cases, log the
259       // container-executor's output
260       if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
261           && exitCode != ExitCode.TERMINATED.getExitCode()) {
262         LOG.warn("Exception from container-launch with container ID: "
263             + containerId + " and exit code: " + exitCode, e);
264         logOutput(shExec.getOutput());
265         String diagnostics = "Exception from container-launch: \n"
266             + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
267         container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
268             diagnostics));
269       } else {
270         container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
271             "Container killed on request. Exit code is " + exitCode));
272       }
273       return exitCode;
274     } finally {
275       if (shExec != null) {
276         shExec.close();
277       }
278     }
279     return 0;
280   }
281 
282   @Override
writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command)283   public void writeLaunchEnv(OutputStream out, Map<String, String> environment, Map<Path, List<String>> resources, List<String> command) throws IOException {
284     ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create();
285 
286     Set<String> exclusionSet = new HashSet<String>();
287     exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME);
288     exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name());
289     exclusionSet.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name());
290     exclusionSet.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name());
291     exclusionSet.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
292     exclusionSet.add(ApplicationConstants.Environment.JAVA_HOME.name());
293 
294     if (environment != null) {
295       for (Map.Entry<String,String> env : environment.entrySet()) {
296         if (!exclusionSet.contains(env.getKey())) {
297           sb.env(env.getKey().toString(), env.getValue().toString());
298         }
299       }
300     }
301     if (resources != null) {
302       for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
303         for (String linkName : entry.getValue()) {
304           sb.symlink(entry.getKey(), new Path(linkName));
305         }
306       }
307     }
308 
309     sb.command(command);
310 
311     PrintStream pout = null;
312     PrintStream ps = null;
313     ByteArrayOutputStream baos = new ByteArrayOutputStream();
314     try {
315       pout = new PrintStream(out, false, "UTF-8");
316       if (LOG.isDebugEnabled()) {
317         ps = new PrintStream(baos, false, "UTF-8");
318         sb.write(ps);
319       }
320       sb.write(pout);
321 
322     } finally {
323       if (out != null) {
324         out.close();
325       }
326       if (ps != null) {
327         ps.close();
328       }
329     }
330     if (LOG.isDebugEnabled()) {
331       LOG.debug("Script: " + baos.toString("UTF-8"));
332     }
333   }
334 
saneDockerImage(String containerImageName)335   private boolean saneDockerImage(String containerImageName) {
336     return dockerImagePattern.matcher(containerImageName).matches();
337   }
338 
339   @Override
signalContainer(String user, String pid, Signal signal)340   public boolean signalContainer(String user, String pid, Signal signal)
341     throws IOException {
342     if (LOG.isDebugEnabled()) {
343       LOG.debug("Sending signal " + signal.getValue() + " to pid " + pid
344         + " as user " + user);
345     }
346     if (!containerIsAlive(pid)) {
347       return false;
348     }
349     try {
350       killContainer(pid, signal);
351     } catch (IOException e) {
352       if (!containerIsAlive(pid)) {
353         return false;
354       }
355       throw e;
356     }
357     return true;
358   }
359 
360   @Override
isContainerProcessAlive(String user, String pid)361   public boolean isContainerProcessAlive(String user, String pid)
362     throws IOException {
363     return containerIsAlive(pid);
364   }
365 
366   /**
367    * Returns true if the process with the specified pid is alive.
368    *
369    * @param pid String pid
370    * @return boolean true if the process is alive
371    */
372   @VisibleForTesting
containerIsAlive(String pid)373   public static boolean containerIsAlive(String pid) throws IOException {
374     try {
375       new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid))
376         .execute();
377       // successful execution means process is alive
378       return true;
379     }
380     catch (Shell.ExitCodeException e) {
381       // failure (non-zero exit code) means process is not alive
382       return false;
383     }
384   }
385 
386   /**
387    * Send a specified signal to the specified pid
388    *
389    * @param pid the pid of the process [group] to signal.
390    * @param signal signal to send
391    * (for logging).
392    */
killContainer(String pid, Signal signal)393   protected void killContainer(String pid, Signal signal) throws IOException {
394     new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
395       .execute();
396   }
397 
398   @Override
deleteAsUser(String user, Path subDir, Path... baseDirs)399   public void deleteAsUser(String user, Path subDir, Path... baseDirs)
400     throws IOException, InterruptedException {
401     if (baseDirs == null || baseDirs.length == 0) {
402       LOG.info("Deleting absolute path : " + subDir);
403       if (!lfs.delete(subDir, true)) {
404         //Maybe retry
405         LOG.warn("delete returned false for path: [" + subDir + "]");
406       }
407       return;
408     }
409     for (Path baseDir : baseDirs) {
410       Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
411       LOG.info("Deleting path : " + del);
412       try {
413         if (!lfs.delete(del, true)) {
414           LOG.warn("delete returned false for path: [" + del + "]");
415         }
416       } catch (FileNotFoundException e) {
417         continue;
418       }
419     }
420   }
421 
422   /**
423    * Converts a directory list to a docker mount string
424    * @param dirs
425    * @return a string of mounts for docker
426    */
toMount(List<String> dirs)427   private String toMount(List<String> dirs) {
428     StringBuilder builder = new StringBuilder();
429     for (String dir : dirs) {
430       builder.append(" -v " + dir + ":" + dir);
431     }
432     return builder.toString();
433   }
434 
435   private abstract class LocalWrapperScriptBuilder {
436 
437     private final Path wrapperScriptPath;
438 
getWrapperScriptPath()439     public Path getWrapperScriptPath() {
440       return wrapperScriptPath;
441     }
442 
writeLocalWrapperScript(Path launchDst, Path pidFile)443     public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException {
444       DataOutputStream out = null;
445       PrintStream pout = null;
446 
447       try {
448         out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
449         pout = new PrintStream(out, false, "UTF-8");
450         writeLocalWrapperScript(launchDst, pidFile, pout);
451       } finally {
452         IOUtils.cleanup(LOG, pout, out);
453       }
454     }
455 
writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout)456     protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile,
457                                                     PrintStream pout);
458 
LocalWrapperScriptBuilder(Path containerWorkDir)459     protected LocalWrapperScriptBuilder(Path containerWorkDir) {
460       this.wrapperScriptPath = new Path(containerWorkDir,
461           Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SCRIPT));
462     }
463   }
464 
465   private final class UnixLocalWrapperScriptBuilder
466       extends LocalWrapperScriptBuilder {
467     private final Path sessionScriptPath;
468     private final String dockerCommand;
469     private final String dockerPidScript;
470 
UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript)471     public UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript) {
472       super(containerWorkDir);
473       this.dockerCommand = dockerCommand;
474       this.dockerPidScript = dockerPidScript;
475       this.sessionScriptPath = new Path(containerWorkDir,
476         Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT));
477     }
478 
479     @Override
writeLocalWrapperScript(Path launchDst, Path pidFile)480     public void writeLocalWrapperScript(Path launchDst, Path pidFile)
481       throws IOException {
482       writeSessionScript(launchDst, pidFile);
483       super.writeLocalWrapperScript(launchDst, pidFile);
484     }
485 
486     @Override
writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout)487     public void writeLocalWrapperScript(Path launchDst, Path pidFile,
488                                         PrintStream pout) {
489 
490       String exitCodeFile = ContainerLaunch.getExitCodeFile(
491         pidFile.toString());
492       String tmpFile = exitCodeFile + ".tmp";
493       pout.println("#!/usr/bin/env bash");
494       pout.println("bash \"" + sessionScriptPath.toString() + "\"");
495       pout.println("rc=$?");
496       pout.println("echo $rc > \"" + tmpFile + "\"");
497       pout.println("mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
498       pout.println("exit $rc");
499     }
500 
writeSessionScript(Path launchDst, Path pidFile)501     private void writeSessionScript(Path launchDst, Path pidFile)
502       throws IOException {
503       DataOutputStream out = null;
504       PrintStream pout = null;
505       try {
506         out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
507         pout = new PrintStream(out, false, "UTF-8");
508         // We need to do a move as writing to a file is not atomic
509         // Process reading a file being written to may get garbled data
510         // hence write pid to tmp file first followed by a mv
511         pout.println("#!/usr/bin/env bash");
512         pout.println();
513         pout.println("echo "+ dockerPidScript +" > " + pidFile.toString() + ".tmp");
514         pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
515         pout.println(dockerCommand + " bash \"" +
516           launchDst.toUri().getPath().toString() + "\"");
517       } finally {
518         IOUtils.cleanup(LOG, pout, out);
519       }
520       lfs.setPermission(sessionScriptPath,
521         ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
522     }
523   }
524 
createDir(Path dirPath, FsPermission perms, boolean createParent, String user)525   protected void createDir(Path dirPath, FsPermission perms,
526                            boolean createParent, String user) throws IOException {
527     lfs.mkdir(dirPath, perms, createParent);
528     if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
529       lfs.setPermission(dirPath, perms);
530     }
531   }
532 
533   /**
534    * Initialize the local directories for a particular user.
535    * <ul>.mkdir
536    * <li>$local.dir/usercache/$user</li>
537    * </ul>
538    */
createUserLocalDirs(List<String> localDirs, String user)539   void createUserLocalDirs(List<String> localDirs, String user)
540       throws IOException {
541     boolean userDirStatus = false;
542     FsPermission userperms = new FsPermission(USER_PERM);
543     for (String localDir : localDirs) {
544       // create $local.dir/usercache/$user and its immediate parent
545       try {
546         createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user);
547       } catch (IOException e) {
548         LOG.warn("Unable to create the user directory : " + localDir, e);
549         continue;
550       }
551       userDirStatus = true;
552     }
553     if (!userDirStatus) {
554       throw new IOException("Not able to initialize user directories "
555           + "in any of the configured local directories for user " + user);
556     }
557   }
558 
559 
560   /**
561    * Initialize the local cache directories for a particular user.
562    * <ul>
563    * <li>$local.dir/usercache/$user</li>
564    * <li>$local.dir/usercache/$user/appcache</li>
565    * <li>$local.dir/usercache/$user/filecache</li>
566    * </ul>
567    */
createUserCacheDirs(List<String> localDirs, String user)568   void createUserCacheDirs(List<String> localDirs, String user)
569     throws IOException {
570     LOG.info("Initializing user " + user);
571 
572     boolean appcacheDirStatus = false;
573     boolean distributedCacheDirStatus = false;
574     FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
575     FsPermission fileperms = new FsPermission(FILECACHE_PERM);
576 
577     for (String localDir : localDirs) {
578       // create $local.dir/usercache/$user/appcache
579       Path localDirPath = new Path(localDir);
580       final Path appDir = getAppcacheDir(localDirPath, user);
581       try {
582         createDir(appDir, appCachePerms, true, user);
583         appcacheDirStatus = true;
584       } catch (IOException e) {
585         LOG.warn("Unable to create app cache directory : " + appDir, e);
586       }
587       // create $local.dir/usercache/$user/filecache
588       final Path distDir = getFileCacheDir(localDirPath, user);
589       try {
590         createDir(distDir, fileperms, true, user);
591         distributedCacheDirStatus = true;
592       } catch (IOException e) {
593         LOG.warn("Unable to create file cache directory : " + distDir, e);
594       }
595     }
596     if (!appcacheDirStatus) {
597       throw new IOException("Not able to initialize app-cache directories "
598         + "in any of the configured local directories for user " + user);
599     }
600     if (!distributedCacheDirStatus) {
601       throw new IOException(
602         "Not able to initialize distributed-cache directories "
603           + "in any of the configured local directories for user "
604           + user);
605     }
606   }
607 
608   /**
609    * Initialize the local directories for a particular user.
610    * <ul>
611    * <li>$local.dir/usercache/$user/appcache/$appid</li>
612    * </ul>
613    * @param localDirs
614    */
createAppDirs(List<String> localDirs, String user, String appId)615   void createAppDirs(List<String> localDirs, String user, String appId)
616     throws IOException {
617     boolean initAppDirStatus = false;
618     FsPermission appperms = new FsPermission(APPDIR_PERM);
619     for (String localDir : localDirs) {
620       Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
621       // create $local.dir/usercache/$user/appcache/$appId
622       try {
623         createDir(fullAppDir, appperms, true, user);
624         initAppDirStatus = true;
625       } catch (IOException e) {
626         LOG.warn("Unable to create app directory " + fullAppDir.toString(), e);
627       }
628     }
629     if (!initAppDirStatus) {
630       throw new IOException("Not able to initialize app directories "
631         + "in any of the configured local directories for app "
632         + appId.toString());
633     }
634   }
635 
636 
637   /**
638    * Create application log directories on all disks.
639    */
createContainerLogDirs(String appId, String containerId, List<String> logDirs, String user)640   void createContainerLogDirs(String appId, String containerId,
641                               List<String> logDirs, String user) throws IOException {
642 
643     boolean containerLogDirStatus = false;
644     FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
645     for (String rootLogDir : logDirs) {
646       // create $log.dir/$appid/$containerid
647       Path appLogDir = new Path(rootLogDir, appId);
648       Path containerLogDir = new Path(appLogDir, containerId);
649       try {
650         createDir(containerLogDir, containerLogDirPerms, true, user);
651       } catch (IOException e) {
652         LOG.warn("Unable to create the container-log directory : "
653           + appLogDir, e);
654         continue;
655       }
656       containerLogDirStatus = true;
657     }
658     if (!containerLogDirStatus) {
659       throw new IOException(
660         "Not able to initialize container-log directories "
661           + "in any of the configured local directories for container "
662           + containerId);
663     }
664   }
665 
666   /**
667    * Permissions for user dir.
668    * $local.dir/usercache/$user
669    */
670   static final short USER_PERM = (short) 0750;
671   /**
672    * Permissions for user appcache dir.
673    * $local.dir/usercache/$user/appcache
674    */
675   static final short APPCACHE_PERM = (short) 0710;
676   /**
677    * Permissions for user filecache dir.
678    * $local.dir/usercache/$user/filecache
679    */
680   static final short FILECACHE_PERM = (short) 0710;
681   /**
682    * Permissions for user app dir.
683    * $local.dir/usercache/$user/appcache/$appId
684    */
685   static final short APPDIR_PERM = (short) 0710;
686   /**
687    * Permissions for user log dir.
688    * $logdir/$user/$appId
689    */
690   static final short LOGDIR_PERM = (short) 0710;
691 
getDiskFreeSpace(Path base)692   private long getDiskFreeSpace(Path base) throws IOException {
693     return lfs.getFsStatus(base).getRemaining();
694   }
695 
getApplicationDir(Path base, String user, String appId)696   private Path getApplicationDir(Path base, String user, String appId) {
697     return new Path(getAppcacheDir(base, user), appId);
698   }
699 
getUserCacheDir(Path base, String user)700   private Path getUserCacheDir(Path base, String user) {
701     return new Path(new Path(base, ContainerLocalizer.USERCACHE), user);
702   }
703 
getAppcacheDir(Path base, String user)704   private Path getAppcacheDir(Path base, String user) {
705     return new Path(getUserCacheDir(base, user),
706         ContainerLocalizer.APPCACHE);
707   }
708 
getFileCacheDir(Path base, String user)709   private Path getFileCacheDir(Path base, String user) {
710     return new Path(getUserCacheDir(base, user),
711         ContainerLocalizer.FILECACHE);
712   }
713 
getWorkingDir(List<String> localDirs, String user, String appId)714   protected Path getWorkingDir(List<String> localDirs, String user,
715                                String appId) throws IOException {
716     Path appStorageDir = null;
717     long totalAvailable = 0L;
718     long[] availableOnDisk = new long[localDirs.size()];
719     int i = 0;
720     // randomly choose the app directory
721     // the chance of picking a directory is proportional to
722     // the available space on the directory.
723     // firstly calculate the sum of all available space on these directories
724     for (String localDir : localDirs) {
725       Path curBase = getApplicationDir(new Path(localDir),
726         user, appId);
727       long space = 0L;
728       try {
729         space = getDiskFreeSpace(curBase);
730       } catch (IOException e) {
731         LOG.warn("Unable to get Free Space for " + curBase.toString(), e);
732       }
733       availableOnDisk[i++] = space;
734       totalAvailable += space;
735     }
736 
737     // throw an IOException if totalAvailable is 0.
738     if (totalAvailable <= 0L) {
739       throw new IOException("Not able to find a working directory for "
740         + user);
741     }
742 
743     // make probability to pick a directory proportional to
744     // the available space on the directory.
745     long randomPosition = RandomUtils.nextLong() % totalAvailable;
746     int dir = 0;
747     // skip zero available space directory,
748     // because totalAvailable is greater than 0 and randomPosition
749     // is less than totalAvailable, we can find a valid directory
750     // with nonzero available space.
751     while (availableOnDisk[dir] == 0L) {
752       dir++;
753     }
754     while (randomPosition > availableOnDisk[dir]) {
755       randomPosition -= availableOnDisk[dir++];
756     }
757     appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
758       user, appId);
759 
760     return appStorageDir;
761   }
762 
763   /**
764    * Create application log directories on all disks.
765    */
createAppLogDirs(String appId, List<String> logDirs, String user)766   void createAppLogDirs(String appId, List<String> logDirs, String user)
767     throws IOException {
768 
769     boolean appLogDirStatus = false;
770     FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
771     for (String rootLogDir : logDirs) {
772       // create $log.dir/$appid
773       Path appLogDir = new Path(rootLogDir, appId);
774       try {
775         createDir(appLogDir, appLogDirPerms, true, user);
776       } catch (IOException e) {
777         LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
778         continue;
779       }
780       appLogDirStatus = true;
781     }
782     if (!appLogDirStatus) {
783       throw new IOException("Not able to initialize app-log directories "
784         + "in any of the configured local directories for app " + appId);
785     }
786   }
787 
788   /**
789    * @return the list of paths of given local directories
790    */
getPaths(List<String> dirs)791   private static List<Path> getPaths(List<String> dirs) {
792     List<Path> paths = new ArrayList<Path>(dirs.size());
793     for (int i = 0; i < dirs.size(); i++) {
794       paths.add(new Path(dirs.get(i)));
795     }
796     return paths;
797   }
798 
799 }