1""" 2Driver of the test execution framework. 3""" 4 5from __future__ import absolute_import 6 7import threading 8import time 9 10from . import fixtures 11from . import hook_test_archival as archival 12from . import hooks as _hooks 13from . import job as _job 14from . import report as _report 15from . import testcases 16from .. import config as _config 17from .. import errors 18from .. import utils 19from ..core import network 20from ..utils import queue as _queue 21 22 23class TestSuiteExecutor(object): 24 """ 25 Executes a test suite. 26 27 Responsible for setting up and tearing down the fixtures that the 28 tests execute against. 29 """ 30 31 _TIMEOUT = 24 * 60 * 60 # =1 day (a long time to have tests run) 32 33 def __init__(self, 34 exec_logger, 35 suite, 36 config=None, 37 fixture=None, 38 hooks=None, 39 archive_instance=None, 40 archive=None): 41 """ 42 Initializes the TestSuiteExecutor with the test suite to run. 43 """ 44 self.logger = exec_logger 45 46 if _config.SHELL_CONN_STRING is not None: 47 # Specifying the shellConnString command line option should override the fixture 48 # specified in the YAML configuration to be the no-op fixture. 49 self.fixture_config = {"class": fixtures.NOOP_FIXTURE_CLASS} 50 else: 51 self.fixture_config = fixture 52 53 self.hooks_config = utils.default_if_none(hooks, []) 54 self.test_config = utils.default_if_none(config, {}) 55 56 self.archival = None 57 if archive_instance: 58 self.archival = archival.HookTestArchival( 59 suite, self.hooks_config, archive_instance, archive) 60 61 self._suite = suite 62 63 # Only start as many jobs as we need. Note this means that the number of jobs we run may not 64 # actually be _config.JOBS or self._suite.options.num_jobs. 65 jobs_to_start = self._suite.options.num_jobs 66 num_tests = len(suite.tests) 67 68 if num_tests < jobs_to_start: 69 self.logger.info( 70 "Reducing the number of jobs from %d to %d since there are only %d test(s) to run.", 71 self._suite.options.num_jobs, num_tests, num_tests) 72 jobs_to_start = num_tests 73 74 # Must be done after getting buildlogger configuration. 75 self._jobs = [self._make_job(job_num) for job_num in xrange(jobs_to_start)] 76 77 def run(self): 78 """ 79 Executes the test suite. 80 81 Any exceptions that occur during setting up or tearing down a 82 fixture are propagated. 83 """ 84 85 self.logger.info("Starting execution of %ss...", self._suite.test_kind) 86 87 return_code = 0 88 teardown_flag = None 89 try: 90 if not self._setup_fixtures(): 91 return_code = 2 92 return 93 94 num_repeats = self._suite.options.num_repeats 95 while num_repeats > 0: 96 test_queue = self._make_test_queue() 97 98 partial_reports = [job.report for job in self._jobs] 99 self._suite.record_test_start(partial_reports) 100 101 # Have the Job threads destroy their fixture during the final repetition after they 102 # finish running their last test. This avoids having a large number of processes 103 # still running if an Evergreen task were to time out from a hang/deadlock being 104 # triggered. 105 teardown_flag = threading.Event() if num_repeats == 1 else None 106 (report, interrupted) = self._run_tests(test_queue, teardown_flag) 107 108 self._suite.record_test_end(report) 109 110 # If the user triggered a KeyboardInterrupt, then we should stop. 111 if interrupted: 112 raise errors.UserInterrupt("Received interrupt from user") 113 114 if teardown_flag and teardown_flag.is_set(): 115 return_code = 2 116 117 sb = [] # String builder. 118 self._suite.summarize_latest(sb) 119 self.logger.info("Summary: %s", "\n ".join(sb)) 120 121 if not report.wasSuccessful(): 122 return_code = 1 123 if self._suite.options.fail_fast: 124 break 125 126 # Clear the report so it can be reused for the next execution. 127 for job in self._jobs: 128 job.report.reset() 129 num_repeats -= 1 130 finally: 131 if not teardown_flag: 132 if not self._teardown_fixtures(): 133 return_code = 2 134 self._suite.return_code = return_code 135 136 def _setup_fixtures(self): 137 """ 138 Sets up a fixture for each job. 139 """ 140 141 # We reset the internal state of the PortAllocator before calling job.fixture.setup() so 142 # that ports used by the fixture during a test suite run earlier can be reused during this 143 # current test suite. 144 network.PortAllocator.reset() 145 146 for job in self._jobs: 147 try: 148 job.fixture.setup() 149 except: 150 self.logger.exception( 151 "Encountered an error while setting up %s.", job.fixture) 152 return False 153 154 # Once they have all been started, wait for them to become available. 155 for job in self._jobs: 156 try: 157 job.fixture.await_ready() 158 except: 159 self.logger.exception( 160 "Encountered an error while waiting for %s to be ready", job.fixture) 161 return False 162 return True 163 164 def _run_tests(self, test_queue, teardown_flag): 165 """ 166 Starts a thread for each Job instance and blocks until all of 167 the tests are run. 168 169 Returns a (combined report, user interrupted) pair, where the 170 report contains the status and timing information of tests run 171 by all of the threads. 172 """ 173 174 threads = [] 175 interrupt_flag = threading.Event() 176 user_interrupted = False 177 try: 178 # Run each Job instance in its own thread. 179 for job in self._jobs: 180 t = threading.Thread(target=job, 181 args=(test_queue, interrupt_flag), 182 kwargs=dict(teardown_flag=teardown_flag)) 183 # Do not wait for tests to finish executing if interrupted by the user. 184 t.daemon = True 185 t.start() 186 threads.append(t) 187 # SERVER-24729 Need to stagger when jobs start to reduce I/O load if there 188 # are many of them. Both the 5 and the 10 are arbitrary. 189 # Currently only enabled on Evergreen. 190 if _config.STAGGER_JOBS and len(threads) >= 5: 191 time.sleep(10) 192 193 joined = False 194 while not joined: 195 # Need to pass a timeout to join() so that KeyboardInterrupt exceptions 196 # are propagated. 197 joined = test_queue.join(TestSuiteExecutor._TIMEOUT) 198 except (KeyboardInterrupt, SystemExit): 199 interrupt_flag.set() 200 user_interrupted = True 201 else: 202 # Only wait for all the Job instances if not interrupted by the user. 203 for t in threads: 204 t.join() 205 206 reports = [job.report for job in self._jobs] 207 combined_report = _report.TestReport.combine(*reports) 208 209 # We cannot return 'interrupt_flag.is_set()' because the interrupt flag can be set by a Job 210 # instance if a test fails and it decides to drain the queue. We only want to raise a 211 # StopExecution exception in TestSuiteExecutor.run() if the user triggered the interrupt. 212 return (combined_report, user_interrupted) 213 214 def _teardown_fixtures(self): 215 """ 216 Tears down all of the fixtures. 217 218 Returns true if all fixtures were torn down successfully, and 219 false otherwise. 220 """ 221 success = True 222 for job in self._jobs: 223 try: 224 if not job.fixture.teardown(finished=True): 225 self.logger.warn("Teardown of %s was not successful.", job.fixture) 226 success = False 227 except: 228 self.logger.exception("Encountered an error while tearing down %s.", job.fixture) 229 success = False 230 return success 231 232 def _make_fixture(self, job_num, job_logger): 233 """ 234 Creates a fixture for a job. 235 """ 236 237 fixture_config = {} 238 fixture_class = fixtures.NOOP_FIXTURE_CLASS 239 240 if self.fixture_config is not None: 241 fixture_config = self.fixture_config.copy() 242 fixture_class = fixture_config.pop("class") 243 244 fixture_logger = job_logger.new_fixture_logger(fixture_class) 245 246 return fixtures.make_fixture(fixture_class, fixture_logger, job_num, **fixture_config) 247 248 def _make_hooks(self, job_num, fixture): 249 """ 250 Creates the custom behaviors for the job's fixture. 251 """ 252 253 behaviors = [] 254 255 for behavior_config in self.hooks_config: 256 behavior_config = behavior_config.copy() 257 behavior_class = behavior_config.pop("class") 258 259 hook_logger = self.logger.new_hook_logger(behavior_class, fixture.logger) 260 behavior = _hooks.make_custom_behavior(behavior_class, 261 hook_logger, 262 fixture, 263 **behavior_config) 264 behaviors.append(behavior) 265 266 return behaviors 267 268 def _make_job(self, job_num): 269 """ 270 Returns a Job instance with its own fixture, hooks, and test 271 report. 272 """ 273 job_logger = self.logger.new_job_logger(self._suite.test_kind, job_num) 274 275 fixture = self._make_fixture(job_num, job_logger) 276 hooks = self._make_hooks(job_num, fixture) 277 278 report = _report.TestReport(job_logger, self._suite.options) 279 280 return _job.Job(job_logger, fixture, hooks, report, self.archival, self._suite.options) 281 282 def _make_test_queue(self): 283 """ 284 Returns a queue of TestCase instances. 285 286 Use a multi-consumer queue instead of a unittest.TestSuite so 287 that the test cases can be dispatched to multiple threads. 288 """ 289 290 test_queue_logger = self.logger.new_testqueue_logger(self._suite.test_kind) 291 # Put all the test cases in a queue. 292 queue = _queue.Queue() 293 for test_name in self._suite.tests: 294 test_case = testcases.make_test_case(self._suite.test_kind, 295 test_queue_logger, 296 test_name, 297 **self.test_config) 298 queue.put(test_case) 299 300 # Add sentinel value for each job to indicate when there are no more items to process. 301 for _ in xrange(len(self._jobs)): 302 queue.put(None) 303 304 return queue 305