1import logging 2import os 3import re 4from enum import IntEnum 5from pathlib import Path 6from typing import TYPE_CHECKING, Dict, Optional, Pattern, Set, Tuple, Union 7 8__all__ = 'Change', 'AllWatcher', 'DefaultDirWatcher', 'DefaultWatcher', 'PythonWatcher', 'RegExpWatcher' 9logger = logging.getLogger('watchgod.watcher') 10 11 12class Change(IntEnum): 13 added = 1 14 modified = 2 15 deleted = 3 16 17 18if TYPE_CHECKING: 19 FileChange = Tuple[Change, str] 20 DirEntry = os.DirEntry[str] 21 StatResult = os.stat_result 22 23 24class AllWatcher: 25 def __init__(self, root_path: Union[Path, str], ignored_paths: Optional[Set[str]] = None) -> None: 26 self.files: Dict[str, float] = {} 27 self.root_path = str(root_path) 28 self.ignored_paths = ignored_paths 29 self.check() 30 31 def should_watch_dir(self, entry: 'DirEntry') -> bool: 32 return True 33 34 def should_watch_file(self, entry: 'DirEntry') -> bool: 35 return True 36 37 def _walk(self, path: str, changes: Set['FileChange'], new_files: Dict[str, float]) -> None: 38 if os.path.isfile(path): 39 self._watch_file(path, changes, new_files, os.stat(path)) 40 else: 41 self._walk_dir(path, changes, new_files) 42 43 def _watch_file( 44 self, path: str, changes: Set['FileChange'], new_files: Dict[str, float], stat: 'StatResult' 45 ) -> None: 46 mtime = stat.st_mtime 47 new_files[path] = mtime 48 old_mtime = self.files.get(path) 49 if not old_mtime: 50 changes.add((Change.added, path)) 51 elif old_mtime != mtime: 52 changes.add((Change.modified, path)) 53 54 def _walk_dir(self, dir_path: str, changes: Set['FileChange'], new_files: Dict[str, float]) -> None: 55 for entry in os.scandir(dir_path): 56 if self.ignored_paths is not None and os.path.join(dir_path, entry) in self.ignored_paths: 57 continue 58 59 if entry.is_dir(): 60 if self.should_watch_dir(entry): 61 self._walk_dir(entry.path, changes, new_files) 62 elif self.should_watch_file(entry): 63 self._watch_file(entry.path, changes, new_files, entry.stat()) 64 65 def check(self) -> Set['FileChange']: 66 changes: Set['FileChange'] = set() 67 new_files: Dict[str, float] = {} 68 try: 69 self._walk(self.root_path, changes, new_files) 70 except OSError as e: 71 # happens when a directory has been deleted between checks 72 logger.warning('error walking file system: %s %s', e.__class__.__name__, e) 73 74 # look for deleted 75 deleted = self.files.keys() - new_files.keys() 76 if deleted: 77 changes |= {(Change.deleted, entry) for entry in deleted} 78 79 self.files = new_files 80 return changes 81 82 83class DefaultDirWatcher(AllWatcher): 84 ignored_dirs = {'.git', '__pycache__', 'site-packages', '.idea', 'node_modules'} 85 86 def should_watch_dir(self, entry: 'DirEntry') -> bool: 87 return entry.name not in self.ignored_dirs 88 89 90class DefaultWatcher(DefaultDirWatcher): 91 ignored_file_regexes = r'\.py[cod]$', r'\.___jb_...___$', r'\.sw.$', '~$', r'^\.\#', r'^flycheck_' 92 93 def __init__(self, root_path: str) -> None: 94 self._ignored_file_regexes = tuple(re.compile(r) for r in self.ignored_file_regexes) 95 super().__init__(root_path) 96 97 def should_watch_file(self, entry: 'DirEntry') -> bool: 98 return not any(r.search(entry.name) for r in self._ignored_file_regexes) 99 100 101class PythonWatcher(DefaultDirWatcher): 102 def should_watch_file(self, entry: 'DirEntry') -> bool: 103 return entry.name.endswith(('.py', '.pyx', '.pyd')) 104 105 106class RegExpWatcher(AllWatcher): 107 def __init__(self, root_path: str, re_files: Optional[str] = None, re_dirs: Optional[str] = None): 108 self.re_files: Optional[Pattern[str]] = re.compile(re_files) if re_files is not None else re_files 109 self.re_dirs: Optional[Pattern[str]] = re.compile(re_dirs) if re_dirs is not None else re_dirs 110 super().__init__(root_path) 111 112 def should_watch_file(self, entry: 'DirEntry') -> bool: 113 if self.re_files is not None: 114 return bool(self.re_files.match(entry.path)) 115 else: 116 return super().should_watch_file(entry) 117 118 def should_watch_dir(self, entry: 'DirEntry') -> bool: 119 if self.re_dirs is not None: 120 return bool(self.re_dirs.match(entry.path)) 121 else: 122 return super().should_watch_dir(entry) 123