1#!/usr/bin/env python 2# Copyright 2015 The LUCI Authors. All rights reserved. 3# Use of this source code is governed under the Apache License, Version 2.0 4# that can be found in the LICENSE file. 5 6"""Calculate statistics about tasks, counts per day. 7 8Saves the data fetched from the server into a json file to enable reprocessing 9the data without having to always fetch from the server. 10""" 11 12import collections 13import datetime 14import json 15import logging 16import optparse 17import os 18import subprocess 19import sys 20 21CLIENT_DIR = os.path.dirname( 22 os.path.dirname( 23 os.path.abspath(__file__.decode(sys.getfilesystemencoding())))) 24sys.path.insert(0, CLIENT_DIR) 25 26from utils import tools 27tools.force_local_third_party() 28 29# third_party/ 30import colorama 31from six.moves import urllib 32 33# pylint: disable=ungrouped-imports 34from utils import graph 35from utils import threading_utils 36 37 38_EPOCH = datetime.datetime.utcfromtimestamp(0) 39 40 41def parse_time_option(value): 42 """Converts time as an option into a datetime.datetime. 43 44 Returns None if not specified. 45 """ 46 if not value: 47 return None 48 try: 49 return _EPOCH + datetime.timedelta(seconds=int(value)) 50 except ValueError: 51 pass 52 for fmt in ('%Y-%m-%d',): 53 try: 54 return datetime.datetime.strptime(value, fmt) 55 except ValueError: 56 pass 57 raise ValueError('Failed to parse %s' % value) 58 59 60def _get_epoch(t): 61 return int((t - _EPOCH).total_seconds()) 62 63 64def _run_json(key, process, cmd): 65 """Runs cmd and calls process with the decoded json.""" 66 logging.info('Running %s', ' '.join(cmd)) 67 raw = subprocess.check_output(cmd) 68 logging.info('- returned %d bytes', len(raw)) 69 return key, process(json.loads(raw)) 70 71 72def _get_cmd(swarming, endpoint, start, end, state, tags): 73 """Returns the command line to query this endpoint.""" 74 cmd = [ 75 sys.executable, 76 os.path.join(CLIENT_DIR, 'swarming.py'), 77 'query', 78 '-S', 79 swarming, 80 '--limit', 81 '0', 82 ] 83 data = [('start', start), ('end', end), ('state', state)] 84 data.extend(('tags', tag) for tag in tags) 85 return cmd + [endpoint + '?' + urllib.parse.urlencode(data)] 86 87 88def _flatten_dimensions(dimensions): 89 items = {i['key']: i['value'] for i in dimensions} 90 return ','.join('%s=%s' % (k, v) for k, v in sorted(items.items())) 91 92 93def fetch_tasks(swarming, start, end, state, tags, parallel): 94 """Fetches the data for each task. 95 96 Fetch per hour because otherwise it's too slow. 97 """ 98 def process(data): 99 """Returns the list of flattened dimensions for these tasks.""" 100 items = data.get('items', []) 101 logging.info('- processing %d items', len(items)) 102 return [_flatten_dimensions(t['properties']['dimensions']) for t in items] 103 104 delta = datetime.timedelta(hours=1) 105 return _fetch_daily_internal( 106 delta, swarming, process, 'tasks/requests', start, end, state, tags, 107 parallel) 108 109 110def fetch_counts(swarming, start, end, state, tags, parallel): 111 """Fetches counts from swarming and returns it.""" 112 113 def process(data): 114 return int(data['count']) 115 delta = datetime.timedelta(days=1) 116 return _fetch_daily_internal(delta, swarming, process, 'tasks/count', start, 117 end, state, tags, parallel) 118 119 120def _fetch_daily_internal(delta, swarming, process, endpoint, start, end, state, 121 tags, parallel): 122 """Executes 'process' by parallelizing it once per day.""" 123 out = {} 124 with threading_utils.ThreadPool(1, parallel, 0) as pool: 125 while start < end: 126 cmd = _get_cmd(swarming, endpoint, _get_epoch(start), 127 _get_epoch(start + delta), state, tags) 128 pool.add_task(0, _run_json, start.strftime('%Y-%m-%d'), process, cmd) 129 start += delta 130 for k, v in pool.iter_results(): 131 sys.stdout.write('.') 132 sys.stdout.flush() 133 out[k] = v 134 print('') 135 return out 136 137 138def present_dimensions(items, daily_count): 139 # Split items per group. 140 per_dimension = collections.defaultdict(lambda: collections.defaultdict(int)) 141 for date, dimensions in items.items(): 142 for d in dimensions: 143 per_dimension[d][date] += 1 144 for i, (dimension, data) in enumerate(sorted(per_dimension.items())): 145 print( 146 '%s%s%s' % ( 147 colorama.Style.BRIGHT + colorama.Fore.MAGENTA, 148 dimension, 149 colorama.Fore.RESET)) 150 present_counts(data, daily_count) 151 if i != len(per_dimension) - 1: 152 print('') 153 154 155def present_counts(items, daily_count): 156 months = collections.defaultdict(int) 157 for day, count in sorted(items.items()): 158 month = day.rsplit('-', 1)[0] 159 months[month] += count 160 161 years = collections.defaultdict(int) 162 for month, count in months.items(): 163 year = month.rsplit('-', 1)[0] 164 years[year] += count 165 total = sum(months.values()) 166 maxlen = len(str(total)) 167 168 if daily_count: 169 for day, count in sorted(items.items()): 170 print('%s: %*d' % (day, maxlen, count)) 171 172 if len(items) > 1: 173 for month, count in sorted(months.items()): 174 print('%s : %*d' % (month, maxlen, count)) 175 if len(months) > 1: 176 for year, count in sorted(years.items()): 177 print('%s : %*d' % (year, maxlen, count)) 178 if len(years) > 1: 179 print('Total : %*d' % (maxlen, total)) 180 if not daily_count: 181 print('') 182 graph.print_histogram(items) 183 184 185STATES = ('PENDING', 'RUNNING', 'PENDING_RUNNING', 'COMPLETED', 186 'COMPLETED_SUCCESS', 'COMPLETED_FAILURE', 'EXPIRED', 'TIMED_OUT', 187 'BOT_DIED', 'CANCELED', 'ALL', 'DEDUPED') 188 189 190def main(): 191 colorama.init() 192 parser = optparse.OptionParser(description=sys.modules['__main__'].__doc__) 193 tomorrow = datetime.datetime.utcnow().date() + datetime.timedelta(days=1) 194 year = datetime.datetime(tomorrow.year, 1, 1) 195 parser.add_option( 196 '-S', 197 '--swarming', 198 metavar='URL', 199 default=os.environ.get('SWARMING_SERVER', ''), 200 help='Swarming server to use') 201 202 group = optparse.OptionGroup(parser, 'Filtering') 203 group.add_option( 204 '--start', 205 default=year.strftime('%Y-%m-%d'), 206 help='Starting date in UTC; defaults to start of year: %default') 207 group.add_option( 208 '--end', 209 default=tomorrow.strftime('%Y-%m-%d'), 210 help='End date in UTC (excluded); defaults to tomorrow: %default') 211 group.add_option( 212 '--state', 213 default='ALL', 214 type='choice', 215 choices=STATES, 216 help='State to filter on. Values are: %s' % ', '.join(STATES)) 217 group.add_option( 218 '--tags', 219 action='append', 220 default=[], 221 help='Tags to filter on; use this to filter on dimensions') 222 parser.add_option_group(group) 223 224 group = optparse.OptionGroup(parser, 'Presentation') 225 group.add_option( 226 '--show-dimensions', 227 action='store_true', 228 help='Show the dimensions; slower') 229 group.add_option( 230 '--daily-count', 231 action='store_true', 232 help='Show the daily count in raw number instead of histogram') 233 parser.add_option_group(group) 234 235 parser.add_option( 236 '--json', default='counts.json', 237 help='File containing raw data; default: %default') 238 parser.add_option( 239 '--parallel', 240 default=100, 241 type='int', 242 help='Concurrent queries; default: %default') 243 parser.add_option( 244 '-v', '--verbose', action='count', default=0, help='Log') 245 options, args = parser.parse_args() 246 247 if args: 248 parser.error('Unsupported argument %s' % args) 249 logging.basicConfig(level=logging.DEBUG if options.verbose else logging.ERROR) 250 start = parse_time_option(options.start) 251 end = parse_time_option(options.end) 252 print('From %s (%d) to %s (%d)' % ( 253 start, int((start- _EPOCH).total_seconds()), 254 end, int((end - _EPOCH).total_seconds()))) 255 if options.swarming: 256 if options.show_dimensions: 257 data = fetch_tasks(options.swarming, start, end, options.state, 258 options.tags, options.parallel) 259 else: 260 data = fetch_counts(options.swarming, start, end, options.state, 261 options.tags, options.parallel) 262 with open(options.json, 'wb') as f: 263 json.dump(data, f) 264 elif not os.path.isfile(options.json): 265 parser.error('--swarming is required.') 266 else: 267 with open(options.json, 'rb') as f: 268 data = json.load(f) 269 270 print('') 271 if options.show_dimensions: 272 present_dimensions(data, options.daily_count) 273 else: 274 present_counts(data, options.daily_count) 275 return 0 276 277 278if __name__ == '__main__': 279 sys.exit(main()) 280