1"""Utility functions related to file operations.""" 2import copy 3import logging 4import os 5import pathlib 6import subprocess 7import sys 8from argparse import Namespace 9from collections import OrderedDict 10from contextlib import contextmanager 11from pathlib import Path 12from tempfile import NamedTemporaryFile 13from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Union 14 15# import wcmatch 16import wcmatch.pathlib 17from wcmatch.wcmatch import RECURSIVE, WcMatch 18 19from ansiblelint.config import BASE_KINDS, options 20from ansiblelint.constants import FileType 21 22if TYPE_CHECKING: 23 # https://github.com/PyCQA/pylint/issues/3979 24 BasePathLike = os.PathLike[Any] # pylint: disable=unsubscriptable-object 25else: 26 BasePathLike = os.PathLike 27 28_logger = logging.getLogger(__package__) 29 30 31def abspath(path: str, base_dir: str) -> str: 32 """Make relative path absolute relative to given directory. 33 34 Args: 35 path (str): the path to make absolute 36 base_dir (str): the directory from which make \ 37 relative paths absolute 38 """ 39 if not os.path.isabs(path): 40 # Don't use abspath as it assumes path is relative to cwd. 41 # We want it relative to base_dir. 42 path = os.path.join(base_dir, path) 43 44 return os.path.normpath(path) 45 46 47def normpath(path: Union[str, BasePathLike]) -> str: 48 """ 49 Normalize a path in order to provide a more consistent output. 50 51 Currently it generates a relative path but in the future we may want to 52 make this user configurable. 53 """ 54 # conversion to string in order to allow receiving non string objects 55 relpath = os.path.relpath(str(path)) 56 abspath = os.path.abspath(str(path)) 57 # we avoid returning relative paths that endup at root level 58 if abspath in relpath: 59 return abspath 60 return relpath 61 62 63@contextmanager 64def cwd(path: Union[str, BasePathLike]) -> Iterator[None]: 65 """Context manager for temporary changing current working directory.""" 66 old_pwd = os.getcwd() 67 os.chdir(path) 68 try: 69 yield 70 finally: 71 os.chdir(old_pwd) 72 73 74def expand_path_vars(path: str) -> str: 75 """Expand the environment or ~ variables in a path string.""" 76 # It may be possible for function to be called with a Path object 77 path = str(path).strip() 78 path = os.path.expanduser(path) 79 path = os.path.expandvars(path) 80 return path 81 82 83def expand_paths_vars(paths: List[str]) -> List[str]: 84 """Expand the environment or ~ variables in a list.""" 85 paths = [expand_path_vars(p) for p in paths] 86 return paths 87 88 89def kind_from_path(path: Path, base: bool = False) -> FileType: 90 """Determine the file kind based on its name. 91 92 When called with base=True, it will return the base file type instead 93 of the explicit one. That is expected to return 'yaml' for any yaml files. 94 """ 95 # pathlib.Path.match patterns are very limited, they do not support *a*.yml 96 # glob.glob supports **/foo.yml but not multiple extensions 97 pathex = wcmatch.pathlib.PurePath(str(path.absolute().resolve())) 98 kinds = options.kinds if not base else BASE_KINDS 99 for entry in kinds: 100 for k, v in entry.items(): 101 if pathex.globmatch( 102 v, 103 flags=( 104 wcmatch.pathlib.GLOBSTAR 105 | wcmatch.pathlib.BRACE 106 | wcmatch.pathlib.DOTGLOB 107 ), 108 ): 109 return str(k) # type: ignore 110 111 if base: 112 # Unknown base file type is default 113 return "" 114 115 if path.is_dir(): 116 return "role" 117 118 if str(path) == '/dev/stdin': 119 return "playbook" 120 121 # Unknown file types report a empty string (evaluated as False) 122 return "" 123 124 125class Lintable: 126 """Defines a file/folder that can be linted. 127 128 Providing file content when creating the object allow creation of in-memory 129 instances that do not need files to be present on disk. 130 """ 131 132 def __init__( 133 self, 134 name: Union[str, Path], 135 content: Optional[str] = None, 136 kind: Optional[FileType] = None, 137 ): 138 """Create a Lintable instance.""" 139 # Filename is effective file on disk, for stdin is a namedtempfile 140 self.filename: str = str(name) 141 self.dir: str = "" 142 self.kind: Optional[FileType] = None 143 144 if isinstance(name, str): 145 self.name = normpath(name) 146 self.path = Path(self.name) 147 else: 148 self.name = str(name) 149 self.path = name 150 self._content = content 151 152 # if the lintable is part of a role, we save role folder name 153 self.role = "" 154 parts = self.path.parent.parts 155 if 'roles' in parts: 156 role = self.path 157 while role.parent.name != "roles" and role.name: 158 role = role.parent 159 if role.exists: 160 self.role = role.name 161 162 if str(self.path) in ['/dev/stdin', '-']: 163 # pylint: disable=consider-using-with 164 self.file = NamedTemporaryFile(mode="w+", suffix="playbook.yml") 165 self.filename = self.file.name 166 self._content = sys.stdin.read() 167 self.file.write(self._content) 168 self.file.flush() 169 self.path = Path(self.file.name) 170 self.name = 'stdin' 171 self.kind = 'playbook' 172 self.dir = '/' 173 else: 174 self.kind = kind or kind_from_path(self.path) 175 # We store absolute directory in dir 176 if not self.dir: 177 if self.kind == "role": 178 self.dir = str(self.path.resolve()) 179 else: 180 self.dir = str(self.path.parent.resolve()) 181 182 # determine base file kind (yaml, xml, ini, ...) 183 self.base_kind = kind_from_path(self.path, base=True) 184 185 def __getitem__(self, key: Any) -> Any: 186 """Provide compatibility subscriptable support.""" 187 if key == 'path': 188 return str(self.path) 189 if key == 'type': 190 return str(self.kind) 191 raise NotImplementedError() 192 193 def get(self, key: Any, default: Any = None) -> Any: 194 """Provide compatibility subscriptable support.""" 195 try: 196 return self.__getitem__(key) 197 except NotImplementedError: 198 return default 199 200 @property 201 def content(self) -> str: 202 """Retried file content, from internal cache or disk.""" 203 if self._content is None: 204 with open(self.path.resolve(), mode='r', encoding='utf-8') as f: 205 self._content = f.read() 206 return self._content 207 208 def __hash__(self) -> int: 209 """Return a hash value of the lintables.""" 210 return hash((self.name, self.kind)) 211 212 def __eq__(self, other: object) -> bool: 213 """Identify whether the other object represents the same rule match.""" 214 if isinstance(other, Lintable): 215 return bool(self.name == other.name and self.kind == other.kind) 216 return False 217 218 def __repr__(self) -> str: 219 """Return user friendly representation of a lintable.""" 220 return f"{self.name} ({self.kind})" 221 222 223def discover_lintables(options: Namespace) -> Dict[str, Any]: 224 """Find all files that we know how to lint.""" 225 # git is preferred as it also considers .gitignore 226 git_command_present = [ 227 'git', 228 'ls-files', 229 '--cached', 230 '--others', 231 '--exclude-standard', 232 '-z', 233 ] 234 git_command_absent = ['git', 'ls-files', '--deleted', '-z'] 235 out = None 236 237 try: 238 out_present = subprocess.check_output( 239 git_command_present, stderr=subprocess.STDOUT, universal_newlines=True 240 ).split("\x00")[:-1] 241 _logger.info( 242 "Discovered files to lint using: %s", ' '.join(git_command_present) 243 ) 244 245 out_absent = subprocess.check_output( 246 git_command_absent, stderr=subprocess.STDOUT, universal_newlines=True 247 ).split("\x00")[:-1] 248 _logger.info("Excluded removed files using: %s", ' '.join(git_command_absent)) 249 250 out = set(out_present) - set(out_absent) 251 except subprocess.CalledProcessError as exc: 252 if not (exc.returncode == 128 and 'fatal: not a git repository' in exc.output): 253 _logger.warning( 254 "Failed to discover lintable files using git: %s", 255 exc.output.rstrip('\n'), 256 ) 257 except FileNotFoundError as exc: 258 if options.verbosity: 259 _logger.warning("Failed to locate command: %s", exc) 260 261 if out is None: 262 exclude_pattern = "|".join(str(x) for x in options.exclude_paths) 263 _logger.info("Looking up for files, excluding %s ...", exclude_pattern) 264 out = set( 265 WcMatch( 266 '.', exclude_pattern=exclude_pattern, flags=RECURSIVE, limit=256 267 ).match() 268 ) 269 270 return OrderedDict.fromkeys(sorted(out)) 271 272 273def guess_project_dir(config_file: Optional[str]) -> str: 274 """Return detected project dir or current working directory.""" 275 path = None 276 if config_file is not None: 277 target = pathlib.Path(config_file) 278 if target.exists(): 279 path = str(target.parent.absolute()) 280 281 if path is None: 282 try: 283 result = subprocess.run( 284 ["git", "rev-parse", "--show-toplevel"], 285 stderr=subprocess.PIPE, 286 stdout=subprocess.PIPE, 287 universal_newlines=True, 288 check=True, 289 ) 290 291 path = result.stdout.splitlines()[0] 292 except subprocess.CalledProcessError as exc: 293 if not ( 294 exc.returncode == 128 and 'fatal: not a git repository' in exc.stderr 295 ): 296 _logger.warning( 297 "Failed to guess project directory using git: %s", 298 exc.stderr.rstrip('\n'), 299 ) 300 except FileNotFoundError as exc: 301 _logger.warning("Failed to locate command: %s", exc) 302 303 if path is None: 304 path = os.getcwd() 305 306 _logger.info( 307 "Guessed %s as project root directory", 308 path, 309 ) 310 311 return path 312 313 314def expand_dirs_in_lintables(lintables: Set[Lintable]) -> None: 315 """Return all recognized lintables within given directory.""" 316 should_expand = False 317 318 for item in lintables: 319 if item.path.is_dir(): 320 should_expand = True 321 break 322 323 if should_expand: 324 # this relies on git and we do not want to call unless needed 325 all_files = discover_lintables(options) 326 327 for item in copy.copy(lintables): 328 if item.path.is_dir(): 329 for filename in all_files: 330 if filename.startswith(str(item.path)): 331 lintables.add(Lintable(filename)) 332