1""" 2Wild Card Match. 3 4A module for performing wild card matches. 5 6Licensed under MIT 7Copyright (c) 2018 - 2020 Isaac Muse <isaacmuse@gmail.com> 8 9Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated 10documentation files (the "Software"), to deal in the Software without restriction, including without limitation 11the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, 12and to permit persons to whom the Software is furnished to do so, subject to the following conditions: 13 14The above copyright notice and this permission notice shall be included in all copies or substantial portions 15of the Software. 16 17THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED 18TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 19THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF 20CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21IN THE SOFTWARE. 22""" 23import os 24import re 25from . import _wcparse 26from . import _wcmatch 27from . import util 28from typing import Optional, Any, Iterator, List, Generic, AnyStr 29 30 31__all__ = ( 32 "CASE", "IGNORECASE", "RAWCHARS", "FILEPATHNAME", "DIRPATHNAME", "PATHNAME", 33 "EXTMATCH", "GLOBSTAR", "BRACE", "MINUSNEGATE", "SYMLINKS", "HIDDEN", "RECURSIVE", 34 "MATCHBASE", 35 "C", "I", "R", "P", "E", "G", "M", "DP", "FP", "SL", "HD", "RV", "X", "B", 36 "WcMatch" 37) 38 39C = CASE = _wcparse.CASE 40I = IGNORECASE = _wcparse.IGNORECASE 41R = RAWCHARS = _wcparse.RAWCHARS 42E = EXTMATCH = _wcparse.EXTMATCH 43G = GLOBSTAR = _wcparse.GLOBSTAR 44B = BRACE = _wcparse.BRACE 45M = MINUSNEGATE = _wcparse.MINUSNEGATE 46X = MATCHBASE = _wcparse.MATCHBASE 47 48# Control `PATHNAME` individually for folder exclude and files 49DP = DIRPATHNAME = 0x1000000 50FP = FILEPATHNAME = 0x2000000 51SL = SYMLINKS = 0x4000000 52HD = HIDDEN = 0x8000000 53RV = RECURSIVE = 0x10000000 54 55# Internal flags 56_ANCHOR = _wcparse._ANCHOR 57_NEGATE = _wcparse.NEGATE 58_DOTMATCH = _wcparse.DOTMATCH 59_NEGATEALL = _wcparse.NEGATEALL 60_SPLIT = _wcparse.SPLIT 61_FORCEWIN = _wcparse.FORCEWIN 62_PATHNAME = _wcparse.PATHNAME 63 64# Control `PATHNAME` for file and folder 65P = PATHNAME = DIRPATHNAME | FILEPATHNAME 66 67FLAG_MASK = ( 68 CASE | 69 IGNORECASE | 70 RAWCHARS | 71 EXTMATCH | 72 GLOBSTAR | 73 BRACE | 74 MINUSNEGATE | 75 DIRPATHNAME | 76 FILEPATHNAME | 77 SYMLINKS | 78 HIDDEN | 79 RECURSIVE | 80 MATCHBASE 81) 82 83 84class WcMatch(Generic[AnyStr]): 85 """Finds files by wildcard.""" 86 87 def __init__( 88 self, 89 root_dir: AnyStr, 90 file_pattern: Optional[AnyStr] = None, 91 exclude_pattern: Optional[AnyStr] = None, 92 flags: int = 0, 93 limit: int = _wcparse.PATHNAME, 94 **kwargs: Any 95 ): 96 """Initialize the directory walker object.""" 97 98 self.is_bytes = isinstance(root_dir, bytes) 99 self._directory = self._norm_slash(root_dir) # type: AnyStr 100 self._abort = False 101 self._skipped = 0 102 self._parse_flags(flags) 103 self._sep = os.fsencode(os.sep) if isinstance(root_dir, bytes) else os.sep # type: AnyStr 104 self._root_dir = self._add_sep(self._get_cwd(), True) # type: AnyStr 105 self.limit = limit 106 empty = os.fsencode('') if isinstance(root_dir, bytes) else '' 107 self.pattern_file = file_pattern if file_pattern is not None else empty # type: AnyStr 108 self.pattern_folder_exclude = exclude_pattern if exclude_pattern is not None else empty # type: AnyStr 109 self.file_check = None # type: Optional[_wcmatch.WcRegexp[AnyStr]] 110 self.folder_exclude_check = None # type: Optional[_wcmatch.WcRegexp[AnyStr]] 111 self.on_init(**kwargs) 112 self._compile(self.pattern_file, self.pattern_folder_exclude) 113 114 def _norm_slash(self, name: AnyStr) -> AnyStr: 115 """Normalize path slashes.""" 116 117 if util.is_case_sensitive(): 118 return name 119 elif isinstance(name, bytes): 120 return name.replace(b'/', b"\\") 121 else: 122 return name.replace('/', "\\") 123 124 def _add_sep(self, path: AnyStr, check: bool = False) -> AnyStr: 125 """Add separator.""" 126 127 return (path + self._sep) if not check or not path.endswith(self._sep) else path 128 129 def _get_cwd(self) -> AnyStr: 130 """Get current working directory.""" 131 132 if self._directory: 133 return self._directory 134 elif isinstance(self._directory, bytes): 135 return bytes(os.curdir, 'ASCII') 136 else: 137 return os.curdir 138 139 def _parse_flags(self, flags: int) -> None: 140 """Parse flags.""" 141 142 self.flags = flags & FLAG_MASK 143 self.flags |= _NEGATE | _DOTMATCH | _NEGATEALL | _SPLIT 144 self.follow_links = bool(self.flags & SYMLINKS) 145 self.show_hidden = bool(self.flags & HIDDEN) 146 self.recursive = bool(self.flags & RECURSIVE) 147 self.dir_pathname = bool(self.flags & DIRPATHNAME) 148 self.file_pathname = bool(self.flags & FILEPATHNAME) 149 self.matchbase = bool(self.flags & MATCHBASE) 150 if util.platform() == "windows": 151 self.flags |= _FORCEWIN 152 self.flags = self.flags & (_wcparse.FLAG_MASK ^ MATCHBASE) 153 154 def _compile_wildcard(self, pattern: AnyStr, pathname: bool = False) -> Optional[_wcmatch.WcRegexp[AnyStr]]: 155 """Compile or format the wildcard inclusion/exclusion pattern.""" 156 157 flags = self.flags 158 if pathname: 159 flags |= _PATHNAME | _ANCHOR 160 if self.matchbase: 161 flags |= MATCHBASE 162 163 return _wcparse.compile([pattern], flags, self.limit) if pattern else None 164 165 def _compile(self, file_pattern: AnyStr, folder_exclude_pattern: AnyStr) -> None: 166 """Compile patterns.""" 167 168 if self.file_check is None: 169 if not file_pattern: 170 self.file_check = _wcmatch.WcRegexp( 171 (re.compile(br'^.*$' if isinstance(file_pattern, bytes) else r'^.*$', re.DOTALL),) 172 ) 173 else: 174 self.file_check = self._compile_wildcard(file_pattern, self.file_pathname) 175 176 if self.folder_exclude_check is None: 177 if not folder_exclude_pattern: 178 self.folder_exclude_check = _wcmatch.WcRegexp(tuple()) 179 else: 180 self.folder_exclude_check = self._compile_wildcard(folder_exclude_pattern, self.dir_pathname) 181 182 def _valid_file(self, base: AnyStr, name: AnyStr) -> bool: 183 """Return whether a file can be searched.""" 184 185 valid = False 186 fullpath = os.path.join(base, name) 187 if self.file_check is not None and self.compare_file(fullpath[self._base_len:] if self.file_pathname else name): 188 valid = True 189 if valid and (not self.show_hidden and util.is_hidden(fullpath)): 190 valid = False 191 return self.on_validate_file(base, name) if valid else valid 192 193 def compare_file(self, filename: AnyStr) -> bool: 194 """Compare filename.""" 195 196 return self.file_check.match(filename) # type: ignore[union-attr] 197 198 def on_validate_file(self, base: AnyStr, name: AnyStr) -> bool: 199 """Validate file override.""" 200 201 return True 202 203 def _valid_folder(self, base: AnyStr, name: AnyStr) -> bool: 204 """Return whether a folder can be searched.""" 205 206 valid = True 207 fullpath = os.path.join(base, name) 208 if ( 209 not self.recursive or 210 ( 211 self.folder_exclude_check and 212 not self.compare_directory(fullpath[self._base_len:] if self.dir_pathname else name) 213 ) 214 ): 215 valid = False 216 if valid and (not self.show_hidden and util.is_hidden(fullpath)): 217 valid = False 218 return self.on_validate_directory(base, name) if valid else valid 219 220 def compare_directory(self, directory: AnyStr) -> bool: 221 """Compare folder.""" 222 223 return not self.folder_exclude_check.match( # type: ignore[union-attr] 224 self._add_sep(directory) if self.dir_pathname else directory 225 ) 226 227 def on_init(self, **kwargs: Any) -> None: 228 """Handle custom initialization.""" 229 230 def on_validate_directory(self, base: AnyStr, name: AnyStr) -> bool: 231 """Validate folder override.""" 232 233 return True 234 235 def on_skip(self, base: AnyStr, name: AnyStr) -> Any: 236 """On skip.""" 237 238 return None 239 240 def on_error(self, base: AnyStr, name: AnyStr) -> Any: 241 """On error.""" 242 243 return None 244 245 def on_match(self, base: AnyStr, name: AnyStr) -> Any: 246 """On match.""" 247 248 return os.path.join(base, name) 249 250 def on_reset(self) -> None: 251 """On reset.""" 252 253 def get_skipped(self) -> int: 254 """Get number of skipped files.""" 255 256 return self._skipped 257 258 def kill(self) -> None: 259 """Abort process.""" 260 261 self._abort = True 262 263 def is_aborted(self) -> bool: 264 """Check if process has been aborted.""" 265 266 return self._abort 267 268 def reset(self) -> None: 269 """Revive class from a killed state.""" 270 271 self._abort = False 272 273 def _walk(self) -> Iterator[Any]: 274 """Start search for valid files.""" 275 276 self._base_len = len(self._root_dir) 277 278 for base, dirs, files in os.walk(self._root_dir, followlinks=self.follow_links): 279 if self.is_aborted(): 280 break 281 282 # Remove child folders based on exclude rules 283 for name in dirs[:]: 284 try: 285 if not self._valid_folder(base, name): 286 dirs.remove(name) 287 except Exception: 288 dirs.remove(name) 289 value = self.on_error(base, name) 290 if value is not None: # pragma: no cover 291 yield value 292 293 if self.is_aborted(): # pragma: no cover 294 break 295 296 # Search files if they were found 297 if files: 298 # Only search files that are in the include rules 299 for name in files: 300 try: 301 valid = self._valid_file(base, name) 302 except Exception: 303 valid = False 304 value = self.on_error(base, name) 305 if value is not None: 306 yield value 307 308 if valid: 309 yield self.on_match(base, name) 310 else: 311 self._skipped += 1 312 value = self.on_skip(base, name) 313 if value is not None: 314 yield value 315 316 if self.is_aborted(): 317 break 318 319 def match(self) -> List[Any]: 320 """Run the directory walker.""" 321 322 return list(self.imatch()) 323 324 def imatch(self) -> Iterator[Any]: 325 """Run the directory walker as iterator.""" 326 327 self.on_reset() 328 self._skipped = 0 329 for f in self._walk(): 330 yield f 331