1"""Utility functions related to file operations."""
2import copy
3import logging
4import os
5import pathlib
6import subprocess
7import sys
8from argparse import Namespace
9from collections import OrderedDict
10from contextlib import contextmanager
11from pathlib import Path
12from tempfile import NamedTemporaryFile
13from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Union
14
15# import wcmatch
16import wcmatch.pathlib
17from wcmatch.wcmatch import RECURSIVE, WcMatch
18
19from ansiblelint.config import BASE_KINDS, options
20from ansiblelint.constants import FileType
21
22if TYPE_CHECKING:
23    # https://github.com/PyCQA/pylint/issues/3979
24    BasePathLike = os.PathLike[Any]  # pylint: disable=unsubscriptable-object
25else:
26    BasePathLike = os.PathLike
27
28_logger = logging.getLogger(__package__)
29
30
31def abspath(path: str, base_dir: str) -> str:
32    """Make relative path absolute relative to given directory.
33
34    Args:
35       path (str): the path to make absolute
36       base_dir (str): the directory from which make \
37                       relative paths absolute
38    """
39    if not os.path.isabs(path):
40        # Don't use abspath as it assumes path is relative to cwd.
41        # We want it relative to base_dir.
42        path = os.path.join(base_dir, path)
43
44    return os.path.normpath(path)
45
46
47def normpath(path: Union[str, BasePathLike]) -> str:
48    """
49    Normalize a path in order to provide a more consistent output.
50
51    Currently it generates a relative path but in the future we may want to
52    make this user configurable.
53    """
54    # conversion to string in order to allow receiving non string objects
55    relpath = os.path.relpath(str(path))
56    abspath = os.path.abspath(str(path))
57    # we avoid returning relative paths that endup at root level
58    if abspath in relpath:
59        return abspath
60    return relpath
61
62
63@contextmanager
64def cwd(path: Union[str, BasePathLike]) -> Iterator[None]:
65    """Context manager for temporary changing current working directory."""
66    old_pwd = os.getcwd()
67    os.chdir(path)
68    try:
69        yield
70    finally:
71        os.chdir(old_pwd)
72
73
74def expand_path_vars(path: str) -> str:
75    """Expand the environment or ~ variables in a path string."""
76    # It may be possible for function to be called with a Path object
77    path = str(path).strip()
78    path = os.path.expanduser(path)
79    path = os.path.expandvars(path)
80    return path
81
82
83def expand_paths_vars(paths: List[str]) -> List[str]:
84    """Expand the environment or ~ variables in a list."""
85    paths = [expand_path_vars(p) for p in paths]
86    return paths
87
88
89def kind_from_path(path: Path, base: bool = False) -> FileType:
90    """Determine the file kind based on its name.
91
92    When called with base=True, it will return the base file type instead
93    of the explicit one. That is expected to return 'yaml' for any yaml files.
94    """
95    # pathlib.Path.match patterns are very limited, they do not support *a*.yml
96    # glob.glob supports **/foo.yml but not multiple extensions
97    pathex = wcmatch.pathlib.PurePath(str(path.absolute().resolve()))
98    kinds = options.kinds if not base else BASE_KINDS
99    for entry in kinds:
100        for k, v in entry.items():
101            if pathex.globmatch(
102                v,
103                flags=(
104                    wcmatch.pathlib.GLOBSTAR
105                    | wcmatch.pathlib.BRACE
106                    | wcmatch.pathlib.DOTGLOB
107                ),
108            ):
109                return str(k)  # type: ignore
110
111    if base:
112        # Unknown base file type is default
113        return ""
114
115    if path.is_dir():
116        return "role"
117
118    if str(path) == '/dev/stdin':
119        return "playbook"
120
121    # Unknown file types report a empty string (evaluated as False)
122    return ""
123
124
125class Lintable:
126    """Defines a file/folder that can be linted.
127
128    Providing file content when creating the object allow creation of in-memory
129    instances that do not need files to be present on disk.
130    """
131
132    def __init__(
133        self,
134        name: Union[str, Path],
135        content: Optional[str] = None,
136        kind: Optional[FileType] = None,
137    ):
138        """Create a Lintable instance."""
139        # Filename is effective file on disk, for stdin is a namedtempfile
140        self.filename: str = str(name)
141        self.dir: str = ""
142        self.kind: Optional[FileType] = None
143
144        if isinstance(name, str):
145            self.name = normpath(name)
146            self.path = Path(self.name)
147        else:
148            self.name = str(name)
149            self.path = name
150        self._content = content
151
152        # if the lintable is part of a role, we save role folder name
153        self.role = ""
154        parts = self.path.parent.parts
155        if 'roles' in parts:
156            role = self.path
157            while role.parent.name != "roles" and role.name:
158                role = role.parent
159            if role.exists:
160                self.role = role.name
161
162        if str(self.path) in ['/dev/stdin', '-']:
163            # pylint: disable=consider-using-with
164            self.file = NamedTemporaryFile(mode="w+", suffix="playbook.yml")
165            self.filename = self.file.name
166            self._content = sys.stdin.read()
167            self.file.write(self._content)
168            self.file.flush()
169            self.path = Path(self.file.name)
170            self.name = 'stdin'
171            self.kind = 'playbook'
172            self.dir = '/'
173        else:
174            self.kind = kind or kind_from_path(self.path)
175        # We store absolute directory in dir
176        if not self.dir:
177            if self.kind == "role":
178                self.dir = str(self.path.resolve())
179            else:
180                self.dir = str(self.path.parent.resolve())
181
182        # determine base file kind (yaml, xml, ini, ...)
183        self.base_kind = kind_from_path(self.path, base=True)
184
185    def __getitem__(self, key: Any) -> Any:
186        """Provide compatibility subscriptable support."""
187        if key == 'path':
188            return str(self.path)
189        if key == 'type':
190            return str(self.kind)
191        raise NotImplementedError()
192
193    def get(self, key: Any, default: Any = None) -> Any:
194        """Provide compatibility subscriptable support."""
195        try:
196            return self.__getitem__(key)
197        except NotImplementedError:
198            return default
199
200    @property
201    def content(self) -> str:
202        """Retried file content, from internal cache or disk."""
203        if self._content is None:
204            with open(self.path.resolve(), mode='r', encoding='utf-8') as f:
205                self._content = f.read()
206        return self._content
207
208    def __hash__(self) -> int:
209        """Return a hash value of the lintables."""
210        return hash((self.name, self.kind))
211
212    def __eq__(self, other: object) -> bool:
213        """Identify whether the other object represents the same rule match."""
214        if isinstance(other, Lintable):
215            return bool(self.name == other.name and self.kind == other.kind)
216        return False
217
218    def __repr__(self) -> str:
219        """Return user friendly representation of a lintable."""
220        return f"{self.name} ({self.kind})"
221
222
223def discover_lintables(options: Namespace) -> Dict[str, Any]:
224    """Find all files that we know how to lint."""
225    # git is preferred as it also considers .gitignore
226    git_command_present = [
227        'git',
228        'ls-files',
229        '--cached',
230        '--others',
231        '--exclude-standard',
232        '-z',
233    ]
234    git_command_absent = ['git', 'ls-files', '--deleted', '-z']
235    out = None
236
237    try:
238        out_present = subprocess.check_output(
239            git_command_present, stderr=subprocess.STDOUT, universal_newlines=True
240        ).split("\x00")[:-1]
241        _logger.info(
242            "Discovered files to lint using: %s", ' '.join(git_command_present)
243        )
244
245        out_absent = subprocess.check_output(
246            git_command_absent, stderr=subprocess.STDOUT, universal_newlines=True
247        ).split("\x00")[:-1]
248        _logger.info("Excluded removed files using: %s", ' '.join(git_command_absent))
249
250        out = set(out_present) - set(out_absent)
251    except subprocess.CalledProcessError as exc:
252        if not (exc.returncode == 128 and 'fatal: not a git repository' in exc.output):
253            _logger.warning(
254                "Failed to discover lintable files using git: %s",
255                exc.output.rstrip('\n'),
256            )
257    except FileNotFoundError as exc:
258        if options.verbosity:
259            _logger.warning("Failed to locate command: %s", exc)
260
261    if out is None:
262        exclude_pattern = "|".join(str(x) for x in options.exclude_paths)
263        _logger.info("Looking up for files, excluding %s ...", exclude_pattern)
264        out = set(
265            WcMatch(
266                '.', exclude_pattern=exclude_pattern, flags=RECURSIVE, limit=256
267            ).match()
268        )
269
270    return OrderedDict.fromkeys(sorted(out))
271
272
273def guess_project_dir(config_file: Optional[str]) -> str:
274    """Return detected project dir or current working directory."""
275    path = None
276    if config_file is not None:
277        target = pathlib.Path(config_file)
278        if target.exists():
279            path = str(target.parent.absolute())
280
281    if path is None:
282        try:
283            result = subprocess.run(
284                ["git", "rev-parse", "--show-toplevel"],
285                stderr=subprocess.PIPE,
286                stdout=subprocess.PIPE,
287                universal_newlines=True,
288                check=True,
289            )
290
291            path = result.stdout.splitlines()[0]
292        except subprocess.CalledProcessError as exc:
293            if not (
294                exc.returncode == 128 and 'fatal: not a git repository' in exc.stderr
295            ):
296                _logger.warning(
297                    "Failed to guess project directory using git: %s",
298                    exc.stderr.rstrip('\n'),
299                )
300        except FileNotFoundError as exc:
301            _logger.warning("Failed to locate command: %s", exc)
302
303    if path is None:
304        path = os.getcwd()
305
306    _logger.info(
307        "Guessed %s as project root directory",
308        path,
309    )
310
311    return path
312
313
314def expand_dirs_in_lintables(lintables: Set[Lintable]) -> None:
315    """Return all recognized lintables within given directory."""
316    should_expand = False
317
318    for item in lintables:
319        if item.path.is_dir():
320            should_expand = True
321            break
322
323    if should_expand:
324        # this relies on git and we do not want to call unless needed
325        all_files = discover_lintables(options)
326
327        for item in copy.copy(lintables):
328            if item.path.is_dir():
329                for filename in all_files:
330                    if filename.startswith(str(item.path)):
331                        lintables.add(Lintable(filename))
332