1#!/usr/bin/env python
2# Copyright 2015 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""Calculate statistics about tasks, counts per day.
7
8Saves the data fetched from the server into a json file to enable reprocessing
9the data without having to always fetch from the server.
10"""
11
12import collections
13import datetime
14import json
15import logging
16import optparse
17import os
18import subprocess
19import sys
20
21CLIENT_DIR = os.path.dirname(
22    os.path.dirname(
23        os.path.abspath(__file__.decode(sys.getfilesystemencoding()))))
24sys.path.insert(0, CLIENT_DIR)
25
26from utils import tools
27tools.force_local_third_party()
28
29# third_party/
30import colorama
31from six.moves import urllib
32
33# pylint: disable=ungrouped-imports
34from utils import graph
35from utils import threading_utils
36
37
38_EPOCH = datetime.datetime.utcfromtimestamp(0)
39
40
41def parse_time_option(value):
42  """Converts time as an option into a datetime.datetime.
43
44  Returns None if not specified.
45  """
46  if not value:
47    return None
48  try:
49    return _EPOCH + datetime.timedelta(seconds=int(value))
50  except ValueError:
51    pass
52  for fmt in ('%Y-%m-%d',):
53    try:
54      return datetime.datetime.strptime(value, fmt)
55    except ValueError:
56      pass
57  raise ValueError('Failed to parse %s' % value)
58
59
60def _get_epoch(t):
61  return int((t - _EPOCH).total_seconds())
62
63
64def _run_json(key, process, cmd):
65  """Runs cmd and calls process with the decoded json."""
66  logging.info('Running %s', ' '.join(cmd))
67  raw = subprocess.check_output(cmd)
68  logging.info('- returned %d bytes', len(raw))
69  return key, process(json.loads(raw))
70
71
72def _get_cmd(swarming, endpoint, start, end, state, tags):
73  """Returns the command line to query this endpoint."""
74  cmd = [
75      sys.executable,
76      os.path.join(CLIENT_DIR, 'swarming.py'),
77      'query',
78      '-S',
79      swarming,
80      '--limit',
81      '0',
82  ]
83  data = [('start', start), ('end', end), ('state', state)]
84  data.extend(('tags', tag) for tag in tags)
85  return cmd + [endpoint + '?' + urllib.parse.urlencode(data)]
86
87
88def _flatten_dimensions(dimensions):
89  items = {i['key']: i['value'] for i in dimensions}
90  return ','.join('%s=%s' % (k, v) for k, v in sorted(items.items()))
91
92
93def fetch_tasks(swarming, start, end, state, tags, parallel):
94  """Fetches the data for each task.
95
96  Fetch per hour because otherwise it's too slow.
97  """
98  def process(data):
99    """Returns the list of flattened dimensions for these tasks."""
100    items = data.get('items', [])
101    logging.info('- processing %d items', len(items))
102    return [_flatten_dimensions(t['properties']['dimensions']) for t in items]
103
104  delta = datetime.timedelta(hours=1)
105  return _fetch_daily_internal(
106      delta, swarming, process, 'tasks/requests', start, end, state, tags,
107      parallel)
108
109
110def fetch_counts(swarming, start, end, state, tags, parallel):
111  """Fetches counts from swarming and returns it."""
112
113  def process(data):
114    return int(data['count'])
115  delta = datetime.timedelta(days=1)
116  return _fetch_daily_internal(delta, swarming, process, 'tasks/count', start,
117                               end, state, tags, parallel)
118
119
120def _fetch_daily_internal(delta, swarming, process, endpoint, start, end, state,
121                          tags, parallel):
122  """Executes 'process' by parallelizing it once per day."""
123  out = {}
124  with threading_utils.ThreadPool(1, parallel, 0) as pool:
125    while start < end:
126      cmd = _get_cmd(swarming, endpoint, _get_epoch(start),
127                     _get_epoch(start + delta), state, tags)
128      pool.add_task(0, _run_json, start.strftime('%Y-%m-%d'), process, cmd)
129      start += delta
130    for k, v in pool.iter_results():
131      sys.stdout.write('.')
132      sys.stdout.flush()
133      out[k] = v
134  print('')
135  return out
136
137
138def present_dimensions(items, daily_count):
139  # Split items per group.
140  per_dimension = collections.defaultdict(lambda: collections.defaultdict(int))
141  for date, dimensions in items.items():
142    for d in dimensions:
143      per_dimension[d][date] += 1
144  for i, (dimension, data) in enumerate(sorted(per_dimension.items())):
145    print(
146        '%s%s%s' % (
147          colorama.Style.BRIGHT + colorama.Fore.MAGENTA,
148          dimension,
149          colorama.Fore.RESET))
150    present_counts(data, daily_count)
151    if i != len(per_dimension) - 1:
152      print('')
153
154
155def present_counts(items, daily_count):
156  months = collections.defaultdict(int)
157  for day, count in sorted(items.items()):
158    month = day.rsplit('-', 1)[0]
159    months[month] += count
160
161  years = collections.defaultdict(int)
162  for month, count in months.items():
163    year = month.rsplit('-', 1)[0]
164    years[year] += count
165  total = sum(months.values())
166  maxlen = len(str(total))
167
168  if daily_count:
169    for day, count in sorted(items.items()):
170      print('%s: %*d' % (day, maxlen, count))
171
172  if len(items) > 1:
173    for month, count in sorted(months.items()):
174      print('%s   : %*d' % (month, maxlen, count))
175  if len(months) > 1:
176    for year, count in sorted(years.items()):
177      print('%s      : %*d' % (year, maxlen, count))
178  if len(years) > 1:
179    print('Total     : %*d' % (maxlen, total))
180  if not daily_count:
181    print('')
182    graph.print_histogram(items)
183
184
185STATES = ('PENDING', 'RUNNING', 'PENDING_RUNNING', 'COMPLETED',
186          'COMPLETED_SUCCESS', 'COMPLETED_FAILURE', 'EXPIRED', 'TIMED_OUT',
187          'BOT_DIED', 'CANCELED', 'ALL', 'DEDUPED')
188
189
190def main():
191  colorama.init()
192  parser = optparse.OptionParser(description=sys.modules['__main__'].__doc__)
193  tomorrow = datetime.datetime.utcnow().date() + datetime.timedelta(days=1)
194  year = datetime.datetime(tomorrow.year, 1, 1)
195  parser.add_option(
196      '-S',
197      '--swarming',
198      metavar='URL',
199      default=os.environ.get('SWARMING_SERVER', ''),
200      help='Swarming server to use')
201
202  group = optparse.OptionGroup(parser, 'Filtering')
203  group.add_option(
204      '--start',
205      default=year.strftime('%Y-%m-%d'),
206      help='Starting date in UTC; defaults to start of year: %default')
207  group.add_option(
208      '--end',
209      default=tomorrow.strftime('%Y-%m-%d'),
210      help='End date in UTC (excluded); defaults to tomorrow: %default')
211  group.add_option(
212      '--state',
213      default='ALL',
214      type='choice',
215      choices=STATES,
216      help='State to filter on. Values are: %s' % ', '.join(STATES))
217  group.add_option(
218      '--tags',
219      action='append',
220      default=[],
221      help='Tags to filter on; use this to filter on dimensions')
222  parser.add_option_group(group)
223
224  group = optparse.OptionGroup(parser, 'Presentation')
225  group.add_option(
226      '--show-dimensions',
227      action='store_true',
228      help='Show the dimensions; slower')
229  group.add_option(
230      '--daily-count',
231      action='store_true',
232      help='Show the daily count in raw number instead of histogram')
233  parser.add_option_group(group)
234
235  parser.add_option(
236      '--json', default='counts.json',
237      help='File containing raw data; default: %default')
238  parser.add_option(
239      '--parallel',
240      default=100,
241      type='int',
242      help='Concurrent queries; default: %default')
243  parser.add_option(
244      '-v', '--verbose', action='count', default=0, help='Log')
245  options, args = parser.parse_args()
246
247  if args:
248    parser.error('Unsupported argument %s' % args)
249  logging.basicConfig(level=logging.DEBUG if options.verbose else logging.ERROR)
250  start = parse_time_option(options.start)
251  end = parse_time_option(options.end)
252  print('From %s (%d) to %s (%d)' % (
253      start, int((start- _EPOCH).total_seconds()),
254      end, int((end - _EPOCH).total_seconds())))
255  if options.swarming:
256    if options.show_dimensions:
257      data = fetch_tasks(options.swarming, start, end, options.state,
258                         options.tags, options.parallel)
259    else:
260      data = fetch_counts(options.swarming, start, end, options.state,
261                          options.tags, options.parallel)
262    with open(options.json, 'wb') as f:
263      json.dump(data, f)
264  elif not os.path.isfile(options.json):
265    parser.error('--swarming is required.')
266  else:
267    with open(options.json, 'rb') as f:
268      data = json.load(f)
269
270  print('')
271  if options.show_dimensions:
272    present_dimensions(data, options.daily_count)
273  else:
274    present_counts(data, options.daily_count)
275  return 0
276
277
278if __name__ == '__main__':
279  sys.exit(main())
280