1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.nodemanager.util; 20 21 import java.io.BufferedReader; 22 import java.io.File; 23 import java.io.FileInputStream; 24 import java.io.FileOutputStream; 25 import java.io.FileReader; 26 import java.io.IOException; 27 import java.io.InputStreamReader; 28 import java.io.OutputStreamWriter; 29 import java.io.PrintWriter; 30 import java.io.Writer; 31 import java.util.ArrayList; 32 import java.util.Arrays; 33 import java.util.HashMap; 34 import java.util.List; 35 import java.util.Map; 36 import java.util.Map.Entry; 37 import java.util.regex.Matcher; 38 import java.util.regex.Pattern; 39 40 import com.google.common.annotations.VisibleForTesting; 41 42 import org.apache.commons.io.FileUtils; 43 import org.apache.commons.logging.Log; 44 import org.apache.commons.logging.LogFactory; 45 import org.apache.hadoop.conf.Configuration; 46 import org.apache.hadoop.fs.FileUtil; 47 import org.apache.hadoop.io.IOUtils; 48 import org.apache.hadoop.yarn.api.records.ContainerId; 49 import org.apache.hadoop.yarn.api.records.Resource; 50 import org.apache.hadoop.yarn.conf.YarnConfiguration; 51 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; 52 import org.apache.hadoop.yarn.util.Clock; 53 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; 54 import org.apache.hadoop.yarn.util.SystemClock; 55 56 public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { 57 58 final static Log LOG = LogFactory 59 .getLog(CgroupsLCEResourcesHandler.class); 60 61 private Configuration conf; 62 private String cgroupPrefix; 63 private boolean cgroupMount; 64 private String cgroupMountPath; 65 66 private boolean cpuWeightEnabled = true; 67 private boolean strictResourceUsageMode = false; 68 69 private final String MTAB_FILE = "/proc/mounts"; 70 private final String CGROUPS_FSTYPE = "cgroup"; 71 private final String CONTROLLER_CPU = "cpu"; 72 private final String CPU_PERIOD_US = "cfs_period_us"; 73 private final String CPU_QUOTA_US = "cfs_quota_us"; 74 private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel 75 private final int MAX_QUOTA_US = 1000 * 1000; 76 private final int MIN_PERIOD_US = 1000; 77 private final Map<String, String> controllerPaths; // Controller -> path 78 79 private long deleteCgroupTimeout; 80 private long deleteCgroupDelay; 81 // package private for testing purposes 82 Clock clock; 83 84 private float yarnProcessors; 85 CgroupsLCEResourcesHandler()86 public CgroupsLCEResourcesHandler() { 87 this.controllerPaths = new HashMap<String, String>(); 88 clock = new SystemClock(); 89 } 90 91 @Override setConf(Configuration conf)92 public void setConf(Configuration conf) { 93 this.conf = conf; 94 } 95 96 @Override getConf()97 public Configuration getConf() { 98 return conf; 99 } 100 101 @VisibleForTesting initConfig()102 void initConfig() throws IOException { 103 104 this.cgroupPrefix = conf.get(YarnConfiguration. 105 NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn"); 106 this.cgroupMount = conf.getBoolean(YarnConfiguration. 107 NM_LINUX_CONTAINER_CGROUPS_MOUNT, false); 108 this.cgroupMountPath = conf.get(YarnConfiguration. 109 NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, null); 110 111 this.deleteCgroupTimeout = conf.getLong( 112 YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT, 113 YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT); 114 this.deleteCgroupDelay = 115 conf.getLong(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY, 116 YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY); 117 // remove extra /'s at end or start of cgroupPrefix 118 if (cgroupPrefix.charAt(0) == '/') { 119 cgroupPrefix = cgroupPrefix.substring(1); 120 } 121 122 this.strictResourceUsageMode = 123 conf 124 .getBoolean( 125 YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, 126 YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE); 127 128 int len = cgroupPrefix.length(); 129 if (cgroupPrefix.charAt(len - 1) == '/') { 130 cgroupPrefix = cgroupPrefix.substring(0, len - 1); 131 } 132 } 133 init(LinuxContainerExecutor lce)134 public void init(LinuxContainerExecutor lce) throws IOException { 135 this.init(lce, 136 ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf)); 137 } 138 139 @VisibleForTesting init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)140 void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin) 141 throws IOException { 142 initConfig(); 143 144 // mount cgroups if requested 145 if (cgroupMount && cgroupMountPath != null) { 146 ArrayList<String> cgroupKVs = new ArrayList<String>(); 147 cgroupKVs.add(CONTROLLER_CPU + "=" + cgroupMountPath + "/" + 148 CONTROLLER_CPU); 149 lce.mountCgroups(cgroupKVs, cgroupPrefix); 150 } 151 152 initializeControllerPaths(); 153 154 // cap overall usage to the number of cores allocated to YARN 155 yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf); 156 int systemProcessors = plugin.getNumProcessors(); 157 if (systemProcessors != (int) yarnProcessors) { 158 LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); 159 int[] limits = getOverallLimits(yarnProcessors); 160 updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0])); 161 updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1])); 162 } else if (cpuLimitsExist()) { 163 LOG.info("Removing CPU constraints for YARN containers."); 164 updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1)); 165 } 166 } 167 cpuLimitsExist()168 boolean cpuLimitsExist() throws IOException { 169 String path = pathForCgroup(CONTROLLER_CPU, ""); 170 File quotaFile = new File(path, CONTROLLER_CPU + "." + CPU_QUOTA_US); 171 if (quotaFile.exists()) { 172 String contents = FileUtils.readFileToString(quotaFile, "UTF-8"); 173 int quotaUS = Integer.parseInt(contents.trim()); 174 if (quotaUS != -1) { 175 return true; 176 } 177 } 178 return false; 179 } 180 181 @VisibleForTesting getOverallLimits(float yarnProcessors)182 int[] getOverallLimits(float yarnProcessors) { 183 184 int[] ret = new int[2]; 185 186 if (yarnProcessors < 0.01f) { 187 throw new IllegalArgumentException("Number of processors can't be <= 0."); 188 } 189 190 int quotaUS = MAX_QUOTA_US; 191 int periodUS = (int) (MAX_QUOTA_US / yarnProcessors); 192 if (yarnProcessors < 1.0f) { 193 periodUS = MAX_QUOTA_US; 194 quotaUS = (int) (periodUS * yarnProcessors); 195 if (quotaUS < MIN_PERIOD_US) { 196 LOG 197 .warn("The quota calculated for the cgroup was too low. The minimum value is " 198 + MIN_PERIOD_US + ", calculated value is " + quotaUS 199 + ". Setting quota to minimum value."); 200 quotaUS = MIN_PERIOD_US; 201 } 202 } 203 204 // cfs_period_us can't be less than 1000 microseconds 205 // if the value of periodUS is less than 1000, we can't really use cgroups 206 // to limit cpu 207 if (periodUS < MIN_PERIOD_US) { 208 LOG 209 .warn("The period calculated for the cgroup was too low. The minimum value is " 210 + MIN_PERIOD_US + ", calculated value is " + periodUS 211 + ". Using all available CPU."); 212 periodUS = MAX_QUOTA_US; 213 quotaUS = -1; 214 } 215 216 ret[0] = periodUS; 217 ret[1] = quotaUS; 218 return ret; 219 } 220 isCpuWeightEnabled()221 boolean isCpuWeightEnabled() { 222 return this.cpuWeightEnabled; 223 } 224 225 /* 226 * Next four functions are for an individual cgroup. 227 */ 228 pathForCgroup(String controller, String groupName)229 private String pathForCgroup(String controller, String groupName) { 230 String controllerPath = controllerPaths.get(controller); 231 return controllerPath + "/" + cgroupPrefix + "/" + groupName; 232 } 233 createCgroup(String controller, String groupName)234 private void createCgroup(String controller, String groupName) 235 throws IOException { 236 String path = pathForCgroup(controller, groupName); 237 238 if (LOG.isDebugEnabled()) { 239 LOG.debug("createCgroup: " + path); 240 } 241 242 if (! new File(path).mkdir()) { 243 throw new IOException("Failed to create cgroup at " + path); 244 } 245 } 246 updateCgroup(String controller, String groupName, String param, String value)247 private void updateCgroup(String controller, String groupName, String param, 248 String value) throws IOException { 249 String path = pathForCgroup(controller, groupName); 250 param = controller + "." + param; 251 252 if (LOG.isDebugEnabled()) { 253 LOG.debug("updateCgroup: " + path + ": " + param + "=" + value); 254 } 255 256 PrintWriter pw = null; 257 try { 258 File file = new File(path + "/" + param); 259 Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8"); 260 pw = new PrintWriter(w); 261 pw.write(value); 262 } catch (IOException e) { 263 throw new IOException("Unable to set " + param + "=" + value + 264 " for cgroup at: " + path, e); 265 } finally { 266 if (pw != null) { 267 boolean hasError = pw.checkError(); 268 pw.close(); 269 if(hasError) { 270 throw new IOException("Unable to set " + param + "=" + value + 271 " for cgroup at: " + path); 272 } 273 if(pw.checkError()) { 274 throw new IOException("Error while closing cgroup file " + path); 275 } 276 } 277 } 278 } 279 280 /* 281 * Utility routine to print first line from cgroup tasks file 282 */ logLineFromTasksFile(File cgf)283 private void logLineFromTasksFile(File cgf) { 284 String str; 285 if (LOG.isDebugEnabled()) { 286 try (BufferedReader inl = 287 new BufferedReader(new InputStreamReader(new FileInputStream(cgf 288 + "/tasks"), "UTF-8"))) { 289 if ((str = inl.readLine()) != null) { 290 LOG.debug("First line in cgroup tasks file: " + cgf + " " + str); 291 } 292 } catch (IOException e) { 293 LOG.warn("Failed to read cgroup tasks file. ", e); 294 } 295 } 296 } 297 298 /** 299 * If tasks file is empty, delete the cgroup. 300 * 301 * @param file object referring to the cgroup to be deleted 302 * @return Boolean indicating whether cgroup was deleted 303 */ 304 @VisibleForTesting checkAndDeleteCgroup(File cgf)305 boolean checkAndDeleteCgroup(File cgf) throws InterruptedException { 306 boolean deleted = false; 307 // FileInputStream in = null; 308 try (FileInputStream in = new FileInputStream(cgf + "/tasks")) { 309 if (in.read() == -1) { 310 /* 311 * "tasks" file is empty, sleep a bit more and then try to delete the 312 * cgroup. Some versions of linux will occasionally panic due to a race 313 * condition in this area, hence the paranoia. 314 */ 315 Thread.sleep(deleteCgroupDelay); 316 deleted = cgf.delete(); 317 if (!deleted) { 318 LOG.warn("Failed attempt to delete cgroup: " + cgf); 319 } 320 } else { 321 logLineFromTasksFile(cgf); 322 } 323 } catch (IOException e) { 324 LOG.warn("Failed to read cgroup tasks file. ", e); 325 } 326 return deleted; 327 } 328 329 @VisibleForTesting deleteCgroup(String cgroupPath)330 boolean deleteCgroup(String cgroupPath) { 331 boolean deleted = false; 332 333 if (LOG.isDebugEnabled()) { 334 LOG.debug("deleteCgroup: " + cgroupPath); 335 } 336 long start = clock.getTime(); 337 do { 338 try { 339 deleted = checkAndDeleteCgroup(new File(cgroupPath)); 340 if (!deleted) { 341 Thread.sleep(deleteCgroupDelay); 342 } 343 } catch (InterruptedException ex) { 344 // NOP 345 } 346 } while (!deleted && (clock.getTime() - start) < deleteCgroupTimeout); 347 348 if (!deleted) { 349 LOG.warn("Unable to delete cgroup at: " + cgroupPath + 350 ", tried to delete for " + deleteCgroupTimeout + "ms"); 351 } 352 return deleted; 353 } 354 355 /* 356 * Next three functions operate on all the resources we are enforcing. 357 */ 358 setupLimits(ContainerId containerId, Resource containerResource)359 private void setupLimits(ContainerId containerId, 360 Resource containerResource) throws IOException { 361 String containerName = containerId.toString(); 362 363 if (isCpuWeightEnabled()) { 364 int containerVCores = containerResource.getVirtualCores(); 365 createCgroup(CONTROLLER_CPU, containerName); 366 int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; 367 updateCgroup(CONTROLLER_CPU, containerName, "shares", 368 String.valueOf(cpuShares)); 369 if (strictResourceUsageMode) { 370 int nodeVCores = 371 conf.getInt(YarnConfiguration.NM_VCORES, 372 YarnConfiguration.DEFAULT_NM_VCORES); 373 if (nodeVCores != containerVCores) { 374 float containerCPU = 375 (containerVCores * yarnProcessors) / (float) nodeVCores; 376 int[] limits = getOverallLimits(containerCPU); 377 updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US, 378 String.valueOf(limits[0])); 379 updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US, 380 String.valueOf(limits[1])); 381 } 382 } 383 } 384 } 385 clearLimits(ContainerId containerId)386 private void clearLimits(ContainerId containerId) { 387 if (isCpuWeightEnabled()) { 388 deleteCgroup(pathForCgroup(CONTROLLER_CPU, containerId.toString())); 389 } 390 } 391 392 /* 393 * LCE Resources Handler interface 394 */ 395 preExecute(ContainerId containerId, Resource containerResource)396 public void preExecute(ContainerId containerId, Resource containerResource) 397 throws IOException { 398 setupLimits(containerId, containerResource); 399 } 400 postExecute(ContainerId containerId)401 public void postExecute(ContainerId containerId) { 402 clearLimits(containerId); 403 } 404 getResourcesOption(ContainerId containerId)405 public String getResourcesOption(ContainerId containerId) { 406 String containerName = containerId.toString(); 407 408 StringBuilder sb = new StringBuilder("cgroups="); 409 410 if (isCpuWeightEnabled()) { 411 sb.append(pathForCgroup(CONTROLLER_CPU, containerName) + "/tasks"); 412 sb.append(","); 413 } 414 415 if (sb.charAt(sb.length() - 1) == ',') { 416 sb.deleteCharAt(sb.length() - 1); 417 } 418 419 return sb.toString(); 420 } 421 422 /* We are looking for entries of the form: 423 * none /cgroup/path/mem cgroup rw,memory 0 0 424 * 425 * Use a simple pattern that splits on the five spaces, and 426 * grabs the 2, 3, and 4th fields. 427 */ 428 429 private static final Pattern MTAB_FILE_FORMAT = Pattern.compile( 430 "^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$"); 431 432 /* 433 * Returns a map: path -> mount options 434 * for mounts with type "cgroup". Cgroup controllers will 435 * appear in the list of options for a path. 436 */ parseMtab()437 private Map<String, List<String>> parseMtab() throws IOException { 438 Map<String, List<String>> ret = new HashMap<String, List<String>>(); 439 BufferedReader in = null; 440 441 try { 442 FileInputStream fis = new FileInputStream(new File(getMtabFileName())); 443 in = new BufferedReader(new InputStreamReader(fis, "UTF-8")); 444 445 for (String str = in.readLine(); str != null; 446 str = in.readLine()) { 447 Matcher m = MTAB_FILE_FORMAT.matcher(str); 448 boolean mat = m.find(); 449 if (mat) { 450 String path = m.group(1); 451 String type = m.group(2); 452 String options = m.group(3); 453 454 if (type.equals(CGROUPS_FSTYPE)) { 455 List<String> value = Arrays.asList(options.split(",")); 456 ret.put(path, value); 457 } 458 } 459 } 460 } catch (IOException e) { 461 throw new IOException("Error while reading " + getMtabFileName(), e); 462 } finally { 463 IOUtils.cleanup(LOG, in); 464 } 465 466 return ret; 467 } 468 findControllerInMtab(String controller, Map<String, List<String>> entries)469 private String findControllerInMtab(String controller, 470 Map<String, List<String>> entries) { 471 for (Entry<String, List<String>> e : entries.entrySet()) { 472 if (e.getValue().contains(controller)) 473 return e.getKey(); 474 } 475 476 return null; 477 } 478 initializeControllerPaths()479 private void initializeControllerPaths() throws IOException { 480 String controllerPath; 481 Map<String, List<String>> parsedMtab = parseMtab(); 482 483 // CPU 484 485 controllerPath = findControllerInMtab(CONTROLLER_CPU, parsedMtab); 486 487 if (controllerPath != null) { 488 File f = new File(controllerPath + "/" + this.cgroupPrefix); 489 490 if (FileUtil.canWrite(f)) { 491 controllerPaths.put(CONTROLLER_CPU, controllerPath); 492 } else { 493 throw new IOException("Not able to enforce cpu weights; cannot write " 494 + "to cgroup at: " + controllerPath); 495 } 496 } else { 497 throw new IOException("Not able to enforce cpu weights; cannot find " 498 + "cgroup for cpu controller in " + getMtabFileName()); 499 } 500 } 501 502 @VisibleForTesting getMtabFileName()503 String getMtabFileName() { 504 return MTAB_FILE; 505 } 506 } 507