1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.util;
20 
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.Map;
24 
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.classification.InterfaceAudience.Private;
28 import org.apache.hadoop.util.Shell;
29 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
30 import org.apache.hadoop.util.StringUtils;
31 
32 @Private
33 public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
34 
35   static final Log LOG = LogFactory
36       .getLog(WindowsBasedProcessTree.class);
37 
38   static class ProcessInfo {
39     String pid; // process pid
40     long vmem; // virtual memory
41     long workingSet; // working set, RAM used
42     long cpuTimeMs; // total cpuTime in millisec
43     long cpuTimeMsDelta; // delta of cpuTime since last update
44     int age = 1;
45   }
46 
47   private String taskProcessId = null;
48   private long cpuTimeMs = UNAVAILABLE;
49   private Map<String, ProcessInfo> processTree =
50       new HashMap<String, ProcessInfo>();
51 
isAvailable()52   public static boolean isAvailable() {
53     if (Shell.WINDOWS) {
54       ShellCommandExecutor shellExecutor = new ShellCommandExecutor(
55           new String[] { Shell.WINUTILS, "help" });
56       try {
57         shellExecutor.execute();
58       } catch (IOException e) {
59         LOG.error(StringUtils.stringifyException(e));
60       } finally {
61         String output = shellExecutor.getOutput();
62         if (output != null &&
63             output.contains("Prints to stdout a list of processes in the task")) {
64           return true;
65         }
66       }
67     }
68     return false;
69   }
70 
WindowsBasedProcessTree(String pid)71   public WindowsBasedProcessTree(String pid) {
72     super(pid);
73     taskProcessId = pid;
74   }
75 
76   // helper method to override while testing
getAllProcessInfoFromShell()77   String getAllProcessInfoFromShell() {
78     ShellCommandExecutor shellExecutor = new ShellCommandExecutor(
79         new String[] { Shell.WINUTILS, "task", "processList", taskProcessId });
80     try {
81       shellExecutor.execute();
82       return shellExecutor.getOutput();
83     } catch (IOException e) {
84       LOG.error(StringUtils.stringifyException(e));
85     }
86     return null;
87   }
88 
89   /**
90    * Parses string of process info lines into ProcessInfo objects
91    * @param processesInfoStr
92    * @return Map of pid string to ProcessInfo objects
93    */
createProcessInfo(String processesInfoStr)94   Map<String, ProcessInfo> createProcessInfo(String processesInfoStr) {
95     String[] processesStr = processesInfoStr.split("\r\n");
96     Map<String, ProcessInfo> allProcs = new HashMap<String, ProcessInfo>();
97     final int procInfoSplitCount = 4;
98     for (String processStr : processesStr) {
99       if (processStr != null) {
100         String[] procInfo = processStr.split(",");
101         if (procInfo.length == procInfoSplitCount) {
102           try {
103             ProcessInfo pInfo = new ProcessInfo();
104             pInfo.pid = procInfo[0];
105             pInfo.vmem = Long.parseLong(procInfo[1]);
106             pInfo.workingSet = Long.parseLong(procInfo[2]);
107             pInfo.cpuTimeMs = Long.parseLong(procInfo[3]);
108             allProcs.put(pInfo.pid, pInfo);
109           } catch (NumberFormatException nfe) {
110             LOG.debug("Error parsing procInfo." + nfe);
111           }
112         } else {
113           LOG.debug("Expected split length of proc info to be "
114               + procInfoSplitCount + ". Got " + procInfo.length);
115         }
116       }
117     }
118     return allProcs;
119   }
120 
121   @Override
updateProcessTree()122   public void updateProcessTree() {
123     if(taskProcessId != null) {
124       // taskProcessId can be null in some tests
125       String processesInfoStr = getAllProcessInfoFromShell();
126       if (processesInfoStr != null && processesInfoStr.length() > 0) {
127         Map<String, ProcessInfo> allProcessInfo = createProcessInfo(processesInfoStr);
128 
129         for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
130           String pid = entry.getKey();
131           ProcessInfo pInfo = entry.getValue();
132           ProcessInfo oldInfo = processTree.get(pid);
133           if (oldInfo != null) {
134             // existing process, update age and replace value
135             pInfo.age += oldInfo.age;
136             // calculate the delta since the last refresh. totals are being kept
137             // in the WindowsBasedProcessTree object
138             pInfo.cpuTimeMsDelta = pInfo.cpuTimeMs - oldInfo.cpuTimeMs;
139           } else {
140             // new process. delta cpu == total cpu
141             pInfo.cpuTimeMsDelta = pInfo.cpuTimeMs;
142           }
143         }
144         processTree.clear();
145         processTree = allProcessInfo;
146       } else {
147         // clearing process tree to mimic semantics of existing Procfs impl
148         processTree.clear();
149       }
150     }
151   }
152 
153   @Override
checkPidPgrpidForMatch()154   public boolean checkPidPgrpidForMatch() {
155     // This is always true on Windows, because the pid doubles as a job object
156     // name for task management.
157     return true;
158   }
159 
160   @Override
getProcessTreeDump()161   public String getProcessTreeDump() {
162     StringBuilder ret = new StringBuilder();
163     // The header.
164     ret.append(String.format("\t|- PID " + "CPU_TIME(MILLIS) "
165         + "VMEM(BYTES) WORKING_SET(BYTES)%n"));
166     for (ProcessInfo p : processTree.values()) {
167       if (p != null) {
168         ret.append(String.format("\t|- %s %d %d %d%n", p.pid,
169             p.cpuTimeMs, p.vmem, p.workingSet));
170       }
171     }
172     return ret.toString();
173   }
174 
175   @Override
getVirtualMemorySize(int olderThanAge)176   public long getVirtualMemorySize(int olderThanAge) {
177     long total = UNAVAILABLE;
178     for (ProcessInfo p : processTree.values()) {
179       if (p != null) {
180         if (total == UNAVAILABLE) {
181           total = 0;
182         }
183         if (p.age > olderThanAge) {
184           total += p.vmem;
185         }
186       }
187     }
188     return total;
189   }
190 
191   @Override
192   @SuppressWarnings("deprecation")
getCumulativeVmem(int olderThanAge)193   public long getCumulativeVmem(int olderThanAge) {
194     return getVirtualMemorySize(olderThanAge);
195   }
196 
197   @Override
getRssMemorySize(int olderThanAge)198   public long getRssMemorySize(int olderThanAge) {
199     long total = UNAVAILABLE;
200     for (ProcessInfo p : processTree.values()) {
201       if (p != null) {
202         if (total == UNAVAILABLE) {
203           total = 0;
204         }
205         if (p.age > olderThanAge) {
206           total += p.workingSet;
207         }
208       }
209     }
210     return total;
211   }
212 
213   @Override
214   @SuppressWarnings("deprecation")
getCumulativeRssmem(int olderThanAge)215   public long getCumulativeRssmem(int olderThanAge) {
216     return getRssMemorySize(olderThanAge);
217   }
218 
219   @Override
getCumulativeCpuTime()220   public long getCumulativeCpuTime() {
221     for (ProcessInfo p : processTree.values()) {
222       if (cpuTimeMs == UNAVAILABLE) {
223         cpuTimeMs = 0;
224       }
225       cpuTimeMs += p.cpuTimeMsDelta;
226     }
227     return cpuTimeMs;
228   }
229 
230   @Override
getCpuUsagePercent()231   public float getCpuUsagePercent() {
232     return CpuTimeTracker.UNAVAILABLE;
233   }
234 
235 }
236