1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
4
5import os
6import readline
7import shlex
8import sys
9import traceback
10from contextlib import suppress
11from functools import lru_cache
12from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple
13
14from .cli import (
15    OptionDict, emph, green, italic, parse_option_spec, print_help_for_seq,
16    title
17)
18from .cli_stub import RCOptions
19from .constants import cache_dir, version, kitty_face
20from .rc.base import (
21    RemoteCommand, all_command_names, command_for_name,
22    display_subcommand_help, parse_subcommand_cli
23)
24from .types import run_once
25
26
27@run_once
28def match_commands() -> Tuple[str, ...]:
29    all_commands = tuple(sorted(x.replace('_', '-') for x in all_command_names()))
30    return tuple(sorted(all_commands + ('exit', 'help', 'quit')))
31
32
33def init_readline(readline: Any) -> None:
34    with suppress(OSError):
35        readline.read_init_file()
36    if 'libedit' in readline.__doc__:
37        readline.parse_and_bind("bind ^I rl_complete")
38    else:
39        readline.parse_and_bind('tab: complete')
40
41
42def cmd_names_matching(prefix: str) -> Generator[str, None, None]:
43    for cmd in match_commands():
44        if not prefix or cmd.startswith(prefix):
45            yield cmd + ' '
46
47
48@lru_cache()
49def options_for_cmd(cmd: str) -> Tuple[Tuple[str, ...], Dict[str, OptionDict]]:
50    alias_map: Dict[str, OptionDict] = {}
51    try:
52        func = command_for_name(cmd)
53    except KeyError:
54        return (), alias_map
55    if not func.options_spec:
56        return (), alias_map
57    seq, disabled = parse_option_spec(func.options_spec)
58    ans = []
59    for opt in seq:
60        if isinstance(opt, str):
61            continue
62        for alias in opt['aliases']:
63            ans.append(alias)
64            alias_map[alias] = opt
65    return tuple(sorted(ans)), alias_map
66
67
68def options_matching(prefix: str, cmd: str, last_word: str, aliases: Iterable[str], alias_map: Dict[str, OptionDict]) -> Generator[str, None, None]:
69    for alias in aliases:
70        if (not prefix or alias.startswith(prefix)) and alias.startswith('--'):
71            yield alias + ' '
72
73
74class Completer:
75
76    def __init__(self) -> None:
77        self.matches: List[str] = []
78        ddir = cache_dir()
79        os.makedirs(ddir, exist_ok=True)
80        self.history_path = os.path.join(ddir, 'shell.history')
81
82    def complete(self, text: str, state: int) -> Optional[str]:
83        if state == 0:
84            line = readline.get_line_buffer()
85            cmdline = shlex.split(line)
86            if len(cmdline) < 2 and not line.endswith(' '):
87                self.matches = list(cmd_names_matching(text))
88            else:
89                self.matches = list(options_matching(text, cmdline[0], cmdline[-1], *options_for_cmd(cmdline[0])))
90        if state < len(self.matches):
91            return self.matches[state]
92        return None
93
94    def __enter__(self) -> 'Completer':
95        with suppress(Exception):
96            readline.read_history_file(self.history_path)
97        readline.set_completer(self.complete)
98        delims = readline.get_completer_delims()
99        readline.set_completer_delims(delims.replace('-', ''))
100        return self
101
102    def __exit__(self, *a: Any) -> None:
103        readline.write_history_file(self.history_path)
104
105
106def print_err(*a: Any, **kw: Any) -> None:
107    kw['file'] = sys.stderr
108    print(*a, **kw)
109
110
111def print_help(which: Optional[str] = None) -> None:
112    if which is None:
113        print('Control kitty by sending it commands.')
114        print()
115        print(title('Commands') + ':')
116        for cmd in all_command_names():
117            c = command_for_name(cmd)
118            print(' ', green(c.name))
119            print('   ', c.short_desc)
120        print(' ', green('exit'))
121        print('   ', 'Exit this shell')
122        print('\nUse help {} for help on individual commands'.format(italic('command')))
123    else:
124        try:
125            func = command_for_name(which)
126        except KeyError:
127            if which == 'exit':
128                print('Exit this shell')
129            elif which == 'help':
130                print('Show help')
131            else:
132                print('Unknown command: {}'.format(emph(which)))
133            return
134        display_subcommand_help(func)
135
136
137def run_cmd(global_opts: RCOptions, cmd: str, func: RemoteCommand, opts: Any, items: List[str]) -> None:
138    from .remote_control import do_io
139    payload = func.message_to_kitty(global_opts, opts, items)
140    send = {
141        'cmd': cmd,
142        'version': version,
143        'no_response': False,
144    }
145    if payload is not None:
146        send['payload'] = payload
147    response = do_io(global_opts.to, send, func.no_response)
148    if not response.get('ok'):
149        if response.get('tb'):
150            print_err(response['tb'])
151        print_err(response['error'])
152        return
153    if 'data' in response:
154        print(response['data'])
155
156
157def real_main(global_opts: RCOptions) -> None:
158    init_readline(readline)
159    print_help_for_seq.allow_pager = False
160    print('Welcome to the kitty shell!')
161    print('Use {} for assistance or {} to quit'.format(green('help'), green('exit')))
162    awid = os.environ.pop('KITTY_SHELL_ACTIVE_WINDOW_ID', None)
163    if awid is not None:
164        print('The ID of the previously active window is: {}'.format(awid))
165
166    while True:
167        try:
168            try:
169                scmdline = input(f'{kitty_face} ')
170            except UnicodeEncodeError:
171                scmdline = input('kitty> ')
172        except EOFError:
173            break
174        except KeyboardInterrupt:
175            print()
176            continue
177        if not scmdline:
178            continue
179        cmdline = shlex.split(scmdline)
180        cmd = cmdline[0].lower()
181
182        try:
183            func = command_for_name(cmd)
184        except KeyError:
185            if cmd in ('exit', 'quit'):
186                break
187            if cmd == 'help':
188                print_help(cmdline[1] if len(cmdline) > 1 else None)
189                continue
190            print_err('"{}" is an unknown command. Use "help" to see a list of commands.'.format(emph(cmd)))
191            continue
192
193        try:
194            opts, items = parse_subcommand_cli(func, cmdline)
195        except SystemExit as e:
196            if e.code != 0:
197                print_err(e)
198                print_err('Use "{}" to see how to use this command.'.format(emph('help ' + cmd)))
199            continue
200        except Exception:
201            print_err('Unhandled error:')
202            traceback.print_exc()
203            continue
204        else:
205            try:
206                run_cmd(global_opts, cmd, func, opts, items)
207            except SystemExit as e:
208                print_err(e)
209                continue
210            except KeyboardInterrupt:
211                print()
212                continue
213            except Exception:
214                print_err('Unhandled error:')
215                traceback.print_exc()
216                continue
217
218
219def main(global_opts: RCOptions) -> None:
220    try:
221        with Completer():
222            real_main(global_opts)
223    except Exception:
224        traceback.print_exc()
225        input('Press enter to quit...')
226        raise SystemExit(1)
227