1# -*- coding: utf-8 -*-
2"""The :program:`celery logtool` command.
3
4.. program:: celery logtool
5"""
6
7from __future__ import absolute_import, unicode_literals
8
9import re
10from collections import Counter
11from fileinput import FileInput
12
13from .base import Command
14
15__all__ = ('logtool',)
16
17RE_LOG_START = re.compile(r'^\[\d\d\d\d\-\d\d-\d\d ')
18RE_TASK_RECEIVED = re.compile(r'.+?\] Received')
19RE_TASK_READY = re.compile(r'.+?\] Task')
20RE_TASK_INFO = re.compile(r'.+?([\w\.]+)\[(.+?)\].+')
21RE_TASK_RESULT = re.compile(r'.+?[\w\.]+\[.+?\] (.+)')
22
23REPORT_FORMAT = """
24Report
25======
26
27Task total: {task[total]}
28Task errors: {task[errors]}
29Task success: {task[succeeded]}
30Task completed: {task[completed]}
31
32Tasks
33=====
34{task[types].format}
35"""
36
37
38class _task_counts(list):
39
40    @property
41    def format(self):
42        return '\n'.join('{0}: {1}'.format(*i) for i in self)
43
44
45def task_info(line):
46    m = RE_TASK_INFO.match(line)
47    return m.groups()
48
49
50class Audit(object):
51
52    def __init__(self, on_task_error=None, on_trace=None, on_debug=None):
53        self.ids = set()
54        self.names = {}
55        self.results = {}
56        self.ready = set()
57        self.task_types = Counter()
58        self.task_errors = 0
59        self.on_task_error = on_task_error
60        self.on_trace = on_trace
61        self.on_debug = on_debug
62        self.prev_line = None
63
64    def run(self, files):
65        for line in FileInput(files):
66            self.feed(line)
67        return self
68
69    def task_received(self, line, task_name, task_id):
70        self.names[task_id] = task_name
71        self.ids.add(task_id)
72        self.task_types[task_name] += 1
73
74    def task_ready(self, line, task_name, task_id, result):
75        self.ready.add(task_id)
76        self.results[task_id] = result
77        if 'succeeded' not in result:
78            self.task_error(line, task_name, task_id, result)
79
80    def task_error(self, line, task_name, task_id, result):
81        self.task_errors += 1
82        if self.on_task_error:
83            self.on_task_error(line, task_name, task_id, result)
84
85    def feed(self, line):
86        if RE_LOG_START.match(line):
87            if RE_TASK_RECEIVED.match(line):
88                task_name, task_id = task_info(line)
89                self.task_received(line, task_name, task_id)
90            elif RE_TASK_READY.match(line):
91                task_name, task_id = task_info(line)
92                result = RE_TASK_RESULT.match(line)
93                if result:
94                    result, = result.groups()
95                self.task_ready(line, task_name, task_id, result)
96            else:
97                if self.on_debug:
98                    self.on_debug(line)
99            self.prev_line = line
100        else:
101            if self.on_trace:
102                self.on_trace('\n'.join(filter(None, [self.prev_line, line])))
103            self.prev_line = None
104
105    def incomplete_tasks(self):
106        return self.ids ^ self.ready
107
108    def report(self):
109        return {
110            'task': {
111                'types': _task_counts(self.task_types.most_common()),
112                'total': len(self.ids),
113                'errors': self.task_errors,
114                'completed': len(self.ready),
115                'succeeded': len(self.ready) - self.task_errors,
116            }
117        }
118
119
120class logtool(Command):
121    """The ``celery logtool`` command."""
122
123    args = """<action> [arguments]
124            .....  stats      [file1|- [file2 [...]]]
125            .....  traces     [file1|- [file2 [...]]]
126            .....  errors     [file1|- [file2 [...]]]
127            .....  incomplete [file1|- [file2 [...]]]
128            .....  debug      [file1|- [file2 [...]]]
129    """
130
131    def run(self, what=None, *files, **kwargs):
132        map = {
133            'stats': self.stats,
134            'traces': self.traces,
135            'errors': self.errors,
136            'incomplete': self.incomplete,
137            'debug': self.debug,
138        }
139        if not what:
140            raise self.UsageError('missing action')
141        elif what not in map:
142            raise self.Error(
143                'action {0} not in {1}'.format(what, '|'.join(map)),
144            )
145
146        return map[what](files)
147
148    def stats(self, files):
149        self.out(REPORT_FORMAT.format(
150            **Audit().run(files).report()
151        ))
152
153    def traces(self, files):
154        Audit(on_trace=self.out).run(files)
155
156    def errors(self, files):
157        Audit(on_task_error=self.say1).run(files)
158
159    def incomplete(self, files):
160        audit = Audit()
161        audit.run(files)
162        for task_id in audit.incomplete_tasks():
163            self.error('Did not complete: %r' % (task_id,))
164
165    def debug(self, files):
166        Audit(on_debug=self.out).run(files)
167
168    def say1(self, line, *_):
169        self.out(line)
170