1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.util; 20 21 import java.io.IOException; 22 import java.util.HashMap; 23 import java.util.Map; 24 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 import org.apache.hadoop.classification.InterfaceAudience.Private; 28 import org.apache.hadoop.util.Shell; 29 import org.apache.hadoop.util.Shell.ShellCommandExecutor; 30 import org.apache.hadoop.util.StringUtils; 31 32 @Private 33 public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree { 34 35 static final Log LOG = LogFactory 36 .getLog(WindowsBasedProcessTree.class); 37 38 static class ProcessInfo { 39 String pid; // process pid 40 long vmem; // virtual memory 41 long workingSet; // working set, RAM used 42 long cpuTimeMs; // total cpuTime in millisec 43 long cpuTimeMsDelta; // delta of cpuTime since last update 44 int age = 1; 45 } 46 47 private String taskProcessId = null; 48 private long cpuTimeMs = UNAVAILABLE; 49 private Map<String, ProcessInfo> processTree = 50 new HashMap<String, ProcessInfo>(); 51 isAvailable()52 public static boolean isAvailable() { 53 if (Shell.WINDOWS) { 54 ShellCommandExecutor shellExecutor = new ShellCommandExecutor( 55 new String[] { Shell.WINUTILS, "help" }); 56 try { 57 shellExecutor.execute(); 58 } catch (IOException e) { 59 LOG.error(StringUtils.stringifyException(e)); 60 } finally { 61 String output = shellExecutor.getOutput(); 62 if (output != null && 63 output.contains("Prints to stdout a list of processes in the task")) { 64 return true; 65 } 66 } 67 } 68 return false; 69 } 70 WindowsBasedProcessTree(String pid)71 public WindowsBasedProcessTree(String pid) { 72 super(pid); 73 taskProcessId = pid; 74 } 75 76 // helper method to override while testing getAllProcessInfoFromShell()77 String getAllProcessInfoFromShell() { 78 ShellCommandExecutor shellExecutor = new ShellCommandExecutor( 79 new String[] { Shell.WINUTILS, "task", "processList", taskProcessId }); 80 try { 81 shellExecutor.execute(); 82 return shellExecutor.getOutput(); 83 } catch (IOException e) { 84 LOG.error(StringUtils.stringifyException(e)); 85 } 86 return null; 87 } 88 89 /** 90 * Parses string of process info lines into ProcessInfo objects 91 * @param processesInfoStr 92 * @return Map of pid string to ProcessInfo objects 93 */ createProcessInfo(String processesInfoStr)94 Map<String, ProcessInfo> createProcessInfo(String processesInfoStr) { 95 String[] processesStr = processesInfoStr.split("\r\n"); 96 Map<String, ProcessInfo> allProcs = new HashMap<String, ProcessInfo>(); 97 final int procInfoSplitCount = 4; 98 for (String processStr : processesStr) { 99 if (processStr != null) { 100 String[] procInfo = processStr.split(","); 101 if (procInfo.length == procInfoSplitCount) { 102 try { 103 ProcessInfo pInfo = new ProcessInfo(); 104 pInfo.pid = procInfo[0]; 105 pInfo.vmem = Long.parseLong(procInfo[1]); 106 pInfo.workingSet = Long.parseLong(procInfo[2]); 107 pInfo.cpuTimeMs = Long.parseLong(procInfo[3]); 108 allProcs.put(pInfo.pid, pInfo); 109 } catch (NumberFormatException nfe) { 110 LOG.debug("Error parsing procInfo." + nfe); 111 } 112 } else { 113 LOG.debug("Expected split length of proc info to be " 114 + procInfoSplitCount + ". Got " + procInfo.length); 115 } 116 } 117 } 118 return allProcs; 119 } 120 121 @Override updateProcessTree()122 public void updateProcessTree() { 123 if(taskProcessId != null) { 124 // taskProcessId can be null in some tests 125 String processesInfoStr = getAllProcessInfoFromShell(); 126 if (processesInfoStr != null && processesInfoStr.length() > 0) { 127 Map<String, ProcessInfo> allProcessInfo = createProcessInfo(processesInfoStr); 128 129 for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) { 130 String pid = entry.getKey(); 131 ProcessInfo pInfo = entry.getValue(); 132 ProcessInfo oldInfo = processTree.get(pid); 133 if (oldInfo != null) { 134 // existing process, update age and replace value 135 pInfo.age += oldInfo.age; 136 // calculate the delta since the last refresh. totals are being kept 137 // in the WindowsBasedProcessTree object 138 pInfo.cpuTimeMsDelta = pInfo.cpuTimeMs - oldInfo.cpuTimeMs; 139 } else { 140 // new process. delta cpu == total cpu 141 pInfo.cpuTimeMsDelta = pInfo.cpuTimeMs; 142 } 143 } 144 processTree.clear(); 145 processTree = allProcessInfo; 146 } else { 147 // clearing process tree to mimic semantics of existing Procfs impl 148 processTree.clear(); 149 } 150 } 151 } 152 153 @Override checkPidPgrpidForMatch()154 public boolean checkPidPgrpidForMatch() { 155 // This is always true on Windows, because the pid doubles as a job object 156 // name for task management. 157 return true; 158 } 159 160 @Override getProcessTreeDump()161 public String getProcessTreeDump() { 162 StringBuilder ret = new StringBuilder(); 163 // The header. 164 ret.append(String.format("\t|- PID " + "CPU_TIME(MILLIS) " 165 + "VMEM(BYTES) WORKING_SET(BYTES)%n")); 166 for (ProcessInfo p : processTree.values()) { 167 if (p != null) { 168 ret.append(String.format("\t|- %s %d %d %d%n", p.pid, 169 p.cpuTimeMs, p.vmem, p.workingSet)); 170 } 171 } 172 return ret.toString(); 173 } 174 175 @Override getVirtualMemorySize(int olderThanAge)176 public long getVirtualMemorySize(int olderThanAge) { 177 long total = UNAVAILABLE; 178 for (ProcessInfo p : processTree.values()) { 179 if (p != null) { 180 if (total == UNAVAILABLE) { 181 total = 0; 182 } 183 if (p.age > olderThanAge) { 184 total += p.vmem; 185 } 186 } 187 } 188 return total; 189 } 190 191 @Override 192 @SuppressWarnings("deprecation") getCumulativeVmem(int olderThanAge)193 public long getCumulativeVmem(int olderThanAge) { 194 return getVirtualMemorySize(olderThanAge); 195 } 196 197 @Override getRssMemorySize(int olderThanAge)198 public long getRssMemorySize(int olderThanAge) { 199 long total = UNAVAILABLE; 200 for (ProcessInfo p : processTree.values()) { 201 if (p != null) { 202 if (total == UNAVAILABLE) { 203 total = 0; 204 } 205 if (p.age > olderThanAge) { 206 total += p.workingSet; 207 } 208 } 209 } 210 return total; 211 } 212 213 @Override 214 @SuppressWarnings("deprecation") getCumulativeRssmem(int olderThanAge)215 public long getCumulativeRssmem(int olderThanAge) { 216 return getRssMemorySize(olderThanAge); 217 } 218 219 @Override getCumulativeCpuTime()220 public long getCumulativeCpuTime() { 221 for (ProcessInfo p : processTree.values()) { 222 if (cpuTimeMs == UNAVAILABLE) { 223 cpuTimeMs = 0; 224 } 225 cpuTimeMs += p.cpuTimeMsDelta; 226 } 227 return cpuTimeMs; 228 } 229 230 @Override getCpuUsagePercent()231 public float getCpuUsagePercent() { 232 return CpuTimeTracker.UNAVAILABLE; 233 } 234 235 } 236