1"""Sanity test using validate-modules."""
2from __future__ import (absolute_import, division, print_function)
3__metaclass__ = type
4
5import json
6import os
7
8from .. import types as t
9
10from ..sanity import (
11    SanitySingleVersion,
12    SanityMessage,
13    SanityFailure,
14    SanitySuccess,
15    SANITY_ROOT,
16)
17
18from ..target import (
19    TestTarget,
20)
21
22from ..util import (
23    SubprocessError,
24    display,
25    find_python,
26)
27
28from ..util_common import (
29    run_command,
30)
31
32from ..ansible_util import (
33    ansible_environment,
34)
35
36from ..config import (
37    SanityConfig,
38)
39
40from ..ci import (
41    get_ci_provider,
42)
43
44from ..data import (
45    data_context,
46)
47
48
49class ValidateModulesTest(SanitySingleVersion):
50    """Sanity test using validate-modules."""
51    @property
52    def error_code(self):  # type: () -> t.Optional[str]
53        """Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes."""
54        return 'A100'
55
56    def filter_targets(self, targets):  # type: (t.List[TestTarget]) -> t.List[TestTarget]
57        """Return the given list of test targets, filtered to include only those relevant for the test."""
58        return [target for target in targets if target.module]
59
60    def test(self, args, targets, python_version):
61        """
62        :type args: SanityConfig
63        :type targets: SanityTargets
64        :type python_version: str
65        :rtype: TestResult
66        """
67        env = ansible_environment(args, color=False)
68
69        settings = self.load_processor(args)
70
71        paths = [target.path for target in targets.include]
72
73        cmd = [
74            find_python(python_version),
75            os.path.join(SANITY_ROOT, 'validate-modules', 'validate-modules'),
76            '--format', 'json',
77            '--arg-spec',
78        ] + paths
79
80        if data_context().content.collection:
81            cmd.extend(['--collection', data_context().content.collection.directory])
82        else:
83            base_branch = args.base_branch or get_ci_provider().get_base_branch()
84
85            if base_branch:
86                cmd.extend([
87                    '--base-branch', base_branch,
88                ])
89            else:
90                display.warning('Cannot perform module comparison against the base branch because the base branch was not detected.')
91
92        try:
93            stdout, stderr = run_command(args, cmd, env=env, capture=True)
94            status = 0
95        except SubprocessError as ex:
96            stdout = ex.stdout
97            stderr = ex.stderr
98            status = ex.status
99
100        if stderr or status not in (0, 3):
101            raise SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout)
102
103        if args.explain:
104            return SanitySuccess(self.name)
105
106        messages = json.loads(stdout)
107
108        errors = []
109
110        for filename in messages:
111            output = messages[filename]
112
113            for item in output['errors']:
114                errors.append(SanityMessage(
115                    path=filename,
116                    line=int(item['line']) if 'line' in item else 0,
117                    column=int(item['column']) if 'column' in item else 0,
118                    level='error',
119                    code='%s' % item['code'],
120                    message=item['msg'],
121                ))
122
123        errors = settings.process_errors(errors, paths)
124
125        if errors:
126            return SanityFailure(self.name, messages=errors)
127
128        return SanitySuccess(self.name)
129