1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
4
5import os
6import re
7import shlex
8from typing import (
9    Any, Callable, Dict, Generator, Iterable, List, NamedTuple, Optional,
10    Sequence, Set, Tuple, TypeVar, Union
11)
12
13from ..rgb import Color, to_color as as_color
14from ..types import ConvertibleToNumbers, ParsedShortcut
15from ..typing import Protocol
16from ..utils import expandvars, log_error
17
18key_pat = re.compile(r'([a-zA-Z][a-zA-Z0-9_-]*)\s+(.+)$')
19ItemParser = Callable[[str, str, Dict[str, Any]], bool]
20T = TypeVar('T')
21
22
23class OptionsProtocol(Protocol):
24
25    def _asdict(self) -> Dict[str, Any]:
26        pass
27
28
29class BadLine(NamedTuple):
30    number: int
31    line: str
32    exception: Exception
33
34
35def positive_int(x: ConvertibleToNumbers) -> int:
36    return max(0, int(x))
37
38
39def positive_float(x: ConvertibleToNumbers) -> float:
40    return max(0, float(x))
41
42
43def to_color(x: str) -> Color:
44    ans = as_color(x, validate=True)
45    if ans is None:  # this is only for type-checking
46        ans = Color(0, 0, 0)
47    return ans
48
49
50def to_color_or_none(x: str) -> Optional[Color]:
51    return None if x.lower() == 'none' else to_color(x)
52
53
54def unit_float(x: ConvertibleToNumbers) -> float:
55    return max(0, min(float(x), 1))
56
57
58def to_bool(x: str) -> bool:
59    return x.lower() in ('y', 'yes', 'true')
60
61
62class ToCmdline:
63
64    def __init__(self) -> None:
65        self.override_env: Optional[Dict[str, str]] = None
66
67    def __enter__(self) -> 'ToCmdline':
68        return self
69
70    def __exit__(self, *a: Any) -> None:
71        self.override_env = None
72
73    def filter_env_vars(self, *a: str) -> 'ToCmdline':
74        remove = frozenset(a)
75        self.override_env = {k: v for k, v in os.environ.items() if k not in remove}
76        return self
77
78    def __call__(self, x: str) -> List[str]:
79        return list(
80            map(
81                lambda y: expandvars(
82                    os.path.expanduser(y),
83                    os.environ if self.override_env is None else self.override_env,
84                    fallback_to_os_env=False
85                ),
86                shlex.split(x)
87            )
88        )
89
90
91to_cmdline_implementation = ToCmdline()
92
93
94def to_cmdline(x: str) -> List[str]:
95    return to_cmdline_implementation(x)
96
97
98def python_string(text: str) -> str:
99    from ast import literal_eval
100    ans: str = literal_eval("'''" + text.replace("'''", "'\\''") + "'''")
101    return ans
102
103
104class Choice:
105
106    def __init__(self, choices: Sequence[str]):
107        self.defval = choices[0]
108        self.all_choices = frozenset(choices)
109
110    def __call__(self, x: str) -> str:
111        x = x.lower()
112        if x not in self.all_choices:
113            raise ValueError(f'The value {x} is not a known choice')
114        return x
115
116
117def choices(*choices: str) -> Choice:
118    return Choice(choices)
119
120
121def parse_line(
122    line: str,
123    parse_conf_item: ItemParser,
124    ans: Dict[str, Any],
125    base_path_for_includes: str,
126    accumulate_bad_lines: Optional[List[BadLine]] = None
127) -> None:
128    line = line.strip()
129    if not line or line.startswith('#'):
130        return
131    m = key_pat.match(line)
132    if m is None:
133        log_error('Ignoring invalid config line: {}'.format(line))
134        return
135    key, val = m.groups()
136    if key == 'include':
137        val = os.path.expandvars(os.path.expanduser(val.strip()))
138        if not os.path.isabs(val):
139            val = os.path.join(base_path_for_includes, val)
140        try:
141            with open(val, encoding='utf-8', errors='replace') as include:
142                _parse(include, parse_conf_item, ans, accumulate_bad_lines)
143        except FileNotFoundError:
144            log_error(
145                'Could not find included config file: {}, ignoring'.
146                format(val)
147            )
148        except OSError:
149            log_error(
150                'Could not read from included config file: {}, ignoring'.
151                format(val)
152            )
153        return
154    if not parse_conf_item(key, val, ans):
155        log_error('Ignoring unknown config key: {}'.format(key))
156
157
158def _parse(
159    lines: Iterable[str],
160    parse_conf_item: ItemParser,
161    ans: Dict[str, Any],
162    accumulate_bad_lines: Optional[List[BadLine]] = None
163) -> None:
164    name = getattr(lines, 'name', None)
165    if name:
166        base_path_for_includes = os.path.dirname(os.path.abspath(name))
167    else:
168        from ..constants import config_dir
169        base_path_for_includes = config_dir
170    for i, line in enumerate(lines):
171        try:
172            parse_line(
173                line, parse_conf_item, ans, base_path_for_includes, accumulate_bad_lines
174            )
175        except Exception as e:
176            if accumulate_bad_lines is None:
177                raise
178            accumulate_bad_lines.append(BadLine(i + 1, line.rstrip(), e))
179
180
181def parse_config_base(
182    lines: Iterable[str],
183    parse_conf_item: ItemParser,
184    ans: Dict[str, Any],
185    accumulate_bad_lines: Optional[List[BadLine]] = None
186) -> None:
187    _parse(
188        lines, parse_conf_item, ans, accumulate_bad_lines
189    )
190
191
192def merge_dicts(defaults: Dict, newvals: Dict) -> Dict:
193    ans = defaults.copy()
194    ans.update(newvals)
195    return ans
196
197
198def resolve_config(SYSTEM_CONF: str, defconf: str, config_files_on_cmd_line: Sequence[str]) -> Generator[str, None, None]:
199    if config_files_on_cmd_line:
200        if 'NONE' not in config_files_on_cmd_line:
201            yield SYSTEM_CONF
202            for cf in config_files_on_cmd_line:
203                yield cf
204    else:
205        yield SYSTEM_CONF
206        yield defconf
207
208
209def load_config(
210    defaults: OptionsProtocol,
211    parse_config: Callable[[Iterable[str]], Dict[str, Any]],
212    merge_configs: Callable[[Dict, Dict], Dict],
213    *paths: str,
214    overrides: Optional[Iterable[str]] = None
215) -> Tuple[Dict[str, Any], Tuple[str, ...]]:
216    ans = defaults._asdict()
217    found_paths = []
218    for path in paths:
219        if not path:
220            continue
221        try:
222            with open(path, encoding='utf-8', errors='replace') as f:
223                vals = parse_config(f)
224        except (FileNotFoundError, PermissionError):
225            continue
226        found_paths.append(path)
227        ans = merge_configs(ans, vals)
228    if overrides is not None:
229        vals = parse_config(overrides)
230        ans = merge_configs(ans, vals)
231    return ans, tuple(found_paths)
232
233
234def key_func() -> Tuple[Callable[..., Callable], Dict[str, Callable]]:
235    ans: Dict[str, Callable] = {}
236
237    def func_with_args(*names: str) -> Callable:
238
239        def w(f: Callable) -> Callable:
240            for name in names:
241                if ans.setdefault(name, f) is not f:
242                    raise ValueError(
243                        'the args_func {} is being redefined'.format(name)
244                    )
245            return f
246
247        return w
248
249    return func_with_args, ans
250
251
252class KeyAction(NamedTuple):
253    func: str
254    args: Tuple[Union[str, float, bool, int, None], ...] = ()
255
256    def __repr__(self) -> str:
257        if self.args:
258            return f'KeyAction({self.func!r}, {self.args!r})'
259        return f'KeyAction({self.func!r})'
260
261
262def parse_kittens_func_args(action: str, args_funcs: Dict[str, Callable]) -> KeyAction:
263    parts = action.strip().split(' ', 1)
264    func = parts[0]
265    if len(parts) == 1:
266        return KeyAction(func, ())
267    rest = parts[1]
268
269    try:
270        parser = args_funcs[func]
271    except KeyError as e:
272        raise KeyError(
273            'Unknown action: {}. Check if map action: '
274            '{} is valid'.format(func, action)
275        ) from e
276
277    try:
278        func, args = parser(func, rest)
279    except Exception:
280        raise ValueError('Unknown key action: {}'.format(action))
281
282    if not isinstance(args, (list, tuple)):
283        args = (args, )
284
285    return KeyAction(func, tuple(args))
286
287
288KittensKeyDefinition = Tuple[ParsedShortcut, KeyAction]
289KittensKeyMap = Dict[ParsedShortcut, KeyAction]
290
291
292def parse_kittens_key(
293    val: str, funcs_with_args: Dict[str, Callable]
294) -> Optional[KittensKeyDefinition]:
295    from ..key_encoding import parse_shortcut
296    sc, action = val.partition(' ')[::2]
297    if not sc or not action:
298        return None
299    ans = parse_kittens_func_args(action, funcs_with_args)
300    return parse_shortcut(sc), ans
301
302
303def uniq(vals: Iterable[T]) -> List[T]:
304    seen: Set[T] = set()
305    seen_add = seen.add
306    return [x for x in vals if x not in seen and not seen_add(x)]
307
308
309def save_type_stub(text: str, fpath: str) -> None:
310    fpath += 'i'
311    preamble = '# Update this file by running: ./test.py mypy\n\n'
312    try:
313        with open(fpath) as fs:
314            existing = fs.read()
315    except FileNotFoundError:
316        existing = ''
317    current = preamble + text
318    if existing != current:
319        with open(fpath, 'w') as f:
320            f.write(current)
321