1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
4
5import asyncio
6import codecs
7import io
8import os
9import re
10import selectors
11import signal
12import sys
13import termios
14from contextlib import contextmanager
15from functools import partial
16from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional
17
18from kitty.constants import is_macos
19from kitty.fast_data_types import (
20    close_tty, normal_tty, open_tty, parse_input_from_terminal, raw_tty
21)
22from kitty.key_encoding import (
23    ALT, CTRL, PRESS, RELEASE, REPEAT, SHIFT, backspace_key, decode_key_event,
24    enter_key
25)
26from kitty.typing import ImageManagerType, KeyEventType, Protocol
27from kitty.utils import ScreenSizeGetter, screen_size_function, write_all
28
29from .handler import Handler
30from .operations import init_state, reset_state
31
32
33class BinaryWrite(Protocol):
34
35    def write(self, data: bytes) -> None:
36        pass
37
38    def flush(self) -> None:
39        pass
40
41
42def debug_write(*a: Any, **kw: Any) -> None:
43    from base64 import standard_b64encode
44    fobj = kw.pop('file', sys.stderr.buffer)
45    buf = io.StringIO()
46    kw['file'] = buf
47    print(*a, **kw)
48    stext = buf.getvalue()
49    for i in range(0, len(stext), 256):
50        chunk = stext[i:i + 256]
51        text = b'\x1bP@kitty-print|' + standard_b64encode(chunk.encode('utf-8')) + b'\x1b\\'
52        fobj.write(text)
53    fobj.flush()
54
55
56class Debug:
57
58    fobj: Optional[BinaryWrite] = None
59
60    def __call__(self, *a: Any, **kw: Any) -> None:
61        kw['file'] = self.fobj or sys.stdout.buffer
62        debug_write(*a, **kw)
63
64
65debug = Debug()
66
67
68class TermManager:
69
70    def __init__(self, optional_actions: int = termios.TCSANOW) -> None:
71        self.extra_finalize: Optional[str] = None
72        self.optional_actions = optional_actions
73
74    def set_state_for_loop(self, set_raw: bool = True) -> None:
75        if set_raw:
76            raw_tty(self.tty_fd, self.original_termios)
77        write_all(self.tty_fd, init_state())
78
79    def reset_state_to_original(self) -> None:
80        normal_tty(self.tty_fd, self.original_termios)
81        if self.extra_finalize:
82            write_all(self.tty_fd, self.extra_finalize)
83        write_all(self.tty_fd, reset_state())
84
85    @contextmanager
86    def suspend(self) -> Generator['TermManager', None, None]:
87        self.reset_state_to_original()
88        yield self
89        self.set_state_for_loop()
90
91    def __enter__(self) -> 'TermManager':
92        self.tty_fd, self.original_termios = open_tty(False, self.optional_actions)
93        self.set_state_for_loop(set_raw=False)
94        return self
95
96    def __exit__(self, *a: object) -> None:
97        self.reset_state_to_original()
98        close_tty(self.tty_fd, self.original_termios)
99        del self.tty_fd, self.original_termios
100
101
102LEFT, MIDDLE, RIGHT, FOURTH, FIFTH = 1, 2, 4, 8, 16
103DRAG = REPEAT
104bmap = {0: LEFT, 1: MIDDLE, 2: RIGHT}
105MOTION_INDICATOR = 1 << 5
106EXTRA_BUTTON_INDICATOR = 1 << 6
107SHIFT_INDICATOR = 1 << 2
108ALT_INDICATOR = 1 << 3
109CTRL_INDICATOR = 1 << 4
110
111
112class MouseEvent(NamedTuple):
113    x: int
114    y: int
115    type: int
116    buttons: int
117    mods: int
118
119
120def decode_sgr_mouse(text: str) -> MouseEvent:
121    cb_, x_, y_ = text.split(';')
122    m, y_ = y_[-1], y_[:-1]
123    cb, x, y = map(int, (cb_, x_, y_))
124    typ = RELEASE if m == 'm' else (DRAG if cb & MOTION_INDICATOR else PRESS)
125    buttons = 0
126    cb3 = cb & 3
127    if cb3 != 3:
128        if cb & EXTRA_BUTTON_INDICATOR:
129            buttons |= FIFTH if cb3 & 1 else FOURTH
130        else:
131            buttons |= bmap[cb3]
132    mods = 0
133    if cb & SHIFT_INDICATOR:
134        mods |= SHIFT
135    if cb & ALT_INDICATOR:
136        mods |= ALT
137    if cb & CTRL_INDICATOR:
138        mods |= CTRL
139    return MouseEvent(x, y, typ, buttons, mods)
140
141
142class UnhandledException(Handler):
143
144    def __init__(self, tb: str) -> None:
145        self.tb = tb
146
147    def initialize(self) -> None:
148        self.cmd.clear_screen()
149        self.cmd.set_scrolling_region()
150        self.cmd.set_cursor_visible(True)
151        self.cmd.set_default_colors()
152        self.write(self.tb.replace('\n', '\r\n'))
153        self.write('\r\n')
154        self.write('Press the Enter key to quit')
155
156    def on_key(self, key_event: KeyEventType) -> None:
157        if key_event.key == 'ENTER':
158            self.quit_loop(1)
159
160    def on_interrupt(self) -> None:
161        self.quit_loop(1)
162    on_eot = on_term = on_interrupt
163
164
165class SignalManager:
166
167    def __init__(
168        self,
169        loop: asyncio.AbstractEventLoop,
170        on_winch: Callable,
171        on_interrupt: Callable,
172        on_term: Callable
173    ) -> None:
174        self.asycio_loop = loop
175        self.on_winch, self.on_interrupt, self.on_term = on_winch, on_interrupt, on_term
176
177    def __enter__(self) -> None:
178        tuple(map(lambda x: self.asycio_loop.add_signal_handler(*x), (
179            (signal.SIGWINCH, self.on_winch),
180            (signal.SIGINT, self.on_interrupt),
181            (signal.SIGTERM, self.on_term)
182        )))
183
184    def __exit__(self, *a: Any) -> None:
185        tuple(map(self.asycio_loop.remove_signal_handler, (
186            signal.SIGWINCH, signal.SIGINT, signal.SIGTERM)))
187
188
189sanitize_bracketed_paste: str = '[\x03\x04\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]'
190
191
192class Loop:
193
194    def __init__(
195        self,
196        sanitize_bracketed_paste: str = sanitize_bracketed_paste,
197        optional_actions: int = termios.TCSADRAIN
198    ):
199        if is_macos:
200            # On macOS PTY devices are not supported by the KqueueSelector and
201            # the PollSelector is broken, causes 100% CPU usage
202            self.asycio_loop: asyncio.AbstractEventLoop = asyncio.SelectorEventLoop(selectors.SelectSelector())
203            asyncio.set_event_loop(self.asycio_loop)
204        else:
205            self.asycio_loop = asyncio.get_event_loop()
206        self.return_code = 0
207        self.optional_actions = optional_actions
208        self.read_buf = ''
209        self.decoder = codecs.getincrementaldecoder('utf-8')('ignore')
210        try:
211            self.iov_limit = max(os.sysconf('SC_IOV_MAX') - 1, 255)
212        except Exception:
213            self.iov_limit = 255
214        self.parse_input_from_terminal = partial(parse_input_from_terminal, self._on_text, self._on_dcs, self._on_csi, self._on_osc, self._on_pm, self._on_apc)
215        self.ebs_pat = re.compile('([\177\r\x03\x04])')
216        self.in_bracketed_paste = False
217        self.sanitize_bracketed_paste = bool(sanitize_bracketed_paste)
218        if self.sanitize_bracketed_paste:
219            self.sanitize_ibp_pat = re.compile(sanitize_bracketed_paste)
220
221    def _read_ready(self, handler: Handler, fd: int) -> None:
222        try:
223            bdata = os.read(fd, io.DEFAULT_BUFFER_SIZE)
224        except BlockingIOError:
225            return
226        if not bdata:
227            raise EOFError('The input stream is closed')
228        data = self.decoder.decode(bdata)
229        if self.read_buf:
230            data = self.read_buf + data
231        self.read_buf = data
232        self.handler = handler
233        try:
234            self.read_buf = self.parse_input_from_terminal(self.read_buf, self.in_bracketed_paste)
235        except Exception:
236            self.read_buf = ''
237            raise
238        finally:
239            del self.handler
240
241    # terminal input callbacks {{{
242    def _on_text(self, text: str) -> None:
243        if self.in_bracketed_paste and self.sanitize_bracketed_paste:
244            text = self.sanitize_ibp_pat.sub('', text)
245
246        for chunk in self.ebs_pat.split(text):
247            if len(chunk) == 1:
248                if chunk == '\r':
249                    self.handler.on_key(enter_key)
250                elif chunk == '\177':
251                    self.handler.on_key(backspace_key)
252                elif chunk == '\x03':
253                    self.handler.on_interrupt()
254                elif chunk == '\x04':
255                    self.handler.on_eot()
256                else:
257                    self.handler.on_text(chunk, self.in_bracketed_paste)
258            elif chunk:
259                self.handler.on_text(chunk, self.in_bracketed_paste)
260
261    def _on_dcs(self, dcs: str) -> None:
262        if dcs.startswith('@kitty-cmd'):
263            import json
264            self.handler.on_kitty_cmd_response(json.loads(dcs[len('@kitty-cmd'):]))
265        elif dcs.startswith('1+r'):
266            from binascii import unhexlify
267            vals = dcs[3:].split(';')
268            for q in vals:
269                parts = q.split('=', 1)
270                try:
271                    name, val = parts[0], unhexlify(parts[1]).decode('utf-8', 'replace')
272                except Exception:
273                    continue
274                self.handler.on_capability_response(name, val)
275
276    def _on_csi(self, csi: str) -> None:
277        q = csi[-1]
278        if q in 'mM':
279            if csi.startswith('<'):
280                # SGR mouse event
281                try:
282                    ev = decode_sgr_mouse(csi[1:])
283                except Exception:
284                    pass
285                else:
286                    self.handler.on_mouse(ev)
287        elif q in 'u~ABCDEHFPQRS':
288            if csi == '200~':
289                self.in_bracketed_paste = True
290                return
291            elif csi == '201~':
292                self.in_bracketed_paste = False
293                return
294            try:
295                k = decode_key_event(csi[:-1], q)
296            except Exception:
297                pass
298            else:
299                if k.matches('ctrl+c'):
300                    self.handler.on_interrupt()
301                    return
302                if k.matches('ctrl+d'):
303                    self.handler.on_eot()
304                    return
305                self.handler.on_key_event(k)
306
307    def _on_pm(self, pm: str) -> None:
308        pass
309
310    def _on_osc(self, osc: str) -> None:
311        m = re.match(r'(\d+);', osc)
312        if m is not None:
313            code = int(m.group(1))
314            rest = osc[m.end():]
315            if code == 52:
316                where, rest = rest.partition(';')[::2]
317                from_primary = 'p' in where
318                from base64 import standard_b64decode
319                self.handler.on_clipboard_response(standard_b64decode(rest).decode('utf-8'), from_primary)
320
321    def _on_apc(self, apc: str) -> None:
322        if apc.startswith('G'):
323            if self.handler.image_manager is not None:
324                self.handler.image_manager.handle_response(apc)
325    # }}}
326
327    def _write_ready(self, handler: Handler, fd: int) -> None:
328        if len(self.write_buf) > self.iov_limit:
329            self.write_buf[self.iov_limit - 1] = b''.join(self.write_buf[self.iov_limit - 1:])
330            del self.write_buf[self.iov_limit:]
331        sizes = tuple(map(len, self.write_buf))
332        total_size = sum(sizes)
333        if total_size:
334            try:
335                written = os.writev(fd, self.write_buf)
336            except BlockingIOError:
337                return
338            if not written:
339                raise EOFError('The output stream is closed')
340        else:
341            written = 0
342        if written >= total_size:
343            self.write_buf: List[bytes] = []
344            self.asycio_loop.remove_writer(fd)
345            self.waiting_for_writes = False
346            handler.on_writing_finished()
347        else:
348            consumed = 0
349            for i, buf in enumerate(self.write_buf):
350                if not written:
351                    break
352                if len(buf) <= written:
353                    written -= len(buf)
354                    consumed += 1
355                    continue
356                self.write_buf[i] = buf[written:]
357                break
358            del self.write_buf[:consumed]
359
360    def quit(self, return_code: Optional[int] = None) -> None:
361        if return_code is not None:
362            self.return_code = return_code
363        self.asycio_loop.stop()
364
365    def loop_impl(self, handler: Handler, term_manager: TermManager, image_manager: Optional[ImageManagerType] = None) -> Optional[str]:
366        self.write_buf = []
367        tty_fd = term_manager.tty_fd
368        tb = None
369        self.waiting_for_writes = True
370
371        def schedule_write(data: bytes) -> None:
372            self.write_buf.append(data)
373            if not self.waiting_for_writes:
374                self.asycio_loop.add_writer(tty_fd, self._write_ready, handler, tty_fd)
375                self.waiting_for_writes = True
376
377        def handle_exception(loop: asyncio.AbstractEventLoop, context: Dict[str, Any]) -> None:
378            nonlocal tb
379            loop.stop()
380            tb = context['message']
381            exc = context.get('exception')
382            if exc is not None:
383                import traceback
384                tb += '\n' + ''.join(traceback.format_exception(exc.__class__, exc, exc.__traceback__))
385
386        self.asycio_loop.set_exception_handler(handle_exception)
387        handler._initialize(self._get_screen_size(), term_manager, schedule_write, self, debug, image_manager)
388        with handler:
389            self.asycio_loop.add_reader(
390                    tty_fd, self._read_ready, handler, tty_fd)
391            self.asycio_loop.add_writer(
392                    tty_fd, self._write_ready, handler, tty_fd)
393            self.asycio_loop.run_forever()
394            self.asycio_loop.remove_reader(tty_fd)
395            if self.waiting_for_writes:
396                self.asycio_loop.remove_writer(tty_fd)
397        return tb
398
399    def loop(self, handler: Handler) -> None:
400        tb: Optional[str] = None
401
402        def _on_sigwinch() -> None:
403            self._get_screen_size.changed = True
404            handler.screen_size = self._get_screen_size()
405            handler.on_resize(handler.screen_size)
406
407        signal_manager = SignalManager(self.asycio_loop, _on_sigwinch, handler.on_interrupt, handler.on_term)
408        with TermManager(self.optional_actions) as term_manager, signal_manager:
409            self._get_screen_size: ScreenSizeGetter = screen_size_function(term_manager.tty_fd)
410            image_manager = None
411            if handler.image_manager_class is not None:
412                image_manager = handler.image_manager_class(handler)
413            try:
414                tb = self.loop_impl(handler, term_manager, image_manager)
415            except Exception:
416                import traceback
417                tb = traceback.format_exc()
418
419            term_manager.extra_finalize = b''.join(self.write_buf).decode('utf-8')
420            if tb is not None:
421                self.return_code = 1
422                self._report_error_loop(tb, term_manager)
423
424    def _report_error_loop(self, tb: str, term_manager: TermManager) -> None:
425        self.loop_impl(UnhandledException(tb), term_manager)
426