1# -*- coding: utf-8 -*- 2# (c) 2018 Matt Martz <matt@sivel.net> 3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 4 5# Make coding more python3-ish 6from __future__ import (absolute_import, division, print_function) 7__metaclass__ = type 8 9ANSIBLE_METADATA = {'metadata_version': '1.1', 10 'status': ['preview'], 11 'supported_by': 'community'} 12 13DOCUMENTATION = ''' 14 callback: cgroup_perf_recap 15 callback_type: aggregate 16 requirements: 17 - whitelist in configuration 18 - cgroups 19 short_description: Profiles system activity of tasks and full execution using cgroups 20 version_added: "2.8" 21 description: 22 - This is an ansible callback plugin utilizes cgroups to profile system activity of ansible and 23 individual tasks, and display a recap at the end of the playbook execution 24 notes: 25 - Requires ansible to be run from within a cgroup, such as with 26 C(cgexec -g cpuacct,memory,pids:ansible_profile ansible-playbook ...) 27 - This cgroup should only be used by ansible to get accurate results 28 - To create the cgroup, first use a command such as 29 C(sudo cgcreate -a ec2-user:ec2-user -t ec2-user:ec2-user -g cpuacct,memory,pids:ansible_profile) 30 options: 31 control_group: 32 required: True 33 description: Name of cgroups control group 34 env: 35 - name: CGROUP_CONTROL_GROUP 36 ini: 37 - section: callback_cgroup_perf_recap 38 key: control_group 39 cpu_poll_interval: 40 description: Interval between CPU polling for determining CPU usage. A lower value may produce inaccurate 41 results, a higher value may not be short enough to collect results for short tasks. 42 default: 0.25 43 type: float 44 env: 45 - name: CGROUP_CPU_POLL_INTERVAL 46 ini: 47 - section: callback_cgroup_perf_recap 48 key: cpu_poll_interval 49 memory_poll_interval: 50 description: Interval between memory polling for determining memory usage. A lower value may produce inaccurate 51 results, a higher value may not be short enough to collect results for short tasks. 52 default: 0.25 53 type: float 54 env: 55 - name: CGROUP_MEMORY_POLL_INTERVAL 56 ini: 57 - section: callback_cgroup_perf_recap 58 key: memory_poll_interval 59 pid_poll_interval: 60 description: Interval between PID polling for determining PID count. A lower value may produce inaccurate 61 results, a higher value may not be short enough to collect results for short tasks. 62 default: 0.25 63 type: float 64 env: 65 - name: CGROUP_PID_POLL_INTERVAL 66 ini: 67 - section: callback_cgroup_perf_recap 68 key: pid_poll_interval 69 display_recap: 70 description: Controls whether the recap is printed at the end, useful if you will automatically 71 process the output files 72 env: 73 - name: CGROUP_DISPLAY_RECAP 74 ini: 75 - section: callback_cgroup_perf_recap 76 key: display_recap 77 type: bool 78 default: true 79 file_name_format: 80 description: Format of filename. Accepts C(%(counter)s), C(%(task_uuid)s), 81 C(%(feature)s), C(%(ext)s). Defaults to C(%(feature)s.%(ext)s) when C(file_per_task) is C(False) 82 and C(%(counter)s-%(task_uuid)s-%(feature)s.%(ext)s) when C(True) 83 env: 84 - name: CGROUP_FILE_NAME_FORMAT 85 ini: 86 - section: callback_cgroup_perf_recap 87 key: file_name_format 88 type: str 89 default: '%(feature)s.%(ext)s' 90 output_dir: 91 description: Output directory for files containing recorded performance readings. If the value contains a 92 single %s, the start time of the playbook run will be inserted in that space. Only the deepest 93 level directory will be created if it does not exist, parent directories will not be created. 94 type: path 95 default: /tmp/ansible-perf-%s 96 env: 97 - name: CGROUP_OUTPUT_DIR 98 ini: 99 - section: callback_cgroup_perf_recap 100 key: output_dir 101 output_format: 102 description: Output format, either CSV or JSON-seq 103 env: 104 - name: CGROUP_OUTPUT_FORMAT 105 ini: 106 - section: callback_cgroup_perf_recap 107 key: output_format 108 type: str 109 default: csv 110 choices: 111 - csv 112 - json 113 file_per_task: 114 description: When set as C(True) along with C(write_files), this callback will write 1 file per task 115 instead of 1 file for the entire playbook run 116 env: 117 - name: CGROUP_FILE_PER_TASK 118 ini: 119 - section: callback_cgroup_perf_recap 120 key: file_per_task 121 type: bool 122 default: False 123 write_files: 124 description: Dictates whether files will be written containing performance readings 125 env: 126 - name: CGROUP_WRITE_FILES 127 ini: 128 - section: callback_cgroup_perf_recap 129 key: write_files 130 type: bool 131 default: false 132''' 133 134import csv 135import datetime 136import os 137import time 138import threading 139 140from abc import ABCMeta, abstractmethod 141 142from functools import partial 143 144from ansible.module_utils._text import to_bytes, to_text 145from ansible.module_utils.six import with_metaclass 146from ansible.parsing.ajson import AnsibleJSONEncoder, json 147from ansible.plugins.callback import CallbackBase 148 149 150RS = '\x1e' # RECORD SEPARATOR 151LF = '\x0a' # LINE FEED 152 153 154def dict_fromkeys(keys, default=None): 155 d = {} 156 for key in keys: 157 d[key] = default() if callable(default) else default 158 return d 159 160 161class BaseProf(with_metaclass(ABCMeta, threading.Thread)): 162 def __init__(self, path, obj=None, writer=None): 163 threading.Thread.__init__(self) # pylint: disable=non-parent-init-called 164 self.obj = obj 165 self.path = path 166 self.max = 0 167 self.running = True 168 self.writer = writer 169 170 def run(self): 171 while self.running: 172 self.poll() 173 174 @abstractmethod 175 def poll(self): 176 pass 177 178 179class MemoryProf(BaseProf): 180 """Python thread for recording memory usage""" 181 def __init__(self, path, poll_interval=0.25, obj=None, writer=None): 182 super(MemoryProf, self).__init__(path, obj=obj, writer=writer) 183 self._poll_interval = poll_interval 184 185 def poll(self): 186 with open(self.path) as f: 187 val = int(f.read().strip()) / 1024**2 188 if val > self.max: 189 self.max = val 190 if self.writer: 191 try: 192 self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val) 193 except ValueError: 194 # We may be profiling after the playbook has ended 195 self.running = False 196 time.sleep(self._poll_interval) 197 198 199class CpuProf(BaseProf): 200 def __init__(self, path, poll_interval=0.25, obj=None, writer=None): 201 super(CpuProf, self).__init__(path, obj=obj, writer=writer) 202 self._poll_interval = poll_interval 203 204 def poll(self): 205 with open(self.path) as f: 206 start_time = time.time() * 1000**2 207 start_usage = int(f.read().strip()) / 1000 208 time.sleep(self._poll_interval) 209 with open(self.path) as f: 210 end_time = time.time() * 1000**2 211 end_usage = int(f.read().strip()) / 1000 212 val = (end_usage - start_usage) / (end_time - start_time) * 100 213 if val > self.max: 214 self.max = val 215 if self.writer: 216 try: 217 self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val) 218 except ValueError: 219 # We may be profiling after the playbook has ended 220 self.running = False 221 222 223class PidsProf(BaseProf): 224 def __init__(self, path, poll_interval=0.25, obj=None, writer=None): 225 super(PidsProf, self).__init__(path, obj=obj, writer=writer) 226 self._poll_interval = poll_interval 227 228 def poll(self): 229 with open(self.path) as f: 230 val = int(f.read().strip()) 231 if val > self.max: 232 self.max = val 233 if self.writer: 234 try: 235 self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val) 236 except ValueError: 237 # We may be profiling after the playbook has ended 238 self.running = False 239 time.sleep(self._poll_interval) 240 241 242def csv_writer(writer, timestamp, task_name, task_uuid, value): 243 writer.writerow([timestamp, task_name, task_uuid, value]) 244 245 246def json_writer(writer, timestamp, task_name, task_uuid, value): 247 data = { 248 'timestamp': timestamp, 249 'task_name': task_name, 250 'task_uuid': task_uuid, 251 'value': value, 252 } 253 writer.write('%s%s%s' % (RS, json.dumps(data, cls=AnsibleJSONEncoder), LF)) 254 255 256class CallbackModule(CallbackBase): 257 CALLBACK_VERSION = 2.0 258 CALLBACK_TYPE = 'aggregate' 259 CALLBACK_NAME = 'cgroup_perf_recap' 260 CALLBACK_NEEDS_WHITELIST = True 261 262 def __init__(self, display=None): 263 super(CallbackModule, self).__init__(display) 264 265 self._features = ('memory', 'cpu', 'pids') 266 267 self._units = { 268 'memory': 'MB', 269 'cpu': '%', 270 'pids': '', 271 } 272 273 self.task_results = dict_fromkeys(self._features, default=list) 274 self._profilers = dict.fromkeys(self._features) 275 self._files = dict.fromkeys(self._features) 276 self._writers = dict.fromkeys(self._features) 277 278 self._file_per_task = False 279 self._counter = 0 280 self.write_files = False 281 282 def _open_files(self, task_uuid=None): 283 output_format = self._output_format 284 output_dir = self._output_dir 285 286 for feature in self._features: 287 data = { 288 b'counter': to_bytes(self._counter), 289 b'task_uuid': to_bytes(task_uuid), 290 b'feature': to_bytes(feature), 291 b'ext': to_bytes(output_format) 292 } 293 294 if self._files.get(feature): 295 try: 296 self._files[feature].close() 297 except Exception: 298 pass 299 300 if self.write_files: 301 filename = self._file_name_format % data 302 303 self._files[feature] = open(os.path.join(output_dir, filename), 'w+') 304 if output_format == b'csv': 305 self._writers[feature] = partial(csv_writer, csv.writer(self._files[feature])) 306 elif output_format == b'json': 307 self._writers[feature] = partial(json_writer, self._files[feature]) 308 309 def set_options(self, task_keys=None, var_options=None, direct=None): 310 super(CallbackModule, self).set_options(task_keys=task_keys, var_options=var_options, direct=direct) 311 312 cpu_poll_interval = self.get_option('cpu_poll_interval') 313 memory_poll_interval = self.get_option('memory_poll_interval') 314 pid_poll_interval = self.get_option('pid_poll_interval') 315 self._display_recap = self.get_option('display_recap') 316 317 control_group = to_bytes(self.get_option('control_group'), errors='surrogate_or_strict') 318 self.mem_max_file = b'/sys/fs/cgroup/memory/%s/memory.max_usage_in_bytes' % control_group 319 mem_current_file = b'/sys/fs/cgroup/memory/%s/memory.usage_in_bytes' % control_group 320 cpu_usage_file = b'/sys/fs/cgroup/cpuacct/%s/cpuacct.usage' % control_group 321 pid_current_file = b'/sys/fs/cgroup/pids/%s/pids.current' % control_group 322 323 for path in (self.mem_max_file, mem_current_file, cpu_usage_file, pid_current_file): 324 try: 325 with open(path) as f: 326 pass 327 except Exception as e: 328 self._display.warning( 329 u'Cannot open %s for reading (%s). Disabling %s' % (to_text(path), to_text(e), self.CALLBACK_NAME) 330 ) 331 self.disabled = True 332 return 333 334 try: 335 with open(self.mem_max_file, 'w+') as f: 336 f.write('0') 337 except Exception as e: 338 self._display.warning( 339 u'Unable to reset max memory value in %s: %s' % (to_text(self.mem_max_file), to_text(e)) 340 ) 341 self.disabled = True 342 return 343 344 try: 345 with open(cpu_usage_file, 'w+') as f: 346 f.write('0') 347 except Exception as e: 348 self._display.warning( 349 u'Unable to reset CPU usage value in %s: %s' % (to_text(cpu_usage_file), to_text(e)) 350 ) 351 self.disabled = True 352 return 353 354 self._profiler_map = { 355 'memory': partial(MemoryProf, mem_current_file, poll_interval=memory_poll_interval), 356 'cpu': partial(CpuProf, cpu_usage_file, poll_interval=cpu_poll_interval), 357 'pids': partial(PidsProf, pid_current_file, poll_interval=pid_poll_interval), 358 } 359 360 self.write_files = self.get_option('write_files') 361 file_per_task = self.get_option('file_per_task') 362 self._output_format = to_bytes(self.get_option('output_format')) 363 output_dir = to_bytes(self.get_option('output_dir'), errors='surrogate_or_strict') 364 try: 365 output_dir %= to_bytes(datetime.datetime.now().isoformat()) 366 except TypeError: 367 pass 368 369 self._output_dir = output_dir 370 371 file_name_format = to_bytes(self.get_option('file_name_format')) 372 373 if self.write_files: 374 if file_per_task: 375 self._file_per_task = True 376 if file_name_format == b'%(feature)s.%(ext)s': 377 file_name_format = b'%(counter)s-%(task_uuid)s-%(feature)s.%(ext)s' 378 else: 379 file_name_format = to_bytes(self.get_option('file_name_format')) 380 381 self._file_name_format = file_name_format 382 383 if not os.path.exists(output_dir): 384 try: 385 os.mkdir(output_dir) 386 except Exception as e: 387 self._display.warning( 388 u'Could not create the output directory at %s: %s' % (to_text(output_dir), to_text(e)) 389 ) 390 self.disabled = True 391 return 392 393 if not self._file_per_task: 394 self._open_files() 395 396 def _profile(self, obj=None): 397 prev_task = None 398 results = dict.fromkeys(self._features) 399 if not obj or self._file_per_task: 400 for dummy, f in self._files.items(): 401 if f is None: 402 continue 403 try: 404 f.close() 405 except Exception: 406 pass 407 408 try: 409 for name, prof in self._profilers.items(): 410 prof.running = False 411 412 for name, prof in self._profilers.items(): 413 results[name] = prof.max 414 prev_task = prof.obj 415 except AttributeError: 416 pass 417 418 for name, result in results.items(): 419 if result is not None: 420 try: 421 self.task_results[name].append((prev_task, result)) 422 except ValueError: 423 pass 424 425 if obj is not None: 426 if self._file_per_task or self._counter == 0: 427 self._open_files(task_uuid=obj._uuid) 428 429 for feature in self._features: 430 self._profilers[feature] = self._profiler_map[feature](obj=obj, writer=self._writers[feature]) 431 self._profilers[feature].start() 432 433 self._counter += 1 434 435 def v2_playbook_on_task_start(self, task, is_conditional): 436 self._profile(task) 437 438 def v2_playbook_on_stats(self, stats): 439 self._profile() 440 441 if not self._display_recap: 442 return 443 444 with open(self.mem_max_file) as f: 445 max_results = int(f.read().strip()) / 1024 / 1024 446 447 self._display.banner('CGROUP PERF RECAP') 448 self._display.display('Memory Execution Maximum: %0.2fMB\n' % max_results) 449 for name, data in self.task_results.items(): 450 if name == 'memory': 451 continue 452 try: 453 self._display.display( 454 '%s Execution Maximum: %0.2f%s\n' % (name, max((t[1] for t in data)), self._units[name]) 455 ) 456 except Exception as e: 457 self._display.display('%s profiling error: no results collected: %s\n' % (name, e)) 458 459 self._display.display('\n') 460 461 for name, data in self.task_results.items(): 462 if data: 463 self._display.display('%s:\n' % name) 464 for task, value in data: 465 self._display.display('%s (%s): %0.2f%s' % (task.get_name(), task._uuid, value, self._units[name])) 466 self._display.display('\n') 467