1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net> 4 5import os 6import re 7import shlex 8from typing import ( 9 Any, Callable, Dict, Generator, Iterable, List, NamedTuple, Optional, 10 Sequence, Set, Tuple, TypeVar, Union 11) 12 13from ..rgb import Color, to_color as as_color 14from ..types import ConvertibleToNumbers, ParsedShortcut 15from ..typing import Protocol 16from ..utils import expandvars, log_error 17 18key_pat = re.compile(r'([a-zA-Z][a-zA-Z0-9_-]*)\s+(.+)$') 19ItemParser = Callable[[str, str, Dict[str, Any]], bool] 20T = TypeVar('T') 21 22 23class OptionsProtocol(Protocol): 24 25 def _asdict(self) -> Dict[str, Any]: 26 pass 27 28 29class BadLine(NamedTuple): 30 number: int 31 line: str 32 exception: Exception 33 34 35def positive_int(x: ConvertibleToNumbers) -> int: 36 return max(0, int(x)) 37 38 39def positive_float(x: ConvertibleToNumbers) -> float: 40 return max(0, float(x)) 41 42 43def to_color(x: str) -> Color: 44 ans = as_color(x, validate=True) 45 if ans is None: # this is only for type-checking 46 ans = Color(0, 0, 0) 47 return ans 48 49 50def to_color_or_none(x: str) -> Optional[Color]: 51 return None if x.lower() == 'none' else to_color(x) 52 53 54def unit_float(x: ConvertibleToNumbers) -> float: 55 return max(0, min(float(x), 1)) 56 57 58def to_bool(x: str) -> bool: 59 return x.lower() in ('y', 'yes', 'true') 60 61 62class ToCmdline: 63 64 def __init__(self) -> None: 65 self.override_env: Optional[Dict[str, str]] = None 66 67 def __enter__(self) -> 'ToCmdline': 68 return self 69 70 def __exit__(self, *a: Any) -> None: 71 self.override_env = None 72 73 def filter_env_vars(self, *a: str) -> 'ToCmdline': 74 remove = frozenset(a) 75 self.override_env = {k: v for k, v in os.environ.items() if k not in remove} 76 return self 77 78 def __call__(self, x: str) -> List[str]: 79 return list( 80 map( 81 lambda y: expandvars( 82 os.path.expanduser(y), 83 os.environ if self.override_env is None else self.override_env, 84 fallback_to_os_env=False 85 ), 86 shlex.split(x) 87 ) 88 ) 89 90 91to_cmdline_implementation = ToCmdline() 92 93 94def to_cmdline(x: str) -> List[str]: 95 return to_cmdline_implementation(x) 96 97 98def python_string(text: str) -> str: 99 from ast import literal_eval 100 ans: str = literal_eval("'''" + text.replace("'''", "'\\''") + "'''") 101 return ans 102 103 104class Choice: 105 106 def __init__(self, choices: Sequence[str]): 107 self.defval = choices[0] 108 self.all_choices = frozenset(choices) 109 110 def __call__(self, x: str) -> str: 111 x = x.lower() 112 if x not in self.all_choices: 113 raise ValueError(f'The value {x} is not a known choice') 114 return x 115 116 117def choices(*choices: str) -> Choice: 118 return Choice(choices) 119 120 121def parse_line( 122 line: str, 123 parse_conf_item: ItemParser, 124 ans: Dict[str, Any], 125 base_path_for_includes: str, 126 accumulate_bad_lines: Optional[List[BadLine]] = None 127) -> None: 128 line = line.strip() 129 if not line or line.startswith('#'): 130 return 131 m = key_pat.match(line) 132 if m is None: 133 log_error('Ignoring invalid config line: {}'.format(line)) 134 return 135 key, val = m.groups() 136 if key == 'include': 137 val = os.path.expandvars(os.path.expanduser(val.strip())) 138 if not os.path.isabs(val): 139 val = os.path.join(base_path_for_includes, val) 140 try: 141 with open(val, encoding='utf-8', errors='replace') as include: 142 _parse(include, parse_conf_item, ans, accumulate_bad_lines) 143 except FileNotFoundError: 144 log_error( 145 'Could not find included config file: {}, ignoring'. 146 format(val) 147 ) 148 except OSError: 149 log_error( 150 'Could not read from included config file: {}, ignoring'. 151 format(val) 152 ) 153 return 154 if not parse_conf_item(key, val, ans): 155 log_error('Ignoring unknown config key: {}'.format(key)) 156 157 158def _parse( 159 lines: Iterable[str], 160 parse_conf_item: ItemParser, 161 ans: Dict[str, Any], 162 accumulate_bad_lines: Optional[List[BadLine]] = None 163) -> None: 164 name = getattr(lines, 'name', None) 165 if name: 166 base_path_for_includes = os.path.dirname(os.path.abspath(name)) 167 else: 168 from ..constants import config_dir 169 base_path_for_includes = config_dir 170 for i, line in enumerate(lines): 171 try: 172 parse_line( 173 line, parse_conf_item, ans, base_path_for_includes, accumulate_bad_lines 174 ) 175 except Exception as e: 176 if accumulate_bad_lines is None: 177 raise 178 accumulate_bad_lines.append(BadLine(i + 1, line.rstrip(), e)) 179 180 181def parse_config_base( 182 lines: Iterable[str], 183 parse_conf_item: ItemParser, 184 ans: Dict[str, Any], 185 accumulate_bad_lines: Optional[List[BadLine]] = None 186) -> None: 187 _parse( 188 lines, parse_conf_item, ans, accumulate_bad_lines 189 ) 190 191 192def merge_dicts(defaults: Dict, newvals: Dict) -> Dict: 193 ans = defaults.copy() 194 ans.update(newvals) 195 return ans 196 197 198def resolve_config(SYSTEM_CONF: str, defconf: str, config_files_on_cmd_line: Sequence[str]) -> Generator[str, None, None]: 199 if config_files_on_cmd_line: 200 if 'NONE' not in config_files_on_cmd_line: 201 yield SYSTEM_CONF 202 for cf in config_files_on_cmd_line: 203 yield cf 204 else: 205 yield SYSTEM_CONF 206 yield defconf 207 208 209def load_config( 210 defaults: OptionsProtocol, 211 parse_config: Callable[[Iterable[str]], Dict[str, Any]], 212 merge_configs: Callable[[Dict, Dict], Dict], 213 *paths: str, 214 overrides: Optional[Iterable[str]] = None 215) -> Tuple[Dict[str, Any], Tuple[str, ...]]: 216 ans = defaults._asdict() 217 found_paths = [] 218 for path in paths: 219 if not path: 220 continue 221 try: 222 with open(path, encoding='utf-8', errors='replace') as f: 223 vals = parse_config(f) 224 except (FileNotFoundError, PermissionError): 225 continue 226 found_paths.append(path) 227 ans = merge_configs(ans, vals) 228 if overrides is not None: 229 vals = parse_config(overrides) 230 ans = merge_configs(ans, vals) 231 return ans, tuple(found_paths) 232 233 234def key_func() -> Tuple[Callable[..., Callable], Dict[str, Callable]]: 235 ans: Dict[str, Callable] = {} 236 237 def func_with_args(*names: str) -> Callable: 238 239 def w(f: Callable) -> Callable: 240 for name in names: 241 if ans.setdefault(name, f) is not f: 242 raise ValueError( 243 'the args_func {} is being redefined'.format(name) 244 ) 245 return f 246 247 return w 248 249 return func_with_args, ans 250 251 252class KeyAction(NamedTuple): 253 func: str 254 args: Tuple[Union[str, float, bool, int, None], ...] = () 255 256 def __repr__(self) -> str: 257 if self.args: 258 return f'KeyAction({self.func!r}, {self.args!r})' 259 return f'KeyAction({self.func!r})' 260 261 262def parse_kittens_func_args(action: str, args_funcs: Dict[str, Callable]) -> KeyAction: 263 parts = action.strip().split(' ', 1) 264 func = parts[0] 265 if len(parts) == 1: 266 return KeyAction(func, ()) 267 rest = parts[1] 268 269 try: 270 parser = args_funcs[func] 271 except KeyError as e: 272 raise KeyError( 273 'Unknown action: {}. Check if map action: ' 274 '{} is valid'.format(func, action) 275 ) from e 276 277 try: 278 func, args = parser(func, rest) 279 except Exception: 280 raise ValueError('Unknown key action: {}'.format(action)) 281 282 if not isinstance(args, (list, tuple)): 283 args = (args, ) 284 285 return KeyAction(func, tuple(args)) 286 287 288KittensKeyDefinition = Tuple[ParsedShortcut, KeyAction] 289KittensKeyMap = Dict[ParsedShortcut, KeyAction] 290 291 292def parse_kittens_key( 293 val: str, funcs_with_args: Dict[str, Callable] 294) -> Optional[KittensKeyDefinition]: 295 from ..key_encoding import parse_shortcut 296 sc, action = val.partition(' ')[::2] 297 if not sc or not action: 298 return None 299 ans = parse_kittens_func_args(action, funcs_with_args) 300 return parse_shortcut(sc), ans 301 302 303def uniq(vals: Iterable[T]) -> List[T]: 304 seen: Set[T] = set() 305 seen_add = seen.add 306 return [x for x in vals if x not in seen and not seen_add(x)] 307 308 309def save_type_stub(text: str, fpath: str) -> None: 310 fpath += 'i' 311 preamble = '# Update this file by running: ./test.py mypy\n\n' 312 try: 313 with open(fpath) as fs: 314 existing = fs.read() 315 except FileNotFoundError: 316 existing = '' 317 current = preamble + text 318 if existing != current: 319 with open(fpath, 'w') as f: 320 f.write(current) 321