1import argparse
2import contextlib
3import logging
4import os
5import sys
6from importlib import import_module
7from multiprocessing import set_start_method
8from pathlib import Path
9from typing import Any, Generator, List, Optional, Sized
10
11from .main import run_process
12
13logger = logging.getLogger('watchgod.cli')
14
15
16def import_string(dotted_path: str) -> Any:
17    """
18    Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
19    last name in the path. Raise ImportError if the import fails.
20    """
21    try:
22        module_path, class_name = dotted_path.strip(' ').rsplit('.', 1)
23    except ValueError as e:
24        raise ImportError('"{}" doesn\'t look like a module path'.format(dotted_path)) from e
25
26    module = import_module(module_path)
27    try:
28        return getattr(module, class_name)
29    except AttributeError as e:
30        raise ImportError('Module "{}" does not define a "{}" attribute'.format(module_path, class_name)) from e
31
32
33@contextlib.contextmanager
34def set_tty(tty_path: Optional[str]) -> Generator[None, None, None]:
35    if tty_path:
36        try:
37            with open(tty_path) as tty:  # pragma: no cover
38                sys.stdin = tty
39                yield
40        except OSError:
41            # eg. "No such device or address: '/dev/tty'", see https://github.com/samuelcolvin/watchgod/issues/40
42            yield
43    else:
44        # currently on windows tty_path is None and there's nothing we can do here
45        yield
46
47
48def run_function(function: str, tty_path: Optional[str]) -> None:
49    with set_tty(tty_path):
50        func = import_string(function)
51        func()
52
53
54def callback(changes: Sized) -> None:
55    logger.info('%d files changed, reloading', len(changes))
56
57
58def sys_argv(function: str) -> List[str]:
59    """
60    Remove watchgod-related arguments from sys.argv and prepend with func's script path.
61    """
62    bases_ = function.split('.')[:-1]  # remove function and leave only file path
63    base = os.path.join(*bases_) + '.py'
64    base = os.path.abspath(base)
65    for i, arg in enumerate(sys.argv):
66        if arg in {'-a', '--args'}:
67            return [base] + sys.argv[i + 1 :]
68    return [base]  # strip all args if no additional args were provided
69
70
71def cli(*args_: str) -> None:
72    args = args_ or sys.argv[1:]
73    parser = argparse.ArgumentParser(
74        prog='watchgod', description='Watch a directory and execute a python function on changes.'
75    )
76    parser.add_argument('function', help='Path to python function to execute.')
77    parser.add_argument('path', nargs='?', default='.', help='Filesystem path to watch, defaults to current directory.')
78    parser.add_argument('--verbosity', nargs='?', type=int, default=1, help='0, 1 (default) or 2')
79    parser.add_argument(
80        '--ignore-paths',
81        nargs='*',
82        type=str,
83        default=[],
84        help='Specify paths to files or directories to ignore their updates',
85    )
86    parser.add_argument(
87        '--args',
88        '-a',
89        nargs=argparse.REMAINDER,
90        help='Arguments for argparser inside executed function. Ex.: module.func path --args --inner arg -v',
91    )
92    arg_namespace = parser.parse_args(args)
93
94    log_level = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}[arg_namespace.verbosity]
95    hdlr = logging.StreamHandler()
96    hdlr.setLevel(log_level)
97    hdlr.setFormatter(logging.Formatter(fmt='[%(asctime)s] %(message)s', datefmt='%H:%M:%S'))
98    wg_logger = logging.getLogger('watchgod')
99    wg_logger.addHandler(hdlr)
100    wg_logger.setLevel(log_level)
101
102    sys.path.append(os.getcwd())
103    try:
104        import_string(arg_namespace.function)
105    except ImportError as e:
106        print('ImportError: {}'.format(e), file=sys.stderr)
107        sys.exit(1)
108        return
109
110    path = Path(arg_namespace.path)
111    if not path.exists():
112        print('path "{}" does not exist'.format(path), file=sys.stderr)
113        sys.exit(1)
114        return
115
116    path = path.resolve()
117
118    try:
119        tty_path: Optional[str] = os.ttyname(sys.stdin.fileno())
120    except OSError:
121        # fileno() always fails with pytest
122        tty_path = '/dev/tty'
123    except AttributeError:
124        # on windows. No idea of a better solution
125        tty_path = None
126    logger.info('watching "%s" and reloading "%s" on changes...', path, arg_namespace.function)
127    set_start_method('spawn')
128    sys.argv = sys_argv(arg_namespace.function)
129
130    ignored_paths = {str(Path(p).resolve()) for p in arg_namespace.ignore_paths}
131
132    run_process(
133        path,
134        run_function,
135        args=(arg_namespace.function, tty_path),
136        callback=callback,
137        watcher_kwargs={'ignored_paths': ignored_paths},
138    )
139