1""" 2Enables supports for running tests simultaneously by processing them 3from a multi-consumer queue. 4""" 5 6from __future__ import absolute_import 7 8import sys 9 10from .. import config 11from .. import errors 12from ..testing.hooks import stepdown 13from ..utils import queue as _queue 14 15 16class Job(object): 17 """ 18 Runs tests from a queue. 19 """ 20 21 def __init__(self, logger, fixture, hooks, report, archival, suite_options): 22 """ 23 Initializes the job with the specified fixture and custom 24 behaviors. 25 """ 26 27 self.logger = logger 28 self.fixture = fixture 29 self.hooks = hooks 30 self.report = report 31 self.archival = archival 32 self.suite_options = suite_options 33 34 # Don't check fixture.is_running() when using the ContinuousStepdown hook, which kills 35 # and restarts the primary. Even if the fixture is still running as expected, there is a 36 # race where fixture.is_running() could fail if called after the primary was killed but 37 # before it was restarted. 38 self._check_if_fixture_running = not any( 39 isinstance(hook, stepdown.ContinuousStepdown) for hook in self.hooks) 40 41 def __call__(self, queue, interrupt_flag, teardown_flag=None): 42 """ 43 Continuously executes tests from 'queue' and records their 44 details in 'report'. 45 46 If 'teardown_flag' is not None, then 'self.fixture.teardown()' 47 will be called before this method returns. If an error occurs 48 while destroying the fixture, then the 'teardown_flag' will be 49 set. 50 """ 51 52 should_stop = False 53 try: 54 self._run(queue, interrupt_flag) 55 except errors.StopExecution as err: 56 # Stop running tests immediately. 57 self.logger.error("Received a StopExecution exception: %s.", err) 58 should_stop = True 59 except: 60 # Unknown error, stop execution. 61 self.logger.exception("Encountered an error during test execution.") 62 should_stop = True 63 64 if should_stop: 65 # Set the interrupt flag so that other jobs do not start running more tests. 66 interrupt_flag.set() 67 # Drain the queue to unblock the main thread. 68 Job._drain_queue(queue) 69 70 if teardown_flag is not None: 71 try: 72 if not self.fixture.teardown(finished=True): 73 self.logger.warn("Teardown of %s was not successful.", self.fixture) 74 teardown_flag.set() 75 except: 76 self.logger.exception("Encountered an error while tearing down %s.", self.fixture) 77 teardown_flag.set() 78 79 def _run(self, queue, interrupt_flag): 80 """ 81 Calls the before/after suite hooks and continuously executes 82 tests from 'queue'. 83 """ 84 85 for hook in self.hooks: 86 hook.before_suite(self.report) 87 88 while not interrupt_flag.is_set(): 89 test = queue.get_nowait() 90 try: 91 if test is None: 92 # Sentinel value received, so exit. 93 break 94 self._execute_test(test) 95 finally: 96 queue.task_done() 97 98 for hook in self.hooks: 99 hook.after_suite(self.report) 100 101 def _execute_test(self, test): 102 """ 103 Calls the before/after test hooks and executes 'test'. 104 """ 105 106 test.configure(self.fixture, config.NUM_CLIENTS_PER_FIXTURE) 107 self._run_hooks_before_tests(test) 108 109 test(self.report) 110 try: 111 if self.suite_options.fail_fast and not self.report.wasSuccessful(): 112 self.logger.info("%s failed, so stopping..." % (test.shortDescription())) 113 raise errors.StopExecution("%s failed" % (test.shortDescription())) 114 115 if self._check_if_fixture_running and not self.fixture.is_running(): 116 self.logger.error( 117 "%s marked as a failure because the fixture crashed during the test.", 118 test.shortDescription()) 119 self.report.setFailure(test, return_code=2) 120 # Always fail fast if the fixture fails. 121 raise errors.StopExecution("%s not running after %s" % 122 (self.fixture, test.shortDescription())) 123 124 finally: 125 success = self.report.find_test_info(test).status == "pass" 126 if self.archival: 127 self.archival.archive(self.logger, test, success) 128 129 self._run_hooks_after_tests(test) 130 131 def _run_hook(self, hook, hook_function, test): 132 """ Helper to run hook and archival. """ 133 try: 134 success = False 135 hook_function(test, self.report) 136 success = True 137 finally: 138 if self.archival: 139 self.archival.archive(self.logger, test, success, hook=hook) 140 141 def _run_hooks_before_tests(self, test): 142 """ 143 Runs the before_test method on each of the hooks. 144 145 Swallows any TestFailure exceptions if set to continue on 146 failure, and reraises any other exceptions. 147 """ 148 149 try: 150 for hook in self.hooks: 151 self._run_hook(hook, hook.before_test, test) 152 153 except errors.StopExecution: 154 raise 155 156 except errors.ServerFailure: 157 self.logger.exception("%s marked as a failure by a hook's before_test.", 158 test.shortDescription()) 159 self._fail_test(test, sys.exc_info(), return_code=2) 160 raise errors.StopExecution("A hook's before_test failed") 161 162 except errors.TestFailure: 163 self.logger.exception("%s marked as a failure by a hook's before_test.", 164 test.shortDescription()) 165 self._fail_test(test, sys.exc_info(), return_code=1) 166 if self.suite_options.fail_fast: 167 raise errors.StopExecution("A hook's before_test failed") 168 169 except: 170 # Record the before_test() error in 'self.report'. 171 self.report.startTest(test) 172 self.report.addError(test, sys.exc_info()) 173 self.report.stopTest(test) 174 raise 175 176 def _run_hooks_after_tests(self, test): 177 """ 178 Runs the after_test method on each of the hooks. 179 180 Swallows any TestFailure exceptions if set to continue on 181 failure, and reraises any other exceptions. 182 """ 183 try: 184 for hook in self.hooks: 185 self._run_hook(hook, hook.after_test, test) 186 187 except errors.StopExecution: 188 raise 189 190 except errors.ServerFailure: 191 self.logger.exception("%s marked as a failure by a hook's after_test.", 192 test.shortDescription()) 193 self.report.setFailure(test, return_code=2) 194 raise errors.StopExecution("A hook's after_test failed") 195 196 except errors.TestFailure: 197 self.logger.exception("%s marked as a failure by a hook's after_test.", 198 test.shortDescription()) 199 self.report.setFailure(test, return_code=1) 200 if self.suite_options.fail_fast: 201 raise errors.StopExecution("A hook's after_test failed") 202 203 except: 204 self.report.setError(test) 205 raise 206 207 def _fail_test(self, test, exc_info, return_code=1): 208 """ 209 Helper to record a test as a failure with the provided return 210 code. 211 212 This method should not be used if 'test' has already been 213 started, instead use TestReport.setFailure(). 214 """ 215 216 self.report.startTest(test) 217 test.return_code = return_code 218 self.report.addFailure(test, exc_info) 219 self.report.stopTest(test) 220 221 @staticmethod 222 def _drain_queue(queue): 223 """ 224 Removes all elements from 'queue' without actually doing 225 anything to them. Necessary to unblock the main thread that is 226 waiting for 'queue' to be empty. 227 """ 228 229 try: 230 while not queue.empty(): 231 queue.get_nowait() 232 queue.task_done() 233 except _queue.Empty: 234 # Multiple threads may be draining the queue simultaneously, so just ignore the 235 # exception from the race between queue.empty() being false and failing to get an item. 236 pass 237