1import logging
2import os
3import re
4from enum import IntEnum
5from pathlib import Path
6from typing import TYPE_CHECKING, Dict, Optional, Pattern, Set, Tuple, Union
7
8__all__ = 'Change', 'AllWatcher', 'DefaultDirWatcher', 'DefaultWatcher', 'PythonWatcher', 'RegExpWatcher'
9logger = logging.getLogger('watchgod.watcher')
10
11
12class Change(IntEnum):
13    added = 1
14    modified = 2
15    deleted = 3
16
17
18if TYPE_CHECKING:
19    FileChange = Tuple[Change, str]
20    DirEntry = os.DirEntry[str]
21    StatResult = os.stat_result
22
23
24class AllWatcher:
25    def __init__(self, root_path: Union[Path, str], ignored_paths: Optional[Set[str]] = None) -> None:
26        self.files: Dict[str, float] = {}
27        self.root_path = str(root_path)
28        self.ignored_paths = ignored_paths
29        self.check()
30
31    def should_watch_dir(self, entry: 'DirEntry') -> bool:
32        return True
33
34    def should_watch_file(self, entry: 'DirEntry') -> bool:
35        return True
36
37    def _walk(self, path: str, changes: Set['FileChange'], new_files: Dict[str, float]) -> None:
38        if os.path.isfile(path):
39            self._watch_file(path, changes, new_files, os.stat(path))
40        else:
41            self._walk_dir(path, changes, new_files)
42
43    def _watch_file(
44        self, path: str, changes: Set['FileChange'], new_files: Dict[str, float], stat: 'StatResult'
45    ) -> None:
46        mtime = stat.st_mtime
47        new_files[path] = mtime
48        old_mtime = self.files.get(path)
49        if not old_mtime:
50            changes.add((Change.added, path))
51        elif old_mtime != mtime:
52            changes.add((Change.modified, path))
53
54    def _walk_dir(self, dir_path: str, changes: Set['FileChange'], new_files: Dict[str, float]) -> None:
55        for entry in os.scandir(dir_path):
56            if self.ignored_paths is not None and os.path.join(dir_path, entry) in self.ignored_paths:
57                continue
58
59            if entry.is_dir():
60                if self.should_watch_dir(entry):
61                    self._walk_dir(entry.path, changes, new_files)
62            elif self.should_watch_file(entry):
63                self._watch_file(entry.path, changes, new_files, entry.stat())
64
65    def check(self) -> Set['FileChange']:
66        changes: Set['FileChange'] = set()
67        new_files: Dict[str, float] = {}
68        try:
69            self._walk(self.root_path, changes, new_files)
70        except OSError as e:
71            # happens when a directory has been deleted between checks
72            logger.warning('error walking file system: %s %s', e.__class__.__name__, e)
73
74        # look for deleted
75        deleted = self.files.keys() - new_files.keys()
76        if deleted:
77            changes |= {(Change.deleted, entry) for entry in deleted}
78
79        self.files = new_files
80        return changes
81
82
83class DefaultDirWatcher(AllWatcher):
84    ignored_dirs = {'.git', '__pycache__', 'site-packages', '.idea', 'node_modules'}
85
86    def should_watch_dir(self, entry: 'DirEntry') -> bool:
87        return entry.name not in self.ignored_dirs
88
89
90class DefaultWatcher(DefaultDirWatcher):
91    ignored_file_regexes = r'\.py[cod]$', r'\.___jb_...___$', r'\.sw.$', '~$', r'^\.\#', r'^flycheck_'
92
93    def __init__(self, root_path: str) -> None:
94        self._ignored_file_regexes = tuple(re.compile(r) for r in self.ignored_file_regexes)
95        super().__init__(root_path)
96
97    def should_watch_file(self, entry: 'DirEntry') -> bool:
98        return not any(r.search(entry.name) for r in self._ignored_file_regexes)
99
100
101class PythonWatcher(DefaultDirWatcher):
102    def should_watch_file(self, entry: 'DirEntry') -> bool:
103        return entry.name.endswith(('.py', '.pyx', '.pyd'))
104
105
106class RegExpWatcher(AllWatcher):
107    def __init__(self, root_path: str, re_files: Optional[str] = None, re_dirs: Optional[str] = None):
108        self.re_files: Optional[Pattern[str]] = re.compile(re_files) if re_files is not None else re_files
109        self.re_dirs: Optional[Pattern[str]] = re.compile(re_dirs) if re_dirs is not None else re_dirs
110        super().__init__(root_path)
111
112    def should_watch_file(self, entry: 'DirEntry') -> bool:
113        if self.re_files is not None:
114            return bool(self.re_files.match(entry.path))
115        else:
116            return super().should_watch_file(entry)
117
118    def should_watch_dir(self, entry: 'DirEntry') -> bool:
119        if self.re_dirs is not None:
120            return bool(self.re_dirs.match(entry.path))
121        else:
122            return super().should_watch_dir(entry)
123