1"""
2Driver of the test execution framework.
3"""
4
5from __future__ import absolute_import
6
7import threading
8
9from . import fixtures
10from . import hooks as _hooks
11from . import job as _job
12from . import report as _report
13from . import testcases
14from .. import config as _config
15from .. import errors
16from .. import logging
17from .. import utils
18from ..utils import queue as _queue
19
20
21class TestGroupExecutor(object):
22    """
23    Executes a test group.
24
25    Responsible for setting up and tearing down the fixtures that the
26    tests execute against.
27    """
28
29    _TIMEOUT = 24 * 60 * 60  # =1 day (a long time to have tests run)
30
31    def __init__(self,
32                 exec_logger,
33                 test_group,
34                 logging_config,
35                 config=None,
36                 fixture=None,
37                 hooks=None):
38        """
39        Initializes the TestGroupExecutor with the test group to run.
40        """
41
42        # Build a logger for executing this group of tests.
43        logger_name = "%s:%s" % (exec_logger.name, test_group.test_kind)
44        self.logger = logging.loggers.new_logger(logger_name, parent=exec_logger)
45
46        self.logging_config = logging_config
47        self.fixture_config = fixture
48        self.hooks_config = utils.default_if_none(hooks, [])
49        self.test_config = utils.default_if_none(config, {})
50
51        self._test_group = test_group
52
53        self._using_buildlogger = logging.config.using_buildlogger(logging_config)
54        self._build_config = None
55
56        if self._using_buildlogger:
57            self._build_config = logging.buildlogger.get_config()
58
59        # Must be done after getting buildlogger configuration.
60        self._jobs = [self._make_job(job_num) for job_num in xrange(_config.JOBS)]
61
62    def run(self):
63        """
64        Executes the test group.
65
66        Any exceptions that occur during setting up or tearing down a
67        fixture are propagated.
68        """
69
70        self.logger.info("Starting execution of %ss...", self._test_group.test_kind)
71
72        return_code = 0
73        try:
74            if not self._setup_fixtures():
75                return_code = 2
76                return
77
78            num_repeats = _config.REPEAT
79            while num_repeats > 0:
80                test_queue = self._make_test_queue()
81                self._test_group.record_start()
82                (report, interrupted) = self._run_tests(test_queue)
83                self._test_group.record_end(report)
84
85                # If the user triggered a KeyboardInterrupt, then we should stop.
86                if interrupted:
87                    raise errors.UserInterrupt("Received interrupt from user")
88
89                sb = []  # String builder.
90                self._test_group.summarize_latest(sb)
91                self.logger.info("Summary: %s", "\n    ".join(sb))
92
93                if not report.wasSuccessful():
94                    return_code = 1
95                    if _config.FAIL_FAST:
96                        break
97
98                # Clear the report so it can be reused for the next execution.
99                for job in self._jobs:
100                    job.report.reset()
101                num_repeats -= 1
102        finally:
103            if not self._teardown_fixtures():
104                return_code = 2
105            self._test_group.return_code = return_code
106
107    def _setup_fixtures(self):
108        """
109        Sets up a fixture for each job.
110        """
111
112        for job in self._jobs:
113            try:
114                job.fixture.setup()
115            except:
116                self.logger.exception("Encountered an error while setting up %s.", job.fixture)
117                return False
118
119        # Once they have all been started, wait for them to become available.
120        for job in self._jobs:
121            try:
122                job.fixture.await_ready()
123            except:
124                self.logger.exception("Encountered an error while waiting for %s to be ready",
125                                      job.fixture)
126                return False
127
128        return True
129
130    def _run_tests(self, test_queue):
131        """
132        Starts a thread for each Job instance and blocks until all of
133        the tests are run.
134
135        Returns a (combined report, user interrupted) pair, where the
136        report contains the status and timing information of tests run
137        by all of the threads.
138        """
139
140        threads = []
141        interrupt_flag = threading.Event()
142        user_interrupted = False
143        try:
144            # Run each Job instance in its own thread.
145            for job in self._jobs:
146                t = threading.Thread(target=job, args=(test_queue, interrupt_flag))
147                # Do not wait for tests to finish executing if interrupted by the user.
148                t.daemon = True
149                t.start()
150                threads.append(t)
151
152            joined = False
153            while not joined:
154                # Need to pass a timeout to join() so that KeyboardInterrupt exceptions
155                # are propagated.
156                joined = test_queue.join(TestGroupExecutor._TIMEOUT)
157        except (KeyboardInterrupt, SystemExit):
158            interrupt_flag.set()
159            user_interrupted = True
160        else:
161            # Only wait for all the Job instances if not interrupted by the user.
162            for t in threads:
163                t.join()
164
165        reports = [job.report for job in self._jobs]
166        combined_report = _report.TestReport.combine(*reports)
167
168        # We cannot return 'interrupt_flag.is_set()' because the interrupt flag can be set by a Job
169        # instance if a test fails and it decides to drain the queue. We only want to raise a
170        # StopExecution exception in TestGroupExecutor.run() if the user triggered the interrupt.
171        return (combined_report, user_interrupted)
172
173    def _teardown_fixtures(self):
174        """
175        Tears down all of the fixtures.
176
177        Returns true if all fixtures were torn down successfully, and
178        false otherwise.
179        """
180
181        success = True
182        for job in self._jobs:
183            try:
184                if not job.fixture.teardown():
185                    self.logger.warn("Teardown of %s was not successful.", job.fixture)
186                    success = False
187            except:
188                self.logger.exception("Encountered an error while tearing down %s.", job.fixture)
189                success = False
190
191        return success
192
193    def _get_build_id(self, job_num):
194        """
195        Returns a unique build id for a job.
196        """
197
198        build_config = self._build_config
199
200        if self._using_buildlogger:
201            # Use a distinct "builder" for each job in order to separate their logs.
202            if build_config is not None and "builder" in build_config:
203                build_config = build_config.copy()
204                build_config["builder"] = "%s_job%d" % (build_config["builder"], job_num)
205
206            build_id = logging.buildlogger.new_build_id(build_config)
207
208            if build_config is None or build_id is None:
209                self.logger.info("Encountered an error configuring buildlogger for job #%d, falling"
210                                 " back to stderr.", job_num)
211
212            return build_id, build_config
213
214        return None, build_config
215
216    def _make_fixture(self, job_num, build_id, build_config):
217        """
218        Creates a fixture for a job.
219        """
220
221        fixture_config = {}
222        fixture_class = fixtures.NOOP_FIXTURE_CLASS
223
224        if self.fixture_config is not None:
225            fixture_config = self.fixture_config.copy()
226            fixture_class = fixture_config.pop("class")
227
228        logger_name = "%s:job%d" % (fixture_class, job_num)
229        logger = logging.loggers.new_logger(logger_name, parent=logging.loggers.FIXTURE)
230        logging.config.apply_buildlogger_global_handler(logger,
231                                                        self.logging_config,
232                                                        build_id=build_id,
233                                                        build_config=build_config)
234
235        return fixtures.make_fixture(fixture_class, logger, job_num, **fixture_config)
236
237    def _make_hooks(self, job_num, fixture):
238        """
239        Creates the custom behaviors for the job's fixture.
240        """
241
242        behaviors = []
243
244        for behavior_config in self.hooks_config:
245            behavior_config = behavior_config.copy()
246            behavior_class = behavior_config.pop("class")
247
248            logger_name = "%s:job%d" % (behavior_class, job_num)
249            logger = logging.loggers.new_logger(logger_name, parent=self.logger)
250            behavior = _hooks.make_custom_behavior(behavior_class,
251                                                   logger,
252                                                   fixture,
253                                                   **behavior_config)
254            behaviors.append(behavior)
255
256        return behaviors
257
258    def _make_job(self, job_num):
259        """
260        Returns a Job instance with its own fixture, hooks, and test
261        report.
262        """
263
264        build_id, build_config = self._get_build_id(job_num)
265        fixture = self._make_fixture(job_num, build_id, build_config)
266        hooks = self._make_hooks(job_num, fixture)
267
268        logger_name = "%s:job%d" % (self.logger.name, job_num)
269        logger = logging.loggers.new_logger(logger_name, parent=self.logger)
270
271        if build_id is not None:
272            endpoint = logging.buildlogger.APPEND_GLOBAL_LOGS_ENDPOINT % {"build_id": build_id}
273            url = "%s/%s/" % (_config.BUILDLOGGER_URL.rstrip("/"), endpoint.strip("/"))
274            logger.info("Writing output of job #%d to %s.", job_num, url)
275
276        report = _report.TestReport(logger,
277                                    self.logging_config,
278                                    build_id=build_id,
279                                    build_config=build_config)
280
281        return _job.Job(logger, fixture, hooks, report)
282
283    def _make_test_queue(self):
284        """
285        Returns a queue of TestCase instances.
286
287        Use a multi-consumer queue instead of a unittest.TestSuite so
288        that the test cases can be dispatched to multiple threads.
289        """
290
291        test_kind_logger = logging.loggers.new_logger(self._test_group.test_kind,
292                                                      parent=logging.loggers.TESTS)
293
294        # Put all the test cases in a queue.
295        queue = _queue.Queue()
296        for test_name in self._test_group.tests:
297            test_case = testcases.make_test_case(self._test_group.test_kind,
298                                                 test_kind_logger,
299                                                 test_name,
300                                                 **self.test_config)
301            queue.put(test_case)
302
303        # Add sentinel value for each job to indicate when there are no more items to process.
304        for _ in xrange(_config.JOBS):
305            queue.put(None)
306
307        return queue
308