1"""Runner implementation."""
2import logging
3import multiprocessing
4import multiprocessing.pool
5import os
6from dataclasses import dataclass
7from fnmatch import fnmatch
8from pathlib import Path
9from typing import TYPE_CHECKING, Any, FrozenSet, Generator, List, Optional, Set, Union
10
11import ansiblelint.skip_utils
12import ansiblelint.utils
13from ansiblelint._internal.rules import LoadingFailureRule
14from ansiblelint.errors import MatchError
15from ansiblelint.file_utils import Lintable, expand_dirs_in_lintables
16from ansiblelint.rules.AnsibleSyntaxCheckRule import AnsibleSyntaxCheckRule
17
18if TYPE_CHECKING:
19    from argparse import Namespace
20
21    from ansiblelint.rules import RulesCollection
22
23_logger = logging.getLogger(__name__)
24
25
26@dataclass
27class LintResult:
28    """Class that tracks result of linting."""
29
30    matches: List[MatchError]
31    files: Set[Lintable]
32
33
34class Runner:
35    """Runner class performs the linting process."""
36
37    # pylint: disable=too-many-arguments
38    def __init__(
39        self,
40        *lintables: Union[Lintable, str],
41        rules: "RulesCollection",
42        tags: FrozenSet[Any] = frozenset(),
43        skip_list: List[str] = [],
44        exclude_paths: List[str] = [],
45        verbosity: int = 0,
46        checked_files: Optional[Set[Lintable]] = None
47    ) -> None:
48        """Initialize a Runner instance."""
49        self.rules = rules
50        self.lintables: Set[Lintable] = set()
51
52        # Assure consistent type
53        for item in lintables:
54            if not isinstance(item, Lintable):
55                item = Lintable(item)
56            self.lintables.add(item)
57
58        # Expand folders (roles) to their components
59        expand_dirs_in_lintables(self.lintables)
60
61        self.tags = tags
62        self.skip_list = skip_list
63        self._update_exclude_paths(exclude_paths)
64        self.verbosity = verbosity
65        if checked_files is None:
66            checked_files = set()
67        self.checked_files = checked_files
68
69    def _update_exclude_paths(self, exclude_paths: List[str]) -> None:
70        if exclude_paths:
71            # These will be (potentially) relative paths
72            paths = ansiblelint.file_utils.expand_paths_vars(exclude_paths)
73            # Since ansiblelint.utils.find_children returns absolute paths,
74            # and the list of files we create in `Runner.run` can contain both
75            # relative and absolute paths, we need to cover both bases.
76            self.exclude_paths = paths + [os.path.abspath(p) for p in paths]
77        else:
78            self.exclude_paths = []
79
80    def is_excluded(self, file_path: str) -> bool:
81        """Verify if a file path should be excluded."""
82        # Any will short-circuit as soon as something returns True, but will
83        # be poor performance for the case where the path under question is
84        # not excluded.
85
86        # Exclusions should be evaluated only using absolute paths in order
87        # to work correctly.
88        if not file_path:
89            return False
90
91        abs_path = os.path.abspath(file_path)
92        _file_path = Path(file_path)
93
94        return any(
95            abs_path.startswith(path)
96            or _file_path.match(path)
97            or fnmatch(str(abs_path), path)
98            or fnmatch(str(_file_path), path)
99            for path in self.exclude_paths
100        )
101
102    def run(self) -> List[MatchError]:
103        """Execute the linting process."""
104        files: List[Lintable] = list()
105        matches: List[MatchError] = list()
106
107        # remove exclusions
108        for lintable in self.lintables.copy():
109            if self.is_excluded(str(lintable.path.resolve())):
110                _logger.debug("Excluded %s", lintable)
111                self.lintables.remove(lintable)
112
113        # -- phase 1 : syntax check in parallel --
114        def worker(lintable: Lintable) -> List[MatchError]:
115            return AnsibleSyntaxCheckRule._get_ansible_syntax_check_matches(lintable)
116
117        # playbooks: List[Lintable] = []
118        for lintable in self.lintables:
119            if lintable.kind != 'playbook':
120                continue
121            files.append(lintable)
122
123        pool = multiprocessing.pool.ThreadPool(processes=multiprocessing.cpu_count())
124        return_list = pool.map(worker, files, chunksize=1)
125        pool.close()
126        pool.join()
127        for data in return_list:
128            matches.extend(data)
129
130        # -- phase 2 ---
131        if not matches:
132
133            # do our processing only when ansible syntax check passed in order
134            # to avoid causing runtime exceptions. Our processing is not as
135            # relisient to be able process garbage.
136            matches.extend(self._emit_matches(files))
137
138            # remove duplicates from files list
139            files = [value for n, value in enumerate(files) if value not in files[:n]]
140
141            for file in self.lintables:
142                if file in self.checked_files or not file.kind:
143                    continue
144                _logger.debug(
145                    "Examining %s of type %s",
146                    ansiblelint.file_utils.normpath(file.path),
147                    file.kind,
148                )
149
150                matches.extend(
151                    self.rules.run(file, tags=set(self.tags), skip_list=self.skip_list)
152                )
153
154        # update list of checked files
155        self.checked_files.update(self.lintables)
156
157        # remove any matches made inside excluded files
158        matches = list(
159            filter(lambda match: not self.is_excluded(match.filename), matches)
160        )
161
162        return sorted(set(matches))
163
164    def _emit_matches(self, files: List[Lintable]) -> Generator[MatchError, None, None]:
165        visited: Set[Lintable] = set()
166        while visited != self.lintables:
167            for lintable in self.lintables - visited:
168                try:
169                    for child in ansiblelint.utils.find_children(lintable):
170                        if self.is_excluded(str(child.path)):
171                            continue
172                        self.lintables.add(child)
173                        files.append(child)
174                except MatchError as e:
175                    if not e.filename:
176                        e.filename = str(lintable.path)
177                    e.rule = LoadingFailureRule()
178                    yield e
179                except AttributeError:
180                    yield MatchError(
181                        filename=str(lintable.path), rule=LoadingFailureRule()
182                    )
183                visited.add(lintable)
184
185
186def _get_matches(rules: "RulesCollection", options: "Namespace") -> LintResult:
187
188    lintables = ansiblelint.utils.get_lintables(options=options, args=options.lintables)
189
190    matches = list()
191    checked_files: Set[Lintable] = set()
192    runner = Runner(
193        *lintables,
194        rules=rules,
195        tags=options.tags,
196        skip_list=options.skip_list,
197        exclude_paths=options.exclude_paths,
198        verbosity=options.verbosity,
199        checked_files=checked_files
200    )
201    matches.extend(runner.run())
202
203    # Assure we do not print duplicates and the order is consistent
204    matches = sorted(set(matches))
205
206    # Convert reported filenames into human redable ones, so we hide the
207    # fact we used temporary files when processing input from stdin.
208    for match in matches:
209        for lintable in lintables:
210            if match.filename == lintable.filename:
211                match.filename = lintable.name
212                break
213
214    return LintResult(matches=matches, files=checked_files)
215