1"""Coverage plugin for pytest."""
2import argparse
3import os
4import warnings
5
6import coverage
7import pytest
8
9from . import compat
10from . import embed
11
12PYTEST_VERSION = tuple(map(int, pytest.__version__.split('.')[:3]))
13
14
15class CoverageError(Exception):
16    """Indicates that our coverage is too low"""
17
18
19def validate_report(arg):
20    file_choices = ['annotate', 'html', 'xml']
21    term_choices = ['term', 'term-missing']
22    term_modifier_choices = ['skip-covered']
23    all_choices = term_choices + file_choices
24    values = arg.split(":", 1)
25    report_type = values[0]
26    if report_type not in all_choices + ['']:
27        msg = 'invalid choice: "{}" (choose from "{}")'.format(arg, all_choices)
28        raise argparse.ArgumentTypeError(msg)
29
30    if len(values) == 1:
31        return report_type, None
32
33    report_modifier = values[1]
34    if report_type in term_choices and report_modifier in term_modifier_choices:
35        return report_type, report_modifier
36
37    if report_type not in file_choices:
38        msg = 'output specifier not supported for: "{}" (choose from "{}")'.format(arg,
39                                                                                   file_choices)
40        raise argparse.ArgumentTypeError(msg)
41
42    return values
43
44
45def validate_fail_under(num_str):
46    try:
47        return int(num_str)
48    except ValueError:
49        return float(num_str)
50
51
52def validate_context(arg):
53    if coverage.version_info <= (5, 0):
54        raise argparse.ArgumentTypeError('Contexts are only supported with coverage.py >= 5.x')
55    if arg != "test":
56        raise argparse.ArgumentTypeError('--cov-context=test is the only supported value')
57    return arg
58
59
60class StoreReport(argparse.Action):
61    def __call__(self, parser, namespace, values, option_string=None):
62        report_type, file = values
63        namespace.cov_report[report_type] = file
64
65
66def pytest_addoption(parser):
67    """Add options to control coverage."""
68
69    group = parser.getgroup(
70        'cov', 'coverage reporting with distributed testing support')
71    group.addoption('--cov', action='append', default=[], metavar='SOURCE',
72                    nargs='?', const=True, dest='cov_source',
73                    help='Path or package name to measure during execution (multi-allowed). '
74                         'Use --cov= to not do any source filtering and record everything.')
75    group.addoption('--cov-report', action=StoreReport, default={},
76                    metavar='TYPE', type=validate_report,
77                    help='Type of report to generate: term, term-missing, '
78                         'annotate, html, xml (multi-allowed). '
79                         'term, term-missing may be followed by ":skip-covered". '
80                         'annotate, html and xml may be followed by ":DEST" '
81                         'where DEST specifies the output location. '
82                         'Use --cov-report= to not generate any output.')
83    group.addoption('--cov-config', action='store', default='.coveragerc',
84                    metavar='PATH',
85                    help='Config file for coverage. Default: .coveragerc')
86    group.addoption('--no-cov-on-fail', action='store_true', default=False,
87                    help='Do not report coverage if test run fails. '
88                         'Default: False')
89    group.addoption('--no-cov', action='store_true', default=False,
90                    help='Disable coverage report completely (useful for debuggers). '
91                         'Default: False')
92    group.addoption('--cov-fail-under', action='store', metavar='MIN',
93                    type=validate_fail_under,
94                    help='Fail if the total coverage is less than MIN.')
95    group.addoption('--cov-append', action='store_true', default=False,
96                    help='Do not delete coverage but append to current. '
97                         'Default: False')
98    group.addoption('--cov-branch', action='store_true', default=None,
99                    help='Enable branch coverage.')
100    group.addoption('--cov-context', action='store', metavar='CONTEXT',
101                    type=validate_context,
102                    help='Dynamic contexts to use. "test" for now.')
103
104
105def _prepare_cov_source(cov_source):
106    """
107    Prepare cov_source so that:
108
109     --cov --cov=foobar is equivalent to --cov (cov_source=None)
110     --cov=foo --cov=bar is equivalent to cov_source=['foo', 'bar']
111    """
112    return None if True in cov_source else [path for path in cov_source if path is not True]
113
114
115@pytest.mark.tryfirst
116def pytest_load_initial_conftests(early_config, parser, args):
117    if early_config.known_args_namespace.cov_source:
118        plugin = CovPlugin(early_config.known_args_namespace, early_config.pluginmanager)
119        early_config.pluginmanager.register(plugin, '_cov')
120
121
122class CovPlugin(object):
123    """Use coverage package to produce code coverage reports.
124
125    Delegates all work to a particular implementation based on whether
126    this test process is centralised, a distributed master or a
127    distributed worker.
128    """
129
130    def __init__(self, options, pluginmanager, start=True):
131        """Creates a coverage pytest plugin.
132
133        We read the rc file that coverage uses to get the data file
134        name.  This is needed since we give coverage through it's API
135        the data file name.
136        """
137
138        # Our implementation is unknown at this time.
139        self.pid = None
140        self.cov_controller = None
141        self.cov_report = compat.StringIO()
142        self.cov_total = None
143        self.failed = False
144        self._started = False
145        self._start_path = None
146        self._disabled = False
147        self.options = options
148
149        is_dist = (getattr(options, 'numprocesses', False) or
150                   getattr(options, 'distload', False) or
151                   getattr(options, 'dist', 'no') != 'no')
152        if getattr(options, 'no_cov', False):
153            self._disabled = True
154            return
155
156        if not self.options.cov_report:
157            self.options.cov_report = ['term']
158        elif len(self.options.cov_report) == 1 and '' in self.options.cov_report:
159            self.options.cov_report = {}
160        self.options.cov_source = _prepare_cov_source(self.options.cov_source)
161
162        # import engine lazily here to avoid importing
163        # it for unit tests that don't need it
164        from . import engine
165
166        if is_dist and start:
167            self.start(engine.DistMaster)
168        elif start:
169            self.start(engine.Central)
170
171        # worker is started in pytest hook
172
173    def start(self, controller_cls, config=None, nodeid=None):
174
175        if config is None:
176            # fake config option for engine
177            class Config(object):
178                option = self.options
179
180            config = Config()
181
182        self.cov_controller = controller_cls(
183            self.options.cov_source,
184            self.options.cov_report,
185            self.options.cov_config,
186            self.options.cov_append,
187            self.options.cov_branch,
188            config,
189            nodeid
190        )
191        self.cov_controller.start()
192        self._started = True
193        self._start_path = os.getcwd()
194        cov_config = self.cov_controller.cov.config
195        if self.options.cov_fail_under is None and hasattr(cov_config, 'fail_under'):
196            self.options.cov_fail_under = cov_config.fail_under
197
198    def _is_worker(self, session):
199        return compat.workerinput(session.config, None) is not None
200
201    def pytest_sessionstart(self, session):
202        """At session start determine our implementation and delegate to it."""
203
204        if self.options.no_cov:
205            # Coverage can be disabled because it does not cooperate with debuggers well.
206            self._disabled = True
207            return
208
209        # import engine lazily here to avoid importing
210        # it for unit tests that don't need it
211        from . import engine
212
213        self.pid = os.getpid()
214        if self._is_worker(session):
215            nodeid = (
216                compat.workerinput(session.config)
217                .get(compat.workerid, getattr(session, 'nodeid'))
218            )
219            self.start(engine.DistWorker, session.config, nodeid)
220        elif not self._started:
221            self.start(engine.Central)
222
223        if self.options.cov_context == 'test':
224            session.config.pluginmanager.register(TestContextPlugin(self.cov_controller.cov), '_cov_contexts')
225
226    def pytest_configure_node(self, node):
227        """Delegate to our implementation.
228
229        Mark this hook as optional in case xdist is not installed.
230        """
231        if not self._disabled:
232            self.cov_controller.configure_node(node)
233    pytest_configure_node.optionalhook = True
234
235    def pytest_testnodedown(self, node, error):
236        """Delegate to our implementation.
237
238        Mark this hook as optional in case xdist is not installed.
239        """
240        if not self._disabled:
241            self.cov_controller.testnodedown(node, error)
242    pytest_testnodedown.optionalhook = True
243
244    def _should_report(self):
245        return not (self.failed and self.options.no_cov_on_fail)
246
247    def _failed_cov_total(self):
248        cov_fail_under = self.options.cov_fail_under
249        return cov_fail_under is not None and self.cov_total < cov_fail_under
250
251    # we need to wrap pytest_runtestloop. by the time pytest_sessionfinish
252    # runs, it's too late to set testsfailed
253    @compat.hookwrapper
254    def pytest_runtestloop(self, session):
255        yield
256
257        if self._disabled:
258            return
259
260        compat_session = compat.SessionWrapper(session)
261
262        self.failed = bool(compat_session.testsfailed)
263        if self.cov_controller is not None:
264            self.cov_controller.finish()
265
266        if not self._is_worker(session) and self._should_report():
267
268            # import coverage lazily here to avoid importing
269            # it for unit tests that don't need it
270            from coverage.misc import CoverageException
271
272            try:
273                self.cov_total = self.cov_controller.summary(self.cov_report)
274            except CoverageException as exc:
275                message = 'Failed to generate report: %s\n' % exc
276                session.config.pluginmanager.getplugin("terminalreporter").write(
277                    'WARNING: %s\n' % message, red=True, bold=True)
278                if PYTEST_VERSION >= (3, 8):
279                    warnings.warn(pytest.PytestWarning(message))
280                else:
281                    session.config.warn(code='COV-2', message=message)
282                self.cov_total = 0
283            assert self.cov_total is not None, 'Test coverage should never be `None`'
284            if self._failed_cov_total():
285                # make sure we get the EXIT_TESTSFAILED exit code
286                compat_session.testsfailed += 1
287
288    def pytest_terminal_summary(self, terminalreporter):
289        if self._disabled:
290            message = 'Coverage disabled via --no-cov switch!'
291            terminalreporter.write('WARNING: %s\n' % message, red=True, bold=True)
292            if PYTEST_VERSION >= (3, 8):
293                warnings.warn(pytest.PytestWarning(message))
294            else:
295                terminalreporter.config.warn(code='COV-1', message=message)
296            return
297        if self.cov_controller is None:
298            return
299
300        if self.cov_total is None:
301            # we shouldn't report, or report generation failed (error raised above)
302            return
303
304        terminalreporter.write('\n' + self.cov_report.getvalue() + '\n')
305
306        if self.options.cov_fail_under is not None and self.options.cov_fail_under > 0:
307            failed = self.cov_total < self.options.cov_fail_under
308            markup = {'red': True, 'bold': True} if failed else {'green': True}
309            message = (
310                '{fail}Required test coverage of {required}% {reached}. '
311                'Total coverage: {actual:.2f}%\n'
312                .format(
313                    required=self.options.cov_fail_under,
314                    actual=self.cov_total,
315                    fail="FAIL " if failed else "",
316                    reached="not reached" if failed else "reached"
317                )
318            )
319            terminalreporter.write(message, **markup)
320
321    def pytest_runtest_setup(self, item):
322        if os.getpid() != self.pid:
323            # test is run in another process than session, run
324            # coverage manually
325            embed.init()
326
327    def pytest_runtest_teardown(self, item):
328        embed.cleanup()
329
330    @compat.hookwrapper
331    def pytest_runtest_call(self, item):
332        if (item.get_closest_marker('no_cover')
333                or 'no_cover' in getattr(item, 'fixturenames', ())):
334            self.cov_controller.pause()
335            yield
336            self.cov_controller.resume()
337        else:
338            yield
339
340
341class TestContextPlugin(object):
342    def __init__(self, cov):
343        self.cov = cov
344
345    def pytest_runtest_setup(self, item):
346        self.switch_context(item, 'setup')
347
348    def pytest_runtest_teardown(self, item):
349        self.switch_context(item, 'teardown')
350
351    def pytest_runtest_call(self, item):
352        self.switch_context(item, 'run')
353
354    def switch_context(self, item, when):
355        context = "{item.nodeid}|{when}".format(item=item, when=when)
356        self.cov.switch_context(context)
357
358
359@pytest.fixture
360def no_cover():
361    """A pytest fixture to disable coverage."""
362    pass
363
364
365@pytest.fixture
366def cov(request):
367    """A pytest fixture to provide access to the underlying coverage object."""
368
369    # Check with hasplugin to avoid getplugin exception in older pytest.
370    if request.config.pluginmanager.hasplugin('_cov'):
371        plugin = request.config.pluginmanager.getplugin('_cov')
372        if plugin.cov_controller:
373            return plugin.cov_controller.cov
374    return None
375
376
377def pytest_configure(config):
378    config.addinivalue_line("markers", "no_cover: disable coverage for this test.")
379