1"""Coverage controllers for use by pytest-cov and nose-cov."""
2import contextlib
3import copy
4import functools
5import os
6import random
7import socket
8import sys
9
10import coverage
11from coverage.data import CoverageData
12
13from .compat import StringIO
14from .compat import workerinput
15from .compat import workeroutput
16from .embed import cleanup
17
18
19class _NullFile(object):
20    @staticmethod
21    def write(v):
22        pass
23
24
25@contextlib.contextmanager
26def _backup(obj, attr):
27    backup = getattr(obj, attr)
28    try:
29        setattr(obj, attr, copy.copy(backup))
30        yield
31    finally:
32        setattr(obj, attr, backup)
33
34
35def _ensure_topdir(meth):
36    @functools.wraps(meth)
37    def ensure_topdir_wrapper(self, *args, **kwargs):
38        try:
39            original_cwd = os.getcwd()
40        except OSError:
41            # Looks like it's gone, this is non-ideal because a side-effect will
42            # be introduced in the tests here but we can't do anything about it.
43            original_cwd = None
44        os.chdir(self.topdir)
45        try:
46            return meth(self, *args, **kwargs)
47        finally:
48            if original_cwd is not None:
49                os.chdir(original_cwd)
50
51    return ensure_topdir_wrapper
52
53
54class CovController(object):
55    """Base class for different plugin implementations."""
56
57    def __init__(self, cov_source, cov_report, cov_config, cov_append, cov_branch, config=None, nodeid=None):
58        """Get some common config used by multiple derived classes."""
59        self.cov_source = cov_source
60        self.cov_report = cov_report
61        self.cov_config = cov_config
62        self.cov_append = cov_append
63        self.cov_branch = cov_branch
64        self.config = config
65        self.nodeid = nodeid
66
67        self.cov = None
68        self.combining_cov = None
69        self.data_file = None
70        self.node_descs = set()
71        self.failed_workers = []
72        self.topdir = os.getcwd()
73        self.is_collocated = None
74
75    @contextlib.contextmanager
76    def ensure_topdir(self):
77        original_cwd = os.getcwd()
78        os.chdir(self.topdir)
79        yield
80        os.chdir(original_cwd)
81
82    @_ensure_topdir
83    def pause(self):
84        self.cov.stop()
85        self.unset_env()
86
87    @_ensure_topdir
88    def resume(self):
89        self.cov.start()
90        self.set_env()
91
92    @_ensure_topdir
93    def set_env(self):
94        """Put info about coverage into the env so that subprocesses can activate coverage."""
95        if self.cov_source is None:
96            os.environ['COV_CORE_SOURCE'] = os.pathsep
97        else:
98            os.environ['COV_CORE_SOURCE'] = os.pathsep.join(self.cov_source)
99        config_file = os.path.abspath(self.cov_config)
100        if os.path.exists(config_file):
101            os.environ['COV_CORE_CONFIG'] = config_file
102        else:
103            os.environ['COV_CORE_CONFIG'] = os.pathsep
104        os.environ['COV_CORE_DATAFILE'] = os.path.abspath(self.cov.config.data_file)
105        if self.cov_branch:
106            os.environ['COV_CORE_BRANCH'] = 'enabled'
107
108    @staticmethod
109    def unset_env():
110        """Remove coverage info from env."""
111        os.environ.pop('COV_CORE_SOURCE', None)
112        os.environ.pop('COV_CORE_CONFIG', None)
113        os.environ.pop('COV_CORE_DATAFILE', None)
114        os.environ.pop('COV_CORE_BRANCH', None)
115
116    @staticmethod
117    def get_node_desc(platform, version_info):
118        """Return a description of this node."""
119
120        return 'platform %s, python %s' % (platform, '%s.%s.%s-%s-%s' % version_info[:5])
121
122    @staticmethod
123    def sep(stream, s, txt):
124        if hasattr(stream, 'sep'):
125            stream.sep(s, txt)
126        else:
127            sep_total = max((70 - 2 - len(txt)), 2)
128            sep_len = sep_total // 2
129            sep_extra = sep_total % 2
130            out = '%s %s %s\n' % (s * sep_len, txt, s * (sep_len + sep_extra))
131            stream.write(out)
132
133    @_ensure_topdir
134    def summary(self, stream):
135        """Produce coverage reports."""
136        total = None
137
138        if not self.cov_report:
139            with _backup(self.cov, "config"):
140                return self.cov.report(show_missing=True, ignore_errors=True, file=_NullFile)
141
142        # Output coverage section header.
143        if len(self.node_descs) == 1:
144            self.sep(stream, '-', 'coverage: %s' % ''.join(self.node_descs))
145        else:
146            self.sep(stream, '-', 'coverage')
147            for node_desc in sorted(self.node_descs):
148                self.sep(stream, ' ', '%s' % node_desc)
149
150        # Report on any failed workers.
151        if self.failed_workers:
152            self.sep(stream, '-', 'coverage: failed workers')
153            stream.write('The following workers failed to return coverage data, '
154                         'ensure that pytest-cov is installed on these workers.\n')
155            for node in self.failed_workers:
156                stream.write('%s\n' % node.gateway.id)
157
158        # Produce terminal report if wanted.
159        if any(x in self.cov_report for x in ['term', 'term-missing']):
160            options = {
161                'show_missing': ('term-missing' in self.cov_report) or None,
162                'ignore_errors': True,
163                'file': stream,
164            }
165            skip_covered = isinstance(self.cov_report, dict) and 'skip-covered' in self.cov_report.values()
166            options.update({'skip_covered': skip_covered or None})
167            with _backup(self.cov, "config"):
168                total = self.cov.report(**options)
169
170        # Produce annotated source code report if wanted.
171        if 'annotate' in self.cov_report:
172            annotate_dir = self.cov_report['annotate']
173
174            with _backup(self.cov, "config"):
175                self.cov.annotate(ignore_errors=True, directory=annotate_dir)
176            # We need to call Coverage.report here, just to get the total
177            # Coverage.annotate don't return any total and we need it for --cov-fail-under.
178
179            with _backup(self.cov, "config"):
180                total = self.cov.report(ignore_errors=True, file=_NullFile)
181            if annotate_dir:
182                stream.write('Coverage annotated source written to dir %s\n' % annotate_dir)
183            else:
184                stream.write('Coverage annotated source written next to source\n')
185
186        # Produce html report if wanted.
187        if 'html' in self.cov_report:
188            output = self.cov_report['html']
189            with _backup(self.cov, "config"):
190                total = self.cov.html_report(ignore_errors=True, directory=output)
191            stream.write('Coverage HTML written to dir %s\n' % (self.cov.config.html_dir if output is None else output))
192
193        # Produce xml report if wanted.
194        if 'xml' in self.cov_report:
195            output = self.cov_report['xml']
196            with _backup(self.cov, "config"):
197                total = self.cov.xml_report(ignore_errors=True, outfile=output)
198            stream.write('Coverage XML written to file %s\n' % (self.cov.config.xml_output if output is None else output))
199
200        return total
201
202
203class Central(CovController):
204    """Implementation for centralised operation."""
205
206    @_ensure_topdir
207    def start(self):
208        cleanup()
209
210        self.cov = coverage.Coverage(source=self.cov_source,
211                                     branch=self.cov_branch,
212                                     data_suffix=True,
213                                     config_file=self.cov_config)
214        self.combining_cov = coverage.Coverage(source=self.cov_source,
215                                               branch=self.cov_branch,
216                                               data_suffix=True,
217                                               data_file=os.path.abspath(self.cov.config.data_file),
218                                               config_file=self.cov_config)
219
220        # Erase or load any previous coverage data and start coverage.
221        if not self.cov_append:
222            self.cov.erase()
223        self.cov.start()
224        self.set_env()
225
226    @_ensure_topdir
227    def finish(self):
228        """Stop coverage, save data to file and set the list of coverage objects to report on."""
229
230        self.unset_env()
231        self.cov.stop()
232        self.cov.save()
233
234        self.cov = self.combining_cov
235        self.cov.load()
236        self.cov.combine()
237        self.cov.save()
238
239        node_desc = self.get_node_desc(sys.platform, sys.version_info)
240        self.node_descs.add(node_desc)
241
242
243class DistMaster(CovController):
244    """Implementation for distributed master."""
245
246    @_ensure_topdir
247    def start(self):
248        cleanup()
249
250        # Ensure coverage rc file rsynced if appropriate.
251        if self.cov_config and os.path.exists(self.cov_config):
252            self.config.option.rsyncdir.append(self.cov_config)
253
254        self.cov = coverage.Coverage(source=self.cov_source,
255                                     branch=self.cov_branch,
256                                     data_suffix=True,
257                                     config_file=self.cov_config)
258        self.cov._warn_no_data = False
259        self.cov._warn_unimported_source = False
260        self.cov._warn_preimported_source = False
261        self.combining_cov = coverage.Coverage(source=self.cov_source,
262                                               branch=self.cov_branch,
263                                               data_suffix=True,
264                                               data_file=os.path.abspath(self.cov.config.data_file),
265                                               config_file=self.cov_config)
266        if not self.cov_append:
267            self.cov.erase()
268        self.cov.start()
269        self.cov.config.paths['source'] = [self.topdir]
270
271    def configure_node(self, node):
272        """Workers need to know if they are collocated and what files have moved."""
273
274        workerinput(node).update({
275            'cov_master_host': socket.gethostname(),
276            'cov_master_topdir': self.topdir,
277            'cov_master_rsync_roots': [str(root) for root in node.nodemanager.roots],
278        })
279
280    def testnodedown(self, node, error):
281        """Collect data file name from worker."""
282
283        # If worker doesn't return any data then it is likely that this
284        # plugin didn't get activated on the worker side.
285        output = workeroutput(node, {})
286        if 'cov_worker_node_id' not in output:
287            self.failed_workers.append(node)
288            return
289
290        # If worker is not collocated then we must save the data file
291        # that it returns to us.
292        if 'cov_worker_data' in output:
293            data_suffix = '%s.%s.%06d.%s' % (
294                socket.gethostname(), os.getpid(),
295                random.randint(0, 999999),
296                output['cov_worker_node_id']
297            )
298
299            cov = coverage.Coverage(source=self.cov_source,
300                                    branch=self.cov_branch,
301                                    data_suffix=data_suffix,
302                                    config_file=self.cov_config)
303            cov.start()
304            if coverage.version_info < (5, 0):
305                data = CoverageData()
306                data.read_fileobj(StringIO(output['cov_worker_data']))
307                cov.data.update(data)
308            else:
309                data = CoverageData(no_disk=True)
310                data.loads(output['cov_worker_data'])
311                cov.get_data().update(data)
312            cov.stop()
313            cov.save()
314            path = output['cov_worker_path']
315            self.cov.config.paths['source'].append(path)
316
317        # Record the worker types that contribute to the data file.
318        rinfo = node.gateway._rinfo()
319        node_desc = self.get_node_desc(rinfo.platform, rinfo.version_info)
320        self.node_descs.add(node_desc)
321
322    @_ensure_topdir
323    def finish(self):
324        """Combines coverage data and sets the list of coverage objects to report on."""
325
326        # Combine all the suffix files into the data file.
327        self.cov.stop()
328        self.cov.save()
329        self.cov = self.combining_cov
330        self.cov.load()
331        self.cov.combine()
332        self.cov.save()
333
334
335class DistWorker(CovController):
336    """Implementation for distributed workers."""
337
338    @_ensure_topdir
339    def start(self):
340
341        cleanup()
342
343        # Determine whether we are collocated with master.
344        self.is_collocated = (socket.gethostname() == workerinput(self.config)['cov_master_host'] and
345                              self.topdir == workerinput(self.config)['cov_master_topdir'])
346
347        # If we are not collocated then rewrite master paths to worker paths.
348        if not self.is_collocated:
349            master_topdir = workerinput(self.config)['cov_master_topdir']
350            worker_topdir = self.topdir
351            if self.cov_source is not None:
352                self.cov_source = [source.replace(master_topdir, worker_topdir)
353                                   for source in self.cov_source]
354            self.cov_config = self.cov_config.replace(master_topdir, worker_topdir)
355
356        # Erase any previous data and start coverage.
357        self.cov = coverage.Coverage(source=self.cov_source,
358                                     branch=self.cov_branch,
359                                     data_suffix=True,
360                                     config_file=self.cov_config)
361        self.cov.start()
362        self.set_env()
363
364    @_ensure_topdir
365    def finish(self):
366        """Stop coverage and send relevant info back to the master."""
367        self.unset_env()
368        self.cov.stop()
369
370        if self.is_collocated:
371            # We don't combine data if we're collocated - we can get
372            # race conditions in the .combine() call (it's not atomic)
373            # The data is going to be combined in the master.
374            self.cov.save()
375
376            # If we are collocated then just inform the master of our
377            # data file to indicate that we have finished.
378            workeroutput(self.config)['cov_worker_node_id'] = self.nodeid
379        else:
380            self.cov.combine()
381            self.cov.save()
382            # If we are not collocated then add the current path
383            # and coverage data to the output so we can combine
384            # it on the master node.
385
386            # Send all the data to the master over the channel.
387            if coverage.version_info < (5, 0):
388                buff = StringIO()
389                self.cov.data.write_fileobj(buff)
390                data = buff.getvalue()
391            else:
392                data = self.cov.get_data().dumps()
393
394            workeroutput(self.config).update({
395                'cov_worker_path': self.topdir,
396                'cov_worker_node_id': self.nodeid,
397                'cov_worker_data': data,
398            })
399
400    def summary(self, stream):
401        """Only the master reports so do nothing."""
402
403        pass
404