1# Copyright (c) 2013-2014 Will Thames <will@thames.id.au> 2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy 4# of this software and associated documentation files (the "Software"), to deal 5# in the Software without restriction, including without limitation the rights 6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 7# copies of the Software, and to permit persons to whom the Software is 8# furnished to do so, subject to the following conditions: 9# 10# The above copyright notice and this permission notice shall be included in 11# all copies or substantial portions of the Software. 12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 19# THE SOFTWARE. 20"""Generic utility helpers.""" 21 22import contextlib 23import inspect 24import logging 25import os 26from argparse import Namespace 27from collections.abc import ItemsView 28from functools import lru_cache 29from pathlib import Path 30from typing import ( 31 Any, 32 Callable, 33 Dict, 34 Generator, 35 List, 36 Optional, 37 Sequence, 38 Tuple, 39 Union, 40) 41 42import yaml 43from ansible import constants 44from ansible.errors import AnsibleError, AnsibleParserError 45from ansible.parsing.dataloader import DataLoader 46from ansible.parsing.mod_args import ModuleArgsParser 47from ansible.parsing.splitter import split_args 48from ansible.parsing.yaml.constructor import AnsibleConstructor, AnsibleMapping 49from ansible.parsing.yaml.loader import AnsibleLoader 50from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleSequence 51from ansible.plugins.loader import add_all_plugin_dirs 52from ansible.template import Templar 53 54try: 55 from ansible.module_utils.parsing.convert_bool import boolean 56except ImportError: 57 try: 58 from ansible.utils.boolean import boolean 59 except ImportError: 60 try: 61 from ansible.utils import boolean 62 except ImportError: 63 boolean = constants.mk_boolean 64 65from yaml.composer import Composer 66from yaml.representer import RepresenterError 67 68from ansiblelint._internal.rules import ( 69 AnsibleParserErrorRule, 70 LoadingFailureRule, 71 RuntimeErrorRule, 72) 73from ansiblelint.config import options 74from ansiblelint.constants import FileType 75from ansiblelint.errors import MatchError 76from ansiblelint.file_utils import Lintable, discover_lintables 77from ansiblelint.text import removeprefix 78 79# ansible-lint doesn't need/want to know about encrypted secrets, so we pass a 80# string as the password to enable such yaml files to be opened and parsed 81# successfully. 82DEFAULT_VAULT_PASSWORD = 'x' 83 84PLAYBOOK_DIR = os.environ.get('ANSIBLE_PLAYBOOK_DIR', None) 85 86 87_logger = logging.getLogger(__name__) 88 89 90def parse_yaml_from_file(filepath: str) -> AnsibleBaseYAMLObject: 91 """Extract a decrypted YAML object from file.""" 92 dl = DataLoader() 93 if hasattr(dl, 'set_vault_password'): 94 dl.set_vault_password(DEFAULT_VAULT_PASSWORD) 95 return dl.load_from_file(filepath) 96 97 98def path_dwim(basedir: str, given: str) -> str: 99 """Convert a given path do-what-I-mean style.""" 100 dl = DataLoader() 101 dl.set_basedir(basedir) 102 return str(dl.path_dwim(given)) 103 104 105def ansible_template( 106 basedir: str, varname: Any, templatevars: Any, **kwargs: Any 107) -> Any: 108 """Render a templated string.""" 109 # `basedir` is the directory containing the lintable file. 110 # Therefore, for tasks in a role, `basedir` has the form 111 # `roles/some_role/tasks`. On the other hand, the search path 112 # is `roles/some_role/{files,templates}`. As a result, the 113 # `tasks` part in the basedir should be stripped stripped. 114 if os.path.basename(basedir) == 'tasks': 115 basedir = os.path.dirname(basedir) 116 117 dl = DataLoader() 118 dl.set_basedir(basedir) 119 templar = Templar(dl, variables=templatevars) 120 return templar.template(varname, **kwargs) 121 122 123LINE_NUMBER_KEY = '__line__' 124FILENAME_KEY = '__file__' 125 126VALID_KEYS = [ 127 'name', 128 'action', 129 'when', 130 'async', 131 'poll', 132 'notify', 133 'first_available_file', 134 'include', 135 'include_tasks', 136 'import_tasks', 137 'import_playbook', 138 'tags', 139 'register', 140 'ignore_errors', 141 'delegate_to', 142 'local_action', 143 'transport', 144 'remote_user', 145 'sudo', 146 'sudo_user', 147 'sudo_pass', 148 'when', 149 'connection', 150 'environment', 151 'args', 152 'any_errors_fatal', 153 'changed_when', 154 'failed_when', 155 'check_mode', 156 'delay', 157 'retries', 158 'until', 159 'su', 160 'su_user', 161 'su_pass', 162 'no_log', 163 'run_once', 164 'become', 165 'become_user', 166 'become_method', 167 FILENAME_KEY, 168] 169 170BLOCK_NAME_TO_ACTION_TYPE_MAP = { 171 'tasks': 'task', 172 'handlers': 'handler', 173 'pre_tasks': 'task', 174 'post_tasks': 'task', 175 'block': 'meta', 176 'rescue': 'meta', 177 'always': 'meta', 178} 179 180 181def tokenize(line: str) -> Tuple[str, List[str], Dict[str, str]]: 182 """Parse a string task invocation.""" 183 tokens = line.lstrip().split(" ") 184 if tokens[0] == '-': 185 tokens = tokens[1:] 186 if tokens[0] == 'action:' or tokens[0] == 'local_action:': 187 tokens = tokens[1:] 188 command = tokens[0].replace(":", "") 189 190 args = list() 191 kwargs = dict() 192 nonkvfound = False 193 for arg in tokens[1:]: 194 if "=" in arg and not nonkvfound: 195 kv = arg.split("=", 1) 196 kwargs[kv[0]] = kv[1] 197 else: 198 nonkvfound = True 199 args.append(arg) 200 return (command, args, kwargs) 201 202 203def _playbook_items(pb_data: AnsibleBaseYAMLObject) -> ItemsView: # type: ignore 204 if isinstance(pb_data, dict): 205 return pb_data.items() 206 if not pb_data: 207 return [] # type: ignore 208 209 # "if play" prevents failure if the play sequence contains None, 210 # which is weird but currently allowed by Ansible 211 # https://github.com/ansible-community/ansible-lint/issues/849 212 return [item for play in pb_data if play for item in play.items()] # type: ignore 213 214 215def _set_collections_basedir(basedir: str) -> None: 216 # Sets the playbook directory as playbook_paths for the collection loader 217 try: 218 # Ansible 2.10+ 219 # noqa: # pylint:disable=cyclic-import,import-outside-toplevel 220 from ansible.utils.collection_loader import AnsibleCollectionConfig 221 222 AnsibleCollectionConfig.playbook_paths = basedir 223 except ImportError: 224 # Ansible 2.8 or 2.9 225 # noqa: # pylint:disable=cyclic-import,import-outside-toplevel 226 from ansible.utils.collection_loader import set_collection_playbook_paths 227 228 set_collection_playbook_paths(basedir) 229 230 231def find_children(lintable: Lintable) -> List[Lintable]: # noqa: C901 232 """Traverse children of a single file or folder.""" 233 if not lintable.path.exists(): 234 return [] 235 playbook_dir = str(lintable.path.parent) 236 _set_collections_basedir(playbook_dir or os.path.abspath('.')) 237 add_all_plugin_dirs(playbook_dir or '.') 238 if lintable.kind == 'role': 239 playbook_ds = AnsibleMapping({'roles': [{'role': str(lintable.path)}]}) 240 elif lintable.kind not in ("playbook", "tasks"): 241 return [] 242 else: 243 try: 244 playbook_ds = parse_yaml_from_file(str(lintable.path)) 245 except AnsibleError as e: 246 raise SystemExit(str(e)) 247 results = [] 248 basedir = os.path.dirname(str(lintable.path)) 249 # playbook_ds can be an AnsibleUnicode string, which we consider invalid 250 if isinstance(playbook_ds, str): 251 raise MatchError(filename=str(lintable.path), rule=LoadingFailureRule()) 252 for item in _playbook_items(playbook_ds): 253 # if lintable.kind not in ["playbook"]: 254 # continue 255 for child in play_children(basedir, item, lintable.kind, playbook_dir): 256 # We avoid processing parametrized children 257 path_str = str(child.path) 258 if "$" in path_str or "{{" in path_str: 259 continue 260 261 # Repair incorrect paths obtained when old syntax was used, like: 262 # - include: simpletask.yml tags=nginx 263 valid_tokens = list() 264 for token in split_args(path_str): 265 if '=' in token: 266 break 267 valid_tokens.append(token) 268 path = ' '.join(valid_tokens) 269 if path != path_str: 270 child.path = Path(path) 271 child.name = child.path.name 272 273 results.append(child) 274 return results 275 276 277def template( 278 basedir: str, 279 value: Any, 280 variables: Any, 281 fail_on_undefined: bool = False, 282 **kwargs: str, 283) -> Any: 284 """Attempt rendering a value with known vars.""" 285 try: 286 value = ansible_template( 287 os.path.abspath(basedir), 288 value, 289 variables, 290 **dict(kwargs, fail_on_undefined=fail_on_undefined), 291 ) 292 # Hack to skip the following exception when using to_json filter on a variable. 293 # I guess the filter doesn't like empty vars... 294 except (AnsibleError, ValueError, RepresenterError): 295 # templating failed, so just keep value as is. 296 pass 297 return value 298 299 300def play_children( 301 basedir: str, item: Tuple[str, Any], parent_type: FileType, playbook_dir: str 302) -> List[Lintable]: 303 """Flatten the traversed play tasks.""" 304 delegate_map: Dict[str, Callable[[str, Any, Any, FileType], List[Lintable]]] = { 305 'tasks': _taskshandlers_children, 306 'pre_tasks': _taskshandlers_children, 307 'post_tasks': _taskshandlers_children, 308 'block': _taskshandlers_children, 309 'include': _include_children, 310 'import_playbook': _include_children, 311 'roles': _roles_children, 312 'dependencies': _roles_children, 313 'handlers': _taskshandlers_children, 314 'include_tasks': _include_children, 315 'import_tasks': _include_children, 316 } 317 (k, v) = item 318 add_all_plugin_dirs(os.path.abspath(basedir)) 319 320 if k in delegate_map: 321 if v: 322 v = template( 323 os.path.abspath(basedir), 324 v, 325 dict(playbook_dir=PLAYBOOK_DIR or os.path.abspath(basedir)), 326 fail_on_undefined=False, 327 ) 328 return delegate_map[k](basedir, k, v, parent_type) 329 return [] 330 331 332def _include_children( 333 basedir: str, k: str, v: Any, parent_type: FileType 334) -> List[Lintable]: 335 # handle special case include_tasks: name=filename.yml 336 if k == 'include_tasks' and isinstance(v, dict) and 'file' in v: 337 v = v['file'] 338 339 # handle include: filename.yml tags=blah 340 (command, args, kwargs) = tokenize("{0}: {1}".format(k, v)) 341 342 result = path_dwim(basedir, args[0]) 343 if not os.path.exists(result): 344 result = path_dwim(os.path.join(os.path.dirname(basedir)), v) 345 return [Lintable(result, kind=parent_type)] 346 347 348def _taskshandlers_children( 349 basedir: str, k: str, v: Union[None, Any], parent_type: FileType 350) -> List[Lintable]: 351 results: List[Lintable] = [] 352 if v is None: 353 raise MatchError( 354 message="A malformed block was encountered while loading a block.", 355 rule=RuntimeErrorRule(), 356 ) 357 for th in v: 358 359 # ignore empty tasks, `-` 360 if not th: 361 continue 362 363 with contextlib.suppress(LookupError): 364 children = _get_task_handler_children_for_tasks_or_playbooks( 365 th, 366 basedir, 367 k, 368 parent_type, 369 ) 370 results.append(children) 371 continue 372 373 if ( 374 'include_role' in th or 'import_role' in th 375 ): # lgtm [py/unreachable-statement] 376 th = normalize_task_v2(th) 377 _validate_task_handler_action_for_role(th['action']) 378 results.extend( 379 _roles_children( 380 basedir, 381 k, 382 [th['action'].get("name")], 383 parent_type, 384 main=th['action'].get('tasks_from', 'main'), 385 ) 386 ) 387 continue 388 389 if 'block' not in th: 390 continue 391 392 results.extend(_taskshandlers_children(basedir, k, th['block'], parent_type)) 393 if 'rescue' in th: 394 results.extend( 395 _taskshandlers_children(basedir, k, th['rescue'], parent_type) 396 ) 397 if 'always' in th: 398 results.extend( 399 _taskshandlers_children(basedir, k, th['always'], parent_type) 400 ) 401 402 return results 403 404 405def _get_task_handler_children_for_tasks_or_playbooks( 406 task_handler: Dict[str, Any], 407 basedir: str, 408 k: Any, 409 parent_type: FileType, 410) -> Lintable: 411 """Try to get children of taskhandler for include/import tasks/playbooks.""" 412 child_type = k if parent_type == 'playbook' else parent_type 413 414 task_include_keys = 'include', 'include_tasks', 'import_playbook', 'import_tasks' 415 for task_handler_key in task_include_keys: 416 417 with contextlib.suppress(KeyError): 418 419 # ignore empty tasks 420 if not task_handler: 421 continue 422 423 # import pdb; pdb.set_trace() 424 return Lintable( 425 path_dwim(basedir, task_handler[task_handler_key]), kind=child_type 426 ) 427 428 raise LookupError( 429 f'The node contains none of: {", ".join(task_include_keys)}', 430 ) 431 432 433def _validate_task_handler_action_for_role(th_action: Dict[str, Any]) -> None: 434 """Verify that the task handler action is valid for role include.""" 435 module = th_action['__ansible_module__'] 436 437 if 'name' not in th_action: 438 raise MatchError(message=f"Failed to find required 'name' key in {module!s}") 439 440 if not isinstance(th_action['name'], str): 441 raise MatchError( 442 message=f"Value assigned to 'name' key on '{module!s}' is not a string.", 443 ) 444 445 446def _roles_children( 447 basedir: str, k: str, v: Sequence[Any], parent_type: FileType, main: str = 'main' 448) -> List[Lintable]: 449 results: List[Lintable] = [] 450 for role in v: 451 if isinstance(role, dict): 452 if 'role' in role or 'name' in role: 453 if 'tags' not in role or 'skip_ansible_lint' not in role['tags']: 454 results.extend( 455 _look_for_role_files( 456 basedir, role.get('role', role.get('name')), main=main 457 ) 458 ) 459 elif k != 'dependencies': 460 raise SystemExit( 461 'role dict {0} does not contain a "role" ' 462 'or "name" key'.format(role) 463 ) 464 else: 465 results.extend(_look_for_role_files(basedir, role, main=main)) 466 return results 467 468 469def _rolepath(basedir: str, role: str) -> Optional[str]: 470 role_path = None 471 472 possible_paths = [ 473 # if included from a playbook 474 path_dwim(basedir, os.path.join('roles', role)), 475 path_dwim(basedir, role), 476 # if included from roles/[role]/meta/main.yml 477 path_dwim(basedir, os.path.join('..', '..', '..', 'roles', role)), 478 path_dwim(basedir, os.path.join('..', '..', role)), 479 # if checking a role in the current directory 480 path_dwim(basedir, os.path.join('..', role)), 481 ] 482 483 if constants.DEFAULT_ROLES_PATH: 484 search_locations = constants.DEFAULT_ROLES_PATH 485 if isinstance(search_locations, str): 486 search_locations = search_locations.split(os.pathsep) 487 for loc in search_locations: 488 loc = os.path.expanduser(loc) 489 possible_paths.append(path_dwim(loc, role)) 490 491 possible_paths.append(path_dwim(basedir, '')) 492 493 for path_option in possible_paths: 494 if os.path.isdir(path_option): 495 role_path = path_option 496 break 497 498 if role_path: 499 add_all_plugin_dirs(role_path) 500 501 return role_path 502 503 504def _look_for_role_files( 505 basedir: str, role: str, main: Optional[str] = 'main' 506) -> List[Lintable]: 507 role_path = _rolepath(basedir, role) 508 if not role_path: 509 return [] 510 511 results = [] 512 513 for kind in ['tasks', 'meta', 'handlers', 'vars', 'defaults']: 514 current_path = os.path.join(role_path, kind) 515 for folder, subdirs, files in os.walk(current_path): 516 for file in files: 517 file_ignorecase = file.lower() 518 if file_ignorecase.endswith(('.yml', '.yaml')): 519 thpath = os.path.join(folder, file) 520 results.append(Lintable(thpath)) 521 522 return results 523 524 525def _kv_to_dict(v: str) -> Dict[str, Any]: 526 (command, args, kwargs) = tokenize(v) 527 return dict(__ansible_module__=command, __ansible_arguments__=args, **kwargs) 528 529 530def _sanitize_task(task: Dict[str, Any]) -> Dict[str, Any]: 531 """Return a stripped-off task structure compatible with new Ansible. 532 533 This helper takes a copy of the incoming task and drops 534 any internally used keys from it. 535 """ 536 result = task.copy() 537 # task is an AnsibleMapping which inherits from OrderedDict, so we need 538 # to use `del` to remove unwanted keys. 539 for k in ['skipped_rules', FILENAME_KEY, LINE_NUMBER_KEY]: 540 if k in result: 541 del result[k] 542 return result 543 544 545def normalize_task_v2(task: Dict[str, Any]) -> Dict[str, Any]: 546 """Ensure tasks have a normalized action key and strings are converted to python objects.""" 547 result = dict() 548 549 sanitized_task = _sanitize_task(task) 550 mod_arg_parser = ModuleArgsParser(sanitized_task) 551 try: 552 action, arguments, result['delegate_to'] = mod_arg_parser.parse( 553 skip_action_validation=options.skip_action_validation 554 ) 555 except AnsibleParserError as e: 556 raise MatchError( 557 rule=AnsibleParserErrorRule(), 558 message=e.message, 559 filename=task.get(FILENAME_KEY, "Unknown"), 560 linenumber=task.get(LINE_NUMBER_KEY, 0), 561 ) 562 563 # denormalize shell -> command conversion 564 if '_uses_shell' in arguments: 565 action = 'shell' 566 del arguments['_uses_shell'] 567 568 for (k, v) in list(task.items()): 569 if k in ('action', 'local_action', 'args', 'delegate_to') or k == action: 570 # we don't want to re-assign these values, which were 571 # determined by the ModuleArgsParser() above 572 continue 573 result[k] = v 574 575 if not isinstance(action, str): 576 raise RuntimeError("Task actions can only be strings, got %s" % action) 577 action_unnormalized = action 578 # convert builtin fqn calls to short forms because most rules know only 579 # about short calls but in the future we may switch the normalization to do 580 # the opposite. Mainly we currently consider normalized the module listing 581 # used by `ansible-doc -t module -l 2>/dev/null` 582 action = removeprefix(action, "ansible.builtin.") 583 result['action'] = dict( 584 __ansible_module__=action, __ansible_module_original__=action_unnormalized 585 ) 586 587 if '_raw_params' in arguments: 588 result['action']['__ansible_arguments__'] = arguments['_raw_params'].split(' ') 589 del arguments['_raw_params'] 590 else: 591 result['action']['__ansible_arguments__'] = list() 592 593 if 'argv' in arguments and not result['action']['__ansible_arguments__']: 594 result['action']['__ansible_arguments__'] = arguments['argv'] 595 del arguments['argv'] 596 597 result['action'].update(arguments) 598 return result 599 600 601def normalize_task(task: Dict[str, Any], filename: str) -> Dict[str, Any]: 602 """Unify task-like object structures.""" 603 ansible_action_type = task.get('__ansible_action_type__', 'task') 604 if '__ansible_action_type__' in task: 605 del task['__ansible_action_type__'] 606 task = normalize_task_v2(task) 607 task[FILENAME_KEY] = filename 608 task['__ansible_action_type__'] = ansible_action_type 609 return task 610 611 612def task_to_str(task: Dict[str, Any]) -> str: 613 """Make a string identifier for the given task.""" 614 name = task.get("name") 615 if name: 616 return str(name) 617 action = task.get("action") 618 if isinstance(action, str) or not isinstance(action, dict): 619 return str(action) 620 args = " ".join( 621 [ 622 "{0}={1}".format(k, v) 623 for (k, v) in action.items() 624 if k 625 not in [ 626 "__ansible_module__", 627 "__ansible_module_original__", 628 "__ansible_arguments__", 629 "__line__", 630 "__file__", 631 ] 632 ] 633 ) 634 for item in action.get("__ansible_arguments__", []): 635 args += f" {item}" 636 return u"{0} {1}".format(action["__ansible_module__"], args) 637 638 639def extract_from_list( 640 blocks: AnsibleBaseYAMLObject, candidates: List[str] 641) -> List[Any]: 642 """Get action tasks from block structures.""" 643 results = list() 644 for block in blocks: 645 for candidate in candidates: 646 if isinstance(block, dict) and candidate in block: 647 if isinstance(block[candidate], list): 648 results.extend(add_action_type(block[candidate], candidate)) 649 elif block[candidate] is not None: 650 raise RuntimeError( 651 "Key '%s' defined, but bad value: '%s'" 652 % (candidate, str(block[candidate])) 653 ) 654 return results 655 656 657def add_action_type(actions: AnsibleBaseYAMLObject, action_type: str) -> List[Any]: 658 """Add action markers to task objects.""" 659 results = list() 660 for action in actions: 661 # ignore empty task 662 if not action: 663 continue 664 action['__ansible_action_type__'] = BLOCK_NAME_TO_ACTION_TYPE_MAP[action_type] 665 results.append(action) 666 return results 667 668 669def get_action_tasks(yaml: AnsibleBaseYAMLObject, file: Lintable) -> List[Any]: 670 """Get a flattened list of action tasks from the file.""" 671 tasks = list() 672 if file.kind in ['tasks', 'handlers']: 673 tasks = add_action_type(yaml, file.kind) 674 else: 675 tasks.extend( 676 extract_from_list(yaml, ['tasks', 'handlers', 'pre_tasks', 'post_tasks']) 677 ) 678 679 # Add sub-elements of block/rescue/always to tasks list 680 tasks.extend(extract_from_list(tasks, ['block', 'rescue', 'always'])) 681 # Remove block/rescue/always elements from tasks list 682 block_rescue_always = ('block', 'rescue', 'always') 683 tasks[:] = [ 684 task for task in tasks if all(k not in task for k in block_rescue_always) 685 ] 686 687 return [ 688 task 689 for task in tasks 690 if set( 691 ['include', 'include_tasks', 'import_playbook', 'import_tasks'] 692 ).isdisjoint(task.keys()) 693 ] 694 695 696def get_normalized_tasks( 697 yaml: "AnsibleBaseYAMLObject", file: Lintable 698) -> List[Dict[str, Any]]: 699 """Extract normalized tasks from a file.""" 700 tasks = get_action_tasks(yaml, file) 701 res = [] 702 for task in tasks: 703 # An empty `tags` block causes `None` to be returned if 704 # the `or []` is not present - `task.get('tags', [])` 705 # does not suffice. 706 if 'skip_ansible_lint' in (task.get('tags') or []): 707 # No need to normalize_task is we are skipping it. 708 continue 709 res.append(normalize_task(task, str(file.path))) 710 711 return res 712 713 714@lru_cache(maxsize=128) 715def parse_yaml_linenumbers(lintable: Lintable) -> AnsibleBaseYAMLObject: 716 """Parse yaml as ansible.utils.parse_yaml but with linenumbers. 717 718 The line numbers are stored in each node's LINE_NUMBER_KEY key. 719 """ 720 721 def compose_node(parent: yaml.nodes.Node, index: int) -> yaml.nodes.Node: 722 # the line number where the previous token has ended (plus empty lines) 723 line = loader.line 724 node = Composer.compose_node(loader, parent, index) 725 if not isinstance(node, yaml.nodes.Node): 726 raise RuntimeError("Unexpected yaml data.") 727 setattr(node, '__line__', line + 1) 728 return node 729 730 def construct_mapping( 731 node: AnsibleBaseYAMLObject, deep: bool = False 732 ) -> AnsibleMapping: 733 mapping = AnsibleConstructor.construct_mapping(loader, node, deep=deep) 734 if hasattr(node, '__line__'): 735 mapping[LINE_NUMBER_KEY] = node.__line__ 736 else: 737 mapping[LINE_NUMBER_KEY] = mapping._line_number 738 mapping[FILENAME_KEY] = lintable.path 739 return mapping 740 741 try: 742 kwargs = {} 743 if 'vault_password' in inspect.getfullargspec(AnsibleLoader.__init__).args: 744 kwargs['vault_password'] = DEFAULT_VAULT_PASSWORD 745 loader = AnsibleLoader(lintable.content, **kwargs) 746 loader.compose_node = compose_node 747 loader.construct_mapping = construct_mapping 748 data = loader.get_single_data() 749 except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e: 750 logging.exception(e) 751 raise SystemExit("Failed to parse YAML in %s: %s" % (lintable.path, str(e))) 752 return data 753 754 755def get_first_cmd_arg(task: Dict[str, Any]) -> Any: 756 """Extract the first arg from a cmd task.""" 757 try: 758 if 'cmd' in task['action']: 759 first_cmd_arg = task['action']['cmd'].split()[0] 760 else: 761 first_cmd_arg = task['action']['__ansible_arguments__'][0] 762 except IndexError: 763 return None 764 return first_cmd_arg 765 766 767def get_second_cmd_arg(task: Dict[str, Any]) -> Any: 768 """Extract the second arg from a cmd task.""" 769 try: 770 if 'cmd' in task['action']: 771 second_cmd_arg = task['action']['cmd'].split()[1] 772 else: 773 second_cmd_arg = task['action']['__ansible_arguments__'][1] 774 except IndexError: 775 return None 776 return second_cmd_arg 777 778 779def is_playbook(filename: str) -> bool: 780 """ 781 Check if the file is a playbook. 782 783 Given a filename, it should return true if it looks like a playbook. The 784 function is not supposed to raise exceptions. 785 """ 786 # we assume is a playbook if we loaded a sequence of dictionaries where 787 # at least one of these keys is present: 788 playbooks_keys = { 789 "gather_facts", 790 "hosts", 791 "import_playbook", 792 "post_tasks", 793 "pre_tasks", 794 "roles", 795 "tasks", 796 } 797 798 # makes it work with Path objects by converting them to strings 799 if not isinstance(filename, str): 800 filename = str(filename) 801 802 try: 803 f = parse_yaml_from_file(filename) 804 except Exception as e: 805 _logger.warning( 806 "Failed to load %s with %s, assuming is not a playbook.", filename, e 807 ) 808 else: 809 if ( 810 isinstance(f, AnsibleSequence) 811 and hasattr(next(iter(f), {}), 'keys') 812 and playbooks_keys.intersection(next(iter(f), {}).keys()) 813 ): 814 return True 815 return False 816 817 818# pylint: disable=too-many-statements 819def get_lintables( 820 options: Namespace = Namespace(), args: Optional[List[str]] = None 821) -> List[Lintable]: 822 """Detect files and directories that are lintable.""" 823 lintables: List[Lintable] = [] 824 825 # passing args bypass auto-detection mode 826 if args: 827 for arg in args: 828 lintable = Lintable(arg) 829 if lintable.kind in ("yaml", None): 830 _logger.warning( 831 "Overriding detected file kind '%s' with 'playbook' " 832 "for given positional argument: %s", 833 lintable.kind, 834 arg, 835 ) 836 lintable = Lintable(arg, kind="playbook") 837 lintables.append(lintable) 838 else: 839 840 for filename in discover_lintables(options): 841 842 p = Path(filename) 843 # skip exclusions 844 try: 845 for file_path in options.exclude_paths: 846 if str(p.resolve()).startswith(str(file_path)): 847 raise FileNotFoundError( 848 f'File {file_path} matched exclusion entry: {p}' 849 ) 850 except FileNotFoundError as e: 851 _logger.debug('Ignored %s due to: %s', p, e) 852 continue 853 854 lintables.append(Lintable(p)) 855 856 # stage 2: guess roles from current lintables, as there is no unique 857 # file that must be present in any kind of role. 858 _extend_with_roles(lintables) 859 860 return lintables 861 862 863def _extend_with_roles(lintables: List[Lintable]) -> None: 864 """Detect roles among lintables and adds them to the list.""" 865 for lintable in lintables: 866 parts = lintable.path.parent.parts 867 if 'roles' in parts: 868 role = lintable.path 869 while role.parent.name != "roles" and role.name: 870 role = role.parent 871 if role.exists and not role.is_file(): 872 lintable = Lintable(role, kind="role") 873 if lintable not in lintables: 874 _logger.debug("Added role: %s", lintable) 875 lintables.append(lintable) 876 877 878def convert_to_boolean(value: Any) -> bool: 879 """Use Ansible to convert something to a boolean.""" 880 return bool(boolean(value)) 881 882 883def nested_items( 884 data: Union[Dict[Any, Any], List[Any]], parent: str = "" 885) -> Generator[Tuple[Any, Any, str], None, None]: 886 """Iterate a nested data structure.""" 887 if isinstance(data, dict): 888 for k, v in data.items(): 889 yield k, v, parent 890 for k, v, p in nested_items(v, k): 891 yield k, v, p 892 if isinstance(data, list): 893 for item in data: 894 yield "list-item", item, parent 895 for k, v, p in nested_items(item): 896 yield k, v, p 897