1"""
2Wild Card Match.
3
4A module for performing wild card matches.
5
6Licensed under MIT
7Copyright (c) 2018 - 2020 Isaac Muse <isaacmuse@gmail.com>
8
9Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
10documentation files (the "Software"), to deal in the Software without restriction, including without limitation
11the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
12and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
13
14The above copyright notice and this permission notice shall be included in all copies or substantial portions
15of the Software.
16
17THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
18TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
20CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21IN THE SOFTWARE.
22"""
23import os
24import re
25from . import _wcparse
26from . import _wcmatch
27from . import util
28from typing import Optional, Any, Iterator, List, Generic, AnyStr
29
30
31__all__ = (
32    "CASE", "IGNORECASE", "RAWCHARS", "FILEPATHNAME", "DIRPATHNAME", "PATHNAME",
33    "EXTMATCH", "GLOBSTAR", "BRACE", "MINUSNEGATE", "SYMLINKS", "HIDDEN", "RECURSIVE",
34    "MATCHBASE",
35    "C", "I", "R", "P", "E", "G", "M", "DP", "FP", "SL", "HD", "RV", "X", "B",
36    "WcMatch"
37)
38
39C = CASE = _wcparse.CASE
40I = IGNORECASE = _wcparse.IGNORECASE
41R = RAWCHARS = _wcparse.RAWCHARS
42E = EXTMATCH = _wcparse.EXTMATCH
43G = GLOBSTAR = _wcparse.GLOBSTAR
44B = BRACE = _wcparse.BRACE
45M = MINUSNEGATE = _wcparse.MINUSNEGATE
46X = MATCHBASE = _wcparse.MATCHBASE
47
48# Control `PATHNAME` individually for folder exclude and files
49DP = DIRPATHNAME = 0x1000000
50FP = FILEPATHNAME = 0x2000000
51SL = SYMLINKS = 0x4000000
52HD = HIDDEN = 0x8000000
53RV = RECURSIVE = 0x10000000
54
55# Internal flags
56_ANCHOR = _wcparse._ANCHOR
57_NEGATE = _wcparse.NEGATE
58_DOTMATCH = _wcparse.DOTMATCH
59_NEGATEALL = _wcparse.NEGATEALL
60_SPLIT = _wcparse.SPLIT
61_FORCEWIN = _wcparse.FORCEWIN
62_PATHNAME = _wcparse.PATHNAME
63
64# Control `PATHNAME` for file and folder
65P = PATHNAME = DIRPATHNAME | FILEPATHNAME
66
67FLAG_MASK = (
68    CASE |
69    IGNORECASE |
70    RAWCHARS |
71    EXTMATCH |
72    GLOBSTAR |
73    BRACE |
74    MINUSNEGATE |
75    DIRPATHNAME |
76    FILEPATHNAME |
77    SYMLINKS |
78    HIDDEN |
79    RECURSIVE |
80    MATCHBASE
81)
82
83
84class WcMatch(Generic[AnyStr]):
85    """Finds files by wildcard."""
86
87    def __init__(
88        self,
89        root_dir: AnyStr,
90        file_pattern: Optional[AnyStr] = None,
91        exclude_pattern: Optional[AnyStr] = None,
92        flags: int = 0,
93        limit: int = _wcparse.PATHNAME,
94        **kwargs: Any
95    ):
96        """Initialize the directory walker object."""
97
98        self.is_bytes = isinstance(root_dir, bytes)
99        self._directory = self._norm_slash(root_dir)  # type: AnyStr
100        self._abort = False
101        self._skipped = 0
102        self._parse_flags(flags)
103        self._sep = os.fsencode(os.sep) if isinstance(root_dir, bytes) else os.sep  # type: AnyStr
104        self._root_dir = self._add_sep(self._get_cwd(), True)  # type: AnyStr
105        self.limit = limit
106        empty = os.fsencode('') if isinstance(root_dir, bytes) else ''
107        self.pattern_file = file_pattern if file_pattern is not None else empty  # type: AnyStr
108        self.pattern_folder_exclude = exclude_pattern if exclude_pattern is not None else empty  # type: AnyStr
109        self.file_check = None  # type: Optional[_wcmatch.WcRegexp[AnyStr]]
110        self.folder_exclude_check = None  # type: Optional[_wcmatch.WcRegexp[AnyStr]]
111        self.on_init(**kwargs)
112        self._compile(self.pattern_file, self.pattern_folder_exclude)
113
114    def _norm_slash(self, name: AnyStr) -> AnyStr:
115        """Normalize path slashes."""
116
117        if util.is_case_sensitive():
118            return name
119        elif isinstance(name, bytes):
120            return name.replace(b'/', b"\\")
121        else:
122            return name.replace('/', "\\")
123
124    def _add_sep(self, path: AnyStr, check: bool = False) -> AnyStr:
125        """Add separator."""
126
127        return (path + self._sep) if not check or not path.endswith(self._sep) else path
128
129    def _get_cwd(self) -> AnyStr:
130        """Get current working directory."""
131
132        if self._directory:
133            return self._directory
134        elif isinstance(self._directory, bytes):
135            return bytes(os.curdir, 'ASCII')
136        else:
137            return os.curdir
138
139    def _parse_flags(self, flags: int) -> None:
140        """Parse flags."""
141
142        self.flags = flags & FLAG_MASK
143        self.flags |= _NEGATE | _DOTMATCH | _NEGATEALL | _SPLIT
144        self.follow_links = bool(self.flags & SYMLINKS)
145        self.show_hidden = bool(self.flags & HIDDEN)
146        self.recursive = bool(self.flags & RECURSIVE)
147        self.dir_pathname = bool(self.flags & DIRPATHNAME)
148        self.file_pathname = bool(self.flags & FILEPATHNAME)
149        self.matchbase = bool(self.flags & MATCHBASE)
150        if util.platform() == "windows":
151            self.flags |= _FORCEWIN
152        self.flags = self.flags & (_wcparse.FLAG_MASK ^ MATCHBASE)
153
154    def _compile_wildcard(self, pattern: AnyStr, pathname: bool = False) -> Optional[_wcmatch.WcRegexp[AnyStr]]:
155        """Compile or format the wildcard inclusion/exclusion pattern."""
156
157        flags = self.flags
158        if pathname:
159            flags |= _PATHNAME | _ANCHOR
160            if self.matchbase:
161                flags |= MATCHBASE
162
163        return _wcparse.compile([pattern], flags, self.limit) if pattern else None
164
165    def _compile(self, file_pattern: AnyStr, folder_exclude_pattern: AnyStr) -> None:
166        """Compile patterns."""
167
168        if self.file_check is None:
169            if not file_pattern:
170                self.file_check = _wcmatch.WcRegexp(
171                    (re.compile(br'^.*$' if isinstance(file_pattern, bytes) else r'^.*$', re.DOTALL),)
172                )
173            else:
174                self.file_check = self._compile_wildcard(file_pattern, self.file_pathname)
175
176        if self.folder_exclude_check is None:
177            if not folder_exclude_pattern:
178                self.folder_exclude_check = _wcmatch.WcRegexp(tuple())
179            else:
180                self.folder_exclude_check = self._compile_wildcard(folder_exclude_pattern, self.dir_pathname)
181
182    def _valid_file(self, base: AnyStr, name: AnyStr) -> bool:
183        """Return whether a file can be searched."""
184
185        valid = False
186        fullpath = os.path.join(base, name)
187        if self.file_check is not None and self.compare_file(fullpath[self._base_len:] if self.file_pathname else name):
188            valid = True
189        if valid and (not self.show_hidden and util.is_hidden(fullpath)):
190            valid = False
191        return self.on_validate_file(base, name) if valid else valid
192
193    def compare_file(self, filename: AnyStr) -> bool:
194        """Compare filename."""
195
196        return self.file_check.match(filename)  # type: ignore[union-attr]
197
198    def on_validate_file(self, base: AnyStr, name: AnyStr) -> bool:
199        """Validate file override."""
200
201        return True
202
203    def _valid_folder(self, base: AnyStr, name: AnyStr) -> bool:
204        """Return whether a folder can be searched."""
205
206        valid = True
207        fullpath = os.path.join(base, name)
208        if (
209            not self.recursive or
210            (
211                self.folder_exclude_check and
212                not self.compare_directory(fullpath[self._base_len:] if self.dir_pathname else name)
213            )
214        ):
215            valid = False
216        if valid and (not self.show_hidden and util.is_hidden(fullpath)):
217            valid = False
218        return self.on_validate_directory(base, name) if valid else valid
219
220    def compare_directory(self, directory: AnyStr) -> bool:
221        """Compare folder."""
222
223        return not self.folder_exclude_check.match(  # type: ignore[union-attr]
224            self._add_sep(directory) if self.dir_pathname else directory
225        )
226
227    def on_init(self, **kwargs: Any) -> None:
228        """Handle custom initialization."""
229
230    def on_validate_directory(self, base: AnyStr, name: AnyStr) -> bool:
231        """Validate folder override."""
232
233        return True
234
235    def on_skip(self, base: AnyStr, name: AnyStr) -> Any:
236        """On skip."""
237
238        return None
239
240    def on_error(self, base: AnyStr, name: AnyStr) -> Any:
241        """On error."""
242
243        return None
244
245    def on_match(self, base: AnyStr, name: AnyStr) -> Any:
246        """On match."""
247
248        return os.path.join(base, name)
249
250    def on_reset(self) -> None:
251        """On reset."""
252
253    def get_skipped(self) -> int:
254        """Get number of skipped files."""
255
256        return self._skipped
257
258    def kill(self) -> None:
259        """Abort process."""
260
261        self._abort = True
262
263    def is_aborted(self) -> bool:
264        """Check if process has been aborted."""
265
266        return self._abort
267
268    def reset(self) -> None:
269        """Revive class from a killed state."""
270
271        self._abort = False
272
273    def _walk(self) -> Iterator[Any]:
274        """Start search for valid files."""
275
276        self._base_len = len(self._root_dir)
277
278        for base, dirs, files in os.walk(self._root_dir, followlinks=self.follow_links):
279            if self.is_aborted():
280                break
281
282            # Remove child folders based on exclude rules
283            for name in dirs[:]:
284                try:
285                    if not self._valid_folder(base, name):
286                        dirs.remove(name)
287                except Exception:
288                    dirs.remove(name)
289                    value = self.on_error(base, name)
290                    if value is not None:  # pragma: no cover
291                        yield value
292
293                if self.is_aborted():  # pragma: no cover
294                    break
295
296            # Search files if they were found
297            if files:
298                # Only search files that are in the include rules
299                for name in files:
300                    try:
301                        valid = self._valid_file(base, name)
302                    except Exception:
303                        valid = False
304                        value = self.on_error(base, name)
305                        if value is not None:
306                            yield value
307
308                    if valid:
309                        yield self.on_match(base, name)
310                    else:
311                        self._skipped += 1
312                        value = self.on_skip(base, name)
313                        if value is not None:
314                            yield value
315
316                    if self.is_aborted():
317                        break
318
319    def match(self) -> List[Any]:
320        """Run the directory walker."""
321
322        return list(self.imatch())
323
324    def imatch(self) -> Iterator[Any]:
325        """Run the directory walker as iterator."""
326
327        self.on_reset()
328        self._skipped = 0
329        for f in self._walk():
330            yield f
331