1"""Coverage controllers for use by pytest-cov and nose-cov.""" 2import contextlib 3import copy 4import functools 5import os 6import random 7import socket 8import sys 9 10import coverage 11from coverage.data import CoverageData 12 13from .compat import StringIO 14from .compat import workerinput 15from .compat import workeroutput 16from .embed import cleanup 17 18 19class _NullFile(object): 20 @staticmethod 21 def write(v): 22 pass 23 24 25@contextlib.contextmanager 26def _backup(obj, attr): 27 backup = getattr(obj, attr) 28 try: 29 setattr(obj, attr, copy.copy(backup)) 30 yield 31 finally: 32 setattr(obj, attr, backup) 33 34 35def _ensure_topdir(meth): 36 @functools.wraps(meth) 37 def ensure_topdir_wrapper(self, *args, **kwargs): 38 try: 39 original_cwd = os.getcwd() 40 except OSError: 41 # Looks like it's gone, this is non-ideal because a side-effect will 42 # be introduced in the tests here but we can't do anything about it. 43 original_cwd = None 44 os.chdir(self.topdir) 45 try: 46 return meth(self, *args, **kwargs) 47 finally: 48 if original_cwd is not None: 49 os.chdir(original_cwd) 50 51 return ensure_topdir_wrapper 52 53 54class CovController(object): 55 """Base class for different plugin implementations.""" 56 57 def __init__(self, cov_source, cov_report, cov_config, cov_append, cov_branch, config=None, nodeid=None): 58 """Get some common config used by multiple derived classes.""" 59 self.cov_source = cov_source 60 self.cov_report = cov_report 61 self.cov_config = cov_config 62 self.cov_append = cov_append 63 self.cov_branch = cov_branch 64 self.config = config 65 self.nodeid = nodeid 66 67 self.cov = None 68 self.combining_cov = None 69 self.data_file = None 70 self.node_descs = set() 71 self.failed_workers = [] 72 self.topdir = os.getcwd() 73 self.is_collocated = None 74 75 @contextlib.contextmanager 76 def ensure_topdir(self): 77 original_cwd = os.getcwd() 78 os.chdir(self.topdir) 79 yield 80 os.chdir(original_cwd) 81 82 @_ensure_topdir 83 def pause(self): 84 self.cov.stop() 85 self.unset_env() 86 87 @_ensure_topdir 88 def resume(self): 89 self.cov.start() 90 self.set_env() 91 92 @_ensure_topdir 93 def set_env(self): 94 """Put info about coverage into the env so that subprocesses can activate coverage.""" 95 if self.cov_source is None: 96 os.environ['COV_CORE_SOURCE'] = os.pathsep 97 else: 98 os.environ['COV_CORE_SOURCE'] = os.pathsep.join(self.cov_source) 99 config_file = os.path.abspath(self.cov_config) 100 if os.path.exists(config_file): 101 os.environ['COV_CORE_CONFIG'] = config_file 102 else: 103 os.environ['COV_CORE_CONFIG'] = os.pathsep 104 os.environ['COV_CORE_DATAFILE'] = os.path.abspath(self.cov.config.data_file) 105 if self.cov_branch: 106 os.environ['COV_CORE_BRANCH'] = 'enabled' 107 108 @staticmethod 109 def unset_env(): 110 """Remove coverage info from env.""" 111 os.environ.pop('COV_CORE_SOURCE', None) 112 os.environ.pop('COV_CORE_CONFIG', None) 113 os.environ.pop('COV_CORE_DATAFILE', None) 114 os.environ.pop('COV_CORE_BRANCH', None) 115 116 @staticmethod 117 def get_node_desc(platform, version_info): 118 """Return a description of this node.""" 119 120 return 'platform %s, python %s' % (platform, '%s.%s.%s-%s-%s' % version_info[:5]) 121 122 @staticmethod 123 def sep(stream, s, txt): 124 if hasattr(stream, 'sep'): 125 stream.sep(s, txt) 126 else: 127 sep_total = max((70 - 2 - len(txt)), 2) 128 sep_len = sep_total // 2 129 sep_extra = sep_total % 2 130 out = '%s %s %s\n' % (s * sep_len, txt, s * (sep_len + sep_extra)) 131 stream.write(out) 132 133 @_ensure_topdir 134 def summary(self, stream): 135 """Produce coverage reports.""" 136 total = None 137 138 if not self.cov_report: 139 with _backup(self.cov, "config"): 140 return self.cov.report(show_missing=True, ignore_errors=True, file=_NullFile) 141 142 # Output coverage section header. 143 if len(self.node_descs) == 1: 144 self.sep(stream, '-', 'coverage: %s' % ''.join(self.node_descs)) 145 else: 146 self.sep(stream, '-', 'coverage') 147 for node_desc in sorted(self.node_descs): 148 self.sep(stream, ' ', '%s' % node_desc) 149 150 # Report on any failed workers. 151 if self.failed_workers: 152 self.sep(stream, '-', 'coverage: failed workers') 153 stream.write('The following workers failed to return coverage data, ' 154 'ensure that pytest-cov is installed on these workers.\n') 155 for node in self.failed_workers: 156 stream.write('%s\n' % node.gateway.id) 157 158 # Produce terminal report if wanted. 159 if any(x in self.cov_report for x in ['term', 'term-missing']): 160 options = { 161 'show_missing': ('term-missing' in self.cov_report) or None, 162 'ignore_errors': True, 163 'file': stream, 164 } 165 skip_covered = isinstance(self.cov_report, dict) and 'skip-covered' in self.cov_report.values() 166 options.update({'skip_covered': skip_covered or None}) 167 with _backup(self.cov, "config"): 168 total = self.cov.report(**options) 169 170 # Produce annotated source code report if wanted. 171 if 'annotate' in self.cov_report: 172 annotate_dir = self.cov_report['annotate'] 173 174 with _backup(self.cov, "config"): 175 self.cov.annotate(ignore_errors=True, directory=annotate_dir) 176 # We need to call Coverage.report here, just to get the total 177 # Coverage.annotate don't return any total and we need it for --cov-fail-under. 178 179 with _backup(self.cov, "config"): 180 total = self.cov.report(ignore_errors=True, file=_NullFile) 181 if annotate_dir: 182 stream.write('Coverage annotated source written to dir %s\n' % annotate_dir) 183 else: 184 stream.write('Coverage annotated source written next to source\n') 185 186 # Produce html report if wanted. 187 if 'html' in self.cov_report: 188 output = self.cov_report['html'] 189 with _backup(self.cov, "config"): 190 total = self.cov.html_report(ignore_errors=True, directory=output) 191 stream.write('Coverage HTML written to dir %s\n' % (self.cov.config.html_dir if output is None else output)) 192 193 # Produce xml report if wanted. 194 if 'xml' in self.cov_report: 195 output = self.cov_report['xml'] 196 with _backup(self.cov, "config"): 197 total = self.cov.xml_report(ignore_errors=True, outfile=output) 198 stream.write('Coverage XML written to file %s\n' % (self.cov.config.xml_output if output is None else output)) 199 200 return total 201 202 203class Central(CovController): 204 """Implementation for centralised operation.""" 205 206 @_ensure_topdir 207 def start(self): 208 cleanup() 209 210 self.cov = coverage.Coverage(source=self.cov_source, 211 branch=self.cov_branch, 212 data_suffix=True, 213 config_file=self.cov_config) 214 self.combining_cov = coverage.Coverage(source=self.cov_source, 215 branch=self.cov_branch, 216 data_suffix=True, 217 data_file=os.path.abspath(self.cov.config.data_file), 218 config_file=self.cov_config) 219 220 # Erase or load any previous coverage data and start coverage. 221 if not self.cov_append: 222 self.cov.erase() 223 self.cov.start() 224 self.set_env() 225 226 @_ensure_topdir 227 def finish(self): 228 """Stop coverage, save data to file and set the list of coverage objects to report on.""" 229 230 self.unset_env() 231 self.cov.stop() 232 self.cov.save() 233 234 self.cov = self.combining_cov 235 self.cov.load() 236 self.cov.combine() 237 self.cov.save() 238 239 node_desc = self.get_node_desc(sys.platform, sys.version_info) 240 self.node_descs.add(node_desc) 241 242 243class DistMaster(CovController): 244 """Implementation for distributed master.""" 245 246 @_ensure_topdir 247 def start(self): 248 cleanup() 249 250 # Ensure coverage rc file rsynced if appropriate. 251 if self.cov_config and os.path.exists(self.cov_config): 252 self.config.option.rsyncdir.append(self.cov_config) 253 254 self.cov = coverage.Coverage(source=self.cov_source, 255 branch=self.cov_branch, 256 data_suffix=True, 257 config_file=self.cov_config) 258 self.cov._warn_no_data = False 259 self.cov._warn_unimported_source = False 260 self.cov._warn_preimported_source = False 261 self.combining_cov = coverage.Coverage(source=self.cov_source, 262 branch=self.cov_branch, 263 data_suffix=True, 264 data_file=os.path.abspath(self.cov.config.data_file), 265 config_file=self.cov_config) 266 if not self.cov_append: 267 self.cov.erase() 268 self.cov.start() 269 self.cov.config.paths['source'] = [self.topdir] 270 271 def configure_node(self, node): 272 """Workers need to know if they are collocated and what files have moved.""" 273 274 workerinput(node).update({ 275 'cov_master_host': socket.gethostname(), 276 'cov_master_topdir': self.topdir, 277 'cov_master_rsync_roots': [str(root) for root in node.nodemanager.roots], 278 }) 279 280 def testnodedown(self, node, error): 281 """Collect data file name from worker.""" 282 283 # If worker doesn't return any data then it is likely that this 284 # plugin didn't get activated on the worker side. 285 output = workeroutput(node, {}) 286 if 'cov_worker_node_id' not in output: 287 self.failed_workers.append(node) 288 return 289 290 # If worker is not collocated then we must save the data file 291 # that it returns to us. 292 if 'cov_worker_data' in output: 293 data_suffix = '%s.%s.%06d.%s' % ( 294 socket.gethostname(), os.getpid(), 295 random.randint(0, 999999), 296 output['cov_worker_node_id'] 297 ) 298 299 cov = coverage.Coverage(source=self.cov_source, 300 branch=self.cov_branch, 301 data_suffix=data_suffix, 302 config_file=self.cov_config) 303 cov.start() 304 if coverage.version_info < (5, 0): 305 data = CoverageData() 306 data.read_fileobj(StringIO(output['cov_worker_data'])) 307 cov.data.update(data) 308 else: 309 data = CoverageData(no_disk=True) 310 data.loads(output['cov_worker_data']) 311 cov.get_data().update(data) 312 cov.stop() 313 cov.save() 314 path = output['cov_worker_path'] 315 self.cov.config.paths['source'].append(path) 316 317 # Record the worker types that contribute to the data file. 318 rinfo = node.gateway._rinfo() 319 node_desc = self.get_node_desc(rinfo.platform, rinfo.version_info) 320 self.node_descs.add(node_desc) 321 322 @_ensure_topdir 323 def finish(self): 324 """Combines coverage data and sets the list of coverage objects to report on.""" 325 326 # Combine all the suffix files into the data file. 327 self.cov.stop() 328 self.cov.save() 329 self.cov = self.combining_cov 330 self.cov.load() 331 self.cov.combine() 332 self.cov.save() 333 334 335class DistWorker(CovController): 336 """Implementation for distributed workers.""" 337 338 @_ensure_topdir 339 def start(self): 340 341 cleanup() 342 343 # Determine whether we are collocated with master. 344 self.is_collocated = (socket.gethostname() == workerinput(self.config)['cov_master_host'] and 345 self.topdir == workerinput(self.config)['cov_master_topdir']) 346 347 # If we are not collocated then rewrite master paths to worker paths. 348 if not self.is_collocated: 349 master_topdir = workerinput(self.config)['cov_master_topdir'] 350 worker_topdir = self.topdir 351 if self.cov_source is not None: 352 self.cov_source = [source.replace(master_topdir, worker_topdir) 353 for source in self.cov_source] 354 self.cov_config = self.cov_config.replace(master_topdir, worker_topdir) 355 356 # Erase any previous data and start coverage. 357 self.cov = coverage.Coverage(source=self.cov_source, 358 branch=self.cov_branch, 359 data_suffix=True, 360 config_file=self.cov_config) 361 self.cov.start() 362 self.set_env() 363 364 @_ensure_topdir 365 def finish(self): 366 """Stop coverage and send relevant info back to the master.""" 367 self.unset_env() 368 self.cov.stop() 369 370 if self.is_collocated: 371 # We don't combine data if we're collocated - we can get 372 # race conditions in the .combine() call (it's not atomic) 373 # The data is going to be combined in the master. 374 self.cov.save() 375 376 # If we are collocated then just inform the master of our 377 # data file to indicate that we have finished. 378 workeroutput(self.config)['cov_worker_node_id'] = self.nodeid 379 else: 380 self.cov.combine() 381 self.cov.save() 382 # If we are not collocated then add the current path 383 # and coverage data to the output so we can combine 384 # it on the master node. 385 386 # Send all the data to the master over the channel. 387 if coverage.version_info < (5, 0): 388 buff = StringIO() 389 self.cov.data.write_fileobj(buff) 390 data = buff.getvalue() 391 else: 392 data = self.cov.get_data().dumps() 393 394 workeroutput(self.config).update({ 395 'cov_worker_path': self.topdir, 396 'cov_worker_node_id': self.nodeid, 397 'cov_worker_data': data, 398 }) 399 400 def summary(self, stream): 401 """Only the master reports so do nothing.""" 402 403 pass 404