1"""Runner implementation.""" 2import logging 3import multiprocessing 4import multiprocessing.pool 5import os 6from dataclasses import dataclass 7from fnmatch import fnmatch 8from pathlib import Path 9from typing import TYPE_CHECKING, Any, FrozenSet, Generator, List, Optional, Set, Union 10 11import ansiblelint.skip_utils 12import ansiblelint.utils 13from ansiblelint._internal.rules import LoadingFailureRule 14from ansiblelint.errors import MatchError 15from ansiblelint.file_utils import Lintable, expand_dirs_in_lintables 16from ansiblelint.rules.AnsibleSyntaxCheckRule import AnsibleSyntaxCheckRule 17 18if TYPE_CHECKING: 19 from argparse import Namespace 20 21 from ansiblelint.rules import RulesCollection 22 23_logger = logging.getLogger(__name__) 24 25 26@dataclass 27class LintResult: 28 """Class that tracks result of linting.""" 29 30 matches: List[MatchError] 31 files: Set[Lintable] 32 33 34class Runner: 35 """Runner class performs the linting process.""" 36 37 # pylint: disable=too-many-arguments 38 def __init__( 39 self, 40 *lintables: Union[Lintable, str], 41 rules: "RulesCollection", 42 tags: FrozenSet[Any] = frozenset(), 43 skip_list: List[str] = [], 44 exclude_paths: List[str] = [], 45 verbosity: int = 0, 46 checked_files: Optional[Set[Lintable]] = None 47 ) -> None: 48 """Initialize a Runner instance.""" 49 self.rules = rules 50 self.lintables: Set[Lintable] = set() 51 52 # Assure consistent type 53 for item in lintables: 54 if not isinstance(item, Lintable): 55 item = Lintable(item) 56 self.lintables.add(item) 57 58 # Expand folders (roles) to their components 59 expand_dirs_in_lintables(self.lintables) 60 61 self.tags = tags 62 self.skip_list = skip_list 63 self._update_exclude_paths(exclude_paths) 64 self.verbosity = verbosity 65 if checked_files is None: 66 checked_files = set() 67 self.checked_files = checked_files 68 69 def _update_exclude_paths(self, exclude_paths: List[str]) -> None: 70 if exclude_paths: 71 # These will be (potentially) relative paths 72 paths = ansiblelint.file_utils.expand_paths_vars(exclude_paths) 73 # Since ansiblelint.utils.find_children returns absolute paths, 74 # and the list of files we create in `Runner.run` can contain both 75 # relative and absolute paths, we need to cover both bases. 76 self.exclude_paths = paths + [os.path.abspath(p) for p in paths] 77 else: 78 self.exclude_paths = [] 79 80 def is_excluded(self, file_path: str) -> bool: 81 """Verify if a file path should be excluded.""" 82 # Any will short-circuit as soon as something returns True, but will 83 # be poor performance for the case where the path under question is 84 # not excluded. 85 86 # Exclusions should be evaluated only using absolute paths in order 87 # to work correctly. 88 if not file_path: 89 return False 90 91 abs_path = os.path.abspath(file_path) 92 _file_path = Path(file_path) 93 94 return any( 95 abs_path.startswith(path) 96 or _file_path.match(path) 97 or fnmatch(str(abs_path), path) 98 or fnmatch(str(_file_path), path) 99 for path in self.exclude_paths 100 ) 101 102 def run(self) -> List[MatchError]: 103 """Execute the linting process.""" 104 files: List[Lintable] = list() 105 matches: List[MatchError] = list() 106 107 # remove exclusions 108 for lintable in self.lintables.copy(): 109 if self.is_excluded(str(lintable.path.resolve())): 110 _logger.debug("Excluded %s", lintable) 111 self.lintables.remove(lintable) 112 113 # -- phase 1 : syntax check in parallel -- 114 def worker(lintable: Lintable) -> List[MatchError]: 115 return AnsibleSyntaxCheckRule._get_ansible_syntax_check_matches(lintable) 116 117 # playbooks: List[Lintable] = [] 118 for lintable in self.lintables: 119 if lintable.kind != 'playbook': 120 continue 121 files.append(lintable) 122 123 pool = multiprocessing.pool.ThreadPool(processes=multiprocessing.cpu_count()) 124 return_list = pool.map(worker, files, chunksize=1) 125 pool.close() 126 pool.join() 127 for data in return_list: 128 matches.extend(data) 129 130 # -- phase 2 --- 131 if not matches: 132 133 # do our processing only when ansible syntax check passed in order 134 # to avoid causing runtime exceptions. Our processing is not as 135 # relisient to be able process garbage. 136 matches.extend(self._emit_matches(files)) 137 138 # remove duplicates from files list 139 files = [value for n, value in enumerate(files) if value not in files[:n]] 140 141 for file in self.lintables: 142 if file in self.checked_files or not file.kind: 143 continue 144 _logger.debug( 145 "Examining %s of type %s", 146 ansiblelint.file_utils.normpath(file.path), 147 file.kind, 148 ) 149 150 matches.extend( 151 self.rules.run(file, tags=set(self.tags), skip_list=self.skip_list) 152 ) 153 154 # update list of checked files 155 self.checked_files.update(self.lintables) 156 157 # remove any matches made inside excluded files 158 matches = list( 159 filter(lambda match: not self.is_excluded(match.filename), matches) 160 ) 161 162 return sorted(set(matches)) 163 164 def _emit_matches(self, files: List[Lintable]) -> Generator[MatchError, None, None]: 165 visited: Set[Lintable] = set() 166 while visited != self.lintables: 167 for lintable in self.lintables - visited: 168 try: 169 for child in ansiblelint.utils.find_children(lintable): 170 if self.is_excluded(str(child.path)): 171 continue 172 self.lintables.add(child) 173 files.append(child) 174 except MatchError as e: 175 if not e.filename: 176 e.filename = str(lintable.path) 177 e.rule = LoadingFailureRule() 178 yield e 179 except AttributeError: 180 yield MatchError( 181 filename=str(lintable.path), rule=LoadingFailureRule() 182 ) 183 visited.add(lintable) 184 185 186def _get_matches(rules: "RulesCollection", options: "Namespace") -> LintResult: 187 188 lintables = ansiblelint.utils.get_lintables(options=options, args=options.lintables) 189 190 matches = list() 191 checked_files: Set[Lintable] = set() 192 runner = Runner( 193 *lintables, 194 rules=rules, 195 tags=options.tags, 196 skip_list=options.skip_list, 197 exclude_paths=options.exclude_paths, 198 verbosity=options.verbosity, 199 checked_files=checked_files 200 ) 201 matches.extend(runner.run()) 202 203 # Assure we do not print duplicates and the order is consistent 204 matches = sorted(set(matches)) 205 206 # Convert reported filenames into human redable ones, so we hide the 207 # fact we used temporary files when processing input from stdin. 208 for match in matches: 209 for lintable in lintables: 210 if match.filename == lintable.filename: 211 match.filename = lintable.name 212 break 213 214 return LintResult(matches=matches, files=checked_files) 215