1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
2# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
3
4"""Miscellaneous stuff for coverage.py."""
5
6import errno
7import hashlib
8import inspect
9import locale
10import os
11import os.path
12import random
13import re
14import socket
15import sys
16import types
17
18from coverage import env
19from coverage.backward import to_bytes, unicode_class
20
21ISOLATED_MODULES = {}
22
23
24def isolate_module(mod):
25    """Copy a module so that we are isolated from aggressive mocking.
26
27    If a test suite mocks os.path.exists (for example), and then we need to use
28    it during the test, everything will get tangled up if we use their mock.
29    Making a copy of the module when we import it will isolate coverage.py from
30    those complications.
31    """
32    if mod not in ISOLATED_MODULES:
33        new_mod = types.ModuleType(mod.__name__)
34        ISOLATED_MODULES[mod] = new_mod
35        for name in dir(mod):
36            value = getattr(mod, name)
37            if isinstance(value, types.ModuleType):
38                value = isolate_module(value)
39            setattr(new_mod, name, value)
40    return ISOLATED_MODULES[mod]
41
42os = isolate_module(os)
43
44
45def dummy_decorator_with_args(*args_unused, **kwargs_unused):
46    """Dummy no-op implementation of a decorator with arguments."""
47    def _decorator(func):
48        return func
49    return _decorator
50
51
52# Environment COVERAGE_NO_CONTRACTS=1 can turn off contracts while debugging
53# tests to remove noise from stack traces.
54# $set_env.py: COVERAGE_NO_CONTRACTS - Disable PyContracts to simplify stack traces.
55USE_CONTRACTS = env.TESTING and not bool(int(os.environ.get("COVERAGE_NO_CONTRACTS", 0)))
56
57# Use PyContracts for assertion testing on parameters and returns, but only if
58# we are running our own test suite.
59if USE_CONTRACTS:
60    from contracts import contract              # pylint: disable=unused-import
61    from contracts import new_contract as raw_new_contract
62
63    def new_contract(*args, **kwargs):
64        """A proxy for contracts.new_contract that doesn't mind happening twice."""
65        try:
66            return raw_new_contract(*args, **kwargs)
67        except ValueError:
68            # During meta-coverage, this module is imported twice, and
69            # PyContracts doesn't like redefining contracts. It's OK.
70            pass
71
72    # Define contract words that PyContract doesn't have.
73    new_contract('bytes', lambda v: isinstance(v, bytes))
74    if env.PY3:
75        new_contract('unicode', lambda v: isinstance(v, unicode_class))
76
77    def one_of(argnames):
78        """Ensure that only one of the argnames is non-None."""
79        def _decorator(func):
80            argnameset = set(name.strip() for name in argnames.split(","))
81            def _wrapper(*args, **kwargs):
82                vals = [kwargs.get(name) for name in argnameset]
83                assert sum(val is not None for val in vals) == 1
84                return func(*args, **kwargs)
85            return _wrapper
86        return _decorator
87else:                                           # pragma: not testing
88    # We aren't using real PyContracts, so just define our decorators as
89    # stunt-double no-ops.
90    contract = dummy_decorator_with_args
91    one_of = dummy_decorator_with_args
92
93    def new_contract(*args_unused, **kwargs_unused):
94        """Dummy no-op implementation of `new_contract`."""
95        pass
96
97
98def nice_pair(pair):
99    """Make a nice string representation of a pair of numbers.
100
101    If the numbers are equal, just return the number, otherwise return the pair
102    with a dash between them, indicating the range.
103
104    """
105    start, end = pair
106    if start == end:
107        return "%d" % start
108    else:
109        return "%d-%d" % (start, end)
110
111
112def expensive(fn):
113    """A decorator to indicate that a method shouldn't be called more than once.
114
115    Normally, this does nothing.  During testing, this raises an exception if
116    called more than once.
117
118    """
119    if env.TESTING:
120        attr = "_once_" + fn.__name__
121
122        def _wrapper(self):
123            if hasattr(self, attr):
124                raise AssertionError("Shouldn't have called %s more than once" % fn.__name__)
125            setattr(self, attr, True)
126            return fn(self)
127        return _wrapper
128    else:
129        return fn                   # pragma: not testing
130
131
132def bool_or_none(b):
133    """Return bool(b), but preserve None."""
134    if b is None:
135        return None
136    else:
137        return bool(b)
138
139
140def join_regex(regexes):
141    """Combine a list of regexes into one that matches any of them."""
142    return "|".join("(?:%s)" % r for r in regexes)
143
144
145def file_be_gone(path):
146    """Remove a file, and don't get annoyed if it doesn't exist."""
147    try:
148        os.remove(path)
149    except OSError as e:
150        if e.errno != errno.ENOENT:
151            raise
152
153
154def ensure_dir(directory):
155    """Make sure the directory exists.
156
157    If `directory` is None or empty, do nothing.
158    """
159    if directory and not os.path.isdir(directory):
160        os.makedirs(directory)
161
162
163def ensure_dir_for_file(path):
164    """Make sure the directory for the path exists."""
165    ensure_dir(os.path.dirname(path))
166
167
168def output_encoding(outfile=None):
169    """Determine the encoding to use for output written to `outfile` or stdout."""
170    if outfile is None:
171        outfile = sys.stdout
172    encoding = (
173        getattr(outfile, "encoding", None) or
174        getattr(sys.__stdout__, "encoding", None) or
175        locale.getpreferredencoding()
176    )
177    return encoding
178
179
180def filename_suffix(suffix):
181    """Compute a filename suffix for a data file.
182
183    If `suffix` is a string or None, simply return it. If `suffix` is True,
184    then build a suffix incorporating the hostname, process id, and a random
185    number.
186
187    Returns a string or None.
188
189    """
190    if suffix is True:
191        # If data_suffix was a simple true value, then make a suffix with
192        # plenty of distinguishing information.  We do this here in
193        # `save()` at the last minute so that the pid will be correct even
194        # if the process forks.
195        dice = random.Random(os.urandom(8)).randint(0, 999999)
196        suffix = "%s.%s.%06d" % (socket.gethostname(), os.getpid(), dice)
197    return suffix
198
199
200class Hasher(object):
201    """Hashes Python data into md5."""
202    def __init__(self):
203        self.md5 = hashlib.md5()
204
205    def update(self, v):
206        """Add `v` to the hash, recursively if needed."""
207        self.md5.update(to_bytes(str(type(v))))
208        if isinstance(v, unicode_class):
209            self.md5.update(v.encode('utf8'))
210        elif isinstance(v, bytes):
211            self.md5.update(v)
212        elif v is None:
213            pass
214        elif isinstance(v, (int, float)):
215            self.md5.update(to_bytes(str(v)))
216        elif isinstance(v, (tuple, list)):
217            for e in v:
218                self.update(e)
219        elif isinstance(v, dict):
220            keys = v.keys()
221            for k in sorted(keys):
222                self.update(k)
223                self.update(v[k])
224        else:
225            for k in dir(v):
226                if k.startswith('__'):
227                    continue
228                a = getattr(v, k)
229                if inspect.isroutine(a):
230                    continue
231                self.update(k)
232                self.update(a)
233        self.md5.update(b'.')
234
235    def hexdigest(self):
236        """Retrieve the hex digest of the hash."""
237        return self.md5.hexdigest()
238
239
240def _needs_to_implement(that, func_name):
241    """Helper to raise NotImplementedError in interface stubs."""
242    if hasattr(that, "_coverage_plugin_name"):
243        thing = "Plugin"
244        name = that._coverage_plugin_name
245    else:
246        thing = "Class"
247        klass = that.__class__
248        name = "{klass.__module__}.{klass.__name__}".format(klass=klass)
249
250    raise NotImplementedError(
251        "{thing} {name!r} needs to implement {func_name}()".format(
252            thing=thing, name=name, func_name=func_name
253            )
254        )
255
256
257class DefaultValue(object):
258    """A sentinel object to use for unusual default-value needs.
259
260    Construct with a string that will be used as the repr, for display in help
261    and Sphinx output.
262
263    """
264    def __init__(self, display_as):
265        self.display_as = display_as
266
267    def __repr__(self):
268        return self.display_as
269
270
271def substitute_variables(text, variables):
272    """Substitute ``${VAR}`` variables in `text` with their values.
273
274    Variables in the text can take a number of shell-inspired forms::
275
276        $VAR
277        ${VAR}
278        ${VAR?}             strict: an error if VAR isn't defined.
279        ${VAR-missing}      defaulted: "missing" if VAR isn't defined.
280        $$                  just a dollar sign.
281
282    `variables` is a dictionary of variable values.
283
284    Returns the resulting text with values substituted.
285
286    """
287    dollar_pattern = r"""(?x)   # Use extended regex syntax
288        \$                      # A dollar sign,
289        (?:                     # then
290            (?P<dollar>\$) |        # a dollar sign, or
291            (?P<word1>\w+) |        # a plain word, or
292            {                       # a {-wrapped
293                (?P<word2>\w+)          # word,
294                (?:
295                    (?P<strict>\?) |        # with a strict marker
296                    -(?P<defval>[^}]*)      # or a default value
297                )?                      # maybe.
298            }
299        )
300        """
301
302    def dollar_replace(match):
303        """Called for each $replacement."""
304        # Only one of the groups will have matched, just get its text.
305        word = next(g for g in match.group('dollar', 'word1', 'word2') if g)
306        if word == "$":
307            return "$"
308        elif word in variables:
309            return variables[word]
310        elif match.group('strict'):
311            msg = "Variable {} is undefined: {!r}".format(word, text)
312            raise CoverageException(msg)
313        else:
314            return match.group('defval')
315
316    text = re.sub(dollar_pattern, dollar_replace, text)
317    return text
318
319
320class BaseCoverageException(Exception):
321    """The base of all Coverage exceptions."""
322    pass
323
324
325class CoverageException(BaseCoverageException):
326    """An exception raised by a coverage.py function."""
327    pass
328
329
330class NoSource(CoverageException):
331    """We couldn't find the source for a module."""
332    pass
333
334
335class NoCode(NoSource):
336    """We couldn't find any code at all."""
337    pass
338
339
340class NotPython(CoverageException):
341    """A source file turned out not to be parsable Python."""
342    pass
343
344
345class ExceptionDuringRun(CoverageException):
346    """An exception happened while running customer code.
347
348    Construct it with three arguments, the values from `sys.exc_info`.
349
350    """
351    pass
352
353
354class StopEverything(BaseCoverageException):
355    """An exception that means everything should stop.
356
357    The CoverageTest class converts these to SkipTest, so that when running
358    tests, raising this exception will automatically skip the test.
359
360    """
361    pass
362