1# -*- coding: utf-8 -*- 2# (c) 2018 Matt Martz <matt@sivel.net> 3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 4 5# Make coding more python3-ish 6from __future__ import (absolute_import, division, print_function) 7__metaclass__ = type 8 9ANSIBLE_METADATA = {'metadata_version': '1.1', 10 'status': ['preview'], 11 'supported_by': 'community'} 12 13DOCUMENTATION = ''' 14 callback: cgroup_perf_recap 15 callback_type: aggregate 16 requirements: 17 - whitelist in configuration 18 - cgroups 19 short_description: Profiles system activity of tasks and full execution using cgroups 20 description: 21 - This is an ansible callback plugin utilizes cgroups to profile system activity of ansible and 22 individual tasks, and display a recap at the end of the playbook execution 23 notes: 24 - Requires ansible to be run from within a cgroup, such as with 25 C(cgexec -g cpuacct,memory,pids:ansible_profile ansible-playbook ...) 26 - This cgroup should only be used by ansible to get accurate results 27 - To create the cgroup, first use a command such as 28 C(sudo cgcreate -a ec2-user:ec2-user -t ec2-user:ec2-user -g cpuacct,memory,pids:ansible_profile) 29 options: 30 control_group: 31 required: True 32 description: Name of cgroups control group 33 env: 34 - name: CGROUP_CONTROL_GROUP 35 ini: 36 - section: callback_cgroup_perf_recap 37 key: control_group 38 cpu_poll_interval: 39 description: Interval between CPU polling for determining CPU usage. A lower value may produce inaccurate 40 results, a higher value may not be short enough to collect results for short tasks. 41 default: 0.25 42 type: float 43 env: 44 - name: CGROUP_CPU_POLL_INTERVAL 45 ini: 46 - section: callback_cgroup_perf_recap 47 key: cpu_poll_interval 48 memory_poll_interval: 49 description: Interval between memory polling for determining memory usage. A lower value may produce inaccurate 50 results, a higher value may not be short enough to collect results for short tasks. 51 default: 0.25 52 type: float 53 env: 54 - name: CGROUP_MEMORY_POLL_INTERVAL 55 ini: 56 - section: callback_cgroup_perf_recap 57 key: memory_poll_interval 58 pid_poll_interval: 59 description: Interval between PID polling for determining PID count. A lower value may produce inaccurate 60 results, a higher value may not be short enough to collect results for short tasks. 61 default: 0.25 62 type: float 63 env: 64 - name: CGROUP_PID_POLL_INTERVAL 65 ini: 66 - section: callback_cgroup_perf_recap 67 key: pid_poll_interval 68 display_recap: 69 description: Controls whether the recap is printed at the end, useful if you will automatically 70 process the output files 71 env: 72 - name: CGROUP_DISPLAY_RECAP 73 ini: 74 - section: callback_cgroup_perf_recap 75 key: display_recap 76 type: bool 77 default: true 78 file_name_format: 79 description: Format of filename. Accepts C(%(counter)s), C(%(task_uuid)s), 80 C(%(feature)s), C(%(ext)s). Defaults to C(%(feature)s.%(ext)s) when C(file_per_task) is C(False) 81 and C(%(counter)s-%(task_uuid)s-%(feature)s.%(ext)s) when C(True) 82 env: 83 - name: CGROUP_FILE_NAME_FORMAT 84 ini: 85 - section: callback_cgroup_perf_recap 86 key: file_name_format 87 type: str 88 default: '%(feature)s.%(ext)s' 89 output_dir: 90 description: Output directory for files containing recorded performance readings. If the value contains a 91 single %s, the start time of the playbook run will be inserted in that space. Only the deepest 92 level directory will be created if it does not exist, parent directories will not be created. 93 type: path 94 default: /tmp/ansible-perf-%s 95 env: 96 - name: CGROUP_OUTPUT_DIR 97 ini: 98 - section: callback_cgroup_perf_recap 99 key: output_dir 100 output_format: 101 description: Output format, either CSV or JSON-seq 102 env: 103 - name: CGROUP_OUTPUT_FORMAT 104 ini: 105 - section: callback_cgroup_perf_recap 106 key: output_format 107 type: str 108 default: csv 109 choices: 110 - csv 111 - json 112 file_per_task: 113 description: When set as C(True) along with C(write_files), this callback will write 1 file per task 114 instead of 1 file for the entire playbook run 115 env: 116 - name: CGROUP_FILE_PER_TASK 117 ini: 118 - section: callback_cgroup_perf_recap 119 key: file_per_task 120 type: bool 121 default: False 122 write_files: 123 description: Dictates whether files will be written containing performance readings 124 env: 125 - name: CGROUP_WRITE_FILES 126 ini: 127 - section: callback_cgroup_perf_recap 128 key: write_files 129 type: bool 130 default: false 131''' 132 133import csv 134import datetime 135import os 136import time 137import threading 138 139from abc import ABCMeta, abstractmethod 140 141from functools import partial 142 143from ansible.module_utils._text import to_bytes, to_text 144from ansible.module_utils.six import with_metaclass 145from ansible.parsing.ajson import AnsibleJSONEncoder, json 146from ansible.plugins.callback import CallbackBase 147 148 149RS = '\x1e' # RECORD SEPARATOR 150LF = '\x0a' # LINE FEED 151 152 153def dict_fromkeys(keys, default=None): 154 d = {} 155 for key in keys: 156 d[key] = default() if callable(default) else default 157 return d 158 159 160class BaseProf(with_metaclass(ABCMeta, threading.Thread)): 161 def __init__(self, path, obj=None, writer=None): 162 threading.Thread.__init__(self) # pylint: disable=non-parent-init-called 163 self.obj = obj 164 self.path = path 165 self.max = 0 166 self.running = True 167 self.writer = writer 168 169 def run(self): 170 while self.running: 171 self.poll() 172 173 @abstractmethod 174 def poll(self): 175 pass 176 177 178class MemoryProf(BaseProf): 179 """Python thread for recording memory usage""" 180 def __init__(self, path, poll_interval=0.25, obj=None, writer=None): 181 super(MemoryProf, self).__init__(path, obj=obj, writer=writer) 182 self._poll_interval = poll_interval 183 184 def poll(self): 185 with open(self.path) as f: 186 val = int(f.read().strip()) / 1024**2 187 if val > self.max: 188 self.max = val 189 if self.writer: 190 try: 191 self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val) 192 except ValueError: 193 # We may be profiling after the playbook has ended 194 self.running = False 195 time.sleep(self._poll_interval) 196 197 198class CpuProf(BaseProf): 199 def __init__(self, path, poll_interval=0.25, obj=None, writer=None): 200 super(CpuProf, self).__init__(path, obj=obj, writer=writer) 201 self._poll_interval = poll_interval 202 203 def poll(self): 204 with open(self.path) as f: 205 start_time = time.time() * 1000**2 206 start_usage = int(f.read().strip()) / 1000 207 time.sleep(self._poll_interval) 208 with open(self.path) as f: 209 end_time = time.time() * 1000**2 210 end_usage = int(f.read().strip()) / 1000 211 val = (end_usage - start_usage) / (end_time - start_time) * 100 212 if val > self.max: 213 self.max = val 214 if self.writer: 215 try: 216 self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val) 217 except ValueError: 218 # We may be profiling after the playbook has ended 219 self.running = False 220 221 222class PidsProf(BaseProf): 223 def __init__(self, path, poll_interval=0.25, obj=None, writer=None): 224 super(PidsProf, self).__init__(path, obj=obj, writer=writer) 225 self._poll_interval = poll_interval 226 227 def poll(self): 228 with open(self.path) as f: 229 val = int(f.read().strip()) 230 if val > self.max: 231 self.max = val 232 if self.writer: 233 try: 234 self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val) 235 except ValueError: 236 # We may be profiling after the playbook has ended 237 self.running = False 238 time.sleep(self._poll_interval) 239 240 241def csv_writer(writer, timestamp, task_name, task_uuid, value): 242 writer.writerow([timestamp, task_name, task_uuid, value]) 243 244 245def json_writer(writer, timestamp, task_name, task_uuid, value): 246 data = { 247 'timestamp': timestamp, 248 'task_name': task_name, 249 'task_uuid': task_uuid, 250 'value': value, 251 } 252 writer.write('%s%s%s' % (RS, json.dumps(data, cls=AnsibleJSONEncoder), LF)) 253 254 255class CallbackModule(CallbackBase): 256 CALLBACK_VERSION = 2.0 257 CALLBACK_TYPE = 'aggregate' 258 CALLBACK_NAME = 'ansible.posix.cgroup_perf_recap' 259 CALLBACK_NEEDS_WHITELIST = True 260 261 def __init__(self, display=None): 262 super(CallbackModule, self).__init__(display) 263 264 self._features = ('memory', 'cpu', 'pids') 265 266 self._units = { 267 'memory': 'MB', 268 'cpu': '%', 269 'pids': '', 270 } 271 272 self.task_results = dict_fromkeys(self._features, default=list) 273 self._profilers = dict.fromkeys(self._features) 274 self._files = dict.fromkeys(self._features) 275 self._writers = dict.fromkeys(self._features) 276 277 self._file_per_task = False 278 self._counter = 0 279 self.write_files = False 280 281 def _open_files(self, task_uuid=None): 282 output_format = self._output_format 283 output_dir = self._output_dir 284 285 for feature in self._features: 286 data = { 287 b'counter': to_bytes(self._counter), 288 b'task_uuid': to_bytes(task_uuid), 289 b'feature': to_bytes(feature), 290 b'ext': to_bytes(output_format) 291 } 292 293 if self._files.get(feature): 294 try: 295 self._files[feature].close() 296 except Exception: 297 pass 298 299 if self.write_files: 300 filename = self._file_name_format % data 301 302 self._files[feature] = open(os.path.join(output_dir, filename), 'w+') 303 if output_format == b'csv': 304 self._writers[feature] = partial(csv_writer, csv.writer(self._files[feature])) 305 elif output_format == b'json': 306 self._writers[feature] = partial(json_writer, self._files[feature]) 307 308 def set_options(self, task_keys=None, var_options=None, direct=None): 309 super(CallbackModule, self).set_options(task_keys=task_keys, var_options=var_options, direct=direct) 310 311 cpu_poll_interval = self.get_option('cpu_poll_interval') 312 memory_poll_interval = self.get_option('memory_poll_interval') 313 pid_poll_interval = self.get_option('pid_poll_interval') 314 self._display_recap = self.get_option('display_recap') 315 316 control_group = to_bytes(self.get_option('control_group'), errors='surrogate_or_strict') 317 self.mem_max_file = b'/sys/fs/cgroup/memory/%s/memory.max_usage_in_bytes' % control_group 318 mem_current_file = b'/sys/fs/cgroup/memory/%s/memory.usage_in_bytes' % control_group 319 cpu_usage_file = b'/sys/fs/cgroup/cpuacct/%s/cpuacct.usage' % control_group 320 pid_current_file = b'/sys/fs/cgroup/pids/%s/pids.current' % control_group 321 322 for path in (self.mem_max_file, mem_current_file, cpu_usage_file, pid_current_file): 323 try: 324 with open(path) as f: 325 pass 326 except Exception as e: 327 self._display.warning( 328 u'Cannot open %s for reading (%s). Disabling %s' % (to_text(path), to_text(e), self.CALLBACK_NAME) 329 ) 330 self.disabled = True 331 return 332 333 try: 334 with open(self.mem_max_file, 'w+') as f: 335 f.write('0') 336 except Exception as e: 337 self._display.warning( 338 u'Unable to reset max memory value in %s: %s' % (to_text(self.mem_max_file), to_text(e)) 339 ) 340 self.disabled = True 341 return 342 343 try: 344 with open(cpu_usage_file, 'w+') as f: 345 f.write('0') 346 except Exception as e: 347 self._display.warning( 348 u'Unable to reset CPU usage value in %s: %s' % (to_text(cpu_usage_file), to_text(e)) 349 ) 350 self.disabled = True 351 return 352 353 self._profiler_map = { 354 'memory': partial(MemoryProf, mem_current_file, poll_interval=memory_poll_interval), 355 'cpu': partial(CpuProf, cpu_usage_file, poll_interval=cpu_poll_interval), 356 'pids': partial(PidsProf, pid_current_file, poll_interval=pid_poll_interval), 357 } 358 359 self.write_files = self.get_option('write_files') 360 file_per_task = self.get_option('file_per_task') 361 self._output_format = to_bytes(self.get_option('output_format')) 362 output_dir = to_bytes(self.get_option('output_dir'), errors='surrogate_or_strict') 363 try: 364 output_dir %= to_bytes(datetime.datetime.now().isoformat()) 365 except TypeError: 366 pass 367 368 self._output_dir = output_dir 369 370 file_name_format = to_bytes(self.get_option('file_name_format')) 371 372 if self.write_files: 373 if file_per_task: 374 self._file_per_task = True 375 if file_name_format == b'%(feature)s.%(ext)s': 376 file_name_format = b'%(counter)s-%(task_uuid)s-%(feature)s.%(ext)s' 377 else: 378 file_name_format = to_bytes(self.get_option('file_name_format')) 379 380 self._file_name_format = file_name_format 381 382 if not os.path.exists(output_dir): 383 try: 384 os.mkdir(output_dir) 385 except Exception as e: 386 self._display.warning( 387 u'Could not create the output directory at %s: %s' % (to_text(output_dir), to_text(e)) 388 ) 389 self.disabled = True 390 return 391 392 if not self._file_per_task: 393 self._open_files() 394 395 def _profile(self, obj=None): 396 prev_task = None 397 results = dict.fromkeys(self._features) 398 if not obj or self._file_per_task: 399 for dummy, f in self._files.items(): 400 if f is None: 401 continue 402 try: 403 f.close() 404 except Exception: 405 pass 406 407 try: 408 for name, prof in self._profilers.items(): 409 prof.running = False 410 411 for name, prof in self._profilers.items(): 412 results[name] = prof.max 413 prev_task = prof.obj 414 except AttributeError: 415 pass 416 417 for name, result in results.items(): 418 if result is not None: 419 try: 420 self.task_results[name].append((prev_task, result)) 421 except ValueError: 422 pass 423 424 if obj is not None: 425 if self._file_per_task or self._counter == 0: 426 self._open_files(task_uuid=obj._uuid) 427 428 for feature in self._features: 429 self._profilers[feature] = self._profiler_map[feature](obj=obj, writer=self._writers[feature]) 430 self._profilers[feature].start() 431 432 self._counter += 1 433 434 def v2_playbook_on_task_start(self, task, is_conditional): 435 self._profile(task) 436 437 def v2_playbook_on_stats(self, stats): 438 self._profile() 439 440 if not self._display_recap: 441 return 442 443 with open(self.mem_max_file) as f: 444 max_results = int(f.read().strip()) / 1024 / 1024 445 446 self._display.banner('CGROUP PERF RECAP') 447 self._display.display('Memory Execution Maximum: %0.2fMB\n' % max_results) 448 for name, data in self.task_results.items(): 449 if name == 'memory': 450 continue 451 try: 452 self._display.display( 453 '%s Execution Maximum: %0.2f%s\n' % (name, max((t[1] for t in data)), self._units[name]) 454 ) 455 except Exception as e: 456 self._display.display('%s profiling error: no results collected: %s\n' % (name, e)) 457 458 self._display.display('\n') 459 460 for name, data in self.task_results.items(): 461 if data: 462 self._display.display('%s:\n' % name) 463 for task, value in data: 464 self._display.display('%s (%s): %0.2f%s' % (task.get_name(), task._uuid, value, self._units[name])) 465 self._display.display('\n') 466