1# (c) 2019–2020, Ansible by Red Hat 2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy 4# of this software and associated documentation files (the "Software"), to deal 5# in the Software without restriction, including without limitation the rights 6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 7# copies of the Software, and to permit persons to whom the Software is 8# furnished to do so, subject to the following conditions: 9# 10# The above copyright notice and this permission notice shall be included in 11# all copies or substantial portions of the Software. 12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 19# THE SOFTWARE. 20 21"""Utils related to inline skipping of rules.""" 22import logging 23from functools import lru_cache 24from itertools import product 25from typing import TYPE_CHECKING, Any, Generator, List, Optional, Sequence 26 27# Module 'ruamel.yaml' does not explicitly export attribute 'YAML'; implicit reexport disabled 28from ruamel.yaml import YAML # type: ignore 29 30from ansiblelint.config import used_old_tags 31from ansiblelint.constants import RENAMED_TAGS 32from ansiblelint.file_utils import Lintable 33 34if TYPE_CHECKING: 35 from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject 36 37_logger = logging.getLogger(__name__) 38 39 40# playbook: Sequence currently expects only instances of one of the two 41# classes below but we should consider avoiding this chimera. 42# ruamel.yaml.comments.CommentedSeq 43# ansible.parsing.yaml.objects.AnsibleSequence 44 45 46def get_rule_skips_from_line(line: str) -> List[str]: 47 """Return list of rule ids skipped via comment on the line of yaml.""" 48 _before_noqa, _noqa_marker, noqa_text = line.partition("# noqa") 49 noqa_text = noqa_text.lstrip(" :") 50 return noqa_text.split() 51 52 53def append_skipped_rules( 54 pyyaml_data: "AnsibleBaseYAMLObject", lintable: Lintable 55) -> "AnsibleBaseYAMLObject": 56 """Append 'skipped_rules' to individual tasks or single metadata block. 57 58 For a file, uses 2nd parser (ruamel.yaml) to pull comments out of 59 yaml subsets, check for '# noqa' skipped rules, and append any skips to the 60 original parser (pyyaml) data relied on by remainder of ansible-lint. 61 62 :param pyyaml_data: file text parsed via ansible and pyyaml. 63 :param file_text: raw file text. 64 :param file_type: type of file: tasks, handlers or meta. 65 :returns: original pyyaml_data altered with a 'skipped_rules' list added \ 66 to individual tasks, or added to the single metadata block. 67 """ 68 try: 69 yaml_skip = _append_skipped_rules(pyyaml_data, lintable) 70 except RuntimeError: 71 # Notify user of skip error, do not stop, do not change exit code 72 _logger.error('Error trying to append skipped rules', exc_info=True) 73 return pyyaml_data 74 75 if not yaml_skip: 76 return pyyaml_data 77 78 return yaml_skip 79 80 81@lru_cache(maxsize=128) 82def load_data(file_text: str) -> Any: 83 """Parse ``file_text`` as yaml and return parsed structure. 84 85 This is the main culprit for slow performance, each rule asks for loading yaml again and again 86 ideally the ``maxsize`` on the decorator above MUST be great or equal total number of rules 87 :param file_text: raw text to parse 88 :return: Parsed yaml 89 """ 90 yaml = YAML() 91 return yaml.load(file_text) 92 93 94def _append_skipped_rules( 95 pyyaml_data: "AnsibleBaseYAMLObject", lintable: Lintable 96) -> Optional["AnsibleBaseYAMLObject"]: 97 # parse file text using 2nd parser library 98 ruamel_data = load_data(lintable.content) 99 100 if lintable.kind == 'meta': 101 pyyaml_data[0]['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_data) 102 return pyyaml_data 103 104 # create list of blocks of tasks or nested tasks 105 if lintable.kind in ('tasks', 'handlers'): 106 ruamel_task_blocks = ruamel_data 107 pyyaml_task_blocks = pyyaml_data 108 elif lintable.kind == 'playbook': 109 try: 110 pyyaml_task_blocks = _get_task_blocks_from_playbook(pyyaml_data) 111 ruamel_task_blocks = _get_task_blocks_from_playbook(ruamel_data) 112 except (AttributeError, TypeError): 113 # TODO(awcrosby): running ansible-lint on any .yml file will 114 # assume it is a playbook, check needs to be added higher in the 115 # call stack, and can remove this except 116 return pyyaml_data 117 elif lintable.kind in ['yaml', 'requirements', 'vars', 'meta', 'reno']: 118 return pyyaml_data 119 else: 120 # For unsupported file types, we return empty skip lists 121 return None 122 123 # get tasks from blocks of tasks 124 pyyaml_tasks = _get_tasks_from_blocks(pyyaml_task_blocks) 125 ruamel_tasks = _get_tasks_from_blocks(ruamel_task_blocks) 126 127 # append skipped_rules for each task 128 for ruamel_task, pyyaml_task in zip(ruamel_tasks, pyyaml_tasks): 129 130 # ignore empty tasks 131 if not pyyaml_task and not ruamel_task: 132 continue 133 134 if pyyaml_task.get('name') != ruamel_task.get('name'): 135 raise RuntimeError('Error in matching skip comment to a task') 136 pyyaml_task['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_task) 137 138 return pyyaml_data 139 140 141def _get_task_blocks_from_playbook(playbook: Sequence[Any]) -> List[Any]: 142 """Return parts of playbook that contains tasks, and nested tasks. 143 144 :param playbook: playbook yaml from yaml parser. 145 :returns: list of task dictionaries. 146 """ 147 PLAYBOOK_TASK_KEYWORDS = [ 148 'tasks', 149 'pre_tasks', 150 'post_tasks', 151 'handlers', 152 ] 153 154 task_blocks = [] 155 for play, key in product(playbook, PLAYBOOK_TASK_KEYWORDS): 156 task_blocks.extend(play.get(key, [])) 157 return task_blocks 158 159 160def _get_tasks_from_blocks(task_blocks: Sequence[Any]) -> Generator[Any, None, None]: 161 """Get list of tasks from list made of tasks and nested tasks.""" 162 NESTED_TASK_KEYS = [ 163 'block', 164 'always', 165 'rescue', 166 ] 167 168 def get_nested_tasks(task: Any) -> Generator[Any, None, None]: 169 for k in NESTED_TASK_KEYS: 170 if task and k in task and task[k]: 171 for subtask in task[k]: 172 yield subtask 173 174 for task in task_blocks: 175 for sub_task in get_nested_tasks(task): 176 yield sub_task 177 yield task 178 179 180def _get_rule_skips_from_yaml(yaml_input: Sequence[Any]) -> Sequence[Any]: 181 """Traverse yaml for comments with rule skips and return list of rules.""" 182 yaml_comment_obj_strs = [] 183 184 def traverse_yaml(obj: Any) -> None: 185 yaml_comment_obj_strs.append(str(obj.ca.items)) 186 if isinstance(obj, dict): 187 for key, val in obj.items(): 188 if isinstance(val, (dict, list)): 189 traverse_yaml(val) 190 elif isinstance(obj, list): 191 for e in obj: 192 if isinstance(e, (dict, list)): 193 traverse_yaml(e) 194 else: 195 return 196 197 traverse_yaml(yaml_input) 198 199 rule_id_list = [] 200 for comment_obj_str in yaml_comment_obj_strs: 201 for line in comment_obj_str.split(r'\n'): 202 rule_id_list.extend(get_rule_skips_from_line(line)) 203 204 return [normalize_tag(tag) for tag in rule_id_list] 205 206 207def normalize_tag(tag: str) -> str: 208 """Return current name of tag.""" 209 if tag in RENAMED_TAGS: 210 used_old_tags[tag] = RENAMED_TAGS[tag] 211 return RENAMED_TAGS[tag] 212 return tag 213