1"""
2Enables supports for running tests simultaneously by processing them
3from a multi-consumer queue.
4"""
5
6from __future__ import absolute_import
7
8import sys
9
10from .. import config
11from .. import errors
12from ..testing.hooks import stepdown
13from ..utils import queue as _queue
14
15
16class Job(object):
17    """
18    Runs tests from a queue.
19    """
20
21    def __init__(self, logger, fixture, hooks, report, archival, suite_options):
22        """
23        Initializes the job with the specified fixture and custom
24        behaviors.
25        """
26
27        self.logger = logger
28        self.fixture = fixture
29        self.hooks = hooks
30        self.report = report
31        self.archival = archival
32        self.suite_options = suite_options
33
34        # Don't check fixture.is_running() when using the ContinuousStepdown hook, which kills
35        # and restarts the primary. Even if the fixture is still running as expected, there is a
36        # race where fixture.is_running() could fail if called after the primary was killed but
37        # before it was restarted.
38        self._check_if_fixture_running = not any(
39            isinstance(hook, stepdown.ContinuousStepdown) for hook in self.hooks)
40
41    def __call__(self, queue, interrupt_flag, teardown_flag=None):
42        """
43        Continuously executes tests from 'queue' and records their
44        details in 'report'.
45
46        If 'teardown_flag' is not None, then 'self.fixture.teardown()'
47        will be called before this method returns. If an error occurs
48        while destroying the fixture, then the 'teardown_flag' will be
49        set.
50        """
51
52        should_stop = False
53        try:
54            self._run(queue, interrupt_flag)
55        except errors.StopExecution as err:
56            # Stop running tests immediately.
57            self.logger.error("Received a StopExecution exception: %s.", err)
58            should_stop = True
59        except:
60            # Unknown error, stop execution.
61            self.logger.exception("Encountered an error during test execution.")
62            should_stop = True
63
64        if should_stop:
65            # Set the interrupt flag so that other jobs do not start running more tests.
66            interrupt_flag.set()
67            # Drain the queue to unblock the main thread.
68            Job._drain_queue(queue)
69
70        if teardown_flag is not None:
71            try:
72                if not self.fixture.teardown(finished=True):
73                    self.logger.warn("Teardown of %s was not successful.", self.fixture)
74                    teardown_flag.set()
75            except:
76                self.logger.exception("Encountered an error while tearing down %s.", self.fixture)
77                teardown_flag.set()
78
79    def _run(self, queue, interrupt_flag):
80        """
81        Calls the before/after suite hooks and continuously executes
82        tests from 'queue'.
83        """
84
85        for hook in self.hooks:
86            hook.before_suite(self.report)
87
88        while not interrupt_flag.is_set():
89            test = queue.get_nowait()
90            try:
91                if test is None:
92                    # Sentinel value received, so exit.
93                    break
94                self._execute_test(test)
95            finally:
96                queue.task_done()
97
98        for hook in self.hooks:
99            hook.after_suite(self.report)
100
101    def _execute_test(self, test):
102        """
103        Calls the before/after test hooks and executes 'test'.
104        """
105
106        test.configure(self.fixture, config.NUM_CLIENTS_PER_FIXTURE)
107        self._run_hooks_before_tests(test)
108
109        test(self.report)
110        try:
111            if self.suite_options.fail_fast and not self.report.wasSuccessful():
112                self.logger.info("%s failed, so stopping..." % (test.shortDescription()))
113                raise errors.StopExecution("%s failed" % (test.shortDescription()))
114
115            if self._check_if_fixture_running and not self.fixture.is_running():
116                self.logger.error(
117                    "%s marked as a failure because the fixture crashed during the test.",
118                    test.shortDescription())
119                self.report.setFailure(test, return_code=2)
120                # Always fail fast if the fixture fails.
121                raise errors.StopExecution("%s not running after %s" %
122                                           (self.fixture, test.shortDescription()))
123
124        finally:
125            success = self.report.find_test_info(test).status == "pass"
126            if self.archival:
127                self.archival.archive(self.logger, test, success)
128
129        self._run_hooks_after_tests(test)
130
131    def _run_hook(self, hook, hook_function, test):
132        """ Helper to run hook and archival. """
133        try:
134            success = False
135            hook_function(test, self.report)
136            success = True
137        finally:
138            if self.archival:
139                self.archival.archive(self.logger, test, success, hook=hook)
140
141    def _run_hooks_before_tests(self, test):
142        """
143        Runs the before_test method on each of the hooks.
144
145        Swallows any TestFailure exceptions if set to continue on
146        failure, and reraises any other exceptions.
147        """
148
149        try:
150            for hook in self.hooks:
151                self._run_hook(hook, hook.before_test, test)
152
153        except errors.StopExecution:
154            raise
155
156        except errors.ServerFailure:
157            self.logger.exception("%s marked as a failure by a hook's before_test.",
158                                  test.shortDescription())
159            self._fail_test(test, sys.exc_info(), return_code=2)
160            raise errors.StopExecution("A hook's before_test failed")
161
162        except errors.TestFailure:
163            self.logger.exception("%s marked as a failure by a hook's before_test.",
164                                  test.shortDescription())
165            self._fail_test(test, sys.exc_info(), return_code=1)
166            if self.suite_options.fail_fast:
167                raise errors.StopExecution("A hook's before_test failed")
168
169        except:
170            # Record the before_test() error in 'self.report'.
171            self.report.startTest(test)
172            self.report.addError(test, sys.exc_info())
173            self.report.stopTest(test)
174            raise
175
176    def _run_hooks_after_tests(self, test):
177        """
178        Runs the after_test method on each of the hooks.
179
180        Swallows any TestFailure exceptions if set to continue on
181        failure, and reraises any other exceptions.
182        """
183        try:
184            for hook in self.hooks:
185                self._run_hook(hook, hook.after_test, test)
186
187        except errors.StopExecution:
188            raise
189
190        except errors.ServerFailure:
191            self.logger.exception("%s marked as a failure by a hook's after_test.",
192                                  test.shortDescription())
193            self.report.setFailure(test, return_code=2)
194            raise errors.StopExecution("A hook's after_test failed")
195
196        except errors.TestFailure:
197            self.logger.exception("%s marked as a failure by a hook's after_test.",
198                                  test.shortDescription())
199            self.report.setFailure(test, return_code=1)
200            if self.suite_options.fail_fast:
201                raise errors.StopExecution("A hook's after_test failed")
202
203        except:
204            self.report.setError(test)
205            raise
206
207    def _fail_test(self, test, exc_info, return_code=1):
208        """
209        Helper to record a test as a failure with the provided return
210        code.
211
212        This method should not be used if 'test' has already been
213        started, instead use TestReport.setFailure().
214        """
215
216        self.report.startTest(test)
217        test.return_code = return_code
218        self.report.addFailure(test, exc_info)
219        self.report.stopTest(test)
220
221    @staticmethod
222    def _drain_queue(queue):
223        """
224        Removes all elements from 'queue' without actually doing
225        anything to them. Necessary to unblock the main thread that is
226        waiting for 'queue' to be empty.
227        """
228
229        try:
230            while not queue.empty():
231                queue.get_nowait()
232                queue.task_done()
233        except _queue.Empty:
234            # Multiple threads may be draining the queue simultaneously, so just ignore the
235            # exception from the race between queue.empty() being false and failing to get an item.
236            pass
237