1# -*- coding: utf-8 -*-
2# (c) 2018 Matt Martz <matt@sivel.net>
3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
4
5# Make coding more python3-ish
6from __future__ import (absolute_import, division, print_function)
7__metaclass__ = type
8
9ANSIBLE_METADATA = {'metadata_version': '1.1',
10                    'status': ['preview'],
11                    'supported_by': 'community'}
12
13DOCUMENTATION = '''
14    callback: cgroup_perf_recap
15    callback_type: aggregate
16    requirements:
17      - whitelist in configuration
18      - cgroups
19    short_description: Profiles system activity of tasks and full execution using cgroups
20    version_added: "2.8"
21    description:
22        - This is an ansible callback plugin utilizes cgroups to profile system activity of ansible and
23          individual tasks, and display a recap at the end of the playbook execution
24    notes:
25        - Requires ansible to be run from within a cgroup, such as with
26          C(cgexec -g cpuacct,memory,pids:ansible_profile ansible-playbook ...)
27        - This cgroup should only be used by ansible to get accurate results
28        - To create the cgroup, first use a command such as
29          C(sudo cgcreate -a ec2-user:ec2-user -t ec2-user:ec2-user -g cpuacct,memory,pids:ansible_profile)
30    options:
31      control_group:
32        required: True
33        description: Name of cgroups control group
34        env:
35          - name: CGROUP_CONTROL_GROUP
36        ini:
37          - section: callback_cgroup_perf_recap
38            key: control_group
39      cpu_poll_interval:
40        description: Interval between CPU polling for determining CPU usage. A lower value may produce inaccurate
41                     results, a higher value may not be short enough to collect results for short tasks.
42        default: 0.25
43        type: float
44        env:
45          - name: CGROUP_CPU_POLL_INTERVAL
46        ini:
47          - section: callback_cgroup_perf_recap
48            key: cpu_poll_interval
49      memory_poll_interval:
50        description: Interval between memory polling for determining memory usage. A lower value may produce inaccurate
51                     results, a higher value may not be short enough to collect results for short tasks.
52        default: 0.25
53        type: float
54        env:
55          - name: CGROUP_MEMORY_POLL_INTERVAL
56        ini:
57          - section: callback_cgroup_perf_recap
58            key: memory_poll_interval
59      pid_poll_interval:
60        description: Interval between PID polling for determining PID count. A lower value may produce inaccurate
61                     results, a higher value may not be short enough to collect results for short tasks.
62        default: 0.25
63        type: float
64        env:
65          - name: CGROUP_PID_POLL_INTERVAL
66        ini:
67          - section: callback_cgroup_perf_recap
68            key: pid_poll_interval
69      display_recap:
70        description: Controls whether the recap is printed at the end, useful if you will automatically
71                     process the output files
72        env:
73          - name: CGROUP_DISPLAY_RECAP
74        ini:
75          - section: callback_cgroup_perf_recap
76            key: display_recap
77        type: bool
78        default: true
79      file_name_format:
80        description: Format of filename. Accepts C(%(counter)s), C(%(task_uuid)s),
81                     C(%(feature)s), C(%(ext)s). Defaults to C(%(feature)s.%(ext)s) when C(file_per_task) is C(False)
82                     and C(%(counter)s-%(task_uuid)s-%(feature)s.%(ext)s) when C(True)
83        env:
84          - name: CGROUP_FILE_NAME_FORMAT
85        ini:
86          - section: callback_cgroup_perf_recap
87            key: file_name_format
88        type: str
89        default: '%(feature)s.%(ext)s'
90      output_dir:
91        description: Output directory for files containing recorded performance readings. If the value contains a
92                     single %s, the start time of the playbook run will be inserted in that space. Only the deepest
93                     level directory will be created if it does not exist, parent directories will not be created.
94        type: path
95        default: /tmp/ansible-perf-%s
96        env:
97          - name: CGROUP_OUTPUT_DIR
98        ini:
99          - section: callback_cgroup_perf_recap
100            key: output_dir
101      output_format:
102        description: Output format, either CSV or JSON-seq
103        env:
104          - name: CGROUP_OUTPUT_FORMAT
105        ini:
106          - section: callback_cgroup_perf_recap
107            key: output_format
108        type: str
109        default: csv
110        choices:
111          - csv
112          - json
113      file_per_task:
114        description: When set as C(True) along with C(write_files), this callback will write 1 file per task
115                     instead of 1 file for the entire playbook run
116        env:
117          - name: CGROUP_FILE_PER_TASK
118        ini:
119          - section: callback_cgroup_perf_recap
120            key: file_per_task
121        type: bool
122        default: False
123      write_files:
124        description: Dictates whether files will be written containing performance readings
125        env:
126          - name: CGROUP_WRITE_FILES
127        ini:
128          - section: callback_cgroup_perf_recap
129            key: write_files
130        type: bool
131        default: false
132'''
133
134import csv
135import datetime
136import os
137import time
138import threading
139
140from abc import ABCMeta, abstractmethod
141
142from functools import partial
143
144from ansible.module_utils._text import to_bytes, to_text
145from ansible.module_utils.six import with_metaclass
146from ansible.parsing.ajson import AnsibleJSONEncoder, json
147from ansible.plugins.callback import CallbackBase
148
149
150RS = '\x1e'  # RECORD SEPARATOR
151LF = '\x0a'  # LINE FEED
152
153
154def dict_fromkeys(keys, default=None):
155    d = {}
156    for key in keys:
157        d[key] = default() if callable(default) else default
158    return d
159
160
161class BaseProf(with_metaclass(ABCMeta, threading.Thread)):
162    def __init__(self, path, obj=None, writer=None):
163        threading.Thread.__init__(self)  # pylint: disable=non-parent-init-called
164        self.obj = obj
165        self.path = path
166        self.max = 0
167        self.running = True
168        self.writer = writer
169
170    def run(self):
171        while self.running:
172            self.poll()
173
174    @abstractmethod
175    def poll(self):
176        pass
177
178
179class MemoryProf(BaseProf):
180    """Python thread for recording memory usage"""
181    def __init__(self, path, poll_interval=0.25, obj=None, writer=None):
182        super(MemoryProf, self).__init__(path, obj=obj, writer=writer)
183        self._poll_interval = poll_interval
184
185    def poll(self):
186        with open(self.path) as f:
187            val = int(f.read().strip()) / 1024**2
188        if val > self.max:
189            self.max = val
190        if self.writer:
191            try:
192                self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val)
193            except ValueError:
194                # We may be profiling after the playbook has ended
195                self.running = False
196        time.sleep(self._poll_interval)
197
198
199class CpuProf(BaseProf):
200    def __init__(self, path, poll_interval=0.25, obj=None, writer=None):
201        super(CpuProf, self).__init__(path, obj=obj, writer=writer)
202        self._poll_interval = poll_interval
203
204    def poll(self):
205        with open(self.path) as f:
206            start_time = time.time() * 1000**2
207            start_usage = int(f.read().strip()) / 1000
208        time.sleep(self._poll_interval)
209        with open(self.path) as f:
210            end_time = time.time() * 1000**2
211            end_usage = int(f.read().strip()) / 1000
212        val = (end_usage - start_usage) / (end_time - start_time) * 100
213        if val > self.max:
214            self.max = val
215        if self.writer:
216            try:
217                self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val)
218            except ValueError:
219                # We may be profiling after the playbook has ended
220                self.running = False
221
222
223class PidsProf(BaseProf):
224    def __init__(self, path, poll_interval=0.25, obj=None, writer=None):
225        super(PidsProf, self).__init__(path, obj=obj, writer=writer)
226        self._poll_interval = poll_interval
227
228    def poll(self):
229        with open(self.path) as f:
230            val = int(f.read().strip())
231        if val > self.max:
232            self.max = val
233        if self.writer:
234            try:
235                self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val)
236            except ValueError:
237                # We may be profiling after the playbook has ended
238                self.running = False
239        time.sleep(self._poll_interval)
240
241
242def csv_writer(writer, timestamp, task_name, task_uuid, value):
243    writer.writerow([timestamp, task_name, task_uuid, value])
244
245
246def json_writer(writer, timestamp, task_name, task_uuid, value):
247    data = {
248        'timestamp': timestamp,
249        'task_name': task_name,
250        'task_uuid': task_uuid,
251        'value': value,
252    }
253    writer.write('%s%s%s' % (RS, json.dumps(data, cls=AnsibleJSONEncoder), LF))
254
255
256class CallbackModule(CallbackBase):
257    CALLBACK_VERSION = 2.0
258    CALLBACK_TYPE = 'aggregate'
259    CALLBACK_NAME = 'cgroup_perf_recap'
260    CALLBACK_NEEDS_WHITELIST = True
261
262    def __init__(self, display=None):
263        super(CallbackModule, self).__init__(display)
264
265        self._features = ('memory', 'cpu', 'pids')
266
267        self._units = {
268            'memory': 'MB',
269            'cpu': '%',
270            'pids': '',
271        }
272
273        self.task_results = dict_fromkeys(self._features, default=list)
274        self._profilers = dict.fromkeys(self._features)
275        self._files = dict.fromkeys(self._features)
276        self._writers = dict.fromkeys(self._features)
277
278        self._file_per_task = False
279        self._counter = 0
280        self.write_files = False
281
282    def _open_files(self, task_uuid=None):
283        output_format = self._output_format
284        output_dir = self._output_dir
285
286        for feature in self._features:
287            data = {
288                b'counter': to_bytes(self._counter),
289                b'task_uuid': to_bytes(task_uuid),
290                b'feature': to_bytes(feature),
291                b'ext': to_bytes(output_format)
292            }
293
294            if self._files.get(feature):
295                try:
296                    self._files[feature].close()
297                except Exception:
298                    pass
299
300            if self.write_files:
301                filename = self._file_name_format % data
302
303                self._files[feature] = open(os.path.join(output_dir, filename), 'w+')
304                if output_format == b'csv':
305                    self._writers[feature] = partial(csv_writer, csv.writer(self._files[feature]))
306                elif output_format == b'json':
307                    self._writers[feature] = partial(json_writer, self._files[feature])
308
309    def set_options(self, task_keys=None, var_options=None, direct=None):
310        super(CallbackModule, self).set_options(task_keys=task_keys, var_options=var_options, direct=direct)
311
312        cpu_poll_interval = self.get_option('cpu_poll_interval')
313        memory_poll_interval = self.get_option('memory_poll_interval')
314        pid_poll_interval = self.get_option('pid_poll_interval')
315        self._display_recap = self.get_option('display_recap')
316
317        control_group = to_bytes(self.get_option('control_group'), errors='surrogate_or_strict')
318        self.mem_max_file = b'/sys/fs/cgroup/memory/%s/memory.max_usage_in_bytes' % control_group
319        mem_current_file = b'/sys/fs/cgroup/memory/%s/memory.usage_in_bytes' % control_group
320        cpu_usage_file = b'/sys/fs/cgroup/cpuacct/%s/cpuacct.usage' % control_group
321        pid_current_file = b'/sys/fs/cgroup/pids/%s/pids.current' % control_group
322
323        for path in (self.mem_max_file, mem_current_file, cpu_usage_file, pid_current_file):
324            try:
325                with open(path) as f:
326                    pass
327            except Exception as e:
328                self._display.warning(
329                    u'Cannot open %s for reading (%s). Disabling %s' % (to_text(path), to_text(e), self.CALLBACK_NAME)
330                )
331                self.disabled = True
332                return
333
334        try:
335            with open(self.mem_max_file, 'w+') as f:
336                f.write('0')
337        except Exception as e:
338            self._display.warning(
339                u'Unable to reset max memory value in %s: %s' % (to_text(self.mem_max_file), to_text(e))
340            )
341            self.disabled = True
342            return
343
344        try:
345            with open(cpu_usage_file, 'w+') as f:
346                f.write('0')
347        except Exception as e:
348            self._display.warning(
349                u'Unable to reset CPU usage value in %s: %s' % (to_text(cpu_usage_file), to_text(e))
350            )
351            self.disabled = True
352            return
353
354        self._profiler_map = {
355            'memory': partial(MemoryProf, mem_current_file, poll_interval=memory_poll_interval),
356            'cpu': partial(CpuProf, cpu_usage_file, poll_interval=cpu_poll_interval),
357            'pids': partial(PidsProf, pid_current_file, poll_interval=pid_poll_interval),
358        }
359
360        self.write_files = self.get_option('write_files')
361        file_per_task = self.get_option('file_per_task')
362        self._output_format = to_bytes(self.get_option('output_format'))
363        output_dir = to_bytes(self.get_option('output_dir'), errors='surrogate_or_strict')
364        try:
365            output_dir %= to_bytes(datetime.datetime.now().isoformat())
366        except TypeError:
367            pass
368
369        self._output_dir = output_dir
370
371        file_name_format = to_bytes(self.get_option('file_name_format'))
372
373        if self.write_files:
374            if file_per_task:
375                self._file_per_task = True
376                if file_name_format == b'%(feature)s.%(ext)s':
377                    file_name_format = b'%(counter)s-%(task_uuid)s-%(feature)s.%(ext)s'
378            else:
379                file_name_format = to_bytes(self.get_option('file_name_format'))
380
381            self._file_name_format = file_name_format
382
383            if not os.path.exists(output_dir):
384                try:
385                    os.mkdir(output_dir)
386                except Exception as e:
387                    self._display.warning(
388                        u'Could not create the output directory at %s: %s' % (to_text(output_dir), to_text(e))
389                    )
390                    self.disabled = True
391                    return
392
393            if not self._file_per_task:
394                self._open_files()
395
396    def _profile(self, obj=None):
397        prev_task = None
398        results = dict.fromkeys(self._features)
399        if not obj or self._file_per_task:
400            for dummy, f in self._files.items():
401                if f is None:
402                    continue
403                try:
404                    f.close()
405                except Exception:
406                    pass
407
408        try:
409            for name, prof in self._profilers.items():
410                prof.running = False
411
412            for name, prof in self._profilers.items():
413                results[name] = prof.max
414            prev_task = prof.obj
415        except AttributeError:
416            pass
417
418        for name, result in results.items():
419            if result is not None:
420                try:
421                    self.task_results[name].append((prev_task, result))
422                except ValueError:
423                    pass
424
425        if obj is not None:
426            if self._file_per_task or self._counter == 0:
427                self._open_files(task_uuid=obj._uuid)
428
429            for feature in self._features:
430                self._profilers[feature] = self._profiler_map[feature](obj=obj, writer=self._writers[feature])
431                self._profilers[feature].start()
432
433            self._counter += 1
434
435    def v2_playbook_on_task_start(self, task, is_conditional):
436        self._profile(task)
437
438    def v2_playbook_on_stats(self, stats):
439        self._profile()
440
441        if not self._display_recap:
442            return
443
444        with open(self.mem_max_file) as f:
445            max_results = int(f.read().strip()) / 1024 / 1024
446
447        self._display.banner('CGROUP PERF RECAP')
448        self._display.display('Memory Execution Maximum: %0.2fMB\n' % max_results)
449        for name, data in self.task_results.items():
450            if name == 'memory':
451                continue
452            try:
453                self._display.display(
454                    '%s Execution Maximum: %0.2f%s\n' % (name, max((t[1] for t in data)), self._units[name])
455                )
456            except Exception as e:
457                self._display.display('%s profiling error: no results collected: %s\n' % (name, e))
458
459        self._display.display('\n')
460
461        for name, data in self.task_results.items():
462            if data:
463                self._display.display('%s:\n' % name)
464            for task, value in data:
465                self._display.display('%s (%s): %0.2f%s' % (task.get_name(), task._uuid, value, self._units[name]))
466            self._display.display('\n')
467