1#!/usr/bin/env python 2# Copyright 2017 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Custom swarming triggering script. 6 7This script does custom swarming triggering logic, to enable device affinity 8for our bots, while lumping all trigger calls under one logical step. 9 10For the perf use case of device affinity, this script now enables soft device 11affinity. This means that it tries to smartly allocate jobs to bots based 12on what is currently alive and what bot the task was last triggered on, 13preferring that last triggered bot if available. If the 14--multiple-trigger-configs flag is specified than this script overrides 15the soft device affinity functionality in favor of the provided ids. 16 17The algorithm is roughly the following: 18 19Find eligible bots, healthy or not. 20 * Query swarming for eligible bots based on the dimensions passed in 21 on the swarming call. Determine their health status based on 22 is not quarantied and is not is_dead 23 24Of the eligible bots determine what bot id to run the shard on. 25(Implementation in _select_config_indices_with_soft_affinity) 26 * First query swarming for the last task that ran that shard with 27 given dimensions. Assuming they are returned with most recent first. 28 * Check if the bot id that ran that task is alive, if so trigger 29 on that bot again. 30 * If that bot isn't alive, allocate to another alive bot or if no 31 other alive bots exist, trigger on the same dead one. 32 33Scripts inheriting must have roughly the same command line interface as 34swarming.py trigger. It modifies it in the following ways: 35 36 * Intercepts the dump-json argument, and creates its own by combining the 37 results from each trigger call. 38 * Intercepts the dimensions from the swarming call and determines what bots 39 are healthy based on the above device affinity algorithm, and triggers 40 * Adds a tag to the swarming trigger job with the shard so we know the last 41 bot that ran this shard. 42 43This script is normally called from the swarming recipe module in tools/build. 44 45""" 46 47from __future__ import print_function 48 49import argparse 50import copy 51import json 52import os 53import subprocess 54import sys 55import tempfile 56import urllib 57import logging 58 59import base_test_triggerer 60 61class Bot(object): 62 """Eligible bots to run the task.""" 63 def __init__(self, bot_id, is_alive): 64 self._bot_id = bot_id 65 self._is_alive = is_alive 66 67 def id(self): 68 return self._bot_id 69 70 def is_alive(self): 71 return self._is_alive 72 73 def as_json_config(self): 74 return {'id': self._bot_id} 75 76class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer): 77 def __init__(self, args, swarming_args): 78 super(PerfDeviceTriggerer, self).__init__() 79 if not args.multiple_trigger_configs: 80 # Represents the list of current dimensions requested 81 # by the parent swarming job. 82 self._dimensions = self._get_swarming_dimensions(swarming_args) 83 84 # Store what swarming server we need and whether or not we need 85 # to send down authentication with it 86 self._swarming_server = self._get_swarming_server(swarming_args) 87 self._service_account = self._get_service_account(swarming_args) 88 89 # Map of all existing bots in swarming that satisfy the current 90 # set of dimensions indexed by bot id. 91 # Note: this assumes perf bot dimensions are unique between 92 # configurations. 93 self._eligible_bots_by_ids = ( 94 self._query_swarming_for_eligible_bot_configs(self._dimensions)) 95 96 def append_additional_args(self, args, shard_index): 97 # Append a tag to the swarming task with the shard number 98 # so we can query for the last bot that ran a specific shard. 99 tag = 'shard:%d' % shard_index 100 shard_tag = ['--tag', tag] 101 # Need to append this before the dash if present so it gets fed to 102 # the swarming task itself. 103 if '--' in args: 104 dash_ind = args.index('--') 105 return args[:dash_ind] + shard_tag + args[dash_ind:] 106 else: 107 return args + shard_tag 108 109 def parse_bot_configs(self, args): 110 if args.multiple_trigger_configs: 111 super(PerfDeviceTriggerer, self).parse_bot_configs(args) 112 else: 113 self._bot_configs = [] 114 # For each eligible bot, append the dimension 115 # to the eligible bot_configs 116 for _, bot in self._eligible_bots_by_ids.iteritems(): 117 self._bot_configs.append(bot.as_json_config()) 118 119 def select_config_indices(self, args, verbose): 120 if args.multiple_trigger_configs: 121 configs = [] 122 # If specific bot ids were passed in, we want to trigger a job for 123 # every valid config regardless of health status since 124 # each config represents exactly one bot in the perf swarming pool. 125 for index in range(len(self.indices_to_trigger(args))): 126 configs.append((index, index)) 127 return self._select_config_indices_with_soft_affinity(args, verbose) 128 129 def _select_config_indices_with_soft_affinity(self, args, verbose): 130 trigger_count = len(self.indices_to_trigger(args)) 131 # First make sure the number of shards doesn't exceed the 132 # number of eligible bots. This means there is a config error somewhere. 133 if trigger_count > len(self._eligible_bots_by_ids): 134 if verbose: 135 self._print_device_affinity_info({}, {}, 136 self._eligible_bots_by_ids, trigger_count) 137 raise ValueError('Not enough available machines exist in swarming ' 138 'pool. Shards requested (%d) exceeds available bots ' 139 '(%d).' % ( 140 trigger_count, len(self._eligible_bots_by_ids))) 141 142 shard_to_bot_assignment_map = {} 143 unallocated_bots_by_ids = copy.deepcopy(self._eligible_bots_by_ids) 144 for shard_index in self.indices_to_trigger(args): 145 bot_id = self._query_swarming_for_last_shard_id(shard_index) 146 if bot_id and bot_id in unallocated_bots_by_ids: 147 bot = unallocated_bots_by_ids[bot_id] 148 shard_to_bot_assignment_map[shard_index] = bot 149 unallocated_bots_by_ids.pop(bot_id) 150 else: 151 shard_to_bot_assignment_map[shard_index] = None 152 153 # Maintain the current map for debugging purposes 154 existing_shard_bot_to_shard_map = copy.deepcopy(shard_to_bot_assignment_map) 155 # Now create sets of remaining healthy and bad bots 156 unallocated_healthy_bots = { 157 b for b in unallocated_bots_by_ids.values() if b.is_alive()} 158 unallocated_bad_bots = { 159 b for b in unallocated_bots_by_ids.values() if not b.is_alive()} 160 161 # Try assigning healthy bots for new shards first. 162 for shard_index, bot in sorted(shard_to_bot_assignment_map.iteritems()): 163 if not bot and unallocated_healthy_bots: 164 shard_to_bot_assignment_map[shard_index] = \ 165 unallocated_healthy_bots.pop() 166 if verbose: 167 print('First time shard %d has been triggered' % shard_index) 168 elif not bot: 169 shard_to_bot_assignment_map[shard_index] = unallocated_bad_bots.pop() 170 171 # Handle the rest of shards that were assigned dead bots: 172 for shard_index, bot in sorted(shard_to_bot_assignment_map.iteritems()): 173 if not bot.is_alive() and unallocated_healthy_bots: 174 dead_bot = bot 175 healthy_bot = unallocated_healthy_bots.pop() 176 shard_to_bot_assignment_map[shard_index] = healthy_bot 177 if verbose: 178 print('Device affinity broken for shard #%d. bot %s is dead, new ' 179 'mapping to bot %s' % ( 180 shard_index, dead_bot.id(), healthy_bot.id())) 181 182 # Now populate the indices into the bot_configs array 183 selected_configs = [] 184 for shard_index in self.indices_to_trigger(args): 185 selected_configs.append((shard_index, self._find_bot_config_index( 186 shard_to_bot_assignment_map[shard_index].id()))) 187 if verbose: 188 self._print_device_affinity_info( 189 shard_to_bot_assignment_map, 190 existing_shard_bot_to_shard_map, 191 self._eligible_bots_by_ids, trigger_count) 192 return selected_configs 193 194 195 def _print_device_affinity_info( 196 self, new_map, existing_map, health_map, num_shards): 197 print() 198 for shard_index in xrange(num_shards): 199 existing = existing_map.get(shard_index, None) 200 new = new_map.get(shard_index, None) 201 existing_id = '' 202 if existing: 203 existing_id = existing.id() 204 new_id = '' 205 if new: 206 new_id = new.id() 207 print('Shard %d\n\tprevious: %s\n\tnew: %s' % ( 208 shard_index, existing_id, new_id)) 209 210 healthy_bots = [] 211 dead_bots = [] 212 for _, b in health_map.iteritems(): 213 if b.is_alive(): 214 healthy_bots.append(b.id()) 215 else: 216 dead_bots.append(b.id()) 217 print('Shards needed: %d' % num_shards) 218 print('Total bots (dead + healthy): %d' % ( 219 len(dead_bots) + len(healthy_bots))) 220 print('Healthy bots, %d: %s' % (len(healthy_bots), healthy_bots)) 221 print('Dead Bots, %d: %s' % (len(dead_bots), dead_bots)) 222 print() 223 224 225 def _query_swarming_for_eligible_bot_configs(self, dimensions): 226 """Query Swarming to figure out which bots are available. 227 228 Returns: a dictionary in which the keys are the bot id and 229 the values are Bot object that indicate the health status 230 of the bots. 231 """ 232 values = [] 233 for key, value in sorted(dimensions.iteritems()): 234 values.append(('dimensions', '%s:%s' % (key, value))) 235 236 query_result = self.query_swarming( 237 'bots/list', values, True, server=self._swarming_server, 238 service_account=self._service_account) 239 if 'items' not in query_result: 240 return {} 241 perf_bots = {} 242 for bot in query_result['items']: 243 alive = (not bot['is_dead'] and not bot['quarantined']) 244 perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive) 245 return perf_bots 246 247 def _find_bot_config_index(self, bot_id): 248 # Find the index into the bot_config map that 249 # maps to the bot id in question 250 for i, dimensions in enumerate(self._bot_configs): 251 if dimensions['id'] == bot_id: 252 return i 253 return None 254 255 def _query_swarming_for_last_shard_id(self, shard_index): 256 # Per shard, query swarming for the last bot that ran the task 257 # Example: swarming.py query -S server-url.com --limit 1 \\ 258 # 'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12' 259 values = [ 260 ('tags', '%s:%s' % (k, v)) for k, v in self._dimensions.iteritems() 261 ] 262 # Append the shard as a tag 263 values.append(('tags', '%s:%s' % ('shard', str(shard_index)))) 264 values.sort() 265 # TODO(eyaich): For now we are ignoring the state of the returned 266 # task (ie completed, timed_out, bot_died, etc) as we are just 267 # answering the question "What bot did we last trigger this shard on?" 268 # Evaluate if this is the right decision going forward. 269 270 # Query for the last task that ran with these dimensions and this shard 271 query_result = self.query_swarming( 272 'tasks/list', values, True, limit='1', server=self._swarming_server, 273 service_account=self._service_account) 274 tasks = query_result.get('items') 275 if tasks: 276 # We queried with a limit of 1 so we could only get back 277 # the most recent which is what we care about. 278 task = tasks[0] 279 if 'bot_id' in task: 280 return task['bot_id'] 281 for tag in task['tags']: 282 if tag.startswith('id:'): 283 return tag[len('id:'):] 284 # No eligible shard for this bot 285 return None 286 287 def _get_swarming_dimensions(self, args): 288 dimensions = {} 289 for i in xrange(len(args) - 2): 290 if args[i] == '--dimension': 291 dimensions[args[i+1]] = args[i+2] 292 return dimensions 293 294 def _get_swarming_server(self, args): 295 for i in xrange(len(args)): 296 if '--swarming' in args[i]: 297 server = args[i+1] 298 slashes_index = server.index('//') + 2 299 # Strip out the protocol 300 return server[slashes_index:] 301 302 def _get_service_account(self, args): 303 for i in xrange(len(args) - 1): 304 if '--auth-service-account-json' in args[i]: 305 return args[i+1] 306 307def main(): 308 logging.basicConfig( 309 level=logging.INFO, 310 format='(%(levelname)s) %(asctime)s pid=%(process)d' 311 ' %(module)s.%(funcName)s:%(lineno)d %(message)s') 312 # Setup args for common contract of base class 313 parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract( 314 argparse.ArgumentParser(description=__doc__)) 315 args, remaining = parser.parse_known_args() 316 317 triggerer = PerfDeviceTriggerer(args, remaining) 318 return triggerer.trigger_tasks(args, remaining) 319 320if __name__ == '__main__': 321 sys.exit(main()) 322 323