1#!/usr/local/bin/python3.8
2"""Wrapper around yamllint that supports YAML embedded in Ansible modules."""
3from __future__ import (absolute_import, division, print_function)
4__metaclass__ = type
5
6import ast
7import json
8import os
9import sys
10
11from yamllint import linter
12from yamllint.config import YamlLintConfig
13
14
15def main():
16    """Main program body."""
17    paths = sys.argv[1:] or sys.stdin.read().splitlines()
18
19    checker = YamlChecker()
20    checker.check(paths)
21    checker.report()
22
23
24class YamlChecker:
25    """Wrapper around yamllint that supports YAML embedded in Ansible modules."""
26    def __init__(self):
27        self.messages = []
28
29    def report(self):
30        """Print yamllint report to stdout."""
31        report = dict(
32            messages=self.messages,
33        )
34
35        print(json.dumps(report, indent=4, sort_keys=True))
36
37    def check(self, paths):
38        """
39        :type paths: str
40        """
41        config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config')
42
43        yaml_conf = YamlLintConfig(file=os.path.join(config_path, 'default.yml'))
44        module_conf = YamlLintConfig(file=os.path.join(config_path, 'modules.yml'))
45        plugin_conf = YamlLintConfig(file=os.path.join(config_path, 'plugins.yml'))
46
47        for path in paths:
48            extension = os.path.splitext(path)[1]
49
50            with open(path) as f:
51                contents = f.read()
52
53            if extension in ('.yml', '.yaml'):
54                self.check_yaml(yaml_conf, path, contents)
55            elif extension == '.py':
56                if path.startswith('lib/ansible/modules/') or path.startswith('plugins/modules/'):
57                    conf = module_conf
58                else:
59                    conf = plugin_conf
60
61                self.check_module(conf, path, contents)
62            else:
63                raise Exception('unsupported extension: %s' % extension)
64
65    def check_yaml(self, conf, path, contents):
66        """
67        :type conf: YamlLintConfig
68        :type path: str
69        :type contents: str
70        """
71        self.messages += [self.result_to_message(r, path) for r in linter.run(contents, conf, path)]
72
73    def check_module(self, conf, path, contents):
74        """
75        :type conf: YamlLintConfig
76        :type path: str
77        :type contents: str
78        """
79        docs = self.get_module_docs(path, contents)
80
81        for key, value in docs.items():
82            yaml = value['yaml']
83            lineno = value['lineno']
84
85            if yaml.startswith('\n'):
86                yaml = yaml[1:]
87                lineno += 1
88
89            messages = list(linter.run(yaml, conf, path))
90
91            self.messages += [self.result_to_message(r, path, lineno - 1, key) for r in messages]
92
93    @staticmethod
94    def result_to_message(result, path, line_offset=0, prefix=''):
95        """
96        :type result: any
97        :type path: str
98        :type line_offset: int
99        :type prefix: str
100        :rtype: dict[str, any]
101        """
102        if prefix:
103            prefix = '%s: ' % prefix
104
105        return dict(
106            code=result.rule or result.level,
107            message=prefix + result.desc,
108            path=path,
109            line=result.line + line_offset,
110            column=result.column,
111            level=result.level,
112        )
113
114    def get_module_docs(self, path, contents):
115        """
116        :type path: str
117        :type contents: str
118        :rtype: dict[str, any]
119        """
120        module_doc_types = [
121            'DOCUMENTATION',
122            'EXAMPLES',
123            'RETURN',
124        ]
125
126        docs = {}
127
128        def check_assignment(statement, doc_types=None):
129            """Check the given statement for a documentation assignment."""
130            for target in statement.targets:
131                if not isinstance(target, ast.Name):
132                    continue
133
134                if doc_types and target.id not in doc_types:
135                    continue
136
137                docs[target.id] = dict(
138                    yaml=statement.value.s,
139                    lineno=statement.lineno,
140                    end_lineno=statement.lineno + len(statement.value.s.splitlines())
141                )
142
143        module_ast = self.parse_module(path, contents)
144
145        if not module_ast:
146            return {}
147
148        is_plugin = path.startswith('lib/ansible/modules/') or path.startswith('lib/ansible/plugins/') or path.startswith('plugins/')
149        is_doc_fragment = path.startswith('lib/ansible/plugins/doc_fragments/') or path.startswith('plugins/doc_fragments/')
150
151        if is_plugin and not is_doc_fragment:
152            for body_statement in module_ast.body:
153                if isinstance(body_statement, ast.Assign):
154                    check_assignment(body_statement, module_doc_types)
155        elif is_doc_fragment:
156            for body_statement in module_ast.body:
157                if isinstance(body_statement, ast.ClassDef):
158                    for class_statement in body_statement.body:
159                        if isinstance(class_statement, ast.Assign):
160                            check_assignment(class_statement)
161        else:
162            raise Exception('unsupported path: %s' % path)
163
164        return docs
165
166    def parse_module(self, path, contents):
167        """
168        :type path: str
169        :type contents: str
170        :rtype: ast.Module | None
171        """
172        try:
173            return ast.parse(contents)
174        except SyntaxError as ex:
175            self.messages.append(dict(
176                code='python-syntax-error',
177                message=str(ex),
178                path=path,
179                line=ex.lineno,
180                column=ex.offset,
181                level='error',
182            ))
183        except Exception as ex:  # pylint: disable=broad-except
184            self.messages.append(dict(
185                code='python-parse-error',
186                message=str(ex),
187                path=path,
188                line=0,
189                column=0,
190                level='error',
191            ))
192
193        return None
194
195
196if __name__ == '__main__':
197    main()
198