1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net> 4 5import os 6import readline 7import shlex 8import sys 9import traceback 10from contextlib import suppress 11from functools import lru_cache 12from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple 13 14from .cli import ( 15 OptionDict, emph, green, italic, parse_option_spec, print_help_for_seq, 16 title 17) 18from .cli_stub import RCOptions 19from .constants import cache_dir, version, kitty_face 20from .rc.base import ( 21 RemoteCommand, all_command_names, command_for_name, 22 display_subcommand_help, parse_subcommand_cli 23) 24from .types import run_once 25 26 27@run_once 28def match_commands() -> Tuple[str, ...]: 29 all_commands = tuple(sorted(x.replace('_', '-') for x in all_command_names())) 30 return tuple(sorted(all_commands + ('exit', 'help', 'quit'))) 31 32 33def init_readline(readline: Any) -> None: 34 with suppress(OSError): 35 readline.read_init_file() 36 if 'libedit' in readline.__doc__: 37 readline.parse_and_bind("bind ^I rl_complete") 38 else: 39 readline.parse_and_bind('tab: complete') 40 41 42def cmd_names_matching(prefix: str) -> Generator[str, None, None]: 43 for cmd in match_commands(): 44 if not prefix or cmd.startswith(prefix): 45 yield cmd + ' ' 46 47 48@lru_cache() 49def options_for_cmd(cmd: str) -> Tuple[Tuple[str, ...], Dict[str, OptionDict]]: 50 alias_map: Dict[str, OptionDict] = {} 51 try: 52 func = command_for_name(cmd) 53 except KeyError: 54 return (), alias_map 55 if not func.options_spec: 56 return (), alias_map 57 seq, disabled = parse_option_spec(func.options_spec) 58 ans = [] 59 for opt in seq: 60 if isinstance(opt, str): 61 continue 62 for alias in opt['aliases']: 63 ans.append(alias) 64 alias_map[alias] = opt 65 return tuple(sorted(ans)), alias_map 66 67 68def options_matching(prefix: str, cmd: str, last_word: str, aliases: Iterable[str], alias_map: Dict[str, OptionDict]) -> Generator[str, None, None]: 69 for alias in aliases: 70 if (not prefix or alias.startswith(prefix)) and alias.startswith('--'): 71 yield alias + ' ' 72 73 74class Completer: 75 76 def __init__(self) -> None: 77 self.matches: List[str] = [] 78 ddir = cache_dir() 79 os.makedirs(ddir, exist_ok=True) 80 self.history_path = os.path.join(ddir, 'shell.history') 81 82 def complete(self, text: str, state: int) -> Optional[str]: 83 if state == 0: 84 line = readline.get_line_buffer() 85 cmdline = shlex.split(line) 86 if len(cmdline) < 2 and not line.endswith(' '): 87 self.matches = list(cmd_names_matching(text)) 88 else: 89 self.matches = list(options_matching(text, cmdline[0], cmdline[-1], *options_for_cmd(cmdline[0]))) 90 if state < len(self.matches): 91 return self.matches[state] 92 return None 93 94 def __enter__(self) -> 'Completer': 95 with suppress(Exception): 96 readline.read_history_file(self.history_path) 97 readline.set_completer(self.complete) 98 delims = readline.get_completer_delims() 99 readline.set_completer_delims(delims.replace('-', '')) 100 return self 101 102 def __exit__(self, *a: Any) -> None: 103 readline.write_history_file(self.history_path) 104 105 106def print_err(*a: Any, **kw: Any) -> None: 107 kw['file'] = sys.stderr 108 print(*a, **kw) 109 110 111def print_help(which: Optional[str] = None) -> None: 112 if which is None: 113 print('Control kitty by sending it commands.') 114 print() 115 print(title('Commands') + ':') 116 for cmd in all_command_names(): 117 c = command_for_name(cmd) 118 print(' ', green(c.name)) 119 print(' ', c.short_desc) 120 print(' ', green('exit')) 121 print(' ', 'Exit this shell') 122 print('\nUse help {} for help on individual commands'.format(italic('command'))) 123 else: 124 try: 125 func = command_for_name(which) 126 except KeyError: 127 if which == 'exit': 128 print('Exit this shell') 129 elif which == 'help': 130 print('Show help') 131 else: 132 print('Unknown command: {}'.format(emph(which))) 133 return 134 display_subcommand_help(func) 135 136 137def run_cmd(global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str]) -> None: 138 from .remote_control import do_io 139 payload = func.message_to_kitty(global_opts, opts, items) 140 send = { 141 'cmd': cmd, 142 'version': version, 143 'no_response': False, 144 } 145 if payload is not None: 146 send['payload'] = payload 147 response = do_io(global_opts.to, send, func.no_response) 148 if not response.get('ok'): 149 if response.get('tb'): 150 print_err(response['tb']) 151 print_err(response['error']) 152 return 153 if 'data' in response: 154 print(response['data']) 155 156 157def real_main(global_opts: RCOptions) -> None: 158 init_readline(readline) 159 print_help_for_seq.allow_pager = False 160 print('Welcome to the kitty shell!') 161 print('Use {} for assistance or {} to quit'.format(green('help'), green('exit'))) 162 awid = os.environ.pop('KITTY_SHELL_ACTIVE_WINDOW_ID', None) 163 if awid is not None: 164 print('The ID of the previously active window is: {}'.format(awid)) 165 166 while True: 167 try: 168 try: 169 scmdline = input(f'{kitty_face} ') 170 except UnicodeEncodeError: 171 scmdline = input('kitty> ') 172 except EOFError: 173 break 174 except KeyboardInterrupt: 175 print() 176 continue 177 if not scmdline: 178 continue 179 cmdline = shlex.split(scmdline) 180 cmd = cmdline[0].lower() 181 182 try: 183 func = command_for_name(cmd) 184 except KeyError: 185 if cmd in ('exit', 'quit'): 186 break 187 if cmd == 'help': 188 print_help(cmdline[1] if len(cmdline) > 1 else None) 189 continue 190 print_err('"{}" is an unknown command. Use "help" to see a list of commands.'.format(emph(cmd))) 191 continue 192 193 try: 194 opts, items = parse_subcommand_cli(func, cmdline) 195 except SystemExit as e: 196 if e.code != 0: 197 print_err(e) 198 print_err('Use "{}" to see how to use this command.'.format(emph('help ' + cmd))) 199 continue 200 except Exception: 201 print_err('Unhandled error:') 202 traceback.print_exc() 203 continue 204 else: 205 try: 206 run_cmd(global_opts, cmd, func, opts, items) 207 except SystemExit as e: 208 print_err(e) 209 continue 210 except KeyboardInterrupt: 211 print() 212 continue 213 except Exception: 214 print_err('Unhandled error:') 215 traceback.print_exc() 216 continue 217 218 219def main(global_opts: RCOptions) -> None: 220 try: 221 with Completer(): 222 real_main(global_opts) 223 except Exception: 224 traceback.print_exc() 225 input('Press enter to quit...') 226 raise SystemExit(1) 227