1""" 2Driver of the test execution framework. 3""" 4 5from __future__ import absolute_import 6 7import threading 8 9from . import fixtures 10from . import hooks as _hooks 11from . import job as _job 12from . import report as _report 13from . import testcases 14from .. import config as _config 15from .. import errors 16from .. import logging 17from .. import utils 18from ..utils import queue as _queue 19 20 21class TestGroupExecutor(object): 22 """ 23 Executes a test group. 24 25 Responsible for setting up and tearing down the fixtures that the 26 tests execute against. 27 """ 28 29 _TIMEOUT = 24 * 60 * 60 # =1 day (a long time to have tests run) 30 31 def __init__(self, 32 exec_logger, 33 test_group, 34 logging_config, 35 config=None, 36 fixture=None, 37 hooks=None): 38 """ 39 Initializes the TestGroupExecutor with the test group to run. 40 """ 41 42 # Build a logger for executing this group of tests. 43 logger_name = "%s:%s" % (exec_logger.name, test_group.test_kind) 44 self.logger = logging.loggers.new_logger(logger_name, parent=exec_logger) 45 46 self.logging_config = logging_config 47 self.fixture_config = fixture 48 self.hooks_config = utils.default_if_none(hooks, []) 49 self.test_config = utils.default_if_none(config, {}) 50 51 self._test_group = test_group 52 53 self._using_buildlogger = logging.config.using_buildlogger(logging_config) 54 self._build_config = None 55 56 if self._using_buildlogger: 57 self._build_config = logging.buildlogger.get_config() 58 59 # Must be done after getting buildlogger configuration. 60 self._jobs = [self._make_job(job_num) for job_num in xrange(_config.JOBS)] 61 62 def run(self): 63 """ 64 Executes the test group. 65 66 Any exceptions that occur during setting up or tearing down a 67 fixture are propagated. 68 """ 69 70 self.logger.info("Starting execution of %ss...", self._test_group.test_kind) 71 72 return_code = 0 73 try: 74 if not self._setup_fixtures(): 75 return_code = 2 76 return 77 78 num_repeats = _config.REPEAT 79 while num_repeats > 0: 80 test_queue = self._make_test_queue() 81 self._test_group.record_start() 82 (report, interrupted) = self._run_tests(test_queue) 83 self._test_group.record_end(report) 84 85 # If the user triggered a KeyboardInterrupt, then we should stop. 86 if interrupted: 87 raise errors.UserInterrupt("Received interrupt from user") 88 89 sb = [] # String builder. 90 self._test_group.summarize_latest(sb) 91 self.logger.info("Summary: %s", "\n ".join(sb)) 92 93 if not report.wasSuccessful(): 94 return_code = 1 95 if _config.FAIL_FAST: 96 break 97 98 # Clear the report so it can be reused for the next execution. 99 for job in self._jobs: 100 job.report.reset() 101 num_repeats -= 1 102 finally: 103 if not self._teardown_fixtures(): 104 return_code = 2 105 self._test_group.return_code = return_code 106 107 def _setup_fixtures(self): 108 """ 109 Sets up a fixture for each job. 110 """ 111 112 for job in self._jobs: 113 try: 114 job.fixture.setup() 115 except: 116 self.logger.exception("Encountered an error while setting up %s.", job.fixture) 117 return False 118 119 # Once they have all been started, wait for them to become available. 120 for job in self._jobs: 121 try: 122 job.fixture.await_ready() 123 except: 124 self.logger.exception("Encountered an error while waiting for %s to be ready", 125 job.fixture) 126 return False 127 128 return True 129 130 def _run_tests(self, test_queue): 131 """ 132 Starts a thread for each Job instance and blocks until all of 133 the tests are run. 134 135 Returns a (combined report, user interrupted) pair, where the 136 report contains the status and timing information of tests run 137 by all of the threads. 138 """ 139 140 threads = [] 141 interrupt_flag = threading.Event() 142 user_interrupted = False 143 try: 144 # Run each Job instance in its own thread. 145 for job in self._jobs: 146 t = threading.Thread(target=job, args=(test_queue, interrupt_flag)) 147 # Do not wait for tests to finish executing if interrupted by the user. 148 t.daemon = True 149 t.start() 150 threads.append(t) 151 152 joined = False 153 while not joined: 154 # Need to pass a timeout to join() so that KeyboardInterrupt exceptions 155 # are propagated. 156 joined = test_queue.join(TestGroupExecutor._TIMEOUT) 157 except (KeyboardInterrupt, SystemExit): 158 interrupt_flag.set() 159 user_interrupted = True 160 else: 161 # Only wait for all the Job instances if not interrupted by the user. 162 for t in threads: 163 t.join() 164 165 reports = [job.report for job in self._jobs] 166 combined_report = _report.TestReport.combine(*reports) 167 168 # We cannot return 'interrupt_flag.is_set()' because the interrupt flag can be set by a Job 169 # instance if a test fails and it decides to drain the queue. We only want to raise a 170 # StopExecution exception in TestGroupExecutor.run() if the user triggered the interrupt. 171 return (combined_report, user_interrupted) 172 173 def _teardown_fixtures(self): 174 """ 175 Tears down all of the fixtures. 176 177 Returns true if all fixtures were torn down successfully, and 178 false otherwise. 179 """ 180 181 success = True 182 for job in self._jobs: 183 try: 184 if not job.fixture.teardown(): 185 self.logger.warn("Teardown of %s was not successful.", job.fixture) 186 success = False 187 except: 188 self.logger.exception("Encountered an error while tearing down %s.", job.fixture) 189 success = False 190 191 return success 192 193 def _get_build_id(self, job_num): 194 """ 195 Returns a unique build id for a job. 196 """ 197 198 build_config = self._build_config 199 200 if self._using_buildlogger: 201 # Use a distinct "builder" for each job in order to separate their logs. 202 if build_config is not None and "builder" in build_config: 203 build_config = build_config.copy() 204 build_config["builder"] = "%s_job%d" % (build_config["builder"], job_num) 205 206 build_id = logging.buildlogger.new_build_id(build_config) 207 208 if build_config is None or build_id is None: 209 self.logger.info("Encountered an error configuring buildlogger for job #%d, falling" 210 " back to stderr.", job_num) 211 212 return build_id, build_config 213 214 return None, build_config 215 216 def _make_fixture(self, job_num, build_id, build_config): 217 """ 218 Creates a fixture for a job. 219 """ 220 221 fixture_config = {} 222 fixture_class = fixtures.NOOP_FIXTURE_CLASS 223 224 if self.fixture_config is not None: 225 fixture_config = self.fixture_config.copy() 226 fixture_class = fixture_config.pop("class") 227 228 logger_name = "%s:job%d" % (fixture_class, job_num) 229 logger = logging.loggers.new_logger(logger_name, parent=logging.loggers.FIXTURE) 230 logging.config.apply_buildlogger_global_handler(logger, 231 self.logging_config, 232 build_id=build_id, 233 build_config=build_config) 234 235 return fixtures.make_fixture(fixture_class, logger, job_num, **fixture_config) 236 237 def _make_hooks(self, job_num, fixture): 238 """ 239 Creates the custom behaviors for the job's fixture. 240 """ 241 242 behaviors = [] 243 244 for behavior_config in self.hooks_config: 245 behavior_config = behavior_config.copy() 246 behavior_class = behavior_config.pop("class") 247 248 logger_name = "%s:job%d" % (behavior_class, job_num) 249 logger = logging.loggers.new_logger(logger_name, parent=self.logger) 250 behavior = _hooks.make_custom_behavior(behavior_class, 251 logger, 252 fixture, 253 **behavior_config) 254 behaviors.append(behavior) 255 256 return behaviors 257 258 def _make_job(self, job_num): 259 """ 260 Returns a Job instance with its own fixture, hooks, and test 261 report. 262 """ 263 264 build_id, build_config = self._get_build_id(job_num) 265 fixture = self._make_fixture(job_num, build_id, build_config) 266 hooks = self._make_hooks(job_num, fixture) 267 268 logger_name = "%s:job%d" % (self.logger.name, job_num) 269 logger = logging.loggers.new_logger(logger_name, parent=self.logger) 270 271 if build_id is not None: 272 endpoint = logging.buildlogger.APPEND_GLOBAL_LOGS_ENDPOINT % {"build_id": build_id} 273 url = "%s/%s/" % (_config.BUILDLOGGER_URL.rstrip("/"), endpoint.strip("/")) 274 logger.info("Writing output of job #%d to %s.", job_num, url) 275 276 report = _report.TestReport(logger, 277 self.logging_config, 278 build_id=build_id, 279 build_config=build_config) 280 281 return _job.Job(logger, fixture, hooks, report) 282 283 def _make_test_queue(self): 284 """ 285 Returns a queue of TestCase instances. 286 287 Use a multi-consumer queue instead of a unittest.TestSuite so 288 that the test cases can be dispatched to multiple threads. 289 """ 290 291 test_kind_logger = logging.loggers.new_logger(self._test_group.test_kind, 292 parent=logging.loggers.TESTS) 293 294 # Put all the test cases in a queue. 295 queue = _queue.Queue() 296 for test_name in self._test_group.tests: 297 test_case = testcases.make_test_case(self._test_group.test_kind, 298 test_kind_logger, 299 test_name, 300 **self.test_config) 301 queue.put(test_case) 302 303 # Add sentinel value for each job to indicate when there are no more items to process. 304 for _ in xrange(_config.JOBS): 305 queue.put(None) 306 307 return queue 308