1#!/usr/bin/env python
2# Copyright 2017 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Custom swarming triggering script.
6
7This script does custom swarming triggering logic, to enable device affinity
8for our bots, while lumping all trigger calls under one logical step.
9
10For the perf use case of device affinity, this script now enables soft device
11affinity.  This means that it tries to smartly allocate jobs to bots based
12on what is currently alive and what bot the task was last triggered on,
13preferring that last triggered bot if available.  If the
14--multiple-trigger-configs flag is specified than this script overrides
15the soft device affinity functionality in favor of the provided ids.
16
17The algorithm is roughly the following:
18
19Find eligible bots, healthy or not.
20  * Query swarming for eligible bots based on the dimensions passed in
21    on the swarming call.  Determine their health status based on
22    is not quarantied and is not is_dead
23
24Of the eligible bots determine what bot id to run the shard on.
25(Implementation in _select_config_indices_with_soft_affinity)
26  * First query swarming for the last task that ran that shard with
27    given dimensions.  Assuming they are returned with most recent first.
28  * Check if the bot id that ran that task is alive, if so trigger
29    on that bot again.
30  * If that bot isn't alive, allocate to another alive bot or if no
31    other alive bots exist, trigger on the same dead one.
32
33Scripts inheriting must have roughly the same command line interface as
34swarming.py trigger. It modifies it in the following ways:
35
36 * Intercepts the dump-json argument, and creates its own by combining the
37   results from each trigger call.
38 * Intercepts the dimensions from the swarming call and determines what bots
39   are healthy based on the above device affinity algorithm, and triggers
40 * Adds a tag to the swarming trigger job with the shard so we know the last
41   bot that ran this shard.
42
43This script is normally called from the swarming recipe module in tools/build.
44
45"""
46
47from __future__ import print_function
48
49import argparse
50import copy
51import json
52import os
53import subprocess
54import sys
55import tempfile
56import urllib
57import logging
58
59import base_test_triggerer
60
61class Bot(object):
62  """Eligible bots to run the task."""
63  def __init__(self, bot_id, is_alive):
64    self._bot_id = bot_id
65    self._is_alive = is_alive
66
67  def id(self):
68    return self._bot_id
69
70  def is_alive(self):
71    return self._is_alive
72
73  def as_json_config(self):
74    return {'id': self._bot_id}
75
76class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer):
77  def __init__(self, args, swarming_args):
78    super(PerfDeviceTriggerer, self).__init__()
79    if not args.multiple_trigger_configs:
80      # Represents the list of current dimensions requested
81      # by the parent swarming job.
82      self._dimensions = self._get_swarming_dimensions(swarming_args)
83
84      # Store what swarming server we need and whether or not we need
85      # to send down authentication with it
86      self._swarming_server = self._get_swarming_server(swarming_args)
87      self._service_account = self._get_service_account(swarming_args)
88
89      # Map of all existing bots in swarming that satisfy the current
90      # set of dimensions indexed by bot id.
91      # Note: this assumes perf bot dimensions are unique between
92      # configurations.
93      self._eligible_bots_by_ids = (
94          self._query_swarming_for_eligible_bot_configs(self._dimensions))
95
96  def append_additional_args(self, args, shard_index):
97    # Append a tag to the swarming task with the shard number
98    # so we can query for the last bot that ran a specific shard.
99    tag = 'shard:%d' % shard_index
100    shard_tag = ['--tag', tag]
101    # Need to append this before the dash if present so it gets fed to
102    # the swarming task itself.
103    if '--' in args:
104      dash_ind = args.index('--')
105      return args[:dash_ind] + shard_tag + args[dash_ind:]
106    else:
107      return args + shard_tag
108
109  def parse_bot_configs(self, args):
110    if args.multiple_trigger_configs:
111      super(PerfDeviceTriggerer, self).parse_bot_configs(args)
112    else:
113      self._bot_configs = []
114      # For each eligible bot, append the dimension
115      # to the eligible bot_configs
116      for  _, bot in self._eligible_bots_by_ids.iteritems():
117        self._bot_configs.append(bot.as_json_config())
118
119  def select_config_indices(self, args, verbose):
120    if args.multiple_trigger_configs:
121      configs = []
122      # If specific bot ids were passed in, we want to trigger a job for
123      # every valid config regardless of health status since
124      # each config represents exactly one bot in the perf swarming pool.
125      for index in range(len(self.indices_to_trigger(args))):
126        configs.append((index, index))
127    return self._select_config_indices_with_soft_affinity(args, verbose)
128
129  def _select_config_indices_with_soft_affinity(self, args, verbose):
130    trigger_count = len(self.indices_to_trigger(args))
131    # First make sure the number of shards doesn't exceed the
132    # number of eligible bots.  This means there is a config error somewhere.
133    if trigger_count > len(self._eligible_bots_by_ids):
134      if verbose:
135        self._print_device_affinity_info({}, {},
136          self._eligible_bots_by_ids, trigger_count)
137      raise ValueError('Not enough available machines exist in swarming '
138                       'pool.  Shards requested (%d) exceeds available bots '
139                       '(%d).' % (
140                           trigger_count, len(self._eligible_bots_by_ids)))
141
142    shard_to_bot_assignment_map = {}
143    unallocated_bots_by_ids = copy.deepcopy(self._eligible_bots_by_ids)
144    for shard_index in self.indices_to_trigger(args):
145      bot_id = self._query_swarming_for_last_shard_id(shard_index)
146      if bot_id and bot_id in unallocated_bots_by_ids:
147        bot = unallocated_bots_by_ids[bot_id]
148        shard_to_bot_assignment_map[shard_index] = bot
149        unallocated_bots_by_ids.pop(bot_id)
150      else:
151        shard_to_bot_assignment_map[shard_index] = None
152
153    # Maintain the current map for debugging purposes
154    existing_shard_bot_to_shard_map = copy.deepcopy(shard_to_bot_assignment_map)
155    # Now create sets of remaining healthy and bad bots
156    unallocated_healthy_bots = {
157        b for b in unallocated_bots_by_ids.values() if b.is_alive()}
158    unallocated_bad_bots = {
159        b for b in unallocated_bots_by_ids.values() if not b.is_alive()}
160
161    # Try assigning healthy bots for new shards first.
162    for shard_index, bot in sorted(shard_to_bot_assignment_map.iteritems()):
163      if not bot and unallocated_healthy_bots:
164        shard_to_bot_assignment_map[shard_index] = \
165            unallocated_healthy_bots.pop()
166        if verbose:
167          print('First time shard %d has been triggered' % shard_index)
168      elif not bot:
169        shard_to_bot_assignment_map[shard_index] = unallocated_bad_bots.pop()
170
171    # Handle the rest of shards that were assigned dead bots:
172    for shard_index, bot in sorted(shard_to_bot_assignment_map.iteritems()):
173      if not bot.is_alive() and unallocated_healthy_bots:
174        dead_bot = bot
175        healthy_bot = unallocated_healthy_bots.pop()
176        shard_to_bot_assignment_map[shard_index] = healthy_bot
177        if verbose:
178          print('Device affinity broken for shard #%d. bot %s is dead, new '
179                'mapping to bot %s' % (
180                    shard_index, dead_bot.id(), healthy_bot.id()))
181
182    # Now populate the indices into the bot_configs array
183    selected_configs = []
184    for shard_index in self.indices_to_trigger(args):
185      selected_configs.append((shard_index, self._find_bot_config_index(
186          shard_to_bot_assignment_map[shard_index].id())))
187    if verbose:
188      self._print_device_affinity_info(
189        shard_to_bot_assignment_map,
190        existing_shard_bot_to_shard_map,
191        self._eligible_bots_by_ids, trigger_count)
192    return selected_configs
193
194
195  def _print_device_affinity_info(
196      self, new_map, existing_map, health_map, num_shards):
197    print()
198    for shard_index in xrange(num_shards):
199      existing = existing_map.get(shard_index, None)
200      new = new_map.get(shard_index, None)
201      existing_id = ''
202      if existing:
203        existing_id = existing.id()
204      new_id = ''
205      if new:
206        new_id = new.id()
207      print('Shard %d\n\tprevious: %s\n\tnew: %s' % (
208          shard_index, existing_id, new_id))
209
210    healthy_bots = []
211    dead_bots = []
212    for _, b in health_map.iteritems():
213      if b.is_alive():
214        healthy_bots.append(b.id())
215      else:
216        dead_bots.append(b.id())
217    print('Shards needed: %d' % num_shards)
218    print('Total bots (dead + healthy): %d' % (
219        len(dead_bots) + len(healthy_bots)))
220    print('Healthy bots, %d: %s' % (len(healthy_bots), healthy_bots))
221    print('Dead Bots, %d: %s' % (len(dead_bots), dead_bots))
222    print()
223
224
225  def _query_swarming_for_eligible_bot_configs(self, dimensions):
226    """Query Swarming to figure out which bots are available.
227
228      Returns: a dictionary in which the keys are the bot id and
229      the values are Bot object that indicate the health status
230      of the bots.
231    """
232    values = []
233    for key, value in sorted(dimensions.iteritems()):
234      values.append(('dimensions', '%s:%s' % (key, value)))
235
236    query_result = self.query_swarming(
237        'bots/list', values, True, server=self._swarming_server,
238        service_account=self._service_account)
239    if 'items' not in query_result:
240      return {}
241    perf_bots = {}
242    for bot in query_result['items']:
243      alive = (not bot['is_dead'] and not bot['quarantined'])
244      perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive)
245    return perf_bots
246
247  def _find_bot_config_index(self, bot_id):
248    # Find the index into the bot_config map that
249    # maps to the bot id in question
250    for i, dimensions in enumerate(self._bot_configs):
251      if dimensions['id'] == bot_id:
252        return i
253    return None
254
255  def _query_swarming_for_last_shard_id(self, shard_index):
256    # Per shard, query swarming for the last bot that ran the task
257    # Example: swarming.py query -S server-url.com --limit 1 \\
258    #  'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12'
259    values = [
260      ('tags', '%s:%s' % (k, v)) for k, v in self._dimensions.iteritems()
261    ]
262    # Append the shard as a tag
263    values.append(('tags', '%s:%s' % ('shard', str(shard_index))))
264    values.sort()
265    # TODO(eyaich): For now we are ignoring the state of the returned
266    # task (ie completed, timed_out, bot_died, etc) as we are just
267    # answering the question "What bot did we last trigger this shard on?"
268    # Evaluate if this is the right decision going forward.
269
270    # Query for the last task that ran with these dimensions and this shard
271    query_result = self.query_swarming(
272          'tasks/list', values, True, limit='1', server=self._swarming_server,
273         service_account=self._service_account)
274    tasks = query_result.get('items')
275    if tasks:
276      # We queried with a limit of 1 so we could only get back
277      # the most recent which is what we care about.
278      task = tasks[0]
279      if 'bot_id' in task:
280        return task['bot_id']
281      for tag in task['tags']:
282        if tag.startswith('id:'):
283          return tag[len('id:'):]
284    # No eligible shard for this bot
285    return None
286
287  def _get_swarming_dimensions(self, args):
288    dimensions = {}
289    for i in xrange(len(args) - 2):
290      if args[i] == '--dimension':
291        dimensions[args[i+1]] = args[i+2]
292    return dimensions
293
294  def _get_swarming_server(self, args):
295    for i in xrange(len(args)):
296      if '--swarming' in args[i]:
297        server = args[i+1]
298        slashes_index = server.index('//') + 2
299        # Strip out the protocol
300        return server[slashes_index:]
301
302  def _get_service_account(self, args):
303    for i in xrange(len(args) - 1):
304      if '--auth-service-account-json' in args[i]:
305        return args[i+1]
306
307def main():
308  logging.basicConfig(
309      level=logging.INFO,
310      format='(%(levelname)s) %(asctime)s pid=%(process)d'
311             '  %(module)s.%(funcName)s:%(lineno)d  %(message)s')
312  # Setup args for common contract of base class
313  parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract(
314      argparse.ArgumentParser(description=__doc__))
315  args, remaining = parser.parse_known_args()
316
317  triggerer = PerfDeviceTriggerer(args, remaining)
318  return triggerer.trigger_tasks(args, remaining)
319
320if __name__ == '__main__':
321  sys.exit(main())
322
323