1import asyncio
2import functools
3import logging
4import os
5import signal
6from concurrent.futures import ThreadPoolExecutor
7from functools import partial
8from multiprocessing import Process
9from pathlib import Path
10from time import time
11from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Generator, Optional, Set, Tuple, Type, Union, cast
12
13from .watcher import DefaultWatcher, PythonWatcher
14
15__all__ = 'watch', 'awatch', 'run_process', 'arun_process'
16logger = logging.getLogger('watchgod.main')
17
18if TYPE_CHECKING:
19    from .watcher import AllWatcher, FileChange
20
21    FileChanges = Set[FileChange]
22    AnyCallable = Callable[..., Any]
23
24
25def unix_ms() -> int:
26    return int(round(time() * 1000))
27
28
29def watch(path: Union[Path, str], **kwargs: Any) -> Generator['FileChanges', None, None]:
30    """
31    Watch a directory and yield a set of changes whenever files change in that directory or its subdirectories.
32    """
33    loop = asyncio.new_event_loop()
34    try:
35        _awatch = awatch(path, loop=loop, **kwargs)
36        while True:
37            try:
38                yield loop.run_until_complete(_awatch.__anext__())
39            except StopAsyncIteration:
40                break
41    except KeyboardInterrupt:
42        logger.debug('KeyboardInterrupt, exiting')
43    finally:
44        loop.close()
45
46
47class awatch:
48    """
49    asynchronous equivalent of watch using a threaded executor.
50
51    3.5 doesn't support yield in coroutines so we need all this fluff. Yawwwwn.
52    """
53
54    __slots__ = (
55        '_loop',
56        '_path',
57        '_watcher_cls',
58        '_watcher_kwargs',
59        '_debounce',
60        '_min_sleep',
61        '_stop_event',
62        '_normal_sleep',
63        '_w',
64        'lock',
65        '_executor',
66    )
67
68    def __init__(
69        self,
70        path: Union[Path, str],
71        *,
72        watcher_cls: Type['AllWatcher'] = DefaultWatcher,
73        watcher_kwargs: Optional[Dict[str, Any]] = None,
74        debounce: int = 1600,
75        normal_sleep: int = 400,
76        min_sleep: int = 50,
77        stop_event: Optional[asyncio.Event] = None,
78        loop: Optional[asyncio.AbstractEventLoop] = None,
79    ) -> None:
80        self._loop = loop or asyncio.get_event_loop()
81        self._executor = ThreadPoolExecutor(max_workers=4)
82        self._path = path
83        self._watcher_cls = watcher_cls
84        self._watcher_kwargs = watcher_kwargs or dict()
85        self._debounce = debounce
86        self._normal_sleep = normal_sleep
87        self._min_sleep = min_sleep
88        self._stop_event = stop_event
89        self._w: Optional['AllWatcher'] = None
90        asyncio.set_event_loop(self._loop)
91        self.lock = asyncio.Lock()
92
93    def __aiter__(self) -> 'awatch':
94        return self
95
96    async def __anext__(self) -> 'FileChanges':
97        if self._w:
98            watcher = self._w
99        else:
100            watcher = self._w = await self.run_in_executor(
101                functools.partial(self._watcher_cls, self._path, **self._watcher_kwargs)
102            )
103        check_time = 0
104        changes: 'FileChanges' = set()
105        last_change = 0
106        while True:
107            if self._stop_event and self._stop_event.is_set():
108                raise StopAsyncIteration()
109            async with self.lock:
110                if not changes:
111                    last_change = unix_ms()
112
113                if check_time:
114                    if changes:
115                        sleep_time = self._min_sleep
116                    else:
117                        sleep_time = max(self._normal_sleep - check_time, self._min_sleep)
118                    await asyncio.sleep(sleep_time / 1000)
119
120                s = unix_ms()
121                new_changes = await self.run_in_executor(watcher.check)
122                changes.update(new_changes)
123                now = unix_ms()
124                check_time = now - s
125                debounced = now - last_change
126                if logger.isEnabledFor(logging.DEBUG) and changes:
127                    logger.debug(
128                        '%s time=%0.0fms debounced=%0.0fms files=%d changes=%d (%d)',
129                        self._path,
130                        check_time,
131                        debounced,
132                        len(watcher.files),
133                        len(changes),
134                        len(new_changes),
135                    )
136
137                if changes and (not new_changes or debounced > self._debounce):
138                    logger.debug('%s changes released debounced=%0.0fms', self._path, debounced)
139                    return changes
140
141    async def run_in_executor(self, func: 'AnyCallable', *args: Any) -> Any:
142        return await self._loop.run_in_executor(self._executor, func, *args)
143
144    def __del__(self) -> None:
145        self._executor.shutdown()
146
147
148def _start_process(target: 'AnyCallable', args: Tuple[Any, ...], kwargs: Optional[Dict[str, Any]]) -> Process:
149    process = Process(target=target, args=args, kwargs=kwargs or {})
150    process.start()
151    return process
152
153
154def _stop_process(process: Process) -> None:
155    if process.is_alive():
156        logger.debug('stopping process...')
157        pid = cast(int, process.pid)
158        os.kill(pid, signal.SIGINT)
159        process.join(5)
160        if process.exitcode is None:
161            logger.warning('process has not terminated, sending SIGKILL')
162            os.kill(pid, signal.SIGKILL)
163            process.join(1)
164        else:
165            logger.debug('process stopped')
166    else:
167        logger.warning('process already dead, exit code: %d', process.exitcode)
168
169
170def run_process(
171    path: Union[Path, str],
172    target: 'AnyCallable',
173    *,
174    args: Tuple[Any, ...] = (),
175    kwargs: Optional[Dict[str, Any]] = None,
176    callback: Optional[Callable[[Set['FileChange']], None]] = None,
177    watcher_cls: Type['AllWatcher'] = PythonWatcher,
178    watcher_kwargs: Optional[Dict[str, Any]] = None,
179    debounce: int = 400,
180    min_sleep: int = 100,
181) -> int:
182    """
183    Run a function in a subprocess using multiprocessing.Process, restart it whenever files change in path.
184    """
185
186    process = _start_process(target=target, args=args, kwargs=kwargs)
187    reloads = 0
188
189    try:
190        for changes in watch(
191            path, watcher_cls=watcher_cls, debounce=debounce, min_sleep=min_sleep, watcher_kwargs=watcher_kwargs
192        ):
193            callback and callback(changes)
194            _stop_process(process)
195            process = _start_process(target=target, args=args, kwargs=kwargs)
196            reloads += 1
197    finally:
198        _stop_process(process)
199    return reloads
200
201
202async def arun_process(
203    path: Union[Path, str],
204    target: 'AnyCallable',
205    *,
206    args: Tuple[Any, ...] = (),
207    kwargs: Optional[Dict[str, Any]] = None,
208    callback: Optional[Callable[['FileChanges'], Awaitable[None]]] = None,
209    watcher_cls: Type['AllWatcher'] = PythonWatcher,
210    debounce: int = 400,
211    min_sleep: int = 100,
212) -> int:
213    """
214    Run a function in a subprocess using multiprocessing.Process, restart it whenever files change in path.
215    """
216    watcher = awatch(path, watcher_cls=watcher_cls, debounce=debounce, min_sleep=min_sleep)
217    start_process = partial(_start_process, target=target, args=args, kwargs=kwargs)
218    process = await watcher.run_in_executor(start_process)
219    reloads = 0
220
221    async for changes in watcher:
222        callback and await callback(changes)
223        await watcher.run_in_executor(_stop_process, process)
224        process = await watcher.run_in_executor(start_process)
225        reloads += 1
226    await watcher.run_in_executor(_stop_process, process)
227    return reloads
228