1"""Coverage plugin for pytest.""" 2import argparse 3import os 4import warnings 5 6import coverage 7import pytest 8 9from . import compat 10from . import embed 11 12PYTEST_VERSION = tuple(map(int, pytest.__version__.split('.')[:3])) 13 14 15class CoverageError(Exception): 16 """Indicates that our coverage is too low""" 17 18 19def validate_report(arg): 20 file_choices = ['annotate', 'html', 'xml'] 21 term_choices = ['term', 'term-missing'] 22 term_modifier_choices = ['skip-covered'] 23 all_choices = term_choices + file_choices 24 values = arg.split(":", 1) 25 report_type = values[0] 26 if report_type not in all_choices + ['']: 27 msg = 'invalid choice: "{}" (choose from "{}")'.format(arg, all_choices) 28 raise argparse.ArgumentTypeError(msg) 29 30 if len(values) == 1: 31 return report_type, None 32 33 report_modifier = values[1] 34 if report_type in term_choices and report_modifier in term_modifier_choices: 35 return report_type, report_modifier 36 37 if report_type not in file_choices: 38 msg = 'output specifier not supported for: "{}" (choose from "{}")'.format(arg, 39 file_choices) 40 raise argparse.ArgumentTypeError(msg) 41 42 return values 43 44 45def validate_fail_under(num_str): 46 try: 47 return int(num_str) 48 except ValueError: 49 return float(num_str) 50 51 52def validate_context(arg): 53 if coverage.version_info <= (5, 0): 54 raise argparse.ArgumentTypeError('Contexts are only supported with coverage.py >= 5.x') 55 if arg != "test": 56 raise argparse.ArgumentTypeError('--cov-context=test is the only supported value') 57 return arg 58 59 60class StoreReport(argparse.Action): 61 def __call__(self, parser, namespace, values, option_string=None): 62 report_type, file = values 63 namespace.cov_report[report_type] = file 64 65 66def pytest_addoption(parser): 67 """Add options to control coverage.""" 68 69 group = parser.getgroup( 70 'cov', 'coverage reporting with distributed testing support') 71 group.addoption('--cov', action='append', default=[], metavar='SOURCE', 72 nargs='?', const=True, dest='cov_source', 73 help='Path or package name to measure during execution (multi-allowed). ' 74 'Use --cov= to not do any source filtering and record everything.') 75 group.addoption('--cov-report', action=StoreReport, default={}, 76 metavar='TYPE', type=validate_report, 77 help='Type of report to generate: term, term-missing, ' 78 'annotate, html, xml (multi-allowed). ' 79 'term, term-missing may be followed by ":skip-covered". ' 80 'annotate, html and xml may be followed by ":DEST" ' 81 'where DEST specifies the output location. ' 82 'Use --cov-report= to not generate any output.') 83 group.addoption('--cov-config', action='store', default='.coveragerc', 84 metavar='PATH', 85 help='Config file for coverage. Default: .coveragerc') 86 group.addoption('--no-cov-on-fail', action='store_true', default=False, 87 help='Do not report coverage if test run fails. ' 88 'Default: False') 89 group.addoption('--no-cov', action='store_true', default=False, 90 help='Disable coverage report completely (useful for debuggers). ' 91 'Default: False') 92 group.addoption('--cov-fail-under', action='store', metavar='MIN', 93 type=validate_fail_under, 94 help='Fail if the total coverage is less than MIN.') 95 group.addoption('--cov-append', action='store_true', default=False, 96 help='Do not delete coverage but append to current. ' 97 'Default: False') 98 group.addoption('--cov-branch', action='store_true', default=None, 99 help='Enable branch coverage.') 100 group.addoption('--cov-context', action='store', metavar='CONTEXT', 101 type=validate_context, 102 help='Dynamic contexts to use. "test" for now.') 103 104 105def _prepare_cov_source(cov_source): 106 """ 107 Prepare cov_source so that: 108 109 --cov --cov=foobar is equivalent to --cov (cov_source=None) 110 --cov=foo --cov=bar is equivalent to cov_source=['foo', 'bar'] 111 """ 112 return None if True in cov_source else [path for path in cov_source if path is not True] 113 114 115@pytest.mark.tryfirst 116def pytest_load_initial_conftests(early_config, parser, args): 117 if early_config.known_args_namespace.cov_source: 118 plugin = CovPlugin(early_config.known_args_namespace, early_config.pluginmanager) 119 early_config.pluginmanager.register(plugin, '_cov') 120 121 122class CovPlugin(object): 123 """Use coverage package to produce code coverage reports. 124 125 Delegates all work to a particular implementation based on whether 126 this test process is centralised, a distributed master or a 127 distributed worker. 128 """ 129 130 def __init__(self, options, pluginmanager, start=True): 131 """Creates a coverage pytest plugin. 132 133 We read the rc file that coverage uses to get the data file 134 name. This is needed since we give coverage through it's API 135 the data file name. 136 """ 137 138 # Our implementation is unknown at this time. 139 self.pid = None 140 self.cov_controller = None 141 self.cov_report = compat.StringIO() 142 self.cov_total = None 143 self.failed = False 144 self._started = False 145 self._start_path = None 146 self._disabled = False 147 self.options = options 148 149 is_dist = (getattr(options, 'numprocesses', False) or 150 getattr(options, 'distload', False) or 151 getattr(options, 'dist', 'no') != 'no') 152 if getattr(options, 'no_cov', False): 153 self._disabled = True 154 return 155 156 if not self.options.cov_report: 157 self.options.cov_report = ['term'] 158 elif len(self.options.cov_report) == 1 and '' in self.options.cov_report: 159 self.options.cov_report = {} 160 self.options.cov_source = _prepare_cov_source(self.options.cov_source) 161 162 # import engine lazily here to avoid importing 163 # it for unit tests that don't need it 164 from . import engine 165 166 if is_dist and start: 167 self.start(engine.DistMaster) 168 elif start: 169 self.start(engine.Central) 170 171 # worker is started in pytest hook 172 173 def start(self, controller_cls, config=None, nodeid=None): 174 175 if config is None: 176 # fake config option for engine 177 class Config(object): 178 option = self.options 179 180 config = Config() 181 182 self.cov_controller = controller_cls( 183 self.options.cov_source, 184 self.options.cov_report, 185 self.options.cov_config, 186 self.options.cov_append, 187 self.options.cov_branch, 188 config, 189 nodeid 190 ) 191 self.cov_controller.start() 192 self._started = True 193 self._start_path = os.getcwd() 194 cov_config = self.cov_controller.cov.config 195 if self.options.cov_fail_under is None and hasattr(cov_config, 'fail_under'): 196 self.options.cov_fail_under = cov_config.fail_under 197 198 def _is_worker(self, session): 199 return compat.workerinput(session.config, None) is not None 200 201 def pytest_sessionstart(self, session): 202 """At session start determine our implementation and delegate to it.""" 203 204 if self.options.no_cov: 205 # Coverage can be disabled because it does not cooperate with debuggers well. 206 self._disabled = True 207 return 208 209 # import engine lazily here to avoid importing 210 # it for unit tests that don't need it 211 from . import engine 212 213 self.pid = os.getpid() 214 if self._is_worker(session): 215 nodeid = ( 216 compat.workerinput(session.config) 217 .get(compat.workerid, getattr(session, 'nodeid')) 218 ) 219 self.start(engine.DistWorker, session.config, nodeid) 220 elif not self._started: 221 self.start(engine.Central) 222 223 if self.options.cov_context == 'test': 224 session.config.pluginmanager.register(TestContextPlugin(self.cov_controller.cov), '_cov_contexts') 225 226 def pytest_configure_node(self, node): 227 """Delegate to our implementation. 228 229 Mark this hook as optional in case xdist is not installed. 230 """ 231 if not self._disabled: 232 self.cov_controller.configure_node(node) 233 pytest_configure_node.optionalhook = True 234 235 def pytest_testnodedown(self, node, error): 236 """Delegate to our implementation. 237 238 Mark this hook as optional in case xdist is not installed. 239 """ 240 if not self._disabled: 241 self.cov_controller.testnodedown(node, error) 242 pytest_testnodedown.optionalhook = True 243 244 def _should_report(self): 245 return not (self.failed and self.options.no_cov_on_fail) 246 247 def _failed_cov_total(self): 248 cov_fail_under = self.options.cov_fail_under 249 return cov_fail_under is not None and self.cov_total < cov_fail_under 250 251 # we need to wrap pytest_runtestloop. by the time pytest_sessionfinish 252 # runs, it's too late to set testsfailed 253 @compat.hookwrapper 254 def pytest_runtestloop(self, session): 255 yield 256 257 if self._disabled: 258 return 259 260 compat_session = compat.SessionWrapper(session) 261 262 self.failed = bool(compat_session.testsfailed) 263 if self.cov_controller is not None: 264 self.cov_controller.finish() 265 266 if not self._is_worker(session) and self._should_report(): 267 268 # import coverage lazily here to avoid importing 269 # it for unit tests that don't need it 270 from coverage.misc import CoverageException 271 272 try: 273 self.cov_total = self.cov_controller.summary(self.cov_report) 274 except CoverageException as exc: 275 message = 'Failed to generate report: %s\n' % exc 276 session.config.pluginmanager.getplugin("terminalreporter").write( 277 'WARNING: %s\n' % message, red=True, bold=True) 278 if PYTEST_VERSION >= (3, 8): 279 warnings.warn(pytest.PytestWarning(message)) 280 else: 281 session.config.warn(code='COV-2', message=message) 282 self.cov_total = 0 283 assert self.cov_total is not None, 'Test coverage should never be `None`' 284 if self._failed_cov_total(): 285 # make sure we get the EXIT_TESTSFAILED exit code 286 compat_session.testsfailed += 1 287 288 def pytest_terminal_summary(self, terminalreporter): 289 if self._disabled: 290 message = 'Coverage disabled via --no-cov switch!' 291 terminalreporter.write('WARNING: %s\n' % message, red=True, bold=True) 292 if PYTEST_VERSION >= (3, 8): 293 warnings.warn(pytest.PytestWarning(message)) 294 else: 295 terminalreporter.config.warn(code='COV-1', message=message) 296 return 297 if self.cov_controller is None: 298 return 299 300 if self.cov_total is None: 301 # we shouldn't report, or report generation failed (error raised above) 302 return 303 304 terminalreporter.write('\n' + self.cov_report.getvalue() + '\n') 305 306 if self.options.cov_fail_under is not None and self.options.cov_fail_under > 0: 307 failed = self.cov_total < self.options.cov_fail_under 308 markup = {'red': True, 'bold': True} if failed else {'green': True} 309 message = ( 310 '{fail}Required test coverage of {required}% {reached}. ' 311 'Total coverage: {actual:.2f}%\n' 312 .format( 313 required=self.options.cov_fail_under, 314 actual=self.cov_total, 315 fail="FAIL " if failed else "", 316 reached="not reached" if failed else "reached" 317 ) 318 ) 319 terminalreporter.write(message, **markup) 320 321 def pytest_runtest_setup(self, item): 322 if os.getpid() != self.pid: 323 # test is run in another process than session, run 324 # coverage manually 325 embed.init() 326 327 def pytest_runtest_teardown(self, item): 328 embed.cleanup() 329 330 @compat.hookwrapper 331 def pytest_runtest_call(self, item): 332 if (item.get_closest_marker('no_cover') 333 or 'no_cover' in getattr(item, 'fixturenames', ())): 334 self.cov_controller.pause() 335 yield 336 self.cov_controller.resume() 337 else: 338 yield 339 340 341class TestContextPlugin(object): 342 def __init__(self, cov): 343 self.cov = cov 344 345 def pytest_runtest_setup(self, item): 346 self.switch_context(item, 'setup') 347 348 def pytest_runtest_teardown(self, item): 349 self.switch_context(item, 'teardown') 350 351 def pytest_runtest_call(self, item): 352 self.switch_context(item, 'run') 353 354 def switch_context(self, item, when): 355 context = "{item.nodeid}|{when}".format(item=item, when=when) 356 self.cov.switch_context(context) 357 358 359@pytest.fixture 360def no_cover(): 361 """A pytest fixture to disable coverage.""" 362 pass 363 364 365@pytest.fixture 366def cov(request): 367 """A pytest fixture to provide access to the underlying coverage object.""" 368 369 # Check with hasplugin to avoid getplugin exception in older pytest. 370 if request.config.pluginmanager.hasplugin('_cov'): 371 plugin = request.config.pluginmanager.getplugin('_cov') 372 if plugin.cov_controller: 373 return plugin.cov_controller.cov 374 return None 375 376 377def pytest_configure(config): 378 config.addinivalue_line("markers", "no_cover: disable coverage for this test.") 379