1# -*- coding: utf-8 -*-
2# (c) 2018 Matt Martz <matt@sivel.net>
3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
4
5# Make coding more python3-ish
6from __future__ import (absolute_import, division, print_function)
7__metaclass__ = type
8
9ANSIBLE_METADATA = {'metadata_version': '1.1',
10                    'status': ['preview'],
11                    'supported_by': 'community'}
12
13DOCUMENTATION = '''
14    callback: cgroup_perf_recap
15    callback_type: aggregate
16    requirements:
17      - whitelist in configuration
18      - cgroups
19    short_description: Profiles system activity of tasks and full execution using cgroups
20    description:
21        - This is an ansible callback plugin utilizes cgroups to profile system activity of ansible and
22          individual tasks, and display a recap at the end of the playbook execution
23    notes:
24        - Requires ansible to be run from within a cgroup, such as with
25          C(cgexec -g cpuacct,memory,pids:ansible_profile ansible-playbook ...)
26        - This cgroup should only be used by ansible to get accurate results
27        - To create the cgroup, first use a command such as
28          C(sudo cgcreate -a ec2-user:ec2-user -t ec2-user:ec2-user -g cpuacct,memory,pids:ansible_profile)
29    options:
30      control_group:
31        required: True
32        description: Name of cgroups control group
33        env:
34          - name: CGROUP_CONTROL_GROUP
35        ini:
36          - section: callback_cgroup_perf_recap
37            key: control_group
38      cpu_poll_interval:
39        description: Interval between CPU polling for determining CPU usage. A lower value may produce inaccurate
40                     results, a higher value may not be short enough to collect results for short tasks.
41        default: 0.25
42        type: float
43        env:
44          - name: CGROUP_CPU_POLL_INTERVAL
45        ini:
46          - section: callback_cgroup_perf_recap
47            key: cpu_poll_interval
48      memory_poll_interval:
49        description: Interval between memory polling for determining memory usage. A lower value may produce inaccurate
50                     results, a higher value may not be short enough to collect results for short tasks.
51        default: 0.25
52        type: float
53        env:
54          - name: CGROUP_MEMORY_POLL_INTERVAL
55        ini:
56          - section: callback_cgroup_perf_recap
57            key: memory_poll_interval
58      pid_poll_interval:
59        description: Interval between PID polling for determining PID count. A lower value may produce inaccurate
60                     results, a higher value may not be short enough to collect results for short tasks.
61        default: 0.25
62        type: float
63        env:
64          - name: CGROUP_PID_POLL_INTERVAL
65        ini:
66          - section: callback_cgroup_perf_recap
67            key: pid_poll_interval
68      display_recap:
69        description: Controls whether the recap is printed at the end, useful if you will automatically
70                     process the output files
71        env:
72          - name: CGROUP_DISPLAY_RECAP
73        ini:
74          - section: callback_cgroup_perf_recap
75            key: display_recap
76        type: bool
77        default: true
78      file_name_format:
79        description: Format of filename. Accepts C(%(counter)s), C(%(task_uuid)s),
80                     C(%(feature)s), C(%(ext)s). Defaults to C(%(feature)s.%(ext)s) when C(file_per_task) is C(False)
81                     and C(%(counter)s-%(task_uuid)s-%(feature)s.%(ext)s) when C(True)
82        env:
83          - name: CGROUP_FILE_NAME_FORMAT
84        ini:
85          - section: callback_cgroup_perf_recap
86            key: file_name_format
87        type: str
88        default: '%(feature)s.%(ext)s'
89      output_dir:
90        description: Output directory for files containing recorded performance readings. If the value contains a
91                     single %s, the start time of the playbook run will be inserted in that space. Only the deepest
92                     level directory will be created if it does not exist, parent directories will not be created.
93        type: path
94        default: /tmp/ansible-perf-%s
95        env:
96          - name: CGROUP_OUTPUT_DIR
97        ini:
98          - section: callback_cgroup_perf_recap
99            key: output_dir
100      output_format:
101        description: Output format, either CSV or JSON-seq
102        env:
103          - name: CGROUP_OUTPUT_FORMAT
104        ini:
105          - section: callback_cgroup_perf_recap
106            key: output_format
107        type: str
108        default: csv
109        choices:
110          - csv
111          - json
112      file_per_task:
113        description: When set as C(True) along with C(write_files), this callback will write 1 file per task
114                     instead of 1 file for the entire playbook run
115        env:
116          - name: CGROUP_FILE_PER_TASK
117        ini:
118          - section: callback_cgroup_perf_recap
119            key: file_per_task
120        type: bool
121        default: False
122      write_files:
123        description: Dictates whether files will be written containing performance readings
124        env:
125          - name: CGROUP_WRITE_FILES
126        ini:
127          - section: callback_cgroup_perf_recap
128            key: write_files
129        type: bool
130        default: false
131'''
132
133import csv
134import datetime
135import os
136import time
137import threading
138
139from abc import ABCMeta, abstractmethod
140
141from functools import partial
142
143from ansible.module_utils._text import to_bytes, to_text
144from ansible.module_utils.six import with_metaclass
145from ansible.parsing.ajson import AnsibleJSONEncoder, json
146from ansible.plugins.callback import CallbackBase
147
148
149RS = '\x1e'  # RECORD SEPARATOR
150LF = '\x0a'  # LINE FEED
151
152
153def dict_fromkeys(keys, default=None):
154    d = {}
155    for key in keys:
156        d[key] = default() if callable(default) else default
157    return d
158
159
160class BaseProf(with_metaclass(ABCMeta, threading.Thread)):
161    def __init__(self, path, obj=None, writer=None):
162        threading.Thread.__init__(self)  # pylint: disable=non-parent-init-called
163        self.obj = obj
164        self.path = path
165        self.max = 0
166        self.running = True
167        self.writer = writer
168
169    def run(self):
170        while self.running:
171            self.poll()
172
173    @abstractmethod
174    def poll(self):
175        pass
176
177
178class MemoryProf(BaseProf):
179    """Python thread for recording memory usage"""
180    def __init__(self, path, poll_interval=0.25, obj=None, writer=None):
181        super(MemoryProf, self).__init__(path, obj=obj, writer=writer)
182        self._poll_interval = poll_interval
183
184    def poll(self):
185        with open(self.path) as f:
186            val = int(f.read().strip()) / 1024**2
187        if val > self.max:
188            self.max = val
189        if self.writer:
190            try:
191                self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val)
192            except ValueError:
193                # We may be profiling after the playbook has ended
194                self.running = False
195        time.sleep(self._poll_interval)
196
197
198class CpuProf(BaseProf):
199    def __init__(self, path, poll_interval=0.25, obj=None, writer=None):
200        super(CpuProf, self).__init__(path, obj=obj, writer=writer)
201        self._poll_interval = poll_interval
202
203    def poll(self):
204        with open(self.path) as f:
205            start_time = time.time() * 1000**2
206            start_usage = int(f.read().strip()) / 1000
207        time.sleep(self._poll_interval)
208        with open(self.path) as f:
209            end_time = time.time() * 1000**2
210            end_usage = int(f.read().strip()) / 1000
211        val = (end_usage - start_usage) / (end_time - start_time) * 100
212        if val > self.max:
213            self.max = val
214        if self.writer:
215            try:
216                self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val)
217            except ValueError:
218                # We may be profiling after the playbook has ended
219                self.running = False
220
221
222class PidsProf(BaseProf):
223    def __init__(self, path, poll_interval=0.25, obj=None, writer=None):
224        super(PidsProf, self).__init__(path, obj=obj, writer=writer)
225        self._poll_interval = poll_interval
226
227    def poll(self):
228        with open(self.path) as f:
229            val = int(f.read().strip())
230        if val > self.max:
231            self.max = val
232        if self.writer:
233            try:
234                self.writer(time.time(), self.obj.get_name(), self.obj._uuid, val)
235            except ValueError:
236                # We may be profiling after the playbook has ended
237                self.running = False
238        time.sleep(self._poll_interval)
239
240
241def csv_writer(writer, timestamp, task_name, task_uuid, value):
242    writer.writerow([timestamp, task_name, task_uuid, value])
243
244
245def json_writer(writer, timestamp, task_name, task_uuid, value):
246    data = {
247        'timestamp': timestamp,
248        'task_name': task_name,
249        'task_uuid': task_uuid,
250        'value': value,
251    }
252    writer.write('%s%s%s' % (RS, json.dumps(data, cls=AnsibleJSONEncoder), LF))
253
254
255class CallbackModule(CallbackBase):
256    CALLBACK_VERSION = 2.0
257    CALLBACK_TYPE = 'aggregate'
258    CALLBACK_NAME = 'ansible.posix.cgroup_perf_recap'
259    CALLBACK_NEEDS_WHITELIST = True
260
261    def __init__(self, display=None):
262        super(CallbackModule, self).__init__(display)
263
264        self._features = ('memory', 'cpu', 'pids')
265
266        self._units = {
267            'memory': 'MB',
268            'cpu': '%',
269            'pids': '',
270        }
271
272        self.task_results = dict_fromkeys(self._features, default=list)
273        self._profilers = dict.fromkeys(self._features)
274        self._files = dict.fromkeys(self._features)
275        self._writers = dict.fromkeys(self._features)
276
277        self._file_per_task = False
278        self._counter = 0
279        self.write_files = False
280
281    def _open_files(self, task_uuid=None):
282        output_format = self._output_format
283        output_dir = self._output_dir
284
285        for feature in self._features:
286            data = {
287                b'counter': to_bytes(self._counter),
288                b'task_uuid': to_bytes(task_uuid),
289                b'feature': to_bytes(feature),
290                b'ext': to_bytes(output_format)
291            }
292
293            if self._files.get(feature):
294                try:
295                    self._files[feature].close()
296                except Exception:
297                    pass
298
299            if self.write_files:
300                filename = self._file_name_format % data
301
302                self._files[feature] = open(os.path.join(output_dir, filename), 'w+')
303                if output_format == b'csv':
304                    self._writers[feature] = partial(csv_writer, csv.writer(self._files[feature]))
305                elif output_format == b'json':
306                    self._writers[feature] = partial(json_writer, self._files[feature])
307
308    def set_options(self, task_keys=None, var_options=None, direct=None):
309        super(CallbackModule, self).set_options(task_keys=task_keys, var_options=var_options, direct=direct)
310
311        cpu_poll_interval = self.get_option('cpu_poll_interval')
312        memory_poll_interval = self.get_option('memory_poll_interval')
313        pid_poll_interval = self.get_option('pid_poll_interval')
314        self._display_recap = self.get_option('display_recap')
315
316        control_group = to_bytes(self.get_option('control_group'), errors='surrogate_or_strict')
317        self.mem_max_file = b'/sys/fs/cgroup/memory/%s/memory.max_usage_in_bytes' % control_group
318        mem_current_file = b'/sys/fs/cgroup/memory/%s/memory.usage_in_bytes' % control_group
319        cpu_usage_file = b'/sys/fs/cgroup/cpuacct/%s/cpuacct.usage' % control_group
320        pid_current_file = b'/sys/fs/cgroup/pids/%s/pids.current' % control_group
321
322        for path in (self.mem_max_file, mem_current_file, cpu_usage_file, pid_current_file):
323            try:
324                with open(path) as f:
325                    pass
326            except Exception as e:
327                self._display.warning(
328                    u'Cannot open %s for reading (%s). Disabling %s' % (to_text(path), to_text(e), self.CALLBACK_NAME)
329                )
330                self.disabled = True
331                return
332
333        try:
334            with open(self.mem_max_file, 'w+') as f:
335                f.write('0')
336        except Exception as e:
337            self._display.warning(
338                u'Unable to reset max memory value in %s: %s' % (to_text(self.mem_max_file), to_text(e))
339            )
340            self.disabled = True
341            return
342
343        try:
344            with open(cpu_usage_file, 'w+') as f:
345                f.write('0')
346        except Exception as e:
347            self._display.warning(
348                u'Unable to reset CPU usage value in %s: %s' % (to_text(cpu_usage_file), to_text(e))
349            )
350            self.disabled = True
351            return
352
353        self._profiler_map = {
354            'memory': partial(MemoryProf, mem_current_file, poll_interval=memory_poll_interval),
355            'cpu': partial(CpuProf, cpu_usage_file, poll_interval=cpu_poll_interval),
356            'pids': partial(PidsProf, pid_current_file, poll_interval=pid_poll_interval),
357        }
358
359        self.write_files = self.get_option('write_files')
360        file_per_task = self.get_option('file_per_task')
361        self._output_format = to_bytes(self.get_option('output_format'))
362        output_dir = to_bytes(self.get_option('output_dir'), errors='surrogate_or_strict')
363        try:
364            output_dir %= to_bytes(datetime.datetime.now().isoformat())
365        except TypeError:
366            pass
367
368        self._output_dir = output_dir
369
370        file_name_format = to_bytes(self.get_option('file_name_format'))
371
372        if self.write_files:
373            if file_per_task:
374                self._file_per_task = True
375                if file_name_format == b'%(feature)s.%(ext)s':
376                    file_name_format = b'%(counter)s-%(task_uuid)s-%(feature)s.%(ext)s'
377            else:
378                file_name_format = to_bytes(self.get_option('file_name_format'))
379
380            self._file_name_format = file_name_format
381
382            if not os.path.exists(output_dir):
383                try:
384                    os.mkdir(output_dir)
385                except Exception as e:
386                    self._display.warning(
387                        u'Could not create the output directory at %s: %s' % (to_text(output_dir), to_text(e))
388                    )
389                    self.disabled = True
390                    return
391
392            if not self._file_per_task:
393                self._open_files()
394
395    def _profile(self, obj=None):
396        prev_task = None
397        results = dict.fromkeys(self._features)
398        if not obj or self._file_per_task:
399            for dummy, f in self._files.items():
400                if f is None:
401                    continue
402                try:
403                    f.close()
404                except Exception:
405                    pass
406
407        try:
408            for name, prof in self._profilers.items():
409                prof.running = False
410
411            for name, prof in self._profilers.items():
412                results[name] = prof.max
413            prev_task = prof.obj
414        except AttributeError:
415            pass
416
417        for name, result in results.items():
418            if result is not None:
419                try:
420                    self.task_results[name].append((prev_task, result))
421                except ValueError:
422                    pass
423
424        if obj is not None:
425            if self._file_per_task or self._counter == 0:
426                self._open_files(task_uuid=obj._uuid)
427
428            for feature in self._features:
429                self._profilers[feature] = self._profiler_map[feature](obj=obj, writer=self._writers[feature])
430                self._profilers[feature].start()
431
432            self._counter += 1
433
434    def v2_playbook_on_task_start(self, task, is_conditional):
435        self._profile(task)
436
437    def v2_playbook_on_stats(self, stats):
438        self._profile()
439
440        if not self._display_recap:
441            return
442
443        with open(self.mem_max_file) as f:
444            max_results = int(f.read().strip()) / 1024 / 1024
445
446        self._display.banner('CGROUP PERF RECAP')
447        self._display.display('Memory Execution Maximum: %0.2fMB\n' % max_results)
448        for name, data in self.task_results.items():
449            if name == 'memory':
450                continue
451            try:
452                self._display.display(
453                    '%s Execution Maximum: %0.2f%s\n' % (name, max((t[1] for t in data)), self._units[name])
454                )
455            except Exception as e:
456                self._display.display('%s profiling error: no results collected: %s\n' % (name, e))
457
458        self._display.display('\n')
459
460        for name, data in self.task_results.items():
461            if data:
462                self._display.display('%s:\n' % name)
463            for task, value in data:
464                self._display.display('%s (%s): %0.2f%s' % (task.get_name(), task._uuid, value, self._units[name]))
465            self._display.display('\n')
466