1# -*- coding: utf-8 -*- 2"""The :program:`celery logtool` command. 3 4.. program:: celery logtool 5""" 6 7from __future__ import absolute_import, unicode_literals 8 9import re 10from collections import Counter 11from fileinput import FileInput 12 13from .base import Command 14 15__all__ = ('logtool',) 16 17RE_LOG_START = re.compile(r'^\[\d\d\d\d\-\d\d-\d\d ') 18RE_TASK_RECEIVED = re.compile(r'.+?\] Received') 19RE_TASK_READY = re.compile(r'.+?\] Task') 20RE_TASK_INFO = re.compile(r'.+?([\w\.]+)\[(.+?)\].+') 21RE_TASK_RESULT = re.compile(r'.+?[\w\.]+\[.+?\] (.+)') 22 23REPORT_FORMAT = """ 24Report 25====== 26 27Task total: {task[total]} 28Task errors: {task[errors]} 29Task success: {task[succeeded]} 30Task completed: {task[completed]} 31 32Tasks 33===== 34{task[types].format} 35""" 36 37 38class _task_counts(list): 39 40 @property 41 def format(self): 42 return '\n'.join('{0}: {1}'.format(*i) for i in self) 43 44 45def task_info(line): 46 m = RE_TASK_INFO.match(line) 47 return m.groups() 48 49 50class Audit(object): 51 52 def __init__(self, on_task_error=None, on_trace=None, on_debug=None): 53 self.ids = set() 54 self.names = {} 55 self.results = {} 56 self.ready = set() 57 self.task_types = Counter() 58 self.task_errors = 0 59 self.on_task_error = on_task_error 60 self.on_trace = on_trace 61 self.on_debug = on_debug 62 self.prev_line = None 63 64 def run(self, files): 65 for line in FileInput(files): 66 self.feed(line) 67 return self 68 69 def task_received(self, line, task_name, task_id): 70 self.names[task_id] = task_name 71 self.ids.add(task_id) 72 self.task_types[task_name] += 1 73 74 def task_ready(self, line, task_name, task_id, result): 75 self.ready.add(task_id) 76 self.results[task_id] = result 77 if 'succeeded' not in result: 78 self.task_error(line, task_name, task_id, result) 79 80 def task_error(self, line, task_name, task_id, result): 81 self.task_errors += 1 82 if self.on_task_error: 83 self.on_task_error(line, task_name, task_id, result) 84 85 def feed(self, line): 86 if RE_LOG_START.match(line): 87 if RE_TASK_RECEIVED.match(line): 88 task_name, task_id = task_info(line) 89 self.task_received(line, task_name, task_id) 90 elif RE_TASK_READY.match(line): 91 task_name, task_id = task_info(line) 92 result = RE_TASK_RESULT.match(line) 93 if result: 94 result, = result.groups() 95 self.task_ready(line, task_name, task_id, result) 96 else: 97 if self.on_debug: 98 self.on_debug(line) 99 self.prev_line = line 100 else: 101 if self.on_trace: 102 self.on_trace('\n'.join(filter(None, [self.prev_line, line]))) 103 self.prev_line = None 104 105 def incomplete_tasks(self): 106 return self.ids ^ self.ready 107 108 def report(self): 109 return { 110 'task': { 111 'types': _task_counts(self.task_types.most_common()), 112 'total': len(self.ids), 113 'errors': self.task_errors, 114 'completed': len(self.ready), 115 'succeeded': len(self.ready) - self.task_errors, 116 } 117 } 118 119 120class logtool(Command): 121 """The ``celery logtool`` command.""" 122 123 args = """<action> [arguments] 124 ..... stats [file1|- [file2 [...]]] 125 ..... traces [file1|- [file2 [...]]] 126 ..... errors [file1|- [file2 [...]]] 127 ..... incomplete [file1|- [file2 [...]]] 128 ..... debug [file1|- [file2 [...]]] 129 """ 130 131 def run(self, what=None, *files, **kwargs): 132 map = { 133 'stats': self.stats, 134 'traces': self.traces, 135 'errors': self.errors, 136 'incomplete': self.incomplete, 137 'debug': self.debug, 138 } 139 if not what: 140 raise self.UsageError('missing action') 141 elif what not in map: 142 raise self.Error( 143 'action {0} not in {1}'.format(what, '|'.join(map)), 144 ) 145 146 return map[what](files) 147 148 def stats(self, files): 149 self.out(REPORT_FORMAT.format( 150 **Audit().run(files).report() 151 )) 152 153 def traces(self, files): 154 Audit(on_trace=self.out).run(files) 155 156 def errors(self, files): 157 Audit(on_task_error=self.say1).run(files) 158 159 def incomplete(self, files): 160 audit = Audit() 161 audit.run(files) 162 for task_id in audit.incomplete_tasks(): 163 self.error('Did not complete: %r' % (task_id,)) 164 165 def debug(self, files): 166 Audit(on_debug=self.out).run(files) 167 168 def say1(self, line, *_): 169 self.out(line) 170