1# Copyright (c) 2013-2014 Will Thames <will@thames.id.au>
2#
3# Permission is hereby granted, free of charge, to any person obtaining a copy
4# of this software and associated documentation files (the "Software"), to deal
5# in the Software without restriction, including without limitation the rights
6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7# copies of the Software, and to permit persons to whom the Software is
8# furnished to do so, subject to the following conditions:
9#
10# The above copyright notice and this permission notice shall be included in
11# all copies or substantial portions of the Software.
12#
13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19# THE SOFTWARE.
20"""Generic utility helpers."""
21
22import contextlib
23import inspect
24import logging
25import os
26from argparse import Namespace
27from collections.abc import ItemsView
28from functools import lru_cache
29from pathlib import Path
30from typing import (
31    Any,
32    Callable,
33    Dict,
34    Generator,
35    List,
36    Optional,
37    Sequence,
38    Tuple,
39    Union,
40)
41
42import yaml
43from ansible import constants
44from ansible.errors import AnsibleError, AnsibleParserError
45from ansible.parsing.dataloader import DataLoader
46from ansible.parsing.mod_args import ModuleArgsParser
47from ansible.parsing.splitter import split_args
48from ansible.parsing.yaml.constructor import AnsibleConstructor, AnsibleMapping
49from ansible.parsing.yaml.loader import AnsibleLoader
50from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleSequence
51from ansible.plugins.loader import add_all_plugin_dirs
52from ansible.template import Templar
53
54try:
55    from ansible.module_utils.parsing.convert_bool import boolean
56except ImportError:
57    try:
58        from ansible.utils.boolean import boolean
59    except ImportError:
60        try:
61            from ansible.utils import boolean
62        except ImportError:
63            boolean = constants.mk_boolean
64
65from yaml.composer import Composer
66from yaml.representer import RepresenterError
67
68from ansiblelint._internal.rules import (
69    AnsibleParserErrorRule,
70    LoadingFailureRule,
71    RuntimeErrorRule,
72)
73from ansiblelint.config import options
74from ansiblelint.constants import FileType
75from ansiblelint.errors import MatchError
76from ansiblelint.file_utils import Lintable, discover_lintables
77from ansiblelint.text import removeprefix
78
79# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a
80# string as the password to enable such yaml files to be opened and parsed
81# successfully.
82DEFAULT_VAULT_PASSWORD = 'x'
83
84PLAYBOOK_DIR = os.environ.get('ANSIBLE_PLAYBOOK_DIR', None)
85
86
87_logger = logging.getLogger(__name__)
88
89
90def parse_yaml_from_file(filepath: str) -> AnsibleBaseYAMLObject:
91    """Extract a decrypted YAML object from file."""
92    dl = DataLoader()
93    if hasattr(dl, 'set_vault_password'):
94        dl.set_vault_password(DEFAULT_VAULT_PASSWORD)
95    return dl.load_from_file(filepath)
96
97
98def path_dwim(basedir: str, given: str) -> str:
99    """Convert a given path do-what-I-mean style."""
100    dl = DataLoader()
101    dl.set_basedir(basedir)
102    return str(dl.path_dwim(given))
103
104
105def ansible_template(
106    basedir: str, varname: Any, templatevars: Any, **kwargs: Any
107) -> Any:
108    """Render a templated string."""
109    # `basedir` is the directory containing the lintable file.
110    # Therefore, for tasks in a role, `basedir` has the form
111    # `roles/some_role/tasks`. On the other hand, the search path
112    # is `roles/some_role/{files,templates}`. As a result, the
113    # `tasks` part in the basedir should be stripped stripped.
114    if os.path.basename(basedir) == 'tasks':
115        basedir = os.path.dirname(basedir)
116
117    dl = DataLoader()
118    dl.set_basedir(basedir)
119    templar = Templar(dl, variables=templatevars)
120    return templar.template(varname, **kwargs)
121
122
123LINE_NUMBER_KEY = '__line__'
124FILENAME_KEY = '__file__'
125
126VALID_KEYS = [
127    'name',
128    'action',
129    'when',
130    'async',
131    'poll',
132    'notify',
133    'first_available_file',
134    'include',
135    'include_tasks',
136    'import_tasks',
137    'import_playbook',
138    'tags',
139    'register',
140    'ignore_errors',
141    'delegate_to',
142    'local_action',
143    'transport',
144    'remote_user',
145    'sudo',
146    'sudo_user',
147    'sudo_pass',
148    'when',
149    'connection',
150    'environment',
151    'args',
152    'any_errors_fatal',
153    'changed_when',
154    'failed_when',
155    'check_mode',
156    'delay',
157    'retries',
158    'until',
159    'su',
160    'su_user',
161    'su_pass',
162    'no_log',
163    'run_once',
164    'become',
165    'become_user',
166    'become_method',
167    FILENAME_KEY,
168]
169
170BLOCK_NAME_TO_ACTION_TYPE_MAP = {
171    'tasks': 'task',
172    'handlers': 'handler',
173    'pre_tasks': 'task',
174    'post_tasks': 'task',
175    'block': 'meta',
176    'rescue': 'meta',
177    'always': 'meta',
178}
179
180
181def tokenize(line: str) -> Tuple[str, List[str], Dict[str, str]]:
182    """Parse a string task invocation."""
183    tokens = line.lstrip().split(" ")
184    if tokens[0] == '-':
185        tokens = tokens[1:]
186    if tokens[0] == 'action:' or tokens[0] == 'local_action:':
187        tokens = tokens[1:]
188    command = tokens[0].replace(":", "")
189
190    args = list()
191    kwargs = dict()
192    nonkvfound = False
193    for arg in tokens[1:]:
194        if "=" in arg and not nonkvfound:
195            kv = arg.split("=", 1)
196            kwargs[kv[0]] = kv[1]
197        else:
198            nonkvfound = True
199            args.append(arg)
200    return (command, args, kwargs)
201
202
203def _playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView:  # type: ignore
204    if isinstance(pb_data, dict):
205        return pb_data.items()
206    if not pb_data:
207        return []  # type: ignore
208
209    # "if play" prevents failure if the play sequence contains None,
210    # which is weird but currently allowed by Ansible
211    # https://github.com/ansible-community/ansible-lint/issues/849
212    return [item for play in pb_data if play for item in play.items()]  # type: ignore
213
214
215def _set_collections_basedir(basedir: str) -> None:
216    # Sets the playbook directory as playbook_paths for the collection loader
217    try:
218        # Ansible 2.10+
219        # noqa: # pylint:disable=cyclic-import,import-outside-toplevel
220        from ansible.utils.collection_loader import AnsibleCollectionConfig
221
222        AnsibleCollectionConfig.playbook_paths = basedir
223    except ImportError:
224        # Ansible 2.8 or 2.9
225        # noqa: # pylint:disable=cyclic-import,import-outside-toplevel
226        from ansible.utils.collection_loader import set_collection_playbook_paths
227
228        set_collection_playbook_paths(basedir)
229
230
231def find_children(lintable: Lintable) -> List[Lintable]:  # noqa: C901
232    """Traverse children of a single file or folder."""
233    if not lintable.path.exists():
234        return []
235    playbook_dir = str(lintable.path.parent)
236    _set_collections_basedir(playbook_dir or os.path.abspath('.'))
237    add_all_plugin_dirs(playbook_dir or '.')
238    if lintable.kind == 'role':
239        playbook_ds = AnsibleMapping({'roles': [{'role': str(lintable.path)}]})
240    elif lintable.kind not in ("playbook", "tasks"):
241        return []
242    else:
243        try:
244            playbook_ds = parse_yaml_from_file(str(lintable.path))
245        except AnsibleError as e:
246            raise SystemExit(str(e))
247    results = []
248    basedir = os.path.dirname(str(lintable.path))
249    # playbook_ds can be an AnsibleUnicode string, which we consider invalid
250    if isinstance(playbook_ds, str):
251        raise MatchError(filename=str(lintable.path), rule=LoadingFailureRule())
252    for item in _playbook_items(playbook_ds):
253        # if lintable.kind not in ["playbook"]:
254        #     continue
255        for child in play_children(basedir, item, lintable.kind, playbook_dir):
256            # We avoid processing parametrized children
257            path_str = str(child.path)
258            if "$" in path_str or "{{" in path_str:
259                continue
260
261            # Repair incorrect paths obtained when old syntax was used, like:
262            # - include: simpletask.yml tags=nginx
263            valid_tokens = list()
264            for token in split_args(path_str):
265                if '=' in token:
266                    break
267                valid_tokens.append(token)
268            path = ' '.join(valid_tokens)
269            if path != path_str:
270                child.path = Path(path)
271                child.name = child.path.name
272
273            results.append(child)
274    return results
275
276
277def template(
278    basedir: str,
279    value: Any,
280    variables: Any,
281    fail_on_undefined: bool = False,
282    **kwargs: str,
283) -> Any:
284    """Attempt rendering a value with known vars."""
285    try:
286        value = ansible_template(
287            os.path.abspath(basedir),
288            value,
289            variables,
290            **dict(kwargs, fail_on_undefined=fail_on_undefined),
291        )
292        # Hack to skip the following exception when using to_json filter on a variable.
293        # I guess the filter doesn't like empty vars...
294    except (AnsibleError, ValueError, RepresenterError):
295        # templating failed, so just keep value as is.
296        pass
297    return value
298
299
300def play_children(
301    basedir: str, item: Tuple[str, Any], parent_type: FileType, playbook_dir: str
302) -> List[Lintable]:
303    """Flatten the traversed play tasks."""
304    delegate_map: Dict[str, Callable[[str, Any, Any, FileType], List[Lintable]]] = {
305        'tasks': _taskshandlers_children,
306        'pre_tasks': _taskshandlers_children,
307        'post_tasks': _taskshandlers_children,
308        'block': _taskshandlers_children,
309        'include': _include_children,
310        'import_playbook': _include_children,
311        'roles': _roles_children,
312        'dependencies': _roles_children,
313        'handlers': _taskshandlers_children,
314        'include_tasks': _include_children,
315        'import_tasks': _include_children,
316    }
317    (k, v) = item
318    add_all_plugin_dirs(os.path.abspath(basedir))
319
320    if k in delegate_map:
321        if v:
322            v = template(
323                os.path.abspath(basedir),
324                v,
325                dict(playbook_dir=PLAYBOOK_DIR or os.path.abspath(basedir)),
326                fail_on_undefined=False,
327            )
328            return delegate_map[k](basedir, k, v, parent_type)
329    return []
330
331
332def _include_children(
333    basedir: str, k: str, v: Any, parent_type: FileType
334) -> List[Lintable]:
335    # handle special case include_tasks: name=filename.yml
336    if k == 'include_tasks' and isinstance(v, dict) and 'file' in v:
337        v = v['file']
338
339    # handle include: filename.yml tags=blah
340    (command, args, kwargs) = tokenize("{0}: {1}".format(k, v))
341
342    result = path_dwim(basedir, args[0])
343    if not os.path.exists(result):
344        result = path_dwim(os.path.join(os.path.dirname(basedir)), v)
345    return [Lintable(result, kind=parent_type)]
346
347
348def _taskshandlers_children(
349    basedir: str, k: str, v: Union[None, Any], parent_type: FileType
350) -> List[Lintable]:
351    results: List[Lintable] = []
352    if v is None:
353        raise MatchError(
354            message="A malformed block was encountered while loading a block.",
355            rule=RuntimeErrorRule(),
356        )
357    for th in v:
358
359        # ignore empty tasks, `-`
360        if not th:
361            continue
362
363        with contextlib.suppress(LookupError):
364            children = _get_task_handler_children_for_tasks_or_playbooks(
365                th,
366                basedir,
367                k,
368                parent_type,
369            )
370            results.append(children)
371            continue
372
373        if (
374            'include_role' in th or 'import_role' in th
375        ):  # lgtm [py/unreachable-statement]
376            th = normalize_task_v2(th)
377            _validate_task_handler_action_for_role(th['action'])
378            results.extend(
379                _roles_children(
380                    basedir,
381                    k,
382                    [th['action'].get("name")],
383                    parent_type,
384                    main=th['action'].get('tasks_from', 'main'),
385                )
386            )
387            continue
388
389        if 'block' not in th:
390            continue
391
392        results.extend(_taskshandlers_children(basedir, k, th['block'], parent_type))
393        if 'rescue' in th:
394            results.extend(
395                _taskshandlers_children(basedir, k, th['rescue'], parent_type)
396            )
397        if 'always' in th:
398            results.extend(
399                _taskshandlers_children(basedir, k, th['always'], parent_type)
400            )
401
402    return results
403
404
405def _get_task_handler_children_for_tasks_or_playbooks(
406    task_handler: Dict[str, Any],
407    basedir: str,
408    k: Any,
409    parent_type: FileType,
410) -> Lintable:
411    """Try to get children of taskhandler for include/import tasks/playbooks."""
412    child_type = k if parent_type == 'playbook' else parent_type
413
414    task_include_keys = 'include', 'include_tasks', 'import_playbook', 'import_tasks'
415    for task_handler_key in task_include_keys:
416
417        with contextlib.suppress(KeyError):
418
419            # ignore empty tasks
420            if not task_handler:
421                continue
422
423            # import pdb; pdb.set_trace()
424            return Lintable(
425                path_dwim(basedir, task_handler[task_handler_key]), kind=child_type
426            )
427
428    raise LookupError(
429        f'The node contains none of: {", ".join(task_include_keys)}',
430    )
431
432
433def _validate_task_handler_action_for_role(th_action: Dict[str, Any]) -> None:
434    """Verify that the task handler action is valid for role include."""
435    module = th_action['__ansible_module__']
436
437    if 'name' not in th_action:
438        raise MatchError(message=f"Failed to find required 'name' key in {module!s}")
439
440    if not isinstance(th_action['name'], str):
441        raise MatchError(
442            message=f"Value assigned to 'name' key on '{module!s}' is not a string.",
443        )
444
445
446def _roles_children(
447    basedir: str, k: str, v: Sequence[Any], parent_type: FileType, main: str = 'main'
448) -> List[Lintable]:
449    results: List[Lintable] = []
450    for role in v:
451        if isinstance(role, dict):
452            if 'role' in role or 'name' in role:
453                if 'tags' not in role or 'skip_ansible_lint' not in role['tags']:
454                    results.extend(
455                        _look_for_role_files(
456                            basedir, role.get('role', role.get('name')), main=main
457                        )
458                    )
459            elif k != 'dependencies':
460                raise SystemExit(
461                    'role dict {0} does not contain a "role" '
462                    'or "name" key'.format(role)
463                )
464        else:
465            results.extend(_look_for_role_files(basedir, role, main=main))
466    return results
467
468
469def _rolepath(basedir: str, role: str) -> Optional[str]:
470    role_path = None
471
472    possible_paths = [
473        # if included from a playbook
474        path_dwim(basedir, os.path.join('roles', role)),
475        path_dwim(basedir, role),
476        # if included from roles/[role]/meta/main.yml
477        path_dwim(basedir, os.path.join('..', '..', '..', 'roles', role)),
478        path_dwim(basedir, os.path.join('..', '..', role)),
479        # if checking a role in the current directory
480        path_dwim(basedir, os.path.join('..', role)),
481    ]
482
483    if constants.DEFAULT_ROLES_PATH:
484        search_locations = constants.DEFAULT_ROLES_PATH
485        if isinstance(search_locations, str):
486            search_locations = search_locations.split(os.pathsep)
487        for loc in search_locations:
488            loc = os.path.expanduser(loc)
489            possible_paths.append(path_dwim(loc, role))
490
491    possible_paths.append(path_dwim(basedir, ''))
492
493    for path_option in possible_paths:
494        if os.path.isdir(path_option):
495            role_path = path_option
496            break
497
498    if role_path:
499        add_all_plugin_dirs(role_path)
500
501    return role_path
502
503
504def _look_for_role_files(
505    basedir: str, role: str, main: Optional[str] = 'main'
506) -> List[Lintable]:
507    role_path = _rolepath(basedir, role)
508    if not role_path:
509        return []
510
511    results = []
512
513    for kind in ['tasks', 'meta', 'handlers', 'vars', 'defaults']:
514        current_path = os.path.join(role_path, kind)
515        for folder, subdirs, files in os.walk(current_path):
516            for file in files:
517                file_ignorecase = file.lower()
518                if file_ignorecase.endswith(('.yml', '.yaml')):
519                    thpath = os.path.join(folder, file)
520                    results.append(Lintable(thpath))
521
522    return results
523
524
525def _kv_to_dict(v: str) -> Dict[str, Any]:
526    (command, args, kwargs) = tokenize(v)
527    return dict(__ansible_module__=command, __ansible_arguments__=args, **kwargs)
528
529
530def _sanitize_task(task: Dict[str, Any]) -> Dict[str, Any]:
531    """Return a stripped-off task structure compatible with new Ansible.
532
533    This helper takes a copy of the incoming task and drops
534    any internally used keys from it.
535    """
536    result = task.copy()
537    # task is an AnsibleMapping which inherits from OrderedDict, so we need
538    # to use `del` to remove unwanted keys.
539    for k in ['skipped_rules', FILENAME_KEY, LINE_NUMBER_KEY]:
540        if k in result:
541            del result[k]
542    return result
543
544
545def normalize_task_v2(task: Dict[str, Any]) -> Dict[str, Any]:
546    """Ensure tasks have a normalized action key and strings are converted to python objects."""
547    result = dict()
548
549    sanitized_task = _sanitize_task(task)
550    mod_arg_parser = ModuleArgsParser(sanitized_task)
551    try:
552        action, arguments, result['delegate_to'] = mod_arg_parser.parse(
553            skip_action_validation=options.skip_action_validation
554        )
555    except AnsibleParserError as e:
556        raise MatchError(
557            rule=AnsibleParserErrorRule(),
558            message=e.message,
559            filename=task.get(FILENAME_KEY, "Unknown"),
560            linenumber=task.get(LINE_NUMBER_KEY, 0),
561        )
562
563    # denormalize shell -> command conversion
564    if '_uses_shell' in arguments:
565        action = 'shell'
566        del arguments['_uses_shell']
567
568    for (k, v) in list(task.items()):
569        if k in ('action', 'local_action', 'args', 'delegate_to') or k == action:
570            # we don't want to re-assign these values, which were
571            # determined by the ModuleArgsParser() above
572            continue
573        result[k] = v
574
575    if not isinstance(action, str):
576        raise RuntimeError("Task actions can only be strings, got %s" % action)
577    action_unnormalized = action
578    # convert builtin fqn calls to short forms because most rules know only
579    # about short calls but in the future we may switch the normalization to do
580    # the opposite. Mainly we currently consider normalized the module listing
581    # used by `ansible-doc -t module -l 2>/dev/null`
582    action = removeprefix(action, "ansible.builtin.")
583    result['action'] = dict(
584        __ansible_module__=action, __ansible_module_original__=action_unnormalized
585    )
586
587    if '_raw_params' in arguments:
588        result['action']['__ansible_arguments__'] = arguments['_raw_params'].split(' ')
589        del arguments['_raw_params']
590    else:
591        result['action']['__ansible_arguments__'] = list()
592
593    if 'argv' in arguments and not result['action']['__ansible_arguments__']:
594        result['action']['__ansible_arguments__'] = arguments['argv']
595        del arguments['argv']
596
597    result['action'].update(arguments)
598    return result
599
600
601def normalize_task(task: Dict[str, Any], filename: str) -> Dict[str, Any]:
602    """Unify task-like object structures."""
603    ansible_action_type = task.get('__ansible_action_type__', 'task')
604    if '__ansible_action_type__' in task:
605        del task['__ansible_action_type__']
606    task = normalize_task_v2(task)
607    task[FILENAME_KEY] = filename
608    task['__ansible_action_type__'] = ansible_action_type
609    return task
610
611
612def task_to_str(task: Dict[str, Any]) -> str:
613    """Make a string identifier for the given task."""
614    name = task.get("name")
615    if name:
616        return str(name)
617    action = task.get("action")
618    if isinstance(action, str) or not isinstance(action, dict):
619        return str(action)
620    args = " ".join(
621        [
622            "{0}={1}".format(k, v)
623            for (k, v) in action.items()
624            if k
625            not in [
626                "__ansible_module__",
627                "__ansible_module_original__",
628                "__ansible_arguments__",
629                "__line__",
630                "__file__",
631            ]
632        ]
633    )
634    for item in action.get("__ansible_arguments__", []):
635        args += f" {item}"
636    return u"{0} {1}".format(action["__ansible_module__"], args)
637
638
639def extract_from_list(
640    blocks: AnsibleBaseYAMLObject, candidates: List[str]
641) -> List[Any]:
642    """Get action tasks from block structures."""
643    results = list()
644    for block in blocks:
645        for candidate in candidates:
646            if isinstance(block, dict) and candidate in block:
647                if isinstance(block[candidate], list):
648                    results.extend(add_action_type(block[candidate], candidate))
649                elif block[candidate] is not None:
650                    raise RuntimeError(
651                        "Key '%s' defined, but bad value: '%s'"
652                        % (candidate, str(block[candidate]))
653                    )
654    return results
655
656
657def add_action_type(actions: AnsibleBaseYAMLObject, action_type: str) -> List[Any]:
658    """Add action markers to task objects."""
659    results = list()
660    for action in actions:
661        # ignore empty task
662        if not action:
663            continue
664        action['__ansible_action_type__'] = BLOCK_NAME_TO_ACTION_TYPE_MAP[action_type]
665        results.append(action)
666    return results
667
668
669def get_action_tasks(yaml: AnsibleBaseYAMLObject, file: Lintable) -> List[Any]:
670    """Get a flattened list of action tasks from the file."""
671    tasks = list()
672    if file.kind in ['tasks', 'handlers']:
673        tasks = add_action_type(yaml, file.kind)
674    else:
675        tasks.extend(
676            extract_from_list(yaml, ['tasks', 'handlers', 'pre_tasks', 'post_tasks'])
677        )
678
679    # Add sub-elements of block/rescue/always to tasks list
680    tasks.extend(extract_from_list(tasks, ['block', 'rescue', 'always']))
681    # Remove block/rescue/always elements from tasks list
682    block_rescue_always = ('block', 'rescue', 'always')
683    tasks[:] = [
684        task for task in tasks if all(k not in task for k in block_rescue_always)
685    ]
686
687    return [
688        task
689        for task in tasks
690        if set(
691            ['include', 'include_tasks', 'import_playbook', 'import_tasks']
692        ).isdisjoint(task.keys())
693    ]
694
695
696def get_normalized_tasks(
697    yaml: "AnsibleBaseYAMLObject", file: Lintable
698) -> List[Dict[str, Any]]:
699    """Extract normalized tasks from a file."""
700    tasks = get_action_tasks(yaml, file)
701    res = []
702    for task in tasks:
703        # An empty `tags` block causes `None` to be returned if
704        # the `or []` is not present - `task.get('tags', [])`
705        # does not suffice.
706        if 'skip_ansible_lint' in (task.get('tags') or []):
707            # No need to normalize_task is we are skipping it.
708            continue
709        res.append(normalize_task(task, str(file.path)))
710
711    return res
712
713
714@lru_cache(maxsize=128)
715def parse_yaml_linenumbers(lintable: Lintable) -> AnsibleBaseYAMLObject:
716    """Parse yaml as ansible.utils.parse_yaml but with linenumbers.
717
718    The line numbers are stored in each node's LINE_NUMBER_KEY key.
719    """
720
721    def compose_node(parent: yaml.nodes.Node, index: int) -> yaml.nodes.Node:
722        # the line number where the previous token has ended (plus empty lines)
723        line = loader.line
724        node = Composer.compose_node(loader, parent, index)
725        if not isinstance(node, yaml.nodes.Node):
726            raise RuntimeError("Unexpected yaml data.")
727        setattr(node, '__line__', line + 1)
728        return node
729
730    def construct_mapping(
731        node: AnsibleBaseYAMLObject, deep: bool = False
732    ) -> AnsibleMapping:
733        mapping = AnsibleConstructor.construct_mapping(loader, node, deep=deep)
734        if hasattr(node, '__line__'):
735            mapping[LINE_NUMBER_KEY] = node.__line__
736        else:
737            mapping[LINE_NUMBER_KEY] = mapping._line_number
738        mapping[FILENAME_KEY] = lintable.path
739        return mapping
740
741    try:
742        kwargs = {}
743        if 'vault_password' in inspect.getfullargspec(AnsibleLoader.__init__).args:
744            kwargs['vault_password'] = DEFAULT_VAULT_PASSWORD
745        loader = AnsibleLoader(lintable.content, **kwargs)
746        loader.compose_node = compose_node
747        loader.construct_mapping = construct_mapping
748        data = loader.get_single_data()
749    except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
750        logging.exception(e)
751        raise SystemExit("Failed to parse YAML in %s: %s" % (lintable.path, str(e)))
752    return data
753
754
755def get_first_cmd_arg(task: Dict[str, Any]) -> Any:
756    """Extract the first arg from a cmd task."""
757    try:
758        if 'cmd' in task['action']:
759            first_cmd_arg = task['action']['cmd'].split()[0]
760        else:
761            first_cmd_arg = task['action']['__ansible_arguments__'][0]
762    except IndexError:
763        return None
764    return first_cmd_arg
765
766
767def get_second_cmd_arg(task: Dict[str, Any]) -> Any:
768    """Extract the second arg from a cmd task."""
769    try:
770        if 'cmd' in task['action']:
771            second_cmd_arg = task['action']['cmd'].split()[1]
772        else:
773            second_cmd_arg = task['action']['__ansible_arguments__'][1]
774    except IndexError:
775        return None
776    return second_cmd_arg
777
778
779def is_playbook(filename: str) -> bool:
780    """
781    Check if the file is a playbook.
782
783    Given a filename, it should return true if it looks like a playbook. The
784    function is not supposed to raise exceptions.
785    """
786    # we assume is a playbook if we loaded a sequence of dictionaries where
787    # at least one of these keys is present:
788    playbooks_keys = {
789        "gather_facts",
790        "hosts",
791        "import_playbook",
792        "post_tasks",
793        "pre_tasks",
794        "roles",
795        "tasks",
796    }
797
798    # makes it work with Path objects by converting them to strings
799    if not isinstance(filename, str):
800        filename = str(filename)
801
802    try:
803        f = parse_yaml_from_file(filename)
804    except Exception as e:
805        _logger.warning(
806            "Failed to load %s with %s, assuming is not a playbook.", filename, e
807        )
808    else:
809        if (
810            isinstance(f, AnsibleSequence)
811            and hasattr(next(iter(f), {}), 'keys')
812            and playbooks_keys.intersection(next(iter(f), {}).keys())
813        ):
814            return True
815    return False
816
817
818# pylint: disable=too-many-statements
819def get_lintables(
820    options: Namespace = Namespace(), args: Optional[List[str]] = None
821) -> List[Lintable]:
822    """Detect files and directories that are lintable."""
823    lintables: List[Lintable] = []
824
825    # passing args bypass auto-detection mode
826    if args:
827        for arg in args:
828            lintable = Lintable(arg)
829            if lintable.kind in ("yaml", None):
830                _logger.warning(
831                    "Overriding detected file kind '%s' with 'playbook' "
832                    "for given positional argument: %s",
833                    lintable.kind,
834                    arg,
835                )
836                lintable = Lintable(arg, kind="playbook")
837            lintables.append(lintable)
838    else:
839
840        for filename in discover_lintables(options):
841
842            p = Path(filename)
843            # skip exclusions
844            try:
845                for file_path in options.exclude_paths:
846                    if str(p.resolve()).startswith(str(file_path)):
847                        raise FileNotFoundError(
848                            f'File {file_path} matched exclusion entry: {p}'
849                        )
850            except FileNotFoundError as e:
851                _logger.debug('Ignored %s due to: %s', p, e)
852                continue
853
854            lintables.append(Lintable(p))
855
856        # stage 2: guess roles from current lintables, as there is no unique
857        # file that must be present in any kind of role.
858        _extend_with_roles(lintables)
859
860    return lintables
861
862
863def _extend_with_roles(lintables: List[Lintable]) -> None:
864    """Detect roles among lintables and adds them to the list."""
865    for lintable in lintables:
866        parts = lintable.path.parent.parts
867        if 'roles' in parts:
868            role = lintable.path
869            while role.parent.name != "roles" and role.name:
870                role = role.parent
871            if role.exists and not role.is_file():
872                lintable = Lintable(role, kind="role")
873                if lintable not in lintables:
874                    _logger.debug("Added role: %s", lintable)
875                    lintables.append(lintable)
876
877
878def convert_to_boolean(value: Any) -> bool:
879    """Use Ansible to convert something to a boolean."""
880    return bool(boolean(value))
881
882
883def nested_items(
884    data: Union[Dict[Any, Any], List[Any]], parent: str = ""
885) -> Generator[Tuple[Any, Any, str], None, None]:
886    """Iterate a nested data structure."""
887    if isinstance(data, dict):
888        for k, v in data.items():
889            yield k, v, parent
890            for k, v, p in nested_items(v, k):
891                yield k, v, p
892    if isinstance(data, list):
893        for item in data:
894            yield "list-item", item, parent
895            for k, v, p in nested_items(item):
896                yield k, v, p
897