1#!/usr/local/bin/python3.8 2"""Wrapper around yamllint that supports YAML embedded in Ansible modules.""" 3from __future__ import (absolute_import, division, print_function) 4__metaclass__ = type 5 6import ast 7import json 8import os 9import sys 10 11from yamllint import linter 12from yamllint.config import YamlLintConfig 13 14 15def main(): 16 """Main program body.""" 17 paths = sys.argv[1:] or sys.stdin.read().splitlines() 18 19 checker = YamlChecker() 20 checker.check(paths) 21 checker.report() 22 23 24class YamlChecker: 25 """Wrapper around yamllint that supports YAML embedded in Ansible modules.""" 26 def __init__(self): 27 self.messages = [] 28 29 def report(self): 30 """Print yamllint report to stdout.""" 31 report = dict( 32 messages=self.messages, 33 ) 34 35 print(json.dumps(report, indent=4, sort_keys=True)) 36 37 def check(self, paths): 38 """ 39 :type paths: str 40 """ 41 config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config') 42 43 yaml_conf = YamlLintConfig(file=os.path.join(config_path, 'default.yml')) 44 module_conf = YamlLintConfig(file=os.path.join(config_path, 'modules.yml')) 45 plugin_conf = YamlLintConfig(file=os.path.join(config_path, 'plugins.yml')) 46 47 for path in paths: 48 extension = os.path.splitext(path)[1] 49 50 with open(path) as f: 51 contents = f.read() 52 53 if extension in ('.yml', '.yaml'): 54 self.check_yaml(yaml_conf, path, contents) 55 elif extension == '.py': 56 if path.startswith('lib/ansible/modules/') or path.startswith('plugins/modules/'): 57 conf = module_conf 58 else: 59 conf = plugin_conf 60 61 self.check_module(conf, path, contents) 62 else: 63 raise Exception('unsupported extension: %s' % extension) 64 65 def check_yaml(self, conf, path, contents): 66 """ 67 :type conf: YamlLintConfig 68 :type path: str 69 :type contents: str 70 """ 71 self.messages += [self.result_to_message(r, path) for r in linter.run(contents, conf, path)] 72 73 def check_module(self, conf, path, contents): 74 """ 75 :type conf: YamlLintConfig 76 :type path: str 77 :type contents: str 78 """ 79 docs = self.get_module_docs(path, contents) 80 81 for key, value in docs.items(): 82 yaml = value['yaml'] 83 lineno = value['lineno'] 84 85 if yaml.startswith('\n'): 86 yaml = yaml[1:] 87 lineno += 1 88 89 messages = list(linter.run(yaml, conf, path)) 90 91 self.messages += [self.result_to_message(r, path, lineno - 1, key) for r in messages] 92 93 @staticmethod 94 def result_to_message(result, path, line_offset=0, prefix=''): 95 """ 96 :type result: any 97 :type path: str 98 :type line_offset: int 99 :type prefix: str 100 :rtype: dict[str, any] 101 """ 102 if prefix: 103 prefix = '%s: ' % prefix 104 105 return dict( 106 code=result.rule or result.level, 107 message=prefix + result.desc, 108 path=path, 109 line=result.line + line_offset, 110 column=result.column, 111 level=result.level, 112 ) 113 114 def get_module_docs(self, path, contents): 115 """ 116 :type path: str 117 :type contents: str 118 :rtype: dict[str, any] 119 """ 120 module_doc_types = [ 121 'DOCUMENTATION', 122 'EXAMPLES', 123 'RETURN', 124 ] 125 126 docs = {} 127 128 def check_assignment(statement, doc_types=None): 129 """Check the given statement for a documentation assignment.""" 130 for target in statement.targets: 131 if not isinstance(target, ast.Name): 132 continue 133 134 if doc_types and target.id not in doc_types: 135 continue 136 137 docs[target.id] = dict( 138 yaml=statement.value.s, 139 lineno=statement.lineno, 140 end_lineno=statement.lineno + len(statement.value.s.splitlines()) 141 ) 142 143 module_ast = self.parse_module(path, contents) 144 145 if not module_ast: 146 return {} 147 148 is_plugin = path.startswith('lib/ansible/modules/') or path.startswith('lib/ansible/plugins/') or path.startswith('plugins/') 149 is_doc_fragment = path.startswith('lib/ansible/plugins/doc_fragments/') or path.startswith('plugins/doc_fragments/') 150 151 if is_plugin and not is_doc_fragment: 152 for body_statement in module_ast.body: 153 if isinstance(body_statement, ast.Assign): 154 check_assignment(body_statement, module_doc_types) 155 elif is_doc_fragment: 156 for body_statement in module_ast.body: 157 if isinstance(body_statement, ast.ClassDef): 158 for class_statement in body_statement.body: 159 if isinstance(class_statement, ast.Assign): 160 check_assignment(class_statement) 161 else: 162 raise Exception('unsupported path: %s' % path) 163 164 return docs 165 166 def parse_module(self, path, contents): 167 """ 168 :type path: str 169 :type contents: str 170 :rtype: ast.Module | None 171 """ 172 try: 173 return ast.parse(contents) 174 except SyntaxError as ex: 175 self.messages.append(dict( 176 code='python-syntax-error', 177 message=str(ex), 178 path=path, 179 line=ex.lineno, 180 column=ex.offset, 181 level='error', 182 )) 183 except Exception as ex: # pylint: disable=broad-except 184 self.messages.append(dict( 185 code='python-parse-error', 186 message=str(ex), 187 path=path, 188 line=0, 189 column=0, 190 level='error', 191 )) 192 193 return None 194 195 196if __name__ == '__main__': 197 main() 198