1# coding=utf-8 2""" 3Copyright 2013 LinkedIn Corp. All rights reserved. 4 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16""" 17 18import gc 19import os 20import re 21import logging 22from naarad.metrics.metric import Metric 23import naarad.utils 24 25logger = logging.getLogger('naarad.metrics.top_metric') 26 27 28class TopMetric(Metric): 29 def __init__(self, metric_type, infile, hostname, output_directory, resource_path, label, ts_start, ts_end, 30 rule_strings, important_sub_metrics, anomaly_detection_metrics, **other_options): 31 Metric.__init__(self, metric_type, infile, hostname, output_directory, resource_path, label, ts_start, ts_end, 32 rule_strings, important_sub_metrics, anomaly_detection_metrics) 33 34 # Allow user to specify interested processes; in the format of 'PID=11 22' and 'COMMAND=firefox top' 35 # It will search for any processes that match the PIDs listed or the commands listed. It's not an intersection of the PIDs and commands. 36 self.PID = [] 37 self.COMMAND = [] 38 self.ts_valid_lines = True 39 40 for (key, val) in other_options.iteritems(): 41 setattr(self, key, val.split()) 42 43 self.sub_metrics = None 44 self.process_headers = [] 45 self.ts = '' 46 self.ts_date = '' 47 self.ts_time = '' 48 self.saw_pid = False # Controls when to process individual commands; 49 50 self.data = {} # Stores all data to be written out 51 52 for key, val in other_options.iteritems(): 53 setattr(self, key, val.split()) 54 55 self.sub_metric_description = { 56 'uptime_minute': 'uptime of the machine', 57 'num_users': 'users sessions logged in', 58 'load_aver_1_minute': 'average load on the system (last 1 minute)', 59 'load_aver_5_minute': 'average load on the system (last 5 minutes)', 60 'load_aver_15_minute': 'average load on the system (last 15 minutes)', 61 'tasks_total': 'total processes', 62 'tasks_running': 'processes running', 63 'tasks_sleeping': 'processes sleeping', 64 'tasks_stopped': 'processes stopped', 65 'tasks_zombie': 'zombies', 66 'cpu_us': 'cpu percentage of running user processes', 67 'cpu_sy': 'cpu percentage of running system processes', 68 'cpu_id': 'cpu percentage of idel time', 69 'cpu_ni': 'cpu percentage of running niced processes', 70 'cpu_wa': 'cpu percentage of waiting for IO', 71 'cpu_hi': 'cpu percentage of serving hardware IRQ', 72 'cpu_si': 'cpu percentage of serving software IRQ', 73 'cpu_st': 'cpu percentage of being stolen', 74 'mem_total': 'total memory in GB', 75 'mem_used': 'total memory in use in GB', 76 'mem_free': 'total free memory in GB', 77 'mem_buffers': 'total buffers in GB', 78 'swap_total': 'total swap size in GB', 79 'swap_used': 'total swap in use in GB', 80 'swap_free': 'total free swap in GB', 81 'swap_cached': 'total swap cache in GB', 82 } 83 84 def put_values_into_data(self, values): 85 """ 86 Take the (col, value) in 'values', append value into 'col' in self.data[] 87 """ 88 for col, value in values.items(): 89 if col in self.column_csv_map: 90 out_csv = self.column_csv_map[col] 91 else: 92 out_csv = self.get_csv(col) # column_csv_map[] is assigned in get_csv() 93 self.data[out_csv] = [] 94 self.data[out_csv].append(self.ts + "," + value) 95 96 def process_top_line(self, words): 97 """ 98 Process the line starting with "top" 99 Example log: top - 00:00:02 up 32 days, 7:08, 19 users, load average: 0.00, 0.00, 0.00 100 """ 101 self.ts_time = words[2] 102 self.ts = self.ts_date + ' ' + self.ts_time 103 self.ts = ts = naarad.utils.get_standardized_timestamp(self.ts, None) 104 105 if self.ts_out_of_range(self.ts): 106 self.ts_valid_lines = False 107 else: 108 self.ts_valid_lines = True 109 up_days = int(words[4]) 110 up_hour_minute = words[6].split(':') # E.g. '4:02,' 111 up_minutes = int(up_hour_minute[0]) * 60 + int(up_hour_minute[1].split(',')[0]) 112 uptime_minute = up_days * 24 * 60 + up_minutes # Converting days to minutes 113 114 values = {} 115 values['uptime_minute'] = str(uptime_minute) 116 values['num_users'] = words[7] 117 values['load_aver_1_minute'] = words[11][:-1] 118 values['load_aver_5_minute'] = words[12][:-1] 119 values['load_aver_15_minute'] = words[13] 120 self.put_values_into_data(values) 121 122 def process_tasks_line(self, words): 123 """ 124 Process the line starting with "Tasks:" 125 Example log: Tasks: 446 total, 1 running, 442 sleeping, 2 stopped, 1 zombie 126 """ 127 words = words[1:] 128 length = len(words) / 2 # The number of pairs 129 values = {} 130 for offset in range(length): 131 k = words[2 * offset + 1].strip(',') 132 v = words[2 * offset] 133 values['tasks_' + k] = v 134 self.put_values_into_data(values) 135 136 def process_cpu_line(self, words): 137 """ 138 Process the line starting with "Cpu(s):" 139 Example log: Cpu(s): 1.3%us, 0.5%sy, 0.0%ni, 98.2%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st 140 """ 141 142 values = {} 143 for word in words[1:]: 144 val, key = word.split('%') 145 values['cpu_' + key.strip(',')] = val 146 self.put_values_into_data(values) 147 148 def convert_to_G(self, word): 149 """ 150 Given a size such as '2333M', return the converted value in G 151 """ 152 value = 0.0 153 if word[-1] == 'G' or word[-1] == 'g': 154 value = float(word[:-1]) 155 elif word[-1] == 'M' or word[-1] == 'm': 156 value = float(word[:-1]) / 1000.0 157 elif word[-1] == 'K' or word[-1] == 'k': 158 value = float(word[:-1]) / 1000.0 / 1000.0 159 else: # No unit 160 value = float(word) / 1000.0 / 1000.0 / 1000.0 161 return str(value) 162 163 def process_mem_line(self, words): 164 """ 165 Process the line starting with "Mem:" 166 Example log: Mem: 62.841G total, 16.038G used, 46.803G free, 650.312M buffers 167 For each value, needs to convert to 'G' (needs to handle cases of K, M) 168 """ 169 words = words[1:] 170 length = len(words) / 2 # The number of pairs 171 values = {} 172 for offset in range(length): 173 k = words[2 * offset + 1].strip(',') 174 v = self.convert_to_G(words[2 * offset]) 175 values['mem_' + k] = v 176 self.put_values_into_data(values) 177 178 def process_swap_line(self, words): 179 """ 180 Process the line starting with "Swap:" 181 Example log: Swap: 63.998G total, 0.000k used, 63.998G free, 11.324G cached 182 For each value, needs to convert to 'G' (needs to handle cases of K, M) 183 """ 184 words = words[1:] 185 length = len(words) / 2 # The number of pairs 186 values = {} 187 for offset in range(length): 188 k = words[2 * offset + 1].strip(',') 189 v = self.convert_to_G(words[2 * offset]) 190 values['swap_' + k] = v 191 self.put_values_into_data(values) 192 193 def process_individual_command(self, words): 194 """ 195 process the individual lines like this: 196 #PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 197 29303 root 20 0 35300 2580 1664 R 3.9 0.0 0:00.02 top 198 11 root RT 0 0 0 0 S 1.9 0.0 0:18.87 migration/2 199 3702 root 20 0 34884 4192 1692 S 1.9 0.0 31:40.47 cf-serverd 200 It does not record all processes due to memory concern; rather only records interested processes (based on user input of PID and COMMAND) 201 """ 202 pid_index = self.process_headers.index('PID') 203 proces_index = self.process_headers.index('COMMAND') 204 205 pid = words[pid_index] 206 process = words[proces_index] 207 if pid in self.PID or process in self.COMMAND: 208 process_name = process.split('/')[0] 209 210 values = {} 211 for word_col in self.process_headers: 212 word_index = self.process_headers.index(word_col) 213 if word_col in ['VIRT', 'RES', 'SHR']: # These values need to convert to 'G' 214 values[process_name + '_' + pid + '_' + word_col] = self.convert_to_G(words[word_index]) 215 elif word_col in ['PR', 'NI', '%CPU', '%MEM']: # These values will be assigned later or ignored 216 values[process_name + '_' + pid + '_' + word_col.strip('%')] = words[word_index] 217 218 uptime_index = self.process_headers.index('TIME+') 219 uptime = words[uptime_index].split(':') 220 uptime_sec = float(uptime[0]) * 60 + float(uptime[1]) 221 values[process_name + '_' + pid + '_' + 'TIME'] = str(uptime_sec) 222 self.put_values_into_data(values) 223 224 def parse(self): 225 """ 226 Parse the top output file 227 Return status of the metric parse 228 229 The raw log file is like the following: 230 2014-06-23 231 top - 00:00:02 up 18 days, 7:08, 19 users, load average: 0.05, 0.03, 0.00 232 Tasks: 447 total, 1 running, 443 sleeping, 2 stopped, 1 zombie 233 Cpu(s): 1.6%us, 0.5%sy, 0.0%ni, 97.9%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st 234 Mem: 62.841G total, 15.167G used, 47.675G free, 643.434M buffers 235 Swap: 63.998G total, 0.000k used, 63.998G free, 11.324G cached 236 237 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 238 1730 root 20 0 4457m 10m 3328 S 1.9 0.0 80:13.45 lwregd 239 The log lines can be generated by echo $t >> $RESULT/top.out &; top -b -n $COUNT -d $INTERVAL | grep -A 40 '^top' >> $RESULT/top.out & 240 """ 241 242 for infile in self.infile_list: 243 logger.info('Processing : %s', infile) 244 status = True 245 file_status = naarad.utils.is_valid_file(infile) 246 if not file_status: 247 return False 248 249 with open(infile) as fh: 250 for line in fh: 251 words = line.split() 252 if not words: 253 continue 254 255 # Pattern matches line of '2014-02-03' 256 if re.match('^\d\d\d\d-\d\d-\d\d$', line): 257 self.ts_date = words[0] 258 continue 259 260 prefix_word = words[0].strip() 261 if prefix_word == 'top': 262 self.process_top_line(words) 263 self.saw_pid = False # Turn off the processing of individual process line 264 elif self.ts_valid_lines: 265 if prefix_word == 'Tasks:': 266 self.process_tasks_line(words) 267 elif prefix_word == 'Cpu(s):': 268 self.process_cpu_line(words) 269 elif prefix_word == 'Mem:': 270 self.process_mem_line(words) 271 elif prefix_word == 'Swap:': 272 self.process_swap_line(words) 273 elif prefix_word == 'PID': 274 self.saw_pid = True # Turn on the processing of individual process line 275 self.process_headers = words 276 else: # Each individual process line 277 if self.saw_pid and len(words) >= len(self.process_headers): # Only valid process lines 278 self.process_individual_command(words) 279 280 # Putting data in csv files; 281 for out_csv in self.data.keys(): # All sub_metrics 282 self.csv_files.append(out_csv) 283 with open(out_csv, 'w') as fh: 284 fh.write('\n'.join(self.data[out_csv])) 285 286 gc.collect() 287 return status 288