1import asyncio 2import functools 3import logging 4import os 5import signal 6from concurrent.futures import ThreadPoolExecutor 7from functools import partial 8from multiprocessing import Process 9from pathlib import Path 10from time import time 11from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Generator, Optional, Set, Tuple, Type, Union, cast 12 13from .watcher import DefaultWatcher, PythonWatcher 14 15__all__ = 'watch', 'awatch', 'run_process', 'arun_process' 16logger = logging.getLogger('watchgod.main') 17 18if TYPE_CHECKING: 19 from .watcher import AllWatcher, FileChange 20 21 FileChanges = Set[FileChange] 22 AnyCallable = Callable[..., Any] 23 24 25def unix_ms() -> int: 26 return int(round(time() * 1000)) 27 28 29def watch(path: Union[Path, str], **kwargs: Any) -> Generator['FileChanges', None, None]: 30 """ 31 Watch a directory and yield a set of changes whenever files change in that directory or its subdirectories. 32 """ 33 loop = asyncio.new_event_loop() 34 try: 35 _awatch = awatch(path, loop=loop, **kwargs) 36 while True: 37 try: 38 yield loop.run_until_complete(_awatch.__anext__()) 39 except StopAsyncIteration: 40 break 41 except KeyboardInterrupt: 42 logger.debug('KeyboardInterrupt, exiting') 43 finally: 44 loop.close() 45 46 47class awatch: 48 """ 49 asynchronous equivalent of watch using a threaded executor. 50 51 3.5 doesn't support yield in coroutines so we need all this fluff. Yawwwwn. 52 """ 53 54 __slots__ = ( 55 '_loop', 56 '_path', 57 '_watcher_cls', 58 '_watcher_kwargs', 59 '_debounce', 60 '_min_sleep', 61 '_stop_event', 62 '_normal_sleep', 63 '_w', 64 'lock', 65 '_executor', 66 ) 67 68 def __init__( 69 self, 70 path: Union[Path, str], 71 *, 72 watcher_cls: Type['AllWatcher'] = DefaultWatcher, 73 watcher_kwargs: Optional[Dict[str, Any]] = None, 74 debounce: int = 1600, 75 normal_sleep: int = 400, 76 min_sleep: int = 50, 77 stop_event: Optional[asyncio.Event] = None, 78 loop: Optional[asyncio.AbstractEventLoop] = None, 79 ) -> None: 80 self._loop = loop or asyncio.get_event_loop() 81 self._executor = ThreadPoolExecutor(max_workers=4) 82 self._path = path 83 self._watcher_cls = watcher_cls 84 self._watcher_kwargs = watcher_kwargs or dict() 85 self._debounce = debounce 86 self._normal_sleep = normal_sleep 87 self._min_sleep = min_sleep 88 self._stop_event = stop_event 89 self._w: Optional['AllWatcher'] = None 90 asyncio.set_event_loop(self._loop) 91 self.lock = asyncio.Lock() 92 93 def __aiter__(self) -> 'awatch': 94 return self 95 96 async def __anext__(self) -> 'FileChanges': 97 if self._w: 98 watcher = self._w 99 else: 100 watcher = self._w = await self.run_in_executor( 101 functools.partial(self._watcher_cls, self._path, **self._watcher_kwargs) 102 ) 103 check_time = 0 104 changes: 'FileChanges' = set() 105 last_change = 0 106 while True: 107 if self._stop_event and self._stop_event.is_set(): 108 raise StopAsyncIteration() 109 async with self.lock: 110 if not changes: 111 last_change = unix_ms() 112 113 if check_time: 114 if changes: 115 sleep_time = self._min_sleep 116 else: 117 sleep_time = max(self._normal_sleep - check_time, self._min_sleep) 118 await asyncio.sleep(sleep_time / 1000) 119 120 s = unix_ms() 121 new_changes = await self.run_in_executor(watcher.check) 122 changes.update(new_changes) 123 now = unix_ms() 124 check_time = now - s 125 debounced = now - last_change 126 if logger.isEnabledFor(logging.DEBUG) and changes: 127 logger.debug( 128 '%s time=%0.0fms debounced=%0.0fms files=%d changes=%d (%d)', 129 self._path, 130 check_time, 131 debounced, 132 len(watcher.files), 133 len(changes), 134 len(new_changes), 135 ) 136 137 if changes and (not new_changes or debounced > self._debounce): 138 logger.debug('%s changes released debounced=%0.0fms', self._path, debounced) 139 return changes 140 141 async def run_in_executor(self, func: 'AnyCallable', *args: Any) -> Any: 142 return await self._loop.run_in_executor(self._executor, func, *args) 143 144 def __del__(self) -> None: 145 self._executor.shutdown() 146 147 148def _start_process(target: 'AnyCallable', args: Tuple[Any, ...], kwargs: Optional[Dict[str, Any]]) -> Process: 149 process = Process(target=target, args=args, kwargs=kwargs or {}) 150 process.start() 151 return process 152 153 154def _stop_process(process: Process) -> None: 155 if process.is_alive(): 156 logger.debug('stopping process...') 157 pid = cast(int, process.pid) 158 os.kill(pid, signal.SIGINT) 159 process.join(5) 160 if process.exitcode is None: 161 logger.warning('process has not terminated, sending SIGKILL') 162 os.kill(pid, signal.SIGKILL) 163 process.join(1) 164 else: 165 logger.debug('process stopped') 166 else: 167 logger.warning('process already dead, exit code: %d', process.exitcode) 168 169 170def run_process( 171 path: Union[Path, str], 172 target: 'AnyCallable', 173 *, 174 args: Tuple[Any, ...] = (), 175 kwargs: Optional[Dict[str, Any]] = None, 176 callback: Optional[Callable[[Set['FileChange']], None]] = None, 177 watcher_cls: Type['AllWatcher'] = PythonWatcher, 178 watcher_kwargs: Optional[Dict[str, Any]] = None, 179 debounce: int = 400, 180 min_sleep: int = 100, 181) -> int: 182 """ 183 Run a function in a subprocess using multiprocessing.Process, restart it whenever files change in path. 184 """ 185 186 process = _start_process(target=target, args=args, kwargs=kwargs) 187 reloads = 0 188 189 try: 190 for changes in watch( 191 path, watcher_cls=watcher_cls, debounce=debounce, min_sleep=min_sleep, watcher_kwargs=watcher_kwargs 192 ): 193 callback and callback(changes) 194 _stop_process(process) 195 process = _start_process(target=target, args=args, kwargs=kwargs) 196 reloads += 1 197 finally: 198 _stop_process(process) 199 return reloads 200 201 202async def arun_process( 203 path: Union[Path, str], 204 target: 'AnyCallable', 205 *, 206 args: Tuple[Any, ...] = (), 207 kwargs: Optional[Dict[str, Any]] = None, 208 callback: Optional[Callable[['FileChanges'], Awaitable[None]]] = None, 209 watcher_cls: Type['AllWatcher'] = PythonWatcher, 210 debounce: int = 400, 211 min_sleep: int = 100, 212) -> int: 213 """ 214 Run a function in a subprocess using multiprocessing.Process, restart it whenever files change in path. 215 """ 216 watcher = awatch(path, watcher_cls=watcher_cls, debounce=debounce, min_sleep=min_sleep) 217 start_process = partial(_start_process, target=target, args=args, kwargs=kwargs) 218 process = await watcher.run_in_executor(start_process) 219 reloads = 0 220 221 async for changes in watcher: 222 callback and await callback(changes) 223 await watcher.run_in_executor(_stop_process, process) 224 process = await watcher.run_in_executor(start_process) 225 reloads += 1 226 await watcher.run_in_executor(_stop_process, process) 227 return reloads 228