1"""
2Driver of the test execution framework.
3"""
4
5from __future__ import absolute_import
6
7import threading
8import time
9
10from . import fixtures
11from . import hook_test_archival as archival
12from . import hooks as _hooks
13from . import job as _job
14from . import report as _report
15from . import testcases
16from .. import config as _config
17from .. import errors
18from .. import utils
19from ..core import network
20from ..utils import queue as _queue
21
22
23class TestSuiteExecutor(object):
24    """
25    Executes a test suite.
26
27    Responsible for setting up and tearing down the fixtures that the
28    tests execute against.
29    """
30
31    _TIMEOUT = 24 * 60 * 60  # =1 day (a long time to have tests run)
32
33    def __init__(self,
34                 exec_logger,
35                 suite,
36                 config=None,
37                 fixture=None,
38                 hooks=None,
39                 archive_instance=None,
40                 archive=None):
41        """
42        Initializes the TestSuiteExecutor with the test suite to run.
43        """
44        self.logger = exec_logger
45
46        if _config.SHELL_CONN_STRING is not None:
47            # Specifying the shellConnString command line option should override the fixture
48            # specified in the YAML configuration to be the no-op fixture.
49            self.fixture_config = {"class": fixtures.NOOP_FIXTURE_CLASS}
50        else:
51            self.fixture_config = fixture
52
53        self.hooks_config = utils.default_if_none(hooks, [])
54        self.test_config = utils.default_if_none(config, {})
55
56        self.archival = None
57        if archive_instance:
58            self.archival = archival.HookTestArchival(
59                suite, self.hooks_config, archive_instance, archive)
60
61        self._suite = suite
62
63        # Only start as many jobs as we need. Note this means that the number of jobs we run may not
64        # actually be _config.JOBS or self._suite.options.num_jobs.
65        jobs_to_start = self._suite.options.num_jobs
66        num_tests = len(suite.tests)
67
68        if num_tests < jobs_to_start:
69            self.logger.info(
70                "Reducing the number of jobs from %d to %d since there are only %d test(s) to run.",
71                self._suite.options.num_jobs, num_tests, num_tests)
72            jobs_to_start = num_tests
73
74        # Must be done after getting buildlogger configuration.
75        self._jobs = [self._make_job(job_num) for job_num in xrange(jobs_to_start)]
76
77    def run(self):
78        """
79        Executes the test suite.
80
81        Any exceptions that occur during setting up or tearing down a
82        fixture are propagated.
83        """
84
85        self.logger.info("Starting execution of %ss...", self._suite.test_kind)
86
87        return_code = 0
88        teardown_flag = None
89        try:
90            if not self._setup_fixtures():
91                return_code = 2
92                return
93
94            num_repeats = self._suite.options.num_repeats
95            while num_repeats > 0:
96                test_queue = self._make_test_queue()
97
98                partial_reports = [job.report for job in self._jobs]
99                self._suite.record_test_start(partial_reports)
100
101                # Have the Job threads destroy their fixture during the final repetition after they
102                # finish running their last test. This avoids having a large number of processes
103                # still running if an Evergreen task were to time out from a hang/deadlock being
104                # triggered.
105                teardown_flag = threading.Event() if num_repeats == 1 else None
106                (report, interrupted) = self._run_tests(test_queue, teardown_flag)
107
108                self._suite.record_test_end(report)
109
110                # If the user triggered a KeyboardInterrupt, then we should stop.
111                if interrupted:
112                    raise errors.UserInterrupt("Received interrupt from user")
113
114                if teardown_flag and teardown_flag.is_set():
115                    return_code = 2
116
117                sb = []  # String builder.
118                self._suite.summarize_latest(sb)
119                self.logger.info("Summary: %s", "\n    ".join(sb))
120
121                if not report.wasSuccessful():
122                    return_code = 1
123                    if self._suite.options.fail_fast:
124                        break
125
126                # Clear the report so it can be reused for the next execution.
127                for job in self._jobs:
128                    job.report.reset()
129                num_repeats -= 1
130        finally:
131            if not teardown_flag:
132                if not self._teardown_fixtures():
133                    return_code = 2
134            self._suite.return_code = return_code
135
136    def _setup_fixtures(self):
137        """
138        Sets up a fixture for each job.
139        """
140
141        # We reset the internal state of the PortAllocator before calling job.fixture.setup() so
142        # that ports used by the fixture during a test suite run earlier can be reused during this
143        # current test suite.
144        network.PortAllocator.reset()
145
146        for job in self._jobs:
147            try:
148                job.fixture.setup()
149            except:
150                self.logger.exception(
151                    "Encountered an error while setting up %s.", job.fixture)
152                return False
153
154        # Once they have all been started, wait for them to become available.
155        for job in self._jobs:
156            try:
157                job.fixture.await_ready()
158            except:
159                self.logger.exception(
160                    "Encountered an error while waiting for %s to be ready", job.fixture)
161                return False
162        return True
163
164    def _run_tests(self, test_queue, teardown_flag):
165        """
166        Starts a thread for each Job instance and blocks until all of
167        the tests are run.
168
169        Returns a (combined report, user interrupted) pair, where the
170        report contains the status and timing information of tests run
171        by all of the threads.
172        """
173
174        threads = []
175        interrupt_flag = threading.Event()
176        user_interrupted = False
177        try:
178            # Run each Job instance in its own thread.
179            for job in self._jobs:
180                t = threading.Thread(target=job,
181                                     args=(test_queue, interrupt_flag),
182                                     kwargs=dict(teardown_flag=teardown_flag))
183                # Do not wait for tests to finish executing if interrupted by the user.
184                t.daemon = True
185                t.start()
186                threads.append(t)
187                # SERVER-24729 Need to stagger when jobs start to reduce I/O load if there
188                # are many of them.  Both the 5 and the 10 are arbitrary.
189                # Currently only enabled on Evergreen.
190                if _config.STAGGER_JOBS and len(threads) >= 5:
191                    time.sleep(10)
192
193            joined = False
194            while not joined:
195                # Need to pass a timeout to join() so that KeyboardInterrupt exceptions
196                # are propagated.
197                joined = test_queue.join(TestSuiteExecutor._TIMEOUT)
198        except (KeyboardInterrupt, SystemExit):
199            interrupt_flag.set()
200            user_interrupted = True
201        else:
202            # Only wait for all the Job instances if not interrupted by the user.
203            for t in threads:
204                t.join()
205
206        reports = [job.report for job in self._jobs]
207        combined_report = _report.TestReport.combine(*reports)
208
209        # We cannot return 'interrupt_flag.is_set()' because the interrupt flag can be set by a Job
210        # instance if a test fails and it decides to drain the queue. We only want to raise a
211        # StopExecution exception in TestSuiteExecutor.run() if the user triggered the interrupt.
212        return (combined_report, user_interrupted)
213
214    def _teardown_fixtures(self):
215        """
216        Tears down all of the fixtures.
217
218        Returns true if all fixtures were torn down successfully, and
219        false otherwise.
220        """
221        success = True
222        for job in self._jobs:
223            try:
224                if not job.fixture.teardown(finished=True):
225                    self.logger.warn("Teardown of %s was not successful.", job.fixture)
226                    success = False
227            except:
228                self.logger.exception("Encountered an error while tearing down %s.", job.fixture)
229                success = False
230        return success
231
232    def _make_fixture(self, job_num, job_logger):
233        """
234        Creates a fixture for a job.
235        """
236
237        fixture_config = {}
238        fixture_class = fixtures.NOOP_FIXTURE_CLASS
239
240        if self.fixture_config is not None:
241            fixture_config = self.fixture_config.copy()
242            fixture_class = fixture_config.pop("class")
243
244        fixture_logger = job_logger.new_fixture_logger(fixture_class)
245
246        return fixtures.make_fixture(fixture_class, fixture_logger, job_num, **fixture_config)
247
248    def _make_hooks(self, job_num, fixture):
249        """
250        Creates the custom behaviors for the job's fixture.
251        """
252
253        behaviors = []
254
255        for behavior_config in self.hooks_config:
256            behavior_config = behavior_config.copy()
257            behavior_class = behavior_config.pop("class")
258
259            hook_logger = self.logger.new_hook_logger(behavior_class, fixture.logger)
260            behavior = _hooks.make_custom_behavior(behavior_class,
261                                                   hook_logger,
262                                                   fixture,
263                                                   **behavior_config)
264            behaviors.append(behavior)
265
266        return behaviors
267
268    def _make_job(self, job_num):
269        """
270        Returns a Job instance with its own fixture, hooks, and test
271        report.
272        """
273        job_logger = self.logger.new_job_logger(self._suite.test_kind, job_num)
274
275        fixture = self._make_fixture(job_num, job_logger)
276        hooks = self._make_hooks(job_num, fixture)
277
278        report = _report.TestReport(job_logger, self._suite.options)
279
280        return _job.Job(job_logger, fixture, hooks, report, self.archival, self._suite.options)
281
282    def _make_test_queue(self):
283        """
284        Returns a queue of TestCase instances.
285
286        Use a multi-consumer queue instead of a unittest.TestSuite so
287        that the test cases can be dispatched to multiple threads.
288        """
289
290        test_queue_logger = self.logger.new_testqueue_logger(self._suite.test_kind)
291        # Put all the test cases in a queue.
292        queue = _queue.Queue()
293        for test_name in self._suite.tests:
294            test_case = testcases.make_test_case(self._suite.test_kind,
295                                                 test_queue_logger,
296                                                 test_name,
297                                                 **self.test_config)
298            queue.put(test_case)
299
300        # Add sentinel value for each job to indicate when there are no more items to process.
301        for _ in xrange(len(self._jobs)):
302            queue.put(None)
303
304        return queue
305