1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPLv3 Copyright: 2020, Kovid Goyal <kovid at kovidgoyal.net>
4
5from contextlib import suppress
6from typing import (
7    TYPE_CHECKING, Any, Callable, Dict, FrozenSet, Generator, Iterable, List,
8    NoReturn, Optional, Tuple, Type, Union, cast
9)
10
11from kitty.cli import get_defaults_from_seq, parse_args, parse_option_spec
12from kitty.cli_stub import RCOptions as R
13from kitty.constants import appname, running_in_kitty
14
15if TYPE_CHECKING:
16    from kitty.boss import Boss as B
17    from kitty.tabs import Tab
18    from kitty.window import Window as W
19    Window = W
20    Boss = B
21    Tab
22else:
23    Boss = Window = Tab = None
24RCOptions = R
25
26
27class NoResponse:
28    pass
29
30
31class RemoteControlError(Exception):
32    pass
33
34
35class MatchError(ValueError):
36
37    hide_traceback = True
38
39    def __init__(self, expression: str, target: str = 'windows'):
40        ValueError.__init__(self, 'No matching {} for expression: {}'.format(target, expression))
41
42
43class OpacityError(ValueError):
44
45    hide_traceback = True
46
47
48class UnknownLayout(ValueError):
49
50    hide_traceback = True
51
52
53class PayloadGetter:
54
55    def __init__(self, cmd: 'RemoteCommand', payload: Dict[str, Any]):
56        self.payload = payload
57        self.cmd = cmd
58
59    def __call__(self, key: str, opt_name: Optional[str] = None, missing: Any = None) -> Any:
60        ans = self.payload.get(key, payload_get)
61        if ans is not payload_get:
62            return ans
63        return self.cmd.get_default(opt_name or key, missing=missing)
64
65
66no_response = NoResponse()
67payload_get = object()
68ResponseType = Optional[Union[bool, str]]
69CmdReturnType = Union[Dict[str, Any], List, Tuple, str, int, float, bool]
70CmdGenerator = Generator[CmdReturnType, None, None]
71PayloadType = Optional[Union[CmdReturnType, CmdGenerator]]
72PayloadGetType = PayloadGetter
73ArgsType = List[str]
74
75
76MATCH_WINDOW_OPTION = '''\
77--match -m
78The window to match. Match specifications are of the form:
79:italic:`field:regexp`. Where field can be one of: id, title, pid, cwd, cmdline, num, env and recent.
80You can use the :italic:`ls` command to get a list of windows. Note that for
81numeric fields such as id, pid, recent and num the expression is interpreted as a number,
82not a regular expression. The field num refers to the window position in the current tab,
83starting from zero and counting clockwise (this is the same as the order in which the
84windows are reported by the :italic:`ls` command). The window id of the current window
85is available as the KITTY_WINDOW_ID environment variable. The field recent refers to recently
86active windows in the currently active tab, with zero being the currently active window, one being the previously active
87window and so on. When using the :italic:`env` field
88to match on environment variables you can specify only the environment variable name or a name
89and value, for example, :italic:`env:MY_ENV_VAR=2`
90'''
91MATCH_TAB_OPTION = '''\
92--match -m
93The tab to match. Match specifications are of the form:
94:italic:`field:regexp`. Where field can be one of:
95id, index, title, window_id, window_title, pid, cwd, env, cmdline and recent.
96You can use the :italic:`ls` command to get a list of tabs. Note that for
97numeric fields such as id, recent, index and pid the expression is interpreted as a number,
98not a regular expression. When using title or id, first a matching tab is
99looked for and if not found a matching window is looked for, and the tab
100for that window is used. You can also use window_id and window_title to match
101the tab that contains the window with the specified id or title. The index number
102is used to match the nth tab in the currently active OS window. The recent number
103matches recently active tabs in the currently active OS window, with zero being the currently
104active tab, one the previously active tab and so on.
105'''
106
107
108class RemoteCommand:
109
110    name: str = ''
111    short_desc: str = ''
112    desc: str = ''
113    argspec: str = '...'
114    options_spec: Optional[str] = None
115    no_response: bool = False
116    string_return_is_error: bool = False
117    args_count: Optional[int] = None
118    args_completion: Optional[Dict[str, Tuple[str, Union[Callable[[], Iterable[str]], Tuple[str, ...]]]]] = None
119    defaults: Optional[Dict[str, Any]] = None
120    options_class: Type = RCOptions
121
122    def __init__(self) -> None:
123        self.desc = self.desc or self.short_desc
124        self.name = self.__class__.__module__.split('.')[-1].replace('_', '-')
125        self.args_count = 0 if not self.argspec else self.args_count
126
127    def fatal(self, msg: str) -> NoReturn:
128        if running_in_kitty():
129            raise RemoteControlError(msg)
130        raise SystemExit(msg)
131
132    def get_default(self, name: str, missing: Any = None) -> Any:
133        if self.options_spec:
134            if self.defaults is None:
135                self.defaults = get_defaults_from_seq(parse_option_spec(self.options_spec)[0])
136            return self.defaults.get(name, missing)
137        return missing
138
139    def windows_for_match_payload(self, boss: 'Boss', window: Optional['Window'], payload_get: PayloadGetType) -> List['Window']:
140        if payload_get('all'):
141            windows = list(boss.all_windows)
142        else:
143            if payload_get('self') in (None, True):
144                window = window or boss.active_window
145            else:
146                window = boss.active_window or window
147            windows = [window] if window else []
148            if payload_get('match'):
149                windows = list(boss.match_windows(payload_get('match')))
150                if not windows:
151                    raise MatchError(payload_get('match'))
152        return windows
153
154    def tabs_for_match_payload(self, boss: 'Boss', window: Optional['Window'], payload_get: PayloadGetType) -> List['Tab']:
155        if payload_get('all'):
156            return list(boss.all_tabs)
157        match = payload_get('match')
158        if match:
159            tabs = list(boss.match_tabs(match))
160            if not tabs:
161                raise MatchError(match, 'tabs')
162            return tabs
163        if window and payload_get('self') in (None, True):
164            q = boss.tab_for_window(window)
165            if q:
166                return [q]
167        t = boss.active_tab
168        if t:
169            return [t]
170        return []
171
172    def windows_for_payload(self, boss: 'Boss', window: Optional['Window'], payload_get: PayloadGetType) -> List['Window']:
173        if payload_get('all'):
174            windows = list(boss.all_windows)
175        else:
176            window = window or boss.active_window
177            windows = [window] if window else []
178            if payload_get('match_window'):
179                windows = list(boss.match_windows(payload_get('match_window')))
180                if not windows:
181                    raise MatchError(payload_get('match_window'))
182            if payload_get('match_tab'):
183                tabs = tuple(boss.match_tabs(payload_get('match_tab')))
184                if not tabs:
185                    raise MatchError(payload_get('match_tab'), 'tabs')
186                for tab in tabs:
187                    windows += list(tab)
188        return windows
189
190    def message_to_kitty(self, global_opts: RCOptions, opts: Any, args: ArgsType) -> PayloadType:
191        raise NotImplementedError()
192
193    def response_from_kitty(self, boss: 'Boss', window: Optional['Window'], payload_get: PayloadGetType) -> ResponseType:
194        raise NotImplementedError()
195
196
197def cli_params_for(command: RemoteCommand) -> Tuple[Callable[[], str], str, str, str]:
198    return (command.options_spec or '\n').format, command.argspec, command.desc, '{} @ {}'.format(appname, command.name)
199
200
201def parse_subcommand_cli(command: RemoteCommand, args: ArgsType) -> Tuple[Any, ArgsType]:
202    opts, items = parse_args(args[1:], *cli_params_for(command), result_class=command.options_class)
203    if command.args_count is not None and command.args_count != len(items):
204        if command.args_count == 0:
205            raise SystemExit('Unknown extra argument(s) supplied to {}'.format(command.name))
206        raise SystemExit('Must specify exactly {} argument(s) for {}'.format(command.args_count, command.name))
207    return opts, items
208
209
210def display_subcommand_help(func: RemoteCommand) -> None:
211    with suppress(SystemExit):
212        parse_args(['--help'], (func.options_spec or '\n').format, func.argspec, func.desc, func.name)
213
214
215def command_for_name(cmd_name: str) -> RemoteCommand:
216    from importlib import import_module
217    cmd_name = cmd_name.replace('-', '_')
218    try:
219        m = import_module(f'kitty.rc.{cmd_name}')
220    except ImportError:
221        raise KeyError(f'{cmd_name} is not a known kitty remote control command')
222    return cast(RemoteCommand, getattr(m, cmd_name))
223
224
225def all_command_names() -> FrozenSet[str]:
226    try:
227        from importlib.resources import contents
228    except ImportError:
229        from importlib_resources import contents  # type:ignore
230
231    def ok(name: str) -> bool:
232        root, _, ext = name.rpartition('.')
233        return bool(ext in ('py', 'pyc', 'pyo') and root and root not in ('base', '__init__'))
234
235    return frozenset({x.rpartition('.')[0] for x in filter(ok, contents('kitty.rc'))})
236