1"""
2Extension to the unittest.TestResult to support additional test status
3and timing information for the report.json file.
4"""
5
6from __future__ import absolute_import
7
8import copy
9import time
10import unittest
11
12from .. import config
13from .. import logging
14
15
16class TestReport(unittest.TestResult):
17    """
18    Records test status and timing information.
19    """
20
21    def __init__(self, logger, logging_config, build_id=None, build_config=None):
22        """
23        Initializes the TestReport with the buildlogger configuration.
24        """
25
26        unittest.TestResult.__init__(self)
27
28        self.logger = logger
29        self.logging_config = logging_config
30        self.build_id = build_id
31        self.build_config = build_config
32
33        self.reset()
34
35    @classmethod
36    def combine(cls, *reports):
37        """
38        Merges the results from multiple TestReport instances into one.
39
40        If the same test is present in multiple reports, then one that
41        failed or errored is more preferred over one that succeeded.
42        This behavior is useful for when running multiple jobs that
43        dynamically add a #dbhash# test case.
44        """
45
46        combined_report = cls(logging.loggers.EXECUTOR, {})
47        combining_time = time.time()
48
49        for report in reports:
50            if not isinstance(report, TestReport):
51                raise TypeError("reports must be a list of TestReport instances")
52
53            for test_info in report.test_infos:
54                # If the user triggers a KeyboardInterrupt exception while a test is running, then
55                # it is possible for 'test_info' to be modified by a job thread later on. We make a
56                # shallow copy in order to ensure 'num_failed' is consistent with the actual number
57                # of tests that have status equal to "failed".
58                test_info = copy.copy(test_info)
59
60                # TestReport.addXX() may not have been called.
61                if test_info.status is None or test_info.return_code is None:
62                    # Mark the test as having failed if it was interrupted. It might have passed if
63                    # the suite ran to completion, but we wouldn't know for sure.
64                    test_info.status = "fail"
65                    test_info.return_code = -2
66
67                # TestReport.stopTest() may not have been called.
68                if test_info.end_time is None:
69                    # Use the current time as the time that the test finished running.
70                    test_info.end_time = combining_time
71
72                combined_report.test_infos.append(test_info)
73
74            combined_report.num_dynamic += report.num_dynamic
75
76        # Recompute number of success, failures, and errors.
77        combined_report.num_succeeded = len(combined_report.get_successful())
78        combined_report.num_failed = len(combined_report.get_failed())
79        combined_report.num_errored = len(combined_report.get_errored())
80
81        return combined_report
82
83    def startTest(self, test, dynamic=False):
84        """
85        Called immediately before 'test' is run.
86        """
87
88        unittest.TestResult.startTest(self, test)
89
90        test_info = _TestInfo(test.id(), dynamic)
91        test_info.start_time = time.time()
92        self.test_infos.append(test_info)
93
94        basename = test.basename()
95        if dynamic:
96            command = "(dynamic test case)"
97            self.num_dynamic += 1
98        else:
99            command = test.as_command()
100        self.logger.info("Running %s...\n%s", basename, command)
101
102        test_id = logging.buildlogger.new_test_id(self.build_id,
103                                                  self.build_config,
104                                                  basename,
105                                                  command)
106
107        if self.build_id is not None:
108            endpoint = logging.buildlogger.APPEND_TEST_LOGS_ENDPOINT % {
109                "build_id": self.build_id,
110                "test_id": test_id,
111            }
112
113            test_info.url_endpoint = "%s/%s/" % (config.BUILDLOGGER_URL.rstrip("/"),
114                                                 endpoint.strip("/"))
115
116            self.logger.info("Writing output of %s to %s.",
117                             test.shortDescription(),
118                             test_info.url_endpoint)
119
120        # Set up the test-specific logger.
121        logger_name = "%s:%s" % (test.logger.name, test.short_name())
122        logger = logging.loggers.new_logger(logger_name, parent=test.logger)
123        logging.config.apply_buildlogger_test_handler(logger,
124                                                      self.logging_config,
125                                                      build_id=self.build_id,
126                                                      build_config=self.build_config,
127                                                      test_id=test_id)
128
129        self.__original_loggers[test_info.test_id] = test.logger
130        test.logger = logger
131
132    def stopTest(self, test):
133        """
134        Called immediately after 'test' has run.
135        """
136
137        unittest.TestResult.stopTest(self, test)
138
139        test_info = self._find_test_info(test)
140        test_info.end_time = time.time()
141
142        time_taken = test_info.end_time - test_info.start_time
143        self.logger.info("%s ran in %0.2f seconds.", test.basename(), time_taken)
144
145        # Asynchronously closes the buildlogger test handler to avoid having too many threads open
146        # on 32-bit systems.
147        logging.flush.close_later(test.logger)
148
149        # Restore the original logger for the test.
150        test.logger = self.__original_loggers.pop(test.id())
151
152    def addError(self, test, err):
153        """
154        Called when a non-failureException was raised during the
155        execution of 'test'.
156        """
157
158        unittest.TestResult.addError(self, test, err)
159        self.num_errored += 1
160
161        test_info = self._find_test_info(test)
162        test_info.status = "error"
163        test_info.return_code = test.return_code
164
165    def setError(self, test):
166        """
167        Used to change the outcome of an existing test to an error.
168        """
169
170        test_info = self._find_test_info(test)
171        if test_info.end_time is None:
172            raise ValueError("stopTest was not called on %s" % (test.basename()))
173
174        test_info.status = "error"
175        test_info.return_code = 2
176
177        # Recompute number of success, failures, and errors.
178        self.num_succeeded = len(self.get_successful())
179        self.num_failed = len(self.get_failed())
180        self.num_errored = len(self.get_errored())
181
182    def addFailure(self, test, err):
183        """
184        Called when a failureException was raised during the execution
185        of 'test'.
186        """
187
188        unittest.TestResult.addFailure(self, test, err)
189        self.num_failed += 1
190
191        test_info = self._find_test_info(test)
192        test_info.status = "fail"
193        test_info.return_code = test.return_code
194
195    def setFailure(self, test, return_code=1):
196        """
197        Used to change the outcome of an existing test to a failure.
198        """
199
200        test_info = self._find_test_info(test)
201        if test_info.end_time is None:
202            raise ValueError("stopTest was not called on %s" % (test.basename()))
203
204        test_info.status = "fail"
205        test_info.return_code = return_code
206
207        # Recompute number of success, failures, and errors.
208        self.num_succeeded = len(self.get_successful())
209        self.num_failed = len(self.get_failed())
210        self.num_errored = len(self.get_errored())
211
212    def addSuccess(self, test):
213        """
214        Called when 'test' executed successfully.
215        """
216
217        unittest.TestResult.addSuccess(self, test)
218        self.num_succeeded += 1
219
220        test_info = self._find_test_info(test)
221        test_info.status = "pass"
222        test_info.return_code = test.return_code
223
224    def wasSuccessful(self):
225        """
226        Returns true if all tests executed successfully.
227        """
228        return self.num_failed == self.num_errored == 0
229
230    def get_successful(self):
231        """
232        Returns the status and timing information of the tests that
233        executed successfully.
234        """
235        return [test_info for test_info in self.test_infos if test_info.status == "pass"]
236
237    def get_failed(self):
238        """
239        Returns the status and timing information of the tests that
240        raised a failureException during their execution.
241        """
242        return [test_info for test_info in self.test_infos if test_info.status == "fail"]
243
244    def get_errored(self):
245        """
246        Returns the status and timing information of the tests that
247        raised a non-failureException during their execution.
248        """
249        return [test_info for test_info in self.test_infos if test_info.status == "error"]
250
251    def as_dict(self):
252        """
253        Return the test result information as a dictionary.
254
255        Used to create the report.json file.
256        """
257
258        results = []
259        for test_info in self.test_infos:
260            # Don't distinguish between failures and errors.
261            status = "pass" if test_info.status == "pass" else "fail"
262
263            result = {
264                "test_file": test_info.test_id,
265                "status": status,
266                "exit_code": test_info.return_code,
267                "start": test_info.start_time,
268                "end": test_info.end_time,
269                "elapsed": test_info.end_time - test_info.start_time,
270            }
271
272            if test_info.url_endpoint is not None:
273                result["url"] = test_info.url_endpoint
274
275            results.append(result)
276
277        return {
278            "results": results,
279            "failures": self.num_failed + self.num_errored,
280        }
281
282    def reset(self):
283        """
284        Resets the test report back to its initial state.
285        """
286
287        self.test_infos = []
288
289        self.num_dynamic = 0
290        self.num_succeeded = 0
291        self.num_failed = 0
292        self.num_errored = 0
293
294        self.__original_loggers = {}
295
296    def _find_test_info(self, test):
297        """
298        Returns the status and timing information associated with
299        'test'.
300        """
301
302        test_id = test.id()
303
304        # Search the list backwards to efficiently find the status and timing information of a test
305        # that was recently started.
306        for test_info in reversed(self.test_infos):
307            if test_info.test_id == test_id:
308                return test_info
309
310        raise ValueError("Details for %s not found in the report" % (test.basename()))
311
312
313class _TestInfo(object):
314    """
315    Holder for the test status and timing information.
316    """
317
318    def __init__(self, test_id, dynamic):
319        """
320        Initializes the _TestInfo instance.
321        """
322
323        self.test_id = test_id
324        self.dynamic = dynamic
325
326        self.start_time = None
327        self.end_time = None
328        self.status = None
329        self.return_code = None
330        self.url_endpoint = None
331