1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.nodemanager.util;
20 
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.FileOutputStream;
25 import java.io.FileReader;
26 import java.io.IOException;
27 import java.io.InputStreamReader;
28 import java.io.OutputStreamWriter;
29 import java.io.PrintWriter;
30 import java.io.Writer;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Map.Entry;
37 import java.util.regex.Matcher;
38 import java.util.regex.Pattern;
39 
40 import com.google.common.annotations.VisibleForTesting;
41 
42 import org.apache.commons.io.FileUtils;
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.fs.FileUtil;
47 import org.apache.hadoop.io.IOUtils;
48 import org.apache.hadoop.yarn.api.records.ContainerId;
49 import org.apache.hadoop.yarn.api.records.Resource;
50 import org.apache.hadoop.yarn.conf.YarnConfiguration;
51 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
52 import org.apache.hadoop.yarn.util.Clock;
53 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
54 import org.apache.hadoop.yarn.util.SystemClock;
55 
56 public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
57 
58   final static Log LOG = LogFactory
59       .getLog(CgroupsLCEResourcesHandler.class);
60 
61   private Configuration conf;
62   private String cgroupPrefix;
63   private boolean cgroupMount;
64   private String cgroupMountPath;
65 
66   private boolean cpuWeightEnabled = true;
67   private boolean strictResourceUsageMode = false;
68 
69   private final String MTAB_FILE = "/proc/mounts";
70   private final String CGROUPS_FSTYPE = "cgroup";
71   private final String CONTROLLER_CPU = "cpu";
72   private final String CPU_PERIOD_US = "cfs_period_us";
73   private final String CPU_QUOTA_US = "cfs_quota_us";
74   private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
75   private final int MAX_QUOTA_US = 1000 * 1000;
76   private final int MIN_PERIOD_US = 1000;
77   private final Map<String, String> controllerPaths; // Controller -> path
78 
79   private long deleteCgroupTimeout;
80   private long deleteCgroupDelay;
81   // package private for testing purposes
82   Clock clock;
83 
84   private float yarnProcessors;
85 
CgroupsLCEResourcesHandler()86   public CgroupsLCEResourcesHandler() {
87     this.controllerPaths = new HashMap<String, String>();
88     clock = new SystemClock();
89   }
90 
91   @Override
setConf(Configuration conf)92   public void setConf(Configuration conf) {
93     this.conf = conf;
94   }
95 
96   @Override
getConf()97   public Configuration getConf() {
98     return conf;
99   }
100 
101   @VisibleForTesting
initConfig()102   void initConfig() throws IOException {
103 
104     this.cgroupPrefix = conf.get(YarnConfiguration.
105             NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn");
106     this.cgroupMount = conf.getBoolean(YarnConfiguration.
107             NM_LINUX_CONTAINER_CGROUPS_MOUNT, false);
108     this.cgroupMountPath = conf.get(YarnConfiguration.
109             NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, null);
110 
111     this.deleteCgroupTimeout = conf.getLong(
112         YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT,
113         YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT);
114     this.deleteCgroupDelay =
115         conf.getLong(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY,
116             YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY);
117     // remove extra /'s at end or start of cgroupPrefix
118     if (cgroupPrefix.charAt(0) == '/') {
119       cgroupPrefix = cgroupPrefix.substring(1);
120     }
121 
122     this.strictResourceUsageMode =
123         conf
124           .getBoolean(
125             YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
126             YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
127 
128     int len = cgroupPrefix.length();
129     if (cgroupPrefix.charAt(len - 1) == '/') {
130       cgroupPrefix = cgroupPrefix.substring(0, len - 1);
131     }
132   }
133 
init(LinuxContainerExecutor lce)134   public void init(LinuxContainerExecutor lce) throws IOException {
135     this.init(lce,
136         ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf));
137   }
138 
139   @VisibleForTesting
init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)140   void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
141       throws IOException {
142     initConfig();
143 
144     // mount cgroups if requested
145     if (cgroupMount && cgroupMountPath != null) {
146       ArrayList<String> cgroupKVs = new ArrayList<String>();
147       cgroupKVs.add(CONTROLLER_CPU + "=" + cgroupMountPath + "/" +
148                     CONTROLLER_CPU);
149       lce.mountCgroups(cgroupKVs, cgroupPrefix);
150     }
151 
152     initializeControllerPaths();
153 
154     // cap overall usage to the number of cores allocated to YARN
155     yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
156     int systemProcessors = plugin.getNumProcessors();
157     if (systemProcessors != (int) yarnProcessors) {
158       LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
159       int[] limits = getOverallLimits(yarnProcessors);
160       updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0]));
161       updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1]));
162     } else if (cpuLimitsExist()) {
163       LOG.info("Removing CPU constraints for YARN containers.");
164       updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));
165     }
166   }
167 
cpuLimitsExist()168   boolean cpuLimitsExist() throws IOException {
169     String path = pathForCgroup(CONTROLLER_CPU, "");
170     File quotaFile = new File(path, CONTROLLER_CPU + "." + CPU_QUOTA_US);
171     if (quotaFile.exists()) {
172       String contents = FileUtils.readFileToString(quotaFile, "UTF-8");
173       int quotaUS = Integer.parseInt(contents.trim());
174       if (quotaUS != -1) {
175         return true;
176       }
177     }
178     return false;
179   }
180 
181   @VisibleForTesting
getOverallLimits(float yarnProcessors)182   int[] getOverallLimits(float yarnProcessors) {
183 
184     int[] ret = new int[2];
185 
186     if (yarnProcessors < 0.01f) {
187       throw new IllegalArgumentException("Number of processors can't be <= 0.");
188     }
189 
190     int quotaUS = MAX_QUOTA_US;
191     int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
192     if (yarnProcessors < 1.0f) {
193       periodUS = MAX_QUOTA_US;
194       quotaUS = (int) (periodUS * yarnProcessors);
195       if (quotaUS < MIN_PERIOD_US) {
196         LOG
197           .warn("The quota calculated for the cgroup was too low. The minimum value is "
198               + MIN_PERIOD_US + ", calculated value is " + quotaUS
199               + ". Setting quota to minimum value.");
200         quotaUS = MIN_PERIOD_US;
201       }
202     }
203 
204     // cfs_period_us can't be less than 1000 microseconds
205     // if the value of periodUS is less than 1000, we can't really use cgroups
206     // to limit cpu
207     if (periodUS < MIN_PERIOD_US) {
208       LOG
209         .warn("The period calculated for the cgroup was too low. The minimum value is "
210             + MIN_PERIOD_US + ", calculated value is " + periodUS
211             + ". Using all available CPU.");
212       periodUS = MAX_QUOTA_US;
213       quotaUS = -1;
214     }
215 
216     ret[0] = periodUS;
217     ret[1] = quotaUS;
218     return ret;
219   }
220 
isCpuWeightEnabled()221   boolean isCpuWeightEnabled() {
222     return this.cpuWeightEnabled;
223   }
224 
225   /*
226    * Next four functions are for an individual cgroup.
227    */
228 
pathForCgroup(String controller, String groupName)229   private String pathForCgroup(String controller, String groupName) {
230     String controllerPath = controllerPaths.get(controller);
231     return controllerPath + "/" + cgroupPrefix + "/" + groupName;
232   }
233 
createCgroup(String controller, String groupName)234   private void createCgroup(String controller, String groupName)
235         throws IOException {
236     String path = pathForCgroup(controller, groupName);
237 
238     if (LOG.isDebugEnabled()) {
239       LOG.debug("createCgroup: " + path);
240     }
241 
242     if (! new File(path).mkdir()) {
243       throw new IOException("Failed to create cgroup at " + path);
244     }
245   }
246 
updateCgroup(String controller, String groupName, String param, String value)247   private void updateCgroup(String controller, String groupName, String param,
248                             String value) throws IOException {
249     String path = pathForCgroup(controller, groupName);
250     param = controller + "." + param;
251 
252     if (LOG.isDebugEnabled()) {
253       LOG.debug("updateCgroup: " + path + ": " + param + "=" + value);
254     }
255 
256     PrintWriter pw = null;
257     try {
258       File file = new File(path + "/" + param);
259       Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");
260       pw = new PrintWriter(w);
261       pw.write(value);
262     } catch (IOException e) {
263       throw new IOException("Unable to set " + param + "=" + value +
264           " for cgroup at: " + path, e);
265     } finally {
266       if (pw != null) {
267         boolean hasError = pw.checkError();
268         pw.close();
269         if(hasError) {
270           throw new IOException("Unable to set " + param + "=" + value +
271                 " for cgroup at: " + path);
272         }
273         if(pw.checkError()) {
274           throw new IOException("Error while closing cgroup file " + path);
275         }
276       }
277     }
278   }
279 
280   /*
281    * Utility routine to print first line from cgroup tasks file
282    */
logLineFromTasksFile(File cgf)283   private void logLineFromTasksFile(File cgf) {
284     String str;
285     if (LOG.isDebugEnabled()) {
286       try (BufferedReader inl =
287             new BufferedReader(new InputStreamReader(new FileInputStream(cgf
288               + "/tasks"), "UTF-8"))) {
289         if ((str = inl.readLine()) != null) {
290           LOG.debug("First line in cgroup tasks file: " + cgf + " " + str);
291         }
292       } catch (IOException e) {
293         LOG.warn("Failed to read cgroup tasks file. ", e);
294       }
295     }
296   }
297 
298   /**
299    * If tasks file is empty, delete the cgroup.
300    *
301    * @param file object referring to the cgroup to be deleted
302    * @return Boolean indicating whether cgroup was deleted
303    */
304   @VisibleForTesting
checkAndDeleteCgroup(File cgf)305   boolean checkAndDeleteCgroup(File cgf) throws InterruptedException {
306     boolean deleted = false;
307     // FileInputStream in = null;
308     try (FileInputStream in = new FileInputStream(cgf + "/tasks")) {
309       if (in.read() == -1) {
310         /*
311          * "tasks" file is empty, sleep a bit more and then try to delete the
312          * cgroup. Some versions of linux will occasionally panic due to a race
313          * condition in this area, hence the paranoia.
314          */
315         Thread.sleep(deleteCgroupDelay);
316         deleted = cgf.delete();
317         if (!deleted) {
318           LOG.warn("Failed attempt to delete cgroup: " + cgf);
319         }
320       } else {
321         logLineFromTasksFile(cgf);
322       }
323     } catch (IOException e) {
324       LOG.warn("Failed to read cgroup tasks file. ", e);
325     }
326     return deleted;
327   }
328 
329   @VisibleForTesting
deleteCgroup(String cgroupPath)330   boolean deleteCgroup(String cgroupPath) {
331     boolean deleted = false;
332 
333     if (LOG.isDebugEnabled()) {
334       LOG.debug("deleteCgroup: " + cgroupPath);
335     }
336     long start = clock.getTime();
337     do {
338       try {
339         deleted = checkAndDeleteCgroup(new File(cgroupPath));
340         if (!deleted) {
341           Thread.sleep(deleteCgroupDelay);
342         }
343       } catch (InterruptedException ex) {
344         // NOP
345       }
346     } while (!deleted && (clock.getTime() - start) < deleteCgroupTimeout);
347 
348     if (!deleted) {
349       LOG.warn("Unable to delete cgroup at: " + cgroupPath +
350           ", tried to delete for " + deleteCgroupTimeout + "ms");
351     }
352     return deleted;
353   }
354 
355   /*
356    * Next three functions operate on all the resources we are enforcing.
357    */
358 
setupLimits(ContainerId containerId, Resource containerResource)359   private void setupLimits(ContainerId containerId,
360                            Resource containerResource) throws IOException {
361     String containerName = containerId.toString();
362 
363     if (isCpuWeightEnabled()) {
364       int containerVCores = containerResource.getVirtualCores();
365       createCgroup(CONTROLLER_CPU, containerName);
366       int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
367       updateCgroup(CONTROLLER_CPU, containerName, "shares",
368           String.valueOf(cpuShares));
369       if (strictResourceUsageMode) {
370         int nodeVCores =
371             conf.getInt(YarnConfiguration.NM_VCORES,
372               YarnConfiguration.DEFAULT_NM_VCORES);
373         if (nodeVCores != containerVCores) {
374           float containerCPU =
375               (containerVCores * yarnProcessors) / (float) nodeVCores;
376           int[] limits = getOverallLimits(containerCPU);
377           updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US,
378             String.valueOf(limits[0]));
379           updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US,
380             String.valueOf(limits[1]));
381         }
382       }
383     }
384   }
385 
clearLimits(ContainerId containerId)386   private void clearLimits(ContainerId containerId) {
387     if (isCpuWeightEnabled()) {
388       deleteCgroup(pathForCgroup(CONTROLLER_CPU, containerId.toString()));
389     }
390   }
391 
392   /*
393    * LCE Resources Handler interface
394    */
395 
preExecute(ContainerId containerId, Resource containerResource)396   public void preExecute(ContainerId containerId, Resource containerResource)
397               throws IOException {
398     setupLimits(containerId, containerResource);
399   }
400 
postExecute(ContainerId containerId)401   public void postExecute(ContainerId containerId) {
402     clearLimits(containerId);
403   }
404 
getResourcesOption(ContainerId containerId)405   public String getResourcesOption(ContainerId containerId) {
406     String containerName = containerId.toString();
407 
408     StringBuilder sb = new StringBuilder("cgroups=");
409 
410     if (isCpuWeightEnabled()) {
411       sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/tasks");
412       sb.append(",");
413     }
414 
415     if (sb.charAt(sb.length() - 1) == ',') {
416       sb.deleteCharAt(sb.length() - 1);
417     }
418 
419     return sb.toString();
420   }
421 
422   /* We are looking for entries of the form:
423    * none /cgroup/path/mem cgroup rw,memory 0 0
424    *
425    * Use a simple pattern that splits on the five spaces, and
426    * grabs the 2, 3, and 4th fields.
427    */
428 
429   private static final Pattern MTAB_FILE_FORMAT = Pattern.compile(
430       "^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$");
431 
432   /*
433    * Returns a map: path -> mount options
434    * for mounts with type "cgroup". Cgroup controllers will
435    * appear in the list of options for a path.
436    */
parseMtab()437   private Map<String, List<String>> parseMtab() throws IOException {
438     Map<String, List<String>> ret = new HashMap<String, List<String>>();
439     BufferedReader in = null;
440 
441     try {
442       FileInputStream fis = new FileInputStream(new File(getMtabFileName()));
443       in = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
444 
445       for (String str = in.readLine(); str != null;
446           str = in.readLine()) {
447         Matcher m = MTAB_FILE_FORMAT.matcher(str);
448         boolean mat = m.find();
449         if (mat) {
450           String path = m.group(1);
451           String type = m.group(2);
452           String options = m.group(3);
453 
454           if (type.equals(CGROUPS_FSTYPE)) {
455             List<String> value = Arrays.asList(options.split(","));
456             ret.put(path, value);
457           }
458         }
459       }
460     } catch (IOException e) {
461       throw new IOException("Error while reading " + getMtabFileName(), e);
462     } finally {
463       IOUtils.cleanup(LOG, in);
464     }
465 
466     return ret;
467   }
468 
findControllerInMtab(String controller, Map<String, List<String>> entries)469   private String findControllerInMtab(String controller,
470                                       Map<String, List<String>> entries) {
471     for (Entry<String, List<String>> e : entries.entrySet()) {
472       if (e.getValue().contains(controller))
473         return e.getKey();
474     }
475 
476     return null;
477   }
478 
initializeControllerPaths()479   private void initializeControllerPaths() throws IOException {
480     String controllerPath;
481     Map<String, List<String>> parsedMtab = parseMtab();
482 
483     // CPU
484 
485     controllerPath = findControllerInMtab(CONTROLLER_CPU, parsedMtab);
486 
487     if (controllerPath != null) {
488       File f = new File(controllerPath + "/" + this.cgroupPrefix);
489 
490       if (FileUtil.canWrite(f)) {
491         controllerPaths.put(CONTROLLER_CPU, controllerPath);
492       } else {
493         throw new IOException("Not able to enforce cpu weights; cannot write "
494             + "to cgroup at: " + controllerPath);
495       }
496     } else {
497       throw new IOException("Not able to enforce cpu weights; cannot find "
498           + "cgroup for cpu controller in " + getMtabFileName());
499     }
500   }
501 
502   @VisibleForTesting
getMtabFileName()503   String getMtabFileName() {
504     return MTAB_FILE;
505   }
506 }
507