1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
4
5import atexit
6import json
7import os
8import re
9from contextlib import suppress
10from functools import partial
11from gettext import gettext as _
12from typing import (
13    Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, Union,
14    cast
15)
16from weakref import WeakValueDictionary
17
18from .child import (
19    cached_process_data, cwd_of_process, default_env, set_default_env
20)
21from .cli import create_opts, parse_args
22from .cli_stub import CLIOptions
23from .conf.utils import BadLine, KeyAction, to_cmdline
24from .config import common_opts_as_dict, prepare_config_file_for_editing
25from .constants import (
26    appname, config_dir, is_macos, is_wayland, kitty_exe,
27    supports_primary_selection, website_url
28)
29from .fast_data_types import (
30    CLOSE_BEING_CONFIRMED, IMPERATIVE_CLOSE_REQUESTED, NO_CLOSE_REQUESTED,
31    ChildMonitor, KeyEvent, add_timer, apply_options_update,
32    background_opacity_of, change_background_opacity, change_os_window_state,
33    cocoa_set_menubar_title, create_os_window,
34    current_application_quit_request, current_os_window, destroy_global_data,
35    focus_os_window, get_clipboard_string, get_options, get_os_window_size,
36    global_font_size, mark_os_window_for_close, os_window_font_size,
37    patch_global_colors, safe_pipe, set_application_quit_request,
38    set_background_image, set_boss, set_clipboard_string, set_in_sequence_mode,
39    set_options, set_os_window_size, thread_write, toggle_fullscreen,
40    toggle_maximized
41)
42from .keys import get_shortcut, shortcut_matches
43from .layout.base import set_layout_options
44from .notify import notification_activated
45from .options.types import Options
46from .options.utils import MINIMUM_FONT_SIZE, SubSequenceMap
47from .os_window_size import initial_window_size_func
48from .rgb import Color, color_from_int
49from .session import Session, create_sessions, get_os_window_sizing_data
50from .tabs import (
51    SpecialWindow, SpecialWindowInstance, Tab, TabDict, TabManager
52)
53from .types import SingleKey, ac
54from .typing import PopenType, TypedDict
55from .utils import (
56    func_name, get_editor, get_new_os_window_size, get_primary_selection,
57    is_path_in_temp_dir, log_error, open_url, parse_address_spec,
58    parse_uri_list, platform_window_id, read_shell_environment,
59    remove_socket_file, safe_print, set_primary_selection, single_instance,
60    startup_notification_handler
61)
62from .window import MatchPatternType, Window
63
64
65class OSWindowDict(TypedDict):
66    id: int
67    platform_window_id: Optional[int]
68    is_focused: bool
69    tabs: List[TabDict]
70    wm_class: str
71    wm_name: str
72
73
74def listen_on(spec: str) -> int:
75    import socket
76    family, address, socket_path = parse_address_spec(spec)
77    s = socket.socket(family)
78    atexit.register(remove_socket_file, s, socket_path)
79    s.bind(address)
80    s.listen()
81    return s.fileno()
82
83
84def data_for_at(w: Optional[Window], arg: str, add_wrap_markers: bool = False) -> Optional[str]:
85    if not w:
86        return None
87
88    def as_text(**kw: bool) -> str:
89        kw['add_wrap_markers'] = add_wrap_markers
90        return w.as_text(**kw) if w else ''
91
92    if arg == '@selection':
93        return w.text_for_selection()
94    if arg in ('@ansi', '@ansi_screen_scrollback'):
95        return as_text(as_ansi=True, add_history=True)
96    if arg in ('@text', '@screen_scrollback'):
97        return as_text(add_history=True)
98    if arg == '@screen':
99        return as_text()
100    if arg == '@ansi_screen':
101        return as_text(as_ansi=True)
102    if arg == '@alternate':
103        return as_text(alternate_screen=True)
104    if arg == '@alternate_scrollback':
105        return as_text(alternate_screen=True, add_history=True)
106    if arg == '@ansi_alternate':
107        return as_text(as_ansi=True, alternate_screen=True)
108    if arg == '@ansi_alternate_scrollback':
109        return as_text(as_ansi=True, alternate_screen=True, add_history=True)
110    return None
111
112
113class DumpCommands:  # {{{
114
115    def __init__(self, args: CLIOptions):
116        self.draw_dump_buf: List[str] = []
117        if args.dump_bytes:
118            self.dump_bytes_to = open(args.dump_bytes, 'wb')
119
120    def __call__(self, *a: Any) -> None:
121        if a:
122            if a[0] == 'draw':
123                if a[1] is None:
124                    if self.draw_dump_buf:
125                        safe_print('draw', ''.join(self.draw_dump_buf))
126                        self.draw_dump_buf = []
127                else:
128                    self.draw_dump_buf.append(a[1])
129            elif a[0] == 'bytes':
130                self.dump_bytes_to.write(a[1])
131                self.dump_bytes_to.flush()
132            else:
133                if self.draw_dump_buf:
134                    safe_print('draw', ''.join(self.draw_dump_buf))
135                    self.draw_dump_buf = []
136                safe_print(*a)
137# }}}
138
139
140class Boss:
141
142    def __init__(
143        self,
144        opts: Options,
145        args: CLIOptions,
146        cached_values: Dict[str, Any],
147        global_shortcuts: Dict[str, SingleKey]
148    ):
149        set_layout_options(opts)
150        self.cocoa_application_launched = False
151        self.clipboard_buffers: Dict[str, str] = {}
152        self.update_check_process: Optional[PopenType] = None
153        self.window_id_map: WeakValueDictionary[int, Window] = WeakValueDictionary()
154        self.startup_colors = {k: opts[k] for k in opts if isinstance(opts[k], Color)}
155        self.startup_cursor_text_color = opts.cursor_text_color
156        self.pending_sequences: Optional[SubSequenceMap] = None
157        self.cached_values = cached_values
158        self.os_window_map: Dict[int, TabManager] = {}
159        self.os_window_death_actions: Dict[int, Callable[[], None]] = {}
160        self.cursor_blinking = True
161        self.shutting_down = False
162        talk_fd = getattr(single_instance, 'socket', None)
163        talk_fd = -1 if talk_fd is None else talk_fd.fileno()
164        listen_fd = -1
165        # we dont allow reloading the config file to change
166        # allow_remote_control
167        self.allow_remote_control = opts.allow_remote_control
168        if args.listen_on and (self.allow_remote_control in ('y', 'socket-only')):
169            listen_fd = listen_on(args.listen_on)
170        self.child_monitor = ChildMonitor(
171            self.on_child_death,
172            DumpCommands(args) if args.dump_commands or args.dump_bytes else None,
173            talk_fd, listen_fd
174        )
175        set_boss(self)
176        self.args = args
177        self.global_shortcuts_map = {v: KeyAction(k) for k, v in global_shortcuts.items()}
178        self.global_shortcuts = global_shortcuts
179        self.update_keymap()
180        if is_macos:
181            from .fast_data_types import (
182                cocoa_set_notification_activated_callback
183            )
184            cocoa_set_notification_activated_callback(notification_activated)
185
186    def update_keymap(self) -> None:
187        self.keymap = get_options().keymap.copy()
188        for sc in self.global_shortcuts.values():
189            self.keymap.pop(sc, None)
190
191    def startup_first_child(self, os_window_id: Optional[int]) -> None:
192        startup_sessions = create_sessions(get_options(), self.args, default_session=get_options().startup_session)
193        for startup_session in startup_sessions:
194            self.add_os_window(startup_session, os_window_id=os_window_id)
195            os_window_id = None
196            if self.args.start_as != 'normal':
197                if self.args.start_as == 'fullscreen':
198                    self.toggle_fullscreen()
199                else:
200                    change_os_window_state(self.args.start_as)
201
202    def add_os_window(
203        self,
204        startup_session: Optional[Session] = None,
205        os_window_id: Optional[int] = None,
206        wclass: Optional[str] = None,
207        wname: Optional[str] = None,
208        opts_for_size: Optional[Options] = None,
209        startup_id: Optional[str] = None
210    ) -> int:
211        if os_window_id is None:
212            size_data = get_os_window_sizing_data(opts_for_size or get_options(), startup_session)
213            wclass = wclass or getattr(startup_session, 'os_window_class', None) or self.args.cls or appname
214            wname = wname or self.args.name or wclass
215            with startup_notification_handler(do_notify=startup_id is not None, startup_id=startup_id) as pre_show_callback:
216                os_window_id = create_os_window(
217                        initial_window_size_func(size_data, self.cached_values),
218                        pre_show_callback,
219                        self.args.title or appname, wname, wclass, disallow_override_title=bool(self.args.title))
220        else:
221            wname = self.args.name or self.args.cls or appname
222            wclass = self.args.cls or appname
223        tm = TabManager(os_window_id, self.args, wclass, wname, startup_session)
224        self.os_window_map[os_window_id] = tm
225        return os_window_id
226
227    def list_os_windows(self, self_window: Optional[Window] = None) -> Generator[OSWindowDict, None, None]:
228        with cached_process_data():
229            active_tab, active_window = self.active_tab, self.active_window
230            active_tab_manager = self.active_tab_manager
231            for os_window_id, tm in self.os_window_map.items():
232                yield {
233                    'id': os_window_id,
234                    'platform_window_id': platform_window_id(os_window_id),
235                    'is_focused': tm is active_tab_manager,
236                    'tabs': list(tm.list_tabs(active_tab, active_window, self_window)),
237                    'wm_class': tm.wm_class,
238                    'wm_name': tm.wm_name
239                }
240
241    @property
242    def all_tab_managers(self) -> Generator[TabManager, None, None]:
243        yield from self.os_window_map.values()
244
245    @property
246    def all_tabs(self) -> Generator[Tab, None, None]:
247        for tm in self.all_tab_managers:
248            yield from tm
249
250    @property
251    def all_windows(self) -> Generator[Window, None, None]:
252        for tab in self.all_tabs:
253            yield from tab
254
255    def match_windows(self, match: str) -> Generator[Window, None, None]:
256        try:
257            field, exp = match.split(':', 1)
258        except ValueError:
259            return
260        if field == 'num':
261            tab = self.active_tab
262            if tab is not None:
263                try:
264                    w = tab.get_nth_window(int(exp))
265                except Exception:
266                    return
267                if w is not None:
268                    yield w
269            return
270        if field == 'recent':
271            tab = self.active_tab
272            if tab is not None:
273                try:
274                    num = int(exp)
275                except Exception:
276                    return
277                w = self.window_id_map.get(tab.nth_active_window_id(num))
278                if w is not None:
279                    yield w
280            return
281        if field != 'env':
282            pat: MatchPatternType = re.compile(exp)
283        else:
284            kp, vp = exp.partition('=')[::2]
285            if vp:
286                pat = re.compile(kp), re.compile(vp)
287            else:
288                pat = re.compile(kp), None
289        for window in self.all_windows:
290            if window.matches(field, pat):
291                yield window
292
293    def tab_for_window(self, window: Window) -> Optional[Tab]:
294        for tab in self.all_tabs:
295            for w in tab:
296                if w.id == window.id:
297                    return tab
298
299    def match_tabs(self, match: str) -> Generator[Tab, None, None]:
300        try:
301            field, exp = match.split(':', 1)
302        except ValueError:
303            return
304        pat = re.compile(exp)
305        found = False
306        if field in ('title', 'id'):
307            for tab in self.all_tabs:
308                if tab.matches(field, pat):
309                    yield tab
310                    found = True
311        elif field in ('window_id', 'window_title'):
312            wf = field.split('_')[1]
313            tabs = {self.tab_for_window(w) for w in self.match_windows(f'{wf}:{exp}')}
314            for q in tabs:
315                if q:
316                    found = True
317                    yield q
318        elif field == 'index':
319            tm = self.active_tab_manager
320            if tm is not None and len(tm.tabs) > 0:
321                idx = (int(pat.pattern) + len(tm.tabs)) % len(tm.tabs)
322                found = True
323                yield tm.tabs[idx]
324        elif field == 'recent':
325            tm = self.active_tab_manager
326            if tm is not None and len(tm.tabs) > 0:
327                try:
328                    num = int(exp)
329                except Exception:
330                    return
331                q = tm.nth_active_tab(num)
332                if q is not None:
333                    found = True
334                    yield q
335        if not found:
336            tabs = {self.tab_for_window(w) for w in self.match_windows(match)}
337            for q in tabs:
338                if q:
339                    yield q
340
341    def set_active_window(self, window: Window, switch_os_window_if_needed: bool = False) -> Optional[int]:
342        for os_window_id, tm in self.os_window_map.items():
343            for tab in tm:
344                for w in tab:
345                    if w.id == window.id:
346                        if tab is not self.active_tab:
347                            tm.set_active_tab(tab)
348                        tab.set_active_window(w)
349                        if switch_os_window_if_needed and current_os_window() != os_window_id:
350                            focus_os_window(os_window_id, True)
351                        return os_window_id
352
353    def _new_os_window(self, args: Union[SpecialWindowInstance, Iterable[str]], cwd_from: Optional[int] = None) -> int:
354        if isinstance(args, SpecialWindowInstance):
355            sw: Optional[SpecialWindowInstance] = args
356        else:
357            sw = self.args_to_special_window(args, cwd_from) if args else None
358        startup_session = next(create_sessions(get_options(), special_window=sw, cwd_from=cwd_from))
359        return self.add_os_window(startup_session)
360
361    @ac('win', 'New OS Window')
362    def new_os_window(self, *args: str) -> None:
363        self._new_os_window(args)
364
365    @property
366    def active_window_for_cwd(self) -> Optional[Window]:
367        t = self.active_tab
368        if t is not None:
369            return t.active_window_for_cwd
370
371    @ac('win', 'New OS Window with the same working directory as the currently active window')
372    def new_os_window_with_cwd(self, *args: str) -> None:
373        w = self.active_window_for_cwd
374        cwd_from = w.child.pid_for_cwd if w is not None else None
375        self._new_os_window(args, cwd_from)
376
377    def new_os_window_with_wd(self, wd: str) -> None:
378        special_window = SpecialWindow(None, cwd=wd)
379        self._new_os_window(special_window)
380
381    def add_child(self, window: Window) -> None:
382        assert window.child.pid is not None and window.child.child_fd is not None
383        self.child_monitor.add_child(window.id, window.child.pid, window.child.child_fd, window.screen)
384        self.window_id_map[window.id] = window
385
386    def _handle_remote_command(self, cmd: str, window: Optional[Window] = None, from_peer: bool = False) -> Optional[Dict[str, Any]]:
387        from .remote_control import handle_cmd
388        response = None
389        window = window or None
390        if self.allow_remote_control == 'y' or from_peer or getattr(window, 'allow_remote_control', False):
391            try:
392                response = handle_cmd(self, window, cmd)
393            except Exception as err:
394                import traceback
395                response = {'ok': False, 'error': str(err)}
396                if not getattr(err, 'hide_traceback', False):
397                    response['tb'] = traceback.format_exc()
398        else:
399            no_response = False
400            try:
401                no_response = json.loads(cmd).get('no_response')
402            except Exception:
403                pass
404            if not no_response:
405                response = {'ok': False, 'error': 'Remote control is disabled. Add allow_remote_control to your kitty.conf'}
406        return response
407
408    @ac('misc', '''
409        Run a remote control command
410
411        For example::
412
413            map F1 remote_control set-spacing margin=30
414
415        See :ref:`rc_mapping` for details.
416        ''')
417    def remote_control(self, *args: str) -> None:
418        from .rc.base import (
419            PayloadGetter, command_for_name, parse_subcommand_cli
420        )
421        from .remote_control import parse_rc_args
422        try:
423            global_opts, items = parse_rc_args(['@'] + list(args))
424            if not items:
425                return
426            cmd = items[0]
427            c = command_for_name(cmd)
428            opts, items = parse_subcommand_cli(c, items)
429            payload = c.message_to_kitty(global_opts, opts, items)
430            import types
431            if isinstance(cast(types.GeneratorType, payload), types.GeneratorType):
432                payloads = cast(types.GeneratorType, payload)
433                for x in payloads:
434                    c.response_from_kitty(self, self.active_window, PayloadGetter(c, x if isinstance(x, dict) else {}))
435            else:
436                c.response_from_kitty(self, self.active_window, PayloadGetter(c, payload if isinstance(payload, dict) else {}))
437        except (Exception, SystemExit):
438            import traceback
439            tb = traceback.format_exc()
440            self.show_error(_('remote_control mapping failed'), tb)
441
442    def peer_message_received(self, msg_bytes: bytes) -> Optional[bytes]:
443        cmd_prefix = b'\x1bP@kitty-cmd'
444        terminator = b'\x1b\\'
445        if msg_bytes.startswith(cmd_prefix) and msg_bytes.endswith(terminator):
446            cmd = msg_bytes[len(cmd_prefix):-len(terminator)].decode('utf-8')
447            response = self._handle_remote_command(cmd, from_peer=True)
448            if response is None:
449                return None
450            return cmd_prefix + json.dumps(response).encode('utf-8') + terminator
451
452        data = json.loads(msg_bytes.decode('utf-8'))
453        if isinstance(data, dict) and data.get('cmd') == 'new_instance':
454            from .cli_stub import CLIOptions
455            startup_id = data.get('startup_id')
456            args, rest = parse_args(data['args'][1:], result_class=CLIOptions)
457            args.args = rest
458            opts = create_opts(args)
459            if not os.path.isabs(args.directory):
460                args.directory = os.path.join(data['cwd'], args.directory)
461            for session in create_sessions(opts, args, respect_cwd=True):
462                os_window_id = self.add_os_window(session, wclass=args.cls, wname=args.name, opts_for_size=opts, startup_id=startup_id)
463                if opts.background_opacity != get_options().background_opacity:
464                    self._set_os_window_background_opacity(os_window_id, opts.background_opacity)
465                if data.get('notify_on_os_window_death'):
466                    self.os_window_death_actions[os_window_id] = partial(self.notify_on_os_window_death, data['notify_on_os_window_death'])
467        else:
468            log_error('Unknown message received from peer, ignoring')
469        return None
470
471    def handle_remote_cmd(self, cmd: str, window: Optional[Window] = None) -> None:
472        response = self._handle_remote_command(cmd, window)
473        if response is not None:
474            if window is not None:
475                window.send_cmd_response(response)
476
477    def _cleanup_tab_after_window_removal(self, src_tab: Tab) -> None:
478        if len(src_tab) < 1:
479            tm = src_tab.tab_manager_ref()
480            if tm is not None:
481                tm.remove(src_tab)
482                src_tab.destroy()
483                if len(tm) == 0:
484                    if not self.shutting_down:
485                        mark_os_window_for_close(src_tab.os_window_id)
486
487    def on_child_death(self, window_id: int) -> None:
488        prev_active_window = self.active_window
489        window = self.window_id_map.pop(window_id, None)
490        if window is None:
491            return
492        if window.action_on_close:
493            try:
494                window.action_on_close(window)
495            except Exception:
496                import traceback
497                traceback.print_exc()
498        os_window_id = window.os_window_id
499        window.destroy()
500        tm = self.os_window_map.get(os_window_id)
501        tab = None
502        if tm is not None:
503            for q in tm:
504                if window in q:
505                    tab = q
506                    break
507        if tab is not None:
508            tab.remove_window(window)
509            self._cleanup_tab_after_window_removal(tab)
510        if window.action_on_removal:
511            try:
512                window.action_on_removal(window)
513            except Exception:
514                import traceback
515                traceback.print_exc()
516        window.action_on_close = window.action_on_removal = None
517        window = self.active_window
518        if window is not prev_active_window:
519            if prev_active_window is not None:
520                prev_active_window.focus_changed(False)
521            if window is not None:
522                window.focus_changed(True)
523
524    def close_window(self, window: Optional[Window] = None) -> None:
525        window = window or self.active_window
526        if window:
527            self.child_monitor.mark_for_close(window.id)
528
529    @ac('tab', 'Close the current tab')
530    def close_tab(self, tab: Optional[Tab] = None) -> None:
531        tab = tab or self.active_tab
532        if tab:
533            self.confirm_tab_close(tab)
534
535    def confirm_tab_close(self, tab: Tab) -> None:
536        windows = tuple(tab)
537        needs_confirmation = get_options().confirm_os_window_close > 0 and len(windows) >= get_options().confirm_os_window_close
538        if not needs_confirmation:
539            self.close_tab_no_confirm(tab)
540            return
541        self._run_kitten('ask', ['--type=yesno', '--message', _(
542            'Are you sure you want to close this tab, it has {}'
543            ' windows running?').format(len(windows))],
544            window=tab.active_window,
545            custom_callback=partial(self.handle_close_tab_confirmation, tab.id)
546        )
547
548    def handle_close_tab_confirmation(self, tab_id: int, data: Dict[str, Any], *a: Any) -> None:
549        if data['response'] != 'y':
550            return
551        for tab in self.all_tabs:
552            if tab.id == tab_id:
553                break
554        else:
555            return
556        self.close_tab_no_confirm(tab)
557
558    def close_tab_no_confirm(self, tab: Tab) -> None:
559        for window in tab:
560            self.close_window(window)
561
562    @ac('win', 'Toggle the fullscreen status of the active OS Window')
563    def toggle_fullscreen(self, os_window_id: int = 0) -> None:
564        toggle_fullscreen(os_window_id)
565
566    @ac('win', 'Toggle the maximized status of the active OS Window')
567    def toggle_maximized(self, os_window_id: int = 0) -> None:
568        toggle_maximized(os_window_id)
569
570    def start(self, first_os_window_id: int) -> None:
571        if not getattr(self, 'io_thread_started', False):
572            self.child_monitor.start()
573            self.io_thread_started = True
574            self.startup_first_child(first_os_window_id)
575
576        if get_options().update_check_interval > 0 and not hasattr(self, 'update_check_started'):
577            from .update_check import run_update_check
578            run_update_check(get_options().update_check_interval * 60 * 60)
579            self.update_check_started = True
580
581    def activate_tab_at(self, os_window_id: int, x: int, is_double: bool = False) -> int:
582        tm = self.os_window_map.get(os_window_id)
583        if tm is not None:
584            tm.activate_tab_at(x, is_double)
585
586    def on_window_resize(self, os_window_id: int, w: int, h: int, dpi_changed: bool) -> None:
587        if dpi_changed:
588            self.on_dpi_change(os_window_id)
589        else:
590            tm = self.os_window_map.get(os_window_id)
591            if tm is not None:
592                tm.resize()
593
594    @ac('misc', '''
595        Clear the terminal
596
597        See :sc:`reset_terminal` for details. For example::
598
599            # Reset the terminal
600            map kitty_mod+f9 clear_terminal reset active
601            # Clear the terminal screen by erasing all contents
602            map kitty_mod+f10 clear_terminal clear active
603            # Clear the terminal scrollback by erasing it
604            map kitty_mod+f11 clear_terminal scrollback active
605            # Scroll the contents of the screen into the scrollback
606            map kitty_mod+f12 clear_terminal scroll active
607
608        ''')
609    def clear_terminal(self, action: str, only_active: bool) -> None:
610        if only_active:
611            windows = []
612            w = self.active_window
613            if w is not None:
614                windows.append(w)
615        else:
616            windows = list(self.all_windows)
617        reset = action == 'reset'
618        how = 3 if action == 'scrollback' else 2
619        for w in windows:
620            if action == 'scroll':
621                w.screen.scroll_until_cursor()
622                continue
623            w.screen.cursor.x = w.screen.cursor.y = 0
624            if reset:
625                w.screen.reset()
626            else:
627                w.screen.erase_in_display(how, False)
628
629    def increase_font_size(self) -> None:  # legacy
630        cfs = global_font_size()
631        self.set_font_size(min(get_options().font_size * 5, cfs + 2.0))
632
633    def decrease_font_size(self) -> None:  # legacy
634        cfs = global_font_size()
635        self.set_font_size(max(MINIMUM_FONT_SIZE, cfs - 2.0))
636
637    def restore_font_size(self) -> None:  # legacy
638        self.set_font_size(get_options().font_size)
639
640    def set_font_size(self, new_size: float) -> None:  # legacy
641        self.change_font_size(True, None, new_size)
642
643    @ac('win', '''
644        Change the font size for the current or all OS Windows
645
646        See :ref:`conf-kitty-shortcuts.fonts` for details.
647        ''')
648    def change_font_size(self, all_windows: bool, increment_operation: Optional[str], amt: float) -> None:
649        def calc_new_size(old_size: float) -> float:
650            new_size = old_size
651            if amt == 0:
652                new_size = get_options().font_size
653            else:
654                if increment_operation:
655                    new_size += (1 if increment_operation == '+' else -1) * amt
656                else:
657                    new_size = amt
658                new_size = max(MINIMUM_FONT_SIZE, min(new_size, get_options().font_size * 5))
659            return new_size
660
661        if all_windows:
662            current_global_size = global_font_size()
663            new_size = calc_new_size(current_global_size)
664            if new_size != current_global_size:
665                global_font_size(new_size)
666            os_windows = list(self.os_window_map.keys())
667        else:
668            os_windows = []
669            w = self.active_window
670            if w is not None:
671                os_windows.append(w.os_window_id)
672        if os_windows:
673            final_windows = {}
674            for wid in os_windows:
675                current_size = os_window_font_size(wid)
676                if current_size:
677                    new_size = calc_new_size(current_size)
678                    if new_size != current_size:
679                        final_windows[wid] = new_size
680            if final_windows:
681                self._change_font_size(final_windows)
682
683    def _change_font_size(self, sz_map: Dict[int, float]) -> None:
684        for os_window_id, sz in sz_map.items():
685            tm = self.os_window_map.get(os_window_id)
686            if tm is not None:
687                os_window_font_size(os_window_id, sz)
688                tm.resize()
689
690    def on_dpi_change(self, os_window_id: int) -> None:
691        tm = self.os_window_map.get(os_window_id)
692        if tm is not None:
693            sz = os_window_font_size(os_window_id)
694            if sz:
695                os_window_font_size(os_window_id, sz, True)
696                for tab in tm:
697                    for window in tab:
698                        window.on_dpi_change(sz)
699                tm.resize()
700
701    def _set_os_window_background_opacity(self, os_window_id: int, opacity: float) -> None:
702        change_background_opacity(os_window_id, max(0.1, min(opacity, 1.0)))
703
704    @ac('win', '''
705        Set the background opacity for the active OS Window
706
707        For example::
708
709            map f1 set_background_opacity +0.1
710            map f2 set_background_opacity -0.1
711            map f3 set_background_opacity 0.5
712        ''')
713    def set_background_opacity(self, opacity: str) -> None:
714        window = self.active_window
715        if window is None or not opacity:
716            return
717        if not get_options().dynamic_background_opacity:
718            self.show_error(
719                    _('Cannot change background opacity'),
720                    _('You must set the dynamic_background_opacity option in kitty.conf to be able to change background opacity'))
721            return
722        os_window_id = window.os_window_id
723        if opacity[0] in '+-':
724            old_opacity = background_opacity_of(os_window_id)
725            if old_opacity is None:
726                return
727            fin_opacity = old_opacity + float(opacity)
728        elif opacity == 'default':
729            fin_opacity = get_options().background_opacity
730        else:
731            fin_opacity = float(opacity)
732        self._set_os_window_background_opacity(os_window_id, fin_opacity)
733
734    @property
735    def active_tab_manager(self) -> Optional[TabManager]:
736        os_window_id = current_os_window()
737        if os_window_id is not None:
738            return self.os_window_map.get(os_window_id)
739
740    @property
741    def active_tab(self) -> Optional[Tab]:
742        tm = self.active_tab_manager
743        if tm is not None:
744            return tm.active_tab
745
746    @property
747    def active_window(self) -> Optional[Window]:
748        t = self.active_tab
749        if t is not None:
750            return t.active_window
751
752    def dispatch_possible_special_key(self, ev: KeyEvent) -> bool:
753        # Handles shortcuts, return True if the key was consumed
754        key_action = get_shortcut(self.keymap, ev)
755        if key_action is None:
756            sequences = get_shortcut(get_options().sequence_map, ev)
757            if sequences and not isinstance(sequences, KeyAction):
758                self.pending_sequences = sequences
759                set_in_sequence_mode(True)
760                return True
761            if self.global_shortcuts_map and get_shortcut(self.global_shortcuts_map, ev):
762                return True
763        elif isinstance(key_action, KeyAction):
764            return self.dispatch_action(key_action)
765
766    def process_sequence(self, ev: KeyEvent) -> None:
767        if not self.pending_sequences:
768            set_in_sequence_mode(False)
769            return
770
771        remaining = {}
772        matched_action = None
773        for seq, key_action in self.pending_sequences.items():
774            if shortcut_matches(seq[0], ev):
775                seq = seq[1:]
776                if seq:
777                    remaining[seq] = key_action
778                else:
779                    matched_action = key_action
780
781        if remaining:
782            self.pending_sequences = remaining
783        else:
784            self.pending_sequences = None
785            set_in_sequence_mode(False)
786            if matched_action is not None:
787                self.dispatch_action(matched_action)
788
789    @ac('win', '''
790        Resize the active window interactively
791
792        See :ref:`window_resizing` for details.
793        ''')
794    def start_resizing_window(self) -> None:
795        w = self.active_window
796        if w is None:
797            return
798        overlay_window = self._run_kitten('resize_window', args=[
799            '--horizontal-increment={}'.format(get_options().window_resize_step_cells),
800            '--vertical-increment={}'.format(get_options().window_resize_step_lines)
801        ])
802        if overlay_window is not None:
803            overlay_window.allow_remote_control = True
804
805    def resize_layout_window(self, window: Window, increment: float, is_horizontal: bool, reset: bool = False) -> Union[bool, None, str]:
806        tab = window.tabref()
807        if tab is None or not increment:
808            return False
809        if reset:
810            tab.reset_window_sizes()
811            return None
812        return tab.resize_window_by(window.id, increment, is_horizontal)
813
814    def resize_os_window(self, os_window_id: int, width: int, height: int, unit: str, incremental: bool = False) -> None:
815        if not incremental and (width < 0 or height < 0):
816            return
817        metrics = get_os_window_size(os_window_id)
818        if metrics is None:
819            return
820        has_window_scaling = is_macos or is_wayland()
821        w, h = get_new_os_window_size(metrics, width, height, unit, incremental, has_window_scaling)
822        set_os_window_size(os_window_id, w, h)
823
824    def default_bg_changed_for(self, window_id: int) -> None:
825        w = self.window_id_map.get(window_id)
826        if w is not None:
827            tm = self.os_window_map.get(w.os_window_id)
828            if tm is not None:
829                tm.update_tab_bar_data()
830                tm.mark_tab_bar_dirty()
831                t = tm.tab_for_id(w.tab_id)
832                if t is not None:
833                    t.relayout_borders()
834
835    def dispatch_action(
836        self,
837        key_action: KeyAction,
838        window_for_dispatch: Optional[Window] = None,
839        dispatch_type: str = 'KeyPress'
840    ) -> bool:
841
842        def report_match(f: Callable) -> None:
843            if self.args.debug_keyboard:
844                prefix = '\n' if dispatch_type == 'KeyPress' else ''
845                print(f'{prefix}\x1b[35m{dispatch_type}\x1b[m matched action:', func_name(f), flush=True)
846
847        if key_action is not None:
848            f = getattr(self, key_action.func, None)
849            if f is not None:
850                report_match(f)
851                passthrough = f(*key_action.args)
852                if passthrough is not True:
853                    return True
854        if window_for_dispatch is None:
855            tab = self.active_tab
856            window = self.active_window
857        else:
858            window = window_for_dispatch
859            tab = window.tabref()
860        if tab is None or window is None:
861            return False
862        if key_action is not None:
863            f = getattr(tab, key_action.func, getattr(window, key_action.func, None))
864            if f is not None:
865                passthrough = f(*key_action.args)
866                report_match(f)
867                if passthrough is not True:
868                    return True
869        return False
870
871    @ac('misc', '''
872        Combine multiple actions and map to a single keypress
873
874        The syntax is::
875
876            map key combine <separator> action1 <separator> action2 <separator> action3 ...
877
878        For example::
879
880            map kitty_mod+e combine : new_window : next_layout
881        ''')
882    def combine(self, *actions: KeyAction) -> None:
883        for key_action in actions:
884            self.dispatch_action(key_action)
885
886    def on_focus(self, os_window_id: int, focused: bool) -> None:
887        tm = self.os_window_map.get(os_window_id)
888        if tm is not None:
889            w = tm.active_window
890            if w is not None:
891                w.focus_changed(focused)
892                if is_macos and focused:
893                    cocoa_set_menubar_title(w.title or '')
894            tm.mark_tab_bar_dirty()
895
896    def on_activity_since_last_focus(self, window: Window) -> None:
897        os_window_id = window.os_window_id
898        tm = self.os_window_map.get(os_window_id)
899        if tm is not None:
900            tm.mark_tab_bar_dirty()
901
902    def update_tab_bar_data(self, os_window_id: int) -> None:
903        tm = self.os_window_map.get(os_window_id)
904        if tm is not None:
905            tm.update_tab_bar_data()
906
907    def on_drop(self, os_window_id: int, mime: str, data: bytes) -> None:
908        tm = self.os_window_map.get(os_window_id)
909        if tm is not None:
910            w = tm.active_window
911            if w is not None:
912                text = data.decode('utf-8', 'replace')
913                if mime == 'text/uri-list':
914                    text = '\n'.join(parse_uri_list(text))
915                w.paste(text)
916
917    @ac('win', 'Close the currently active OS Window')
918    def close_os_window(self) -> None:
919        tm = self.active_tab_manager
920        if tm is not None:
921            self.confirm_os_window_close(tm.os_window_id)
922
923    def confirm_os_window_close(self, os_window_id: int) -> None:
924        tm = self.os_window_map.get(os_window_id)
925        needs_confirmation = tm is not None and get_options().confirm_os_window_close > 0 and tm.number_of_windows >= get_options().confirm_os_window_close
926        if not needs_confirmation:
927            mark_os_window_for_close(os_window_id)
928            return
929        if tm is not None:
930            w = tm.active_window
931            self._run_kitten('ask', ['--type=yesno', '--message', _(
932                'Are you sure you want to close this OS window, it has {}'
933                ' windows running?').format(tm.number_of_windows)],
934                window=w,
935                custom_callback=partial(self.handle_close_os_window_confirmation, os_window_id)
936            )
937
938    def handle_close_os_window_confirmation(self, os_window_id: int, data: Dict[str, Any], *a: Any) -> None:
939        if data['response'] == 'y':
940            mark_os_window_for_close(os_window_id)
941        else:
942            mark_os_window_for_close(os_window_id, NO_CLOSE_REQUESTED)
943
944    def on_os_window_closed(self, os_window_id: int, viewport_width: int, viewport_height: int) -> None:
945        self.cached_values['window-size'] = viewport_width, viewport_height
946        tm = self.os_window_map.pop(os_window_id, None)
947        if tm is not None:
948            tm.destroy()
949        for window_id in tuple(w.id for w in self.window_id_map.values() if getattr(w, 'os_window_id', None) == os_window_id):
950            self.window_id_map.pop(window_id, None)
951        if not self.os_window_map and is_macos:
952            cocoa_set_menubar_title('')
953        action = self.os_window_death_actions.pop(os_window_id, None)
954        if action is not None:
955            action()
956
957    @ac('win', 'Quit, closing all windows')
958    def quit(self, *args: Any) -> None:
959        tm = self.active_tab
960        num = 0
961        for q in self.os_window_map.values():
962            num += q.number_of_windows
963        needs_confirmation = tm is not None and get_options().confirm_os_window_close > 0 and num >= get_options().confirm_os_window_close
964        if not needs_confirmation:
965            set_application_quit_request(IMPERATIVE_CLOSE_REQUESTED)
966            return
967        if current_application_quit_request() == CLOSE_BEING_CONFIRMED:
968            return
969        assert tm is not None
970        self._run_kitten('ask', ['--type=yesno', '--message', _(
971            'Are you sure you want to quit kitty, it has {} windows running?').format(num)],
972            window=tm.active_window,
973            custom_callback=self.handle_quit_confirmation
974        )
975        set_application_quit_request(CLOSE_BEING_CONFIRMED)
976
977    def handle_quit_confirmation(self, data: Dict[str, Any], *a: Any) -> None:
978        set_application_quit_request(IMPERATIVE_CLOSE_REQUESTED if data['response'] == 'y' else NO_CLOSE_REQUESTED)
979
980    def notify_on_os_window_death(self, address: str) -> None:
981        import socket
982        s = socket.socket(family=socket.AF_UNIX)
983        with suppress(Exception):
984            s.connect(address)
985            s.sendall(b'c')
986            with suppress(OSError):
987                s.shutdown(socket.SHUT_RDWR)
988            s.close()
989
990    def display_scrollback(self, window: Window, data: Union[bytes, str], input_line_number: int = 0, title: str = '') -> None:
991        def prepare_arg(x: str) -> str:
992            x = x.replace('INPUT_LINE_NUMBER', str(input_line_number))
993            x = x.replace('CURSOR_LINE', str(window.screen.cursor.y + 1))
994            x = x.replace('CURSOR_COLUMN', str(window.screen.cursor.x + 1))
995            return x
996
997        cmd = list(map(prepare_arg, get_options().scrollback_pager))
998        if not os.path.isabs(cmd[0]):
999            import shutil
1000            exe = shutil.which(cmd[0])
1001            if not exe:
1002                env = read_shell_environment(get_options())
1003                if env and 'PATH' in env:
1004                    exe = shutil.which(cmd[0], path=env['PATH'])
1005                    if exe:
1006                        cmd[0] = exe
1007
1008        if os.path.basename(cmd[0]) == 'less':
1009            cmd.append('-+F')  # reset --quit-if-one-screen
1010        tab = self.active_tab
1011        if tab is not None:
1012            bdata = data.encode('utf-8') if isinstance(data, str) else data
1013            tab.new_special_window(
1014                SpecialWindow(cmd, bdata, title or _('History'), overlay_for=window.id, cwd=window.cwd_of_child),
1015                copy_colors_from=self.active_window
1016                )
1017
1018    @ac('misc', 'Edit the kitty.conf config file in your favorite text editor')
1019    def edit_config_file(self, *a: Any) -> None:
1020        confpath = prepare_config_file_for_editing()
1021        # On macOS vim fails to handle SIGWINCH if it occurs early, so add a
1022        # small delay.
1023        cmd = [kitty_exe(), '+runpy', 'import os, sys, time; time.sleep(0.05); os.execvp(sys.argv[1], sys.argv[1:])'] + get_editor(get_options()) + [confpath]
1024        self.new_os_window(*cmd)
1025
1026    def get_output(self, source_window: Window, num_lines: Optional[int] = 1) -> str:
1027        output = ''
1028        s = source_window.screen
1029        if num_lines is None:
1030            num_lines = s.lines
1031        for i in range(min(num_lines, s.lines)):
1032            output += str(s.linebuf.line(i))
1033        return output
1034
1035    def _run_kitten(
1036        self,
1037        kitten: str,
1038        args: Iterable[str] = (),
1039        input_data: Optional[Union[bytes, str]] = None,
1040        window: Optional[Window] = None,
1041        custom_callback: Optional[Callable] = None,
1042        action_on_removal: Optional[Callable] = None
1043    ) -> Any:
1044        orig_args, args = list(args), list(args)
1045        from kittens.runner import create_kitten_handler
1046        end_kitten = create_kitten_handler(kitten, orig_args)
1047        if window is None:
1048            w = self.active_window
1049            tab = self.active_tab
1050        else:
1051            w = window
1052            tab = w.tabref() if w else None
1053        if end_kitten.no_ui:
1054            return end_kitten(None, getattr(w, 'id', None), self)
1055
1056        if w is not None and tab is not None:
1057            args[0:0] = [config_dir, kitten]
1058            if input_data is None:
1059                type_of_input = end_kitten.type_of_input
1060                if type_of_input in ('text', 'history', 'ansi', 'ansi-history', 'screen', 'screen-history', 'screen-ansi', 'screen-ansi-history'):
1061                    data: Optional[bytes] = w.as_text(
1062                            as_ansi='ansi' in type_of_input,
1063                            add_history='history' in type_of_input,
1064                            add_wrap_markers='screen' in type_of_input
1065                    ).encode('utf-8')
1066                elif type_of_input == 'selection':
1067                    sel = self.data_for_at(which='@selection', window=w)
1068                    data = sel.encode('utf-8') if sel else None
1069                elif type_of_input is None:
1070                    data = None
1071                else:
1072                    raise ValueError('Unknown type_of_input: {}'.format(type_of_input))
1073            else:
1074                data = input_data if isinstance(input_data, bytes) else input_data.encode('utf-8')
1075            copts = common_opts_as_dict(get_options())
1076            final_args: List[str] = []
1077            for x in args:
1078                if x == '@selection':
1079                    sel = self.data_for_at(which='@selection', window=w)
1080                    if sel:
1081                        x = sel
1082                final_args.append(x)
1083            overlay_window = tab.new_special_window(
1084                SpecialWindow(
1085                    [kitty_exe(), '+runpy', 'from kittens.runner import main; main()'] + final_args,
1086                    stdin=data,
1087                    env={
1088                        'KITTY_COMMON_OPTS': json.dumps(copts),
1089                        'KITTY_CHILD_PID': str(w.child.pid),
1090                        'PYTHONWARNINGS': 'ignore',
1091                        'OVERLAID_WINDOW_LINES': str(w.screen.lines),
1092                        'OVERLAID_WINDOW_COLS': str(w.screen.columns),
1093                    },
1094                    cwd=w.cwd_of_child,
1095                    overlay_for=w.id
1096                ),
1097                copy_colors_from=w
1098            )
1099            wid = w.id
1100            overlay_window.action_on_close = partial(self.on_kitten_finish, wid, custom_callback or end_kitten)
1101            if action_on_removal is not None:
1102
1103                def callback_wrapper(*a: Any) -> None:
1104                    if action_on_removal is not None:
1105                        action_on_removal(wid, self)
1106                overlay_window.action_on_removal = callback_wrapper
1107            return overlay_window
1108
1109    @ac('misc', 'Run the specified kitten. See :doc:`/kittens/custom` for details')
1110    def kitten(self, kitten: str, *args: str) -> None:
1111        import shlex
1112        cmdline = args[0] if args else ''
1113        kargs = shlex.split(cmdline) if cmdline else []
1114        self._run_kitten(kitten, kargs)
1115
1116    def run_kitten(self, kitten: str, *args: str) -> None:
1117        self._run_kitten(kitten, args)
1118
1119    def on_kitten_finish(self, target_window_id: str, end_kitten: Callable, source_window: Window) -> None:
1120        output = self.get_output(source_window, num_lines=None)
1121        from kittens.runner import deserialize
1122        data = deserialize(output)
1123        if data is not None:
1124            end_kitten(data, target_window_id, self)
1125
1126    @ac('misc', 'Input an arbitrary unicode character. See :doc:`/kittens/unicode-input` for details.')
1127    def input_unicode_character(self) -> None:
1128        self._run_kitten('unicode_input')
1129
1130    @ac('tab', 'Change the title of the active tab')
1131    def set_tab_title(self) -> None:
1132        tab = self.active_tab
1133        if tab:
1134            args = ['--name=tab-title', '--message', _('Enter the new title for this tab below.'), 'do_set_tab_title', str(tab.id)]
1135            self._run_kitten('ask', args)
1136
1137    def do_set_tab_title(self, title: str, tab_id: int) -> None:
1138        tm = self.active_tab_manager
1139        if tm is not None and title:
1140            tab_id = int(tab_id)
1141            for tab in tm.tabs:
1142                if tab.id == tab_id:
1143                    tab.set_title(title)
1144                    break
1145
1146    def show_error(self, title: str, msg: str) -> None:
1147        self._run_kitten('show_error', args=['--title', title], input_data=msg)
1148
1149    @ac('mk', 'Create a new marker')
1150    def create_marker(self) -> None:
1151        w = self.active_window
1152        if w:
1153            spec = None
1154
1155            def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None:
1156                nonlocal spec
1157                spec = data['response']
1158
1159            def done2(target_window_id: int, self: Boss) -> None:
1160                w = self.window_id_map.get(target_window_id)
1161                if w is not None and spec:
1162                    try:
1163                        w.set_marker(spec)
1164                    except Exception as err:
1165                        self.show_error(_('Invalid marker specification'), str(err))
1166
1167            self._run_kitten('ask', [
1168                '--name=create-marker', '--message',
1169                _('Create marker, for example:\ntext 1 ERROR\nSee {}\n').format(website_url('marks'))
1170                ],
1171                custom_callback=done, action_on_removal=done2)
1172
1173    @ac('misc', 'Run the kitty shell to control kitty with commands')
1174    def kitty_shell(self, window_type: str = 'window') -> None:
1175        kw: Dict[str, Any] = {}
1176        cmd = [kitty_exe(), '@']
1177        aw = self.active_window
1178        if aw is not None:
1179            kw['env'] = {'KITTY_SHELL_ACTIVE_WINDOW_ID': str(aw.id)}
1180        if window_type == 'tab':
1181            tab = self._new_tab(SpecialWindow(cmd, **kw))
1182            if tab is not None:
1183                for w in tab:
1184                    w.allow_remote_control = True
1185        elif window_type == 'os_window':
1186            os_window_id = self._new_os_window(SpecialWindow(cmd, **kw))
1187            for tab in self.os_window_map[os_window_id]:
1188                for w in tab:
1189                    w.allow_remote_control = True
1190        elif window_type == 'overlay':
1191            tab = self.active_tab
1192            if aw is not None and tab is not None:
1193                kw['overlay_for'] = aw.id
1194                tab.new_special_window(SpecialWindow(cmd, **kw), allow_remote_control=True)
1195        else:
1196            tab = self.active_tab
1197            if tab is not None:
1198                tab.new_special_window(SpecialWindow(cmd, **kw), allow_remote_control=True)
1199
1200    def switch_focus_to(self, window_id: int) -> None:
1201        tab = self.active_tab
1202        if tab:
1203            tab.set_active_window(window_id)
1204
1205    def open_url(self, url: str, program: Optional[Union[str, List[str]]] = None, cwd: Optional[str] = None) -> None:
1206        if not url:
1207            return
1208        if isinstance(program, str):
1209            program = to_cmdline(program)
1210        found_action = False
1211        if program is None:
1212            from .open_actions import actions_for_url
1213            actions = list(actions_for_url(url))
1214            if actions:
1215                found_action = True
1216                self.dispatch_action(actions.pop(0))
1217                if actions:
1218                    self.drain_actions(actions)
1219        if not found_action:
1220            open_url(url, program or get_options().open_url_with, cwd=cwd)
1221
1222    @ac('misc', 'Click a URL using the keyboard')
1223    def open_url_with_hints(self) -> None:
1224        self._run_kitten('hints')
1225
1226    def drain_actions(self, actions: List) -> None:
1227
1228        def callback(timer_id: Optional[int]) -> None:
1229            self.dispatch_action(actions.pop(0))
1230            if actions:
1231                self.drain_actions(actions)
1232        add_timer(callback, 0, False)
1233
1234    def destroy(self) -> None:
1235        self.shutting_down = True
1236        self.child_monitor.shutdown_monitor()
1237        self.set_update_check_process()
1238        self.update_check_process = None
1239        del self.child_monitor
1240        for tm in self.os_window_map.values():
1241            tm.destroy()
1242        self.os_window_map = {}
1243        destroy_global_data()
1244
1245    def paste_to_active_window(self, text: str) -> None:
1246        if text:
1247            w = self.active_window
1248            if w is not None:
1249                w.paste(text)
1250
1251    @ac('cp', 'Paste from the clipboard to the active window')
1252    def paste_from_clipboard(self) -> None:
1253        text = get_clipboard_string()
1254        self.paste_to_active_window(text)
1255
1256    def current_primary_selection(self) -> str:
1257        return get_primary_selection() if supports_primary_selection else ''
1258
1259    def current_primary_selection_or_clipboard(self) -> str:
1260        return get_primary_selection() if supports_primary_selection else get_clipboard_string()
1261
1262    @ac('cp', 'Paste from the clipboard to the active window')
1263    def paste_from_selection(self) -> None:
1264        text = self.current_primary_selection_or_clipboard()
1265        self.paste_to_active_window(text)
1266
1267    def set_primary_selection(self) -> None:
1268        w = self.active_window
1269        if w is not None and not w.destroyed:
1270            text = w.text_for_selection()
1271            if text:
1272                set_primary_selection(text)
1273                if get_options().copy_on_select:
1274                    self.copy_to_buffer(get_options().copy_on_select)
1275
1276    @ac('cp', '''
1277        Copy the selection from the active window to the specified buffer
1278
1279        See :ref:`cpbuf` for details.
1280        ''')
1281    def copy_to_buffer(self, buffer_name: str) -> None:
1282        w = self.active_window
1283        if w is not None and not w.destroyed:
1284            text = w.text_for_selection()
1285            if text:
1286                if buffer_name == 'clipboard':
1287                    set_clipboard_string(text)
1288                elif buffer_name == 'primary':
1289                    set_primary_selection(text)
1290                else:
1291                    self.clipboard_buffers[buffer_name] = text
1292
1293    @ac('cp', '''
1294        Paste from the specified buffer to the active window
1295
1296        See :ref:`cpbuf` for details.
1297        ''')
1298    def paste_from_buffer(self, buffer_name: str) -> None:
1299        if buffer_name == 'clipboard':
1300            text: Optional[str] = get_clipboard_string()
1301        elif buffer_name == 'primary':
1302            text = get_primary_selection()
1303        else:
1304            text = self.clipboard_buffers.get(buffer_name)
1305        if text:
1306            self.paste_to_active_window(text)
1307
1308    @ac('tab', '''
1309        Go to the specified tab, by number, starting with 1
1310
1311        Zero and negative numbers go to previously active tabs
1312        ''')
1313    def goto_tab(self, tab_num: int) -> None:
1314        tm = self.active_tab_manager
1315        if tm is not None:
1316            tm.goto_tab(tab_num - 1)
1317
1318    def set_active_tab(self, tab: Tab) -> bool:
1319        tm = self.active_tab_manager
1320        if tm is not None:
1321            return tm.set_active_tab(tab)
1322        return False
1323
1324    @ac('tab', 'Make the next tab active')
1325    def next_tab(self) -> None:
1326        tm = self.active_tab_manager
1327        if tm is not None:
1328            tm.next_tab()
1329
1330    @ac('tab', 'Make the previous tab active')
1331    def previous_tab(self) -> None:
1332        tm = self.active_tab_manager
1333        if tm is not None:
1334            tm.next_tab(-1)
1335
1336    prev_tab = previous_tab
1337
1338    def process_stdin_source(
1339        self, window: Optional[Window] = None,
1340        stdin: Optional[str] = None, copy_pipe_data: Optional[Dict] = None
1341    ) -> Tuple[Optional[Dict[str, str]], Optional[bytes]]:
1342        w = window or self.active_window
1343        if not w:
1344            return None, None
1345        env = None
1346        input_data = None
1347        if stdin:
1348            add_wrap_markers = stdin.endswith('_wrap')
1349            if add_wrap_markers:
1350                stdin = stdin[:-len('_wrap')]
1351            stdin = data_for_at(w, stdin, add_wrap_markers=add_wrap_markers)
1352            if stdin is not None:
1353                pipe_data = w.pipe_data(stdin, has_wrap_markers=add_wrap_markers) if w else None
1354                if pipe_data:
1355                    if copy_pipe_data is not None:
1356                        copy_pipe_data.update(pipe_data)
1357                    env = {
1358                        'KITTY_PIPE_DATA':
1359                        '{scrolled_by}:{cursor_x},{cursor_y}:{lines},{columns}'.format(**pipe_data)
1360                    }
1361                input_data = stdin.encode('utf-8')
1362        return env, input_data
1363
1364    def data_for_at(self, which: str, window: Optional[Window] = None, add_wrap_markers: bool = False) -> Optional[str]:
1365        window = window or self.active_window
1366        if not window:
1367            return None
1368        return data_for_at(window, which, add_wrap_markers=add_wrap_markers)
1369
1370    def special_window_for_cmd(
1371        self, cmd: List[str],
1372        window: Optional[Window] = None,
1373        stdin: Optional[str] = None,
1374        cwd_from: Optional[int] = None,
1375        as_overlay: bool = False
1376    ) -> SpecialWindowInstance:
1377        w = window or self.active_window
1378        env, input_data = self.process_stdin_source(w, stdin)
1379        cmdline = []
1380        for arg in cmd:
1381            if arg == '@selection' and w:
1382                q = data_for_at(w, arg)
1383                if not q:
1384                    continue
1385                arg = q
1386            cmdline.append(arg)
1387        overlay_for = w.id if w and as_overlay else None
1388        return SpecialWindow(cmd, input_data, cwd_from=cwd_from, overlay_for=overlay_for, env=env)
1389
1390    def run_background_process(
1391        self,
1392        cmd: List[str],
1393        cwd: Optional[str] = None,
1394        env: Optional[Dict[str, str]] = None,
1395        stdin: Optional[bytes] = None,
1396        cwd_from: Optional[int] = None
1397    ) -> None:
1398        import subprocess
1399        env = env or None
1400        if env:
1401            env_ = default_env().copy()
1402            env_.update(env)
1403            env = env_
1404        if cwd_from:
1405            with suppress(Exception):
1406                cwd = cwd_of_process(cwd_from)
1407
1408        if stdin:
1409            r, w = safe_pipe(False)
1410            try:
1411                subprocess.Popen(cmd, env=env, stdin=r, cwd=cwd)
1412            except Exception:
1413                os.close(w)
1414            else:
1415                thread_write(w, stdin)
1416            finally:
1417                os.close(r)
1418        else:
1419            subprocess.Popen(cmd, env=env, cwd=cwd)
1420
1421    def pipe(self, source: str, dest: str, exe: str, *args: str) -> Window:
1422        cmd = [exe] + list(args)
1423        window = self.active_window
1424        cwd_from = window.child.pid_for_cwd if window else None
1425
1426        def create_window() -> SpecialWindowInstance:
1427            return self.special_window_for_cmd(
1428                cmd, stdin=source, as_overlay=dest == 'overlay', cwd_from=cwd_from)
1429
1430        if dest == 'overlay' or dest == 'window':
1431            tab = self.active_tab
1432            if tab is not None:
1433                return tab.new_special_window(create_window())
1434        elif dest == 'tab':
1435            tm = self.active_tab_manager
1436            if tm is not None:
1437                tm.new_tab(special_window=create_window(), cwd_from=cwd_from)
1438        elif dest == 'os_window':
1439            self._new_os_window(create_window(), cwd_from=cwd_from)
1440        elif dest in ('clipboard', 'primary'):
1441            env, stdin = self.process_stdin_source(stdin=source, window=window)
1442            if stdin:
1443                if dest == 'clipboard':
1444                    set_clipboard_string(stdin)
1445                else:
1446                    set_primary_selection(stdin)
1447        else:
1448            env, stdin = self.process_stdin_source(stdin=source, window=window)
1449            self.run_background_process(cmd, cwd_from=cwd_from, stdin=stdin, env=env)
1450
1451    def args_to_special_window(self, args: Iterable[str], cwd_from: Optional[int] = None) -> SpecialWindowInstance:
1452        args = list(args)
1453        stdin = None
1454        w = self.active_window
1455
1456        if args[0].startswith('@') and args[0] != '@':
1457            q = data_for_at(w, args[0]) or None
1458            if q is not None:
1459                stdin = q.encode('utf-8')
1460            del args[0]
1461
1462        cmd = []
1463        for arg in args:
1464            if arg == '@selection':
1465                q = data_for_at(w, arg)
1466                if not q:
1467                    continue
1468                arg = q
1469            cmd.append(arg)
1470        return SpecialWindow(cmd, stdin, cwd_from=cwd_from)
1471
1472    def _new_tab(self, args: Union[SpecialWindowInstance, Iterable[str]], cwd_from: Optional[int] = None, as_neighbor: bool = False) -> Optional[Tab]:
1473        special_window = None
1474        if args:
1475            if isinstance(args, SpecialWindowInstance):
1476                special_window = args
1477            else:
1478                special_window = self.args_to_special_window(args, cwd_from=cwd_from)
1479        tm = self.active_tab_manager
1480        if tm is not None:
1481            return tm.new_tab(special_window=special_window, cwd_from=cwd_from, as_neighbor=as_neighbor)
1482
1483    def _create_tab(self, args: List[str], cwd_from: Optional[int] = None) -> None:
1484        as_neighbor = False
1485        if args and args[0].startswith('!'):
1486            as_neighbor = 'neighbor' in args[0][1:].split(',')
1487            args = args[1:]
1488        self._new_tab(args, as_neighbor=as_neighbor, cwd_from=cwd_from)
1489
1490    @ac('tab', 'Create a new tab')
1491    def new_tab(self, *args: str) -> None:
1492        self._create_tab(list(args))
1493
1494    @ac('tab', 'Create a new tab with working directory for the window in it set to the same as the active window')
1495    def new_tab_with_cwd(self, *args: str) -> None:
1496        w = self.active_window_for_cwd
1497        cwd_from = w.child.pid_for_cwd if w is not None else None
1498        self._create_tab(list(args), cwd_from=cwd_from)
1499
1500    def new_tab_with_wd(self, wd: str) -> None:
1501        special_window = SpecialWindow(None, cwd=wd)
1502        self._new_tab(special_window)
1503
1504    def _new_window(self, args: List[str], cwd_from: Optional[int] = None) -> Optional[Window]:
1505        tab = self.active_tab
1506        if tab is not None:
1507            allow_remote_control = False
1508            location = None
1509            if args and args[0].startswith('!'):
1510                location = args[0][1:].lower()
1511                args = args[1:]
1512            if args and args[0] == '@':
1513                args = args[1:]
1514                allow_remote_control = True
1515            if args:
1516                return tab.new_special_window(
1517                    self.args_to_special_window(args, cwd_from=cwd_from),
1518                    location=location, allow_remote_control=allow_remote_control)
1519            else:
1520                return tab.new_window(cwd_from=cwd_from, location=location, allow_remote_control=allow_remote_control)
1521
1522    @ac('win', 'Create a new window')
1523    def new_window(self, *args: str) -> None:
1524        self._new_window(list(args))
1525
1526    @ac('win', 'Create a new window with working directory same as that of the active window')
1527    def new_window_with_cwd(self, *args: str) -> None:
1528        w = self.active_window_for_cwd
1529        if w is None:
1530            return self.new_window(*args)
1531        cwd_from = w.child.pid_for_cwd
1532        self._new_window(list(args), cwd_from=cwd_from)
1533
1534    @ac('misc', '''
1535        Launch the specified program in a new window/tab/etc.
1536
1537        See :doc:`launch` for details
1538        ''')
1539    def launch(self, *args: str) -> None:
1540        from kitty.launch import launch, parse_launch_args
1541        opts, args_ = parse_launch_args(args)
1542        launch(self, opts, args_)
1543
1544    @ac('tab', 'Move the active tab forward')
1545    def move_tab_forward(self) -> None:
1546        tm = self.active_tab_manager
1547        if tm is not None:
1548            tm.move_tab(1)
1549
1550    @ac('tab', 'Move the active tab backward')
1551    def move_tab_backward(self) -> None:
1552        tm = self.active_tab_manager
1553        if tm is not None:
1554            tm.move_tab(-1)
1555
1556    @ac('misc', '''
1557        Turn on/off ligatures in the specified window
1558
1559        See :opt:`disable_ligatures` for details
1560        ''')
1561    def disable_ligatures_in(self, where: Union[str, Iterable[Window]], strategy: int) -> None:
1562        if isinstance(where, str):
1563            windows: List[Window] = []
1564            if where == 'active':
1565                if self.active_window is not None:
1566                    windows = [self.active_window]
1567            elif where == 'all':
1568                windows = list(self.all_windows)
1569            elif where == 'tab':
1570                if self.active_tab is not None:
1571                    windows = list(self.active_tab)
1572        else:
1573            windows = list(where)
1574        for window in windows:
1575            window.screen.disable_ligatures = strategy
1576            window.refresh()
1577
1578    def patch_colors(self, spec: Dict[str, int], cursor_text_color: Union[bool, int, Color], configured: bool = False) -> None:
1579        opts = get_options()
1580        if configured:
1581            for k, v in spec.items():
1582                if hasattr(opts, k):
1583                    setattr(opts, k, color_from_int(v))
1584            if cursor_text_color is not False:
1585                if isinstance(cursor_text_color, int):
1586                    cursor_text_color = color_from_int(cursor_text_color)
1587                opts.cursor_text_color = cursor_text_color
1588        for tm in self.all_tab_managers:
1589            tm.tab_bar.patch_colors(spec)
1590        patch_global_colors(spec, configured)
1591
1592    def apply_new_options(self, opts: Options) -> None:
1593        from .fonts.box_drawing import set_scale
1594
1595        # Update options storage
1596        set_options(opts, is_wayland(), self.args.debug_rendering, self.args.debug_font_fallback)
1597        apply_options_update()
1598        set_layout_options(opts)
1599        set_default_env(opts.env.copy())
1600        # Update font data
1601        set_scale(opts.box_drawing_scale)
1602        from .fonts.render import set_font_family
1603        set_font_family(opts, debug_font_matching=self.args.debug_font_fallback)
1604        for os_window_id, tm in self.os_window_map.items():
1605            if tm is not None:
1606                os_window_font_size(os_window_id, opts.font_size, True)
1607                tm.resize()
1608        # Update key bindings
1609        self.update_keymap()
1610        # Update misc options
1611        for tm in self.all_tab_managers:
1612            tm.apply_options()
1613        # Update colors
1614        for w in self.all_windows:
1615            self.default_bg_changed_for(w.id)
1616            w.refresh()
1617
1618    @ac('misc', '''
1619        Reload the config file
1620
1621        If mapped without arguments reloads the default config file, otherwise loads
1622        the specified config files, in order. Loading a config file *replaces* all
1623        config options. For example::
1624
1625            map f5 load_config_file /path/to/some/kitty.conf
1626        ''')
1627    def load_config_file(self, *paths: str, apply_overrides: bool = True) -> None:
1628        from .config import load_config
1629        old_opts = get_options()
1630        paths = paths or old_opts.config_paths
1631        bad_lines: List[BadLine] = []
1632        opts = load_config(*paths, overrides=old_opts.config_overrides if apply_overrides else None, accumulate_bad_lines=bad_lines)
1633        if bad_lines:
1634            self.show_bad_config_lines(bad_lines)
1635        self.apply_new_options(opts)
1636
1637    def safe_delete_temp_file(self, path: str) -> None:
1638        if is_path_in_temp_dir(path):
1639            with suppress(FileNotFoundError):
1640                os.remove(path)
1641
1642    def set_update_check_process(self, process: Optional[PopenType] = None) -> None:
1643        if self.update_check_process is not None:
1644            with suppress(Exception):
1645                if self.update_check_process.poll() is None:
1646                    self.update_check_process.kill()
1647        self.update_check_process = process
1648
1649    def on_monitored_pid_death(self, pid: int, exit_status: int) -> None:
1650        update_check_process = self.update_check_process
1651        if update_check_process is not None and pid == update_check_process.pid:
1652            self.update_check_process = None
1653            from .update_check import process_current_release
1654            try:
1655                assert update_check_process.stdout is not None
1656                raw = update_check_process.stdout.read().decode('utf-8')
1657            except Exception as e:
1658                log_error('Failed to read data from update check process, with error: {}'.format(e))
1659            else:
1660                try:
1661                    process_current_release(raw)
1662                except Exception as e:
1663                    log_error('Failed to process update check data {!r}, with error: {}'.format(raw, e))
1664
1665    def dbus_notification_callback(self, activated: bool, a: int, b: Union[int, str]) -> None:
1666        from .notify import (
1667            dbus_notification_activated, dbus_notification_created
1668        )
1669        if activated:
1670            assert isinstance(b, str)
1671            dbus_notification_activated(a, b)
1672        else:
1673            assert isinstance(b, int)
1674            dbus_notification_created(a, b)
1675
1676    def show_bad_config_lines(self, bad_lines: Iterable[BadLine]) -> None:
1677
1678        def format_bad_line(bad_line: BadLine) -> str:
1679            return '{}:{} in line: {}\n'.format(bad_line.number, bad_line.exception, bad_line.line)
1680
1681        msg = '\n'.join(map(format_bad_line, bad_lines)).rstrip()
1682        self.show_error(_('Errors in kitty.conf'), msg)
1683
1684    @ac('misc', '''
1685        Change colors in the specified windows
1686
1687        For details, see :ref:`at_set-colors`. For example::
1688
1689            map f5 set_colors --configured /path/to/some/config/file/colors.conf
1690        ''')
1691    def set_colors(self, *args: str) -> None:
1692        from kitty.rc.base import (
1693            PayloadGetter, command_for_name, parse_subcommand_cli
1694        )
1695        from kitty.remote_control import parse_rc_args
1696        c = command_for_name('set_colors')
1697        opts, items = parse_subcommand_cli(c, ['set-colors'] + list(args))
1698        payload = c.message_to_kitty(parse_rc_args([])[0], opts, items)
1699        c.response_from_kitty(self, self.active_window, PayloadGetter(c, payload if isinstance(payload, dict) else {}))
1700
1701    def _move_window_to(
1702        self,
1703        window: Optional[Window] = None,
1704        target_tab_id: Optional[Union[str, int]] = None,
1705        target_os_window_id: Optional[Union[str, int]] = None
1706    ) -> None:
1707        window = window or self.active_window
1708        if not window:
1709            return
1710        src_tab = self.tab_for_window(window)
1711        if src_tab is None:
1712            return
1713        if target_os_window_id == 'new':
1714            target_os_window_id = self.add_os_window()
1715            tm = self.os_window_map[target_os_window_id]
1716            target_tab = tm.new_tab(empty_tab=True)
1717        else:
1718            target_os_window_id = target_os_window_id or current_os_window()
1719            if isinstance(target_tab_id, str):
1720                if not isinstance(target_os_window_id, int):
1721                    q = self.active_tab_manager
1722                    assert q is not None
1723                    tm = q
1724                else:
1725                    tm = self.os_window_map[target_os_window_id]
1726                if target_tab_id == 'new':
1727                    target_tab = tm.new_tab(empty_tab=True)
1728                else:
1729                    target_tab = tm.tab_at_location(target_tab_id) or tm.new_tab(empty_tab=True)
1730            else:
1731                for tab in self.all_tabs:
1732                    if tab.id == target_tab_id:
1733                        target_tab = tab
1734                        target_os_window_id = tab.os_window_id
1735                        break
1736                else:
1737                    return
1738
1739        for detached_window in src_tab.detach_window(window):
1740            target_tab.attach_window(detached_window)
1741        self._cleanup_tab_after_window_removal(src_tab)
1742        target_tab.make_active()
1743
1744    def _move_tab_to(self, tab: Optional[Tab] = None, target_os_window_id: Optional[int] = None) -> None:
1745        tab = tab or self.active_tab
1746        if tab is None:
1747            return
1748        if target_os_window_id is None:
1749            target_os_window_id = self.add_os_window()
1750        tm = self.os_window_map[target_os_window_id]
1751        target_tab = tm.new_tab(empty_tab=True)
1752        target_tab.take_over_from(tab)
1753        self._cleanup_tab_after_window_removal(tab)
1754        target_tab.make_active()
1755
1756    @ac('tab', 'Interactively select a tab to switch to')
1757    def select_tab(self) -> None:
1758        title = 'Choose a tab to switch to'
1759        lines = [title, '']
1760        fmt = ': {1}'
1761        tab_id_map: Dict[int, Optional[Union[str, int]]] = {}
1762        current_tab = self.active_tab
1763        done_tab_id: Optional[Union[str, int]] = None
1764
1765        for i, tab in enumerate(self.all_tabs):
1766            if tab is not current_tab:
1767                tab_id_map[len(tab_id_map)] = tab.id
1768                lines.append(fmt.format(i + 1, tab.title))
1769
1770        def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None:
1771            nonlocal done_tab_id
1772            done_tab_id = tab_id_map[int(data['groupdicts'][0]['index'])]
1773
1774        def done2(target_window_id: int, self: Boss) -> None:
1775            tab_id = done_tab_id
1776            if tab_id is not None:
1777                for i, tab in enumerate(self.all_tabs):
1778                    if tab.id == tab_id:
1779                        self.set_active_tab(tab)
1780                        break
1781
1782        self._run_kitten(
1783            'hints', args=(
1784                '--ascending', '--customize-processing=::import::kitty.choose_entry',
1785                r'--regex=(?m)^:\s+.+$', '--window-title', title,
1786            ), input_data='\r\n'.join(lines).encode('utf-8'), custom_callback=done, action_on_removal=done2
1787        )
1788
1789    @ac('win', '''
1790        Detach a window, moving it to another tab or OS Window
1791
1792        See :ref:`detaching windows <detach_window>` for details.
1793        ''')
1794    def detach_window(self, *args: str) -> None:
1795        if not args or args[0] == 'new':
1796            return self._move_window_to(target_os_window_id='new')
1797        if args[0] in ('new-tab', 'tab-prev', 'tab-left', 'tab-right'):
1798            where = 'new' if args[0] == 'new-tab' else args[0][4:]
1799            return self._move_window_to(target_tab_id=where)
1800        title = 'Choose a tab to move the window to'
1801        lines = [title, '']
1802        fmt = ': {1}'
1803        tab_id_map: Dict[int, Optional[Union[str, int]]] = {}
1804        current_tab = self.active_tab
1805        done_tab_id: Optional[Union[str, int]] = None
1806        done_called = False
1807
1808        for i, tab in enumerate(self.all_tabs):
1809            if tab is not current_tab:
1810                tab_id_map[len(tab_id_map)] = tab.id
1811                lines.append(fmt.format(i + 1, tab.title))
1812        new_idx = len(tab_id_map) + 1
1813        tab_id_map[new_idx - 1] = 'new'
1814        lines.append(fmt.format(new_idx, 'New tab'))
1815        new_idx = len(tab_id_map) + 1
1816        tab_id_map[new_idx - 1] = None
1817        lines.append(fmt.format(new_idx, 'New OS Window'))
1818
1819        def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None:
1820            nonlocal done_tab_id, done_called
1821            done_tab_id = tab_id_map[int(data['groupdicts'][0]['index'])]
1822            done_called = True
1823
1824        def done2(target_window_id: int, self: Boss) -> None:
1825            if not done_called:
1826                return
1827            tab_id = done_tab_id
1828            target_window = None
1829            for w in self.all_windows:
1830                if w.id == target_window_id:
1831                    target_window = w
1832                    break
1833            if tab_id is None:
1834                self._move_window_to(window=target_window, target_os_window_id='new')
1835            else:
1836                self._move_window_to(window=target_window, target_tab_id=tab_id)
1837
1838        self._run_kitten(
1839            'hints', args=(
1840                '--ascending', '--customize-processing=::import::kitty.choose_entry',
1841                r'--regex=(?m)^:\s+.+$', '--window-title', title,
1842            ), input_data='\r\n'.join(lines).encode('utf-8'), custom_callback=done, action_on_removal=done2
1843        )
1844
1845    @ac('tab', '''
1846        Detach a tab, moving it to another OS Window
1847
1848        See :ref:`detaching windows <detach_window>` for details.
1849        ''')
1850    def detach_tab(self, *args: str) -> None:
1851        if not args or args[0] == 'new':
1852            return self._move_tab_to()
1853
1854        title = 'Choose an OS window to move the tab to'
1855        lines = [title, '']
1856        fmt = ': {1}'
1857        os_window_id_map: Dict[int, Optional[int]] = {}
1858        current_os_window = getattr(self.active_tab, 'os_window_id', 0)
1859        done_osw: Optional[int] = None
1860        done_called = False
1861
1862        for i, osw in enumerate(self.os_window_map):
1863            tm = self.os_window_map[osw]
1864            if current_os_window != osw and tm.active_tab and tm.active_tab:
1865                os_window_id_map[len(os_window_id_map)] = osw
1866                lines.append(fmt.format(i + 1, tm.active_tab.title))
1867        new_idx = len(os_window_id_map) + 1
1868        os_window_id_map[new_idx - 1] = None
1869        lines.append(fmt.format(new_idx, 'New OS Window'))
1870
1871        def done(data: Dict[str, Any], target_window_id: int, self: Boss) -> None:
1872            nonlocal done_called, done_osw
1873            done_osw = os_window_id_map[int(data['groupdicts'][0]['index'])]
1874            done_called = True
1875
1876        def done2(target_window_id: int, self: Boss) -> None:
1877            if not done_called:
1878                return
1879            os_window_id = done_osw
1880            target_tab = self.active_tab
1881            for w in self.all_windows:
1882                if w.id == target_window_id:
1883                    target_tab = w.tabref()
1884                    break
1885            if target_tab and target_tab.os_window_id == os_window_id:
1886                return
1887            self._move_tab_to(tab=target_tab, target_os_window_id=os_window_id)
1888
1889        self._run_kitten(
1890            'hints', args=(
1891                '--ascending', '--customize-processing=::import::kitty.choose_entry',
1892                r'--regex=(?m)^:\s+.+$', '--window-title', title,
1893            ), input_data='\r\n'.join(lines).encode('utf-8'), custom_callback=done, action_on_removal=done2
1894        )
1895
1896    def set_background_image(self, path: Optional[str], os_windows: Tuple[int, ...], configured: bool, layout: Optional[str]) -> None:
1897        if layout:
1898            set_background_image(path, os_windows, configured, layout)
1899        else:
1900            set_background_image(path, os_windows, configured)
1901        for os_window_id in os_windows:
1902            self.default_bg_changed_for(os_window_id)
1903
1904    # Can be called with kitty -o "map f1 send_test_notification"
1905    def send_test_notification(self) -> None:
1906        from time import monotonic
1907
1908        from .notify import notify
1909        now = monotonic()
1910        ident = f'test-notify-{now}'
1911        notify(f'Test {now}', f'At: {now}', identifier=ident, subtitle=f'Test subtitle {now}')
1912
1913    def notification_activated(self, identifier: str, window_id: int, focus: bool, report: bool) -> None:
1914        w = self.window_id_map.get(window_id)
1915        if w is None:
1916            return
1917        if focus:
1918            self.set_active_window(w, switch_os_window_if_needed=True)
1919        if report:
1920            w.report_notification_activated(identifier)
1921
1922    @ac('misc', 'Show the environment variables that the kitty process sees')
1923    def show_kitty_env_vars(self) -> None:
1924        w = self.active_window
1925        if w:
1926            output = '\n'.join(f'{k}={v}' for k, v in os.environ.items())
1927            self.display_scrollback(w, output, title=_('Current kitty env vars'))
1928
1929    def open_file(self, path: str) -> None:
1930        if path == ":cocoa::application launched::":
1931            self.cocoa_application_launched = True
1932            return
1933
1934        def new_os_window() -> None:
1935            self.new_os_window(path)
1936
1937        if self.cocoa_application_launched or not self.os_window_map:
1938            return new_os_window()
1939        tab = self.active_tab
1940        if tab is None:
1941            return new_os_window()
1942        w = tab.active_window
1943        self.new_window(path)
1944        if w is not None:
1945            tab.remove_window(w)
1946
1947    @ac('misc', 'Show the effective configuration kitty is running with')
1948    def debug_config(self) -> None:
1949        from .debug_config import debug_config
1950        w = self.active_window
1951        if w is not None:
1952            output = debug_config(get_options())
1953            set_clipboard_string(re.sub(r'\x1b.+?m', '', output))
1954            output += '\n\x1b[35mThis debug output has been copied to the clipboard\x1b[m'
1955            self.display_scrollback(w, output, title=_('Current kitty options'))
1956
1957    @ac('misc', 'Discard this event completely ignoring it')
1958    def discard_event(self) -> None:
1959        pass
1960    mouse_discard_event = discard_event
1961