1# (c) 2019–2020, Ansible by Red Hat
2#
3# Permission is hereby granted, free of charge, to any person obtaining a copy
4# of this software and associated documentation files (the "Software"), to deal
5# in the Software without restriction, including without limitation the rights
6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7# copies of the Software, and to permit persons to whom the Software is
8# furnished to do so, subject to the following conditions:
9#
10# The above copyright notice and this permission notice shall be included in
11# all copies or substantial portions of the Software.
12#
13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19# THE SOFTWARE.
20
21"""Utils related to inline skipping of rules."""
22import logging
23from functools import lru_cache
24from itertools import product
25from typing import TYPE_CHECKING, Any, Generator, List, Optional, Sequence
26
27# Module 'ruamel.yaml' does not explicitly export attribute 'YAML'; implicit reexport disabled
28from ruamel.yaml import YAML  # type: ignore
29
30from ansiblelint.config import used_old_tags
31from ansiblelint.constants import RENAMED_TAGS
32from ansiblelint.file_utils import Lintable
33
34if TYPE_CHECKING:
35    from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject
36
37_logger = logging.getLogger(__name__)
38
39
40# playbook: Sequence currently expects only instances of one of the two
41# classes below but we should consider avoiding this chimera.
42# ruamel.yaml.comments.CommentedSeq
43# ansible.parsing.yaml.objects.AnsibleSequence
44
45
46def get_rule_skips_from_line(line: str) -> List[str]:
47    """Return list of rule ids skipped via comment on the line of yaml."""
48    _before_noqa, _noqa_marker, noqa_text = line.partition("# noqa")
49    noqa_text = noqa_text.lstrip(" :")
50    return noqa_text.split()
51
52
53def append_skipped_rules(
54    pyyaml_data: "AnsibleBaseYAMLObject", lintable: Lintable
55) -> "AnsibleBaseYAMLObject":
56    """Append 'skipped_rules' to individual tasks or single metadata block.
57
58    For a file, uses 2nd parser (ruamel.yaml) to pull comments out of
59    yaml subsets, check for '# noqa' skipped rules, and append any skips to the
60    original parser (pyyaml) data relied on by remainder of ansible-lint.
61
62    :param pyyaml_data: file text parsed via ansible and pyyaml.
63    :param file_text: raw file text.
64    :param file_type: type of file: tasks, handlers or meta.
65    :returns: original pyyaml_data altered with a 'skipped_rules' list added \
66              to individual tasks, or added to the single metadata block.
67    """
68    try:
69        yaml_skip = _append_skipped_rules(pyyaml_data, lintable)
70    except RuntimeError:
71        # Notify user of skip error, do not stop, do not change exit code
72        _logger.error('Error trying to append skipped rules', exc_info=True)
73        return pyyaml_data
74
75    if not yaml_skip:
76        return pyyaml_data
77
78    return yaml_skip
79
80
81@lru_cache(maxsize=128)
82def load_data(file_text: str) -> Any:
83    """Parse ``file_text`` as yaml and return parsed structure.
84
85    This is the main culprit for slow performance, each rule asks for loading yaml again and again
86    ideally the ``maxsize`` on the decorator above MUST be great or equal total number of rules
87    :param file_text: raw text to parse
88    :return: Parsed yaml
89    """
90    yaml = YAML()
91    return yaml.load(file_text)
92
93
94def _append_skipped_rules(
95    pyyaml_data: "AnsibleBaseYAMLObject", lintable: Lintable
96) -> Optional["AnsibleBaseYAMLObject"]:
97    # parse file text using 2nd parser library
98    ruamel_data = load_data(lintable.content)
99
100    if lintable.kind == 'meta':
101        pyyaml_data[0]['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_data)
102        return pyyaml_data
103
104    # create list of blocks of tasks or nested tasks
105    if lintable.kind in ('tasks', 'handlers'):
106        ruamel_task_blocks = ruamel_data
107        pyyaml_task_blocks = pyyaml_data
108    elif lintable.kind == 'playbook':
109        try:
110            pyyaml_task_blocks = _get_task_blocks_from_playbook(pyyaml_data)
111            ruamel_task_blocks = _get_task_blocks_from_playbook(ruamel_data)
112        except (AttributeError, TypeError):
113            # TODO(awcrosby): running ansible-lint on any .yml file will
114            # assume it is a playbook, check needs to be added higher in the
115            # call stack, and can remove this except
116            return pyyaml_data
117    elif lintable.kind in ['yaml', 'requirements', 'vars', 'meta', 'reno']:
118        return pyyaml_data
119    else:
120        # For unsupported file types, we return empty skip lists
121        return None
122
123    # get tasks from blocks of tasks
124    pyyaml_tasks = _get_tasks_from_blocks(pyyaml_task_blocks)
125    ruamel_tasks = _get_tasks_from_blocks(ruamel_task_blocks)
126
127    # append skipped_rules for each task
128    for ruamel_task, pyyaml_task in zip(ruamel_tasks, pyyaml_tasks):
129
130        # ignore empty tasks
131        if not pyyaml_task and not ruamel_task:
132            continue
133
134        if pyyaml_task.get('name') != ruamel_task.get('name'):
135            raise RuntimeError('Error in matching skip comment to a task')
136        pyyaml_task['skipped_rules'] = _get_rule_skips_from_yaml(ruamel_task)
137
138    return pyyaml_data
139
140
141def _get_task_blocks_from_playbook(playbook: Sequence[Any]) -> List[Any]:
142    """Return parts of playbook that contains tasks, and nested tasks.
143
144    :param playbook: playbook yaml from yaml parser.
145    :returns: list of task dictionaries.
146    """
147    PLAYBOOK_TASK_KEYWORDS = [
148        'tasks',
149        'pre_tasks',
150        'post_tasks',
151        'handlers',
152    ]
153
154    task_blocks = []
155    for play, key in product(playbook, PLAYBOOK_TASK_KEYWORDS):
156        task_blocks.extend(play.get(key, []))
157    return task_blocks
158
159
160def _get_tasks_from_blocks(task_blocks: Sequence[Any]) -> Generator[Any, None, None]:
161    """Get list of tasks from list made of tasks and nested tasks."""
162    NESTED_TASK_KEYS = [
163        'block',
164        'always',
165        'rescue',
166    ]
167
168    def get_nested_tasks(task: Any) -> Generator[Any, None, None]:
169        for k in NESTED_TASK_KEYS:
170            if task and k in task and task[k]:
171                for subtask in task[k]:
172                    yield subtask
173
174    for task in task_blocks:
175        for sub_task in get_nested_tasks(task):
176            yield sub_task
177        yield task
178
179
180def _get_rule_skips_from_yaml(yaml_input: Sequence[Any]) -> Sequence[Any]:
181    """Traverse yaml for comments with rule skips and return list of rules."""
182    yaml_comment_obj_strs = []
183
184    def traverse_yaml(obj: Any) -> None:
185        yaml_comment_obj_strs.append(str(obj.ca.items))
186        if isinstance(obj, dict):
187            for key, val in obj.items():
188                if isinstance(val, (dict, list)):
189                    traverse_yaml(val)
190        elif isinstance(obj, list):
191            for e in obj:
192                if isinstance(e, (dict, list)):
193                    traverse_yaml(e)
194        else:
195            return
196
197    traverse_yaml(yaml_input)
198
199    rule_id_list = []
200    for comment_obj_str in yaml_comment_obj_strs:
201        for line in comment_obj_str.split(r'\n'):
202            rule_id_list.extend(get_rule_skips_from_line(line))
203
204    return [normalize_tag(tag) for tag in rule_id_list]
205
206
207def normalize_tag(tag: str) -> str:
208    """Return current name of tag."""
209    if tag in RENAMED_TAGS:
210        used_old_tags[tag] = RENAMED_TAGS[tag]
211        return RENAMED_TAGS[tag]
212    return tag
213