1"""Common logic for the coverage subcommand."""
2from __future__ import (absolute_import, division, print_function)
3__metaclass__ = type
4
5import os
6import re
7
8from .. import types as t
9
10from ..encoding import (
11    to_bytes,
12)
13
14from ..io import (
15    open_binary_file,
16    read_json_file,
17)
18
19from ..util import (
20    ApplicationError,
21    common_environment,
22    display,
23    ANSIBLE_TEST_DATA_ROOT,
24)
25
26from ..util_common import (
27    intercept_command,
28    ResultType,
29)
30
31from ..config import (
32    EnvironmentConfig,
33)
34
35from ..executor import (
36    Delegate,
37    install_command_requirements,
38)
39
40from .. target import (
41    walk_module_targets,
42)
43
44from ..data import (
45    data_context,
46)
47
48if t.TYPE_CHECKING:
49    import coverage as coverage_module
50
51COVERAGE_GROUPS = ('command', 'target', 'environment', 'version')
52COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc')
53COVERAGE_OUTPUT_FILE_NAME = 'coverage'
54
55
56class CoverageConfig(EnvironmentConfig):
57    """Configuration for the coverage command."""
58    def __init__(self, args):  # type: (t.Any) -> None
59        super(CoverageConfig, self).__init__(args, 'coverage')
60
61        self.group_by = frozenset(args.group_by) if 'group_by' in args and args.group_by else set()  # type: t.FrozenSet[str]
62        self.all = args.all if 'all' in args else False  # type: bool
63        self.stub = args.stub if 'stub' in args else False  # type: bool
64        self.export = args.export if 'export' in args else None  # type: str
65        self.coverage = False  # temporary work-around to support intercept_command in cover.py
66
67
68def initialize_coverage(args):  # type: (CoverageConfig) -> coverage_module
69    """Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available."""
70    if args.delegate:
71        raise Delegate()
72
73    if args.requirements:
74        install_command_requirements(args)
75
76    try:
77        import coverage
78    except ImportError:
79        coverage = None
80
81    if not coverage:
82        raise ApplicationError('You must install the "coverage" python module to use this command.')
83
84    coverage_version_string = coverage.__version__
85    coverage_version = tuple(int(v) for v in coverage_version_string.split('.'))
86
87    min_version = (4, 2)
88    max_version = (5, 0)
89
90    supported_version = True
91    recommended_version = '4.5.4'
92
93    if coverage_version < min_version or coverage_version >= max_version:
94        supported_version = False
95
96    if not supported_version:
97        raise ApplicationError('Version %s of "coverage" is not supported. Version %s is known to work and is recommended.' % (
98            coverage_version_string, recommended_version))
99
100    return coverage
101
102
103def run_coverage(args, output_file, command, cmd):  # type: (CoverageConfig, str, str, t.List[str]) -> None
104    """Run the coverage cli tool with the specified options."""
105    env = common_environment()
106    env.update(dict(COVERAGE_FILE=output_file))
107
108    cmd = ['python', '-m', 'coverage.__main__', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd
109
110    intercept_command(args, target_name='coverage', env=env, cmd=cmd, disable_coverage=True)
111
112
113def get_python_coverage_files(path=None):  # type: (t.Optional[str]) -> t.List[str]
114    """Return the list of Python coverage file paths."""
115    return get_coverage_files('python', path)
116
117
118def get_powershell_coverage_files(path=None):  # type: (t.Optional[str]) -> t.List[str]
119    """Return the list of PowerShell coverage file paths."""
120    return get_coverage_files('powershell', path)
121
122
123def get_coverage_files(language, path=None):  # type: (str, t.Optional[str]) -> t.List[str]
124    """Return the list of coverage file paths for the given language."""
125    coverage_dir = path or ResultType.COVERAGE.path
126    coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir)
127                      if '=coverage.' in f and '=%s' % language in f]
128
129    return coverage_files
130
131
132def get_collection_path_regexes():  # type: () -> t.Tuple[t.Optional[t.Pattern], t.Optional[t.Pattern]]
133    """Return a pair of regexes used for identifying and manipulating collection paths."""
134    if data_context().content.collection:
135        collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory)
136        collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory)
137    else:
138        collection_search_re = None
139        collection_sub_re = None
140
141    return collection_search_re, collection_sub_re
142
143
144def get_python_modules():  # type: () -> t.Dict[str, str]
145    """Return a dictionary of Ansible module names and their paths."""
146    return dict((target.module, target.path) for target in list(walk_module_targets()) if target.path.endswith('.py'))
147
148
149def enumerate_python_arcs(
150        path,  # type: str
151        coverage,  # type: coverage_module
152        modules,  # type: t.Dict[str, str]
153        collection_search_re,  # type: t.Optional[t.Pattern]
154        collection_sub_re,  # type: t.Optional[t.Pattern]
155):  # type: (...) -> t.Generator[t.Tuple[str, t.Set[t.Tuple[int, int]]]]
156    """Enumerate Python code coverage arcs in the given file."""
157    if os.path.getsize(path) == 0:
158        display.warning('Empty coverage file: %s' % path, verbosity=2)
159        return
160
161    original = coverage.CoverageData()
162
163    try:
164        original.read_file(path)
165    except Exception as ex:  # pylint: disable=locally-disabled, broad-except
166        with open_binary_file(path) as file_obj:
167            header = file_obj.read(6)
168
169        if header == b'SQLite':
170            display.error('File created by "coverage" 5.0+: %s' % os.path.relpath(path))
171        else:
172            display.error(u'%s' % ex)
173
174        return
175
176    for filename in original.measured_files():
177        arcs = original.arcs(filename)
178
179        if not arcs:
180            # This is most likely due to using an unsupported version of coverage.
181            display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path))
182            continue
183
184        filename = sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re)
185
186        if not filename:
187            continue
188
189        yield filename, set(arcs)
190
191
192def enumerate_powershell_lines(
193        path,  # type: str
194        collection_search_re,  # type: t.Optional[t.Pattern]
195        collection_sub_re,  # type: t.Optional[t.Pattern]
196):  # type: (...) -> t.Generator[t.Tuple[str, t.Dict[int, int]]]
197    """Enumerate PowerShell code coverage lines in the given file."""
198    if os.path.getsize(path) == 0:
199        display.warning('Empty coverage file: %s' % path, verbosity=2)
200        return
201
202    try:
203        coverage_run = read_json_file(path)
204    except Exception as ex:  # pylint: disable=locally-disabled, broad-except
205        display.error(u'%s' % ex)
206        return
207
208    for filename, hits in coverage_run.items():
209        filename = sanitize_filename(filename, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re)
210
211        if not filename:
212            continue
213
214        if isinstance(hits, dict) and not hits.get('Line'):
215            # Input data was previously aggregated and thus uses the standard ansible-test output format for PowerShell coverage.
216            # This format differs from the more verbose format of raw coverage data from the remote Windows hosts.
217            hits = dict((int(key), value) for key, value in hits.items())
218
219            yield filename, hits
220            continue
221
222        # PowerShell unpacks arrays if there's only a single entry so this is a defensive check on that
223        if not isinstance(hits, list):
224            hits = [hits]
225
226        hits = dict((hit['Line'], hit['HitCount']) for hit in hits if hit)
227
228        yield filename, hits
229
230
231def sanitize_filename(
232        filename,  # type: str
233        modules=None,  # type: t.Optional[t.Dict[str, str]]
234        collection_search_re=None,  # type: t.Optional[t.Pattern]
235        collection_sub_re=None,  # type: t.Optional[t.Pattern]
236):  # type: (...) -> t.Optional[str]
237    """Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid."""
238    ansible_path = os.path.abspath('lib/ansible/') + '/'
239    root_path = data_context().content.root + '/'
240    integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep
241
242    if modules is None:
243        modules = {}
244
245    if '/ansible_modlib.zip/ansible/' in filename:
246        # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier.
247        new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename)
248        display.info('%s -> %s' % (filename, new_name), verbosity=3)
249        filename = new_name
250    elif collection_search_re and collection_search_re.search(filename):
251        new_name = os.path.abspath(collection_sub_re.sub('', filename))
252        display.info('%s -> %s' % (filename, new_name), verbosity=3)
253        filename = new_name
254    elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename):
255        # Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later.
256        new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename)
257        display.info('%s -> %s' % (filename, new_name), verbosity=3)
258        filename = new_name
259    elif '/ansible_module_' in filename:
260        # Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier.
261        module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename)
262        if module_name not in modules:
263            display.warning('Skipping coverage of unknown module: %s' % module_name)
264            return None
265        new_name = os.path.abspath(modules[module_name])
266        display.info('%s -> %s' % (filename, new_name), verbosity=3)
267        filename = new_name
268    elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename):
269        # Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later.
270        # AnsiballZ versions using zipimporter will match the `.zip` portion of the regex.
271        # AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex.
272        module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$',
273                             '\\g<module>', filename).rstrip('_')
274        if module_name not in modules:
275            display.warning('Skipping coverage of unknown module: %s' % module_name)
276            return None
277        new_name = os.path.abspath(modules[module_name])
278        display.info('%s -> %s' % (filename, new_name), verbosity=3)
279        filename = new_name
280    elif re.search('^(/.*?)?/root/ansible/', filename):
281        # Rewrite the path of code running on a remote host or in a docker container as root.
282        new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename)
283        display.info('%s -> %s' % (filename, new_name), verbosity=3)
284        filename = new_name
285    elif integration_temp_path in filename:
286        # Rewrite the path of code running from an integration test temporary directory.
287        new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename)
288        display.info('%s -> %s' % (filename, new_name), verbosity=3)
289        filename = new_name
290
291    filename = os.path.abspath(filename)  # make sure path is absolute (will be relative if previously exported)
292
293    return filename
294
295
296class PathChecker:
297    """Checks code coverage paths to verify they are valid and reports on the findings."""
298    def __init__(self, args, collection_search_re=None):  # type: (CoverageConfig, t.Optional[t.Pattern]) -> None
299        self.args = args
300        self.collection_search_re = collection_search_re
301        self.invalid_paths = []
302        self.invalid_path_chars = 0
303
304    def check_path(self, path):  # type: (str) -> bool
305        """Return True if the given coverage path is valid, otherwise display a warning and return False."""
306        if os.path.isfile(to_bytes(path)):
307            return True
308
309        if self.collection_search_re and self.collection_search_re.search(path) and os.path.basename(path) == '__init__.py':
310            # the collection loader uses implicit namespace packages, so __init__.py does not need to exist on disk
311            # coverage is still reported for these non-existent files, but warnings are not needed
312            return False
313
314        self.invalid_paths.append(path)
315        self.invalid_path_chars += len(path)
316
317        if self.args.verbosity > 1:
318            display.warning('Invalid coverage path: %s' % path)
319
320        return False
321
322    def report(self):  # type: () -> None
323        """Display a warning regarding invalid paths if any were found."""
324        if self.invalid_paths:
325            display.warning('Ignored %d characters from %d invalid coverage path(s).' % (self.invalid_path_chars, len(self.invalid_paths)))
326